TiDB Development Guide
  • TiDB Development Guide
  • Contributing to TiDB Development Guide
  • .github
    • pull_request_template
  • TiDB Development Guide
    • Summary
    • contribute-to-tidb
      • Cherry-pick a Pull Request
      • TiDB Code Style and Quality Guide
      • Committer Guide
      • Community Guideline
      • Contribute Code
      • Contribute to TiDB
      • Issue Triage
      • Make a Proposal
      • Miscellaneous Topics
      • Release Notes Language Style Guide
      • Report an Issue
      • Review a Pull Request
      • Write Document
    • extending-tidb
      • Add a function
      • Extending TiDB
    • get-started
      • Get the code, build, and run
      • Commit the code and submit a pull request
      • Debug and profile
      • Install Golang
      • Get Started
      • run-and-debug-integration-tests
      • Setup an IDE
      • Write and run unit tests
    • project-management
      • Project Management
      • Release Train Model
      • TiDB Versioning
    • system-tables
      • System tables
      • slow_query
    • understand-tidb
      • 1PC
      • Async Commit
      • Cost-based Optimization
      • DDL - Data Definition Language / Schema change handling
      • DML
      • DQL
      • Execution
      • Implementation of Typical Operators
      • Implementation of Vectorized Execution
      • Introduction of TiDB Architecture
      • Lock Resolver
      • Memory Management Mechanism
      • MVCC Garbage Collection
      • Optimistic Transaction
      • Parallel Execution Framework
      • Parser
      • Pessimistic Transaction
      • Plan Cache
      • Planner
      • Plugin
      • Privilege
      • Rule-based Optimization
      • Session
      • SQL Plan Management
      • Table Statistics
      • The Life cycle of a Statement
      • transaction-on-tikv
      • Transaction
      • system-tables
        • System tables
        • information_schema
          • information_schema
          • slow_query
Powered by GitBook
On this page
  • Sort
  • HashAgg
  • HashJoin

Was this helpful?

  1. TiDB Development Guide
  2. understand-tidb

Implementation of Typical Operators

PreviousExecutionNextImplementation of Vectorized Execution

Last updated 1 year ago

Was this helpful?

This section introduces the implementation details of three typical TiDB operators: Sort, HashAgg, and HashJoin.

Firstly, every operator should implement three basic interfaces of Executor:

  • - Initializes the operator, sets up the memory tracker/disk tracker, and other meta-info for the current operator.

  • - Each call to Next returns a chunk of data. Returning an empty chunk indicates that the execution is complete for the current executor. Note that Next is not thread-safe. It's by design that Next is not called concurrently for all operators.

  • - Responsible for releasing all resources held by the executor.

Sort

The Sort operator is used to arrange the result set of a query in a specific order. In TiDB, the operator implementing sort is . The fundamental concept behind SortExec is to read all the data from its child executor and then sort the entire data set.

In Next, it invokes to read all the data from its child executor. fetchRowChunks aims to store all the data in one . The memory usage grows as the input data volume increases. To manage the memory usage, SortExec has spill-to-disk support. The details of this spilling are encapsulated within SortedRowContainer. Every time the insertion of a chunk into the current SortedRowContainer returns ErrCannotAddBecauseSorted, it indicates that the current SortedRowContainer has been spilled. SortExec will then create a new SortedRowContainer and insert the chunk into this new container. Once there's no data coming from its child executor, SortExec will the current SortedRowContainer.

After fetchRowChunks completes, Next starts producing sorted results. Depending on whether a spill to disk was initiated, there are two methods to produce the final sorted outcomes:

  • Spill not initiated: In this straightforward scenario, if there's no spill, since the entire SortedRowContainer gets sorted at the end of fetchRowChunks, during Next, it simply invokes to fetch the sorted data from SortedRowContainer.

  • Spill initiated: When a spill occurs, each spilling round produces an independent SortedRowContainer, stored in . In Next, an merges all partially sorted data streams into one final sorted data stream.

HashAgg

The HashAgg operator uses a hash table to perform grouping and aggregation. In TiDB, the operator implementing hash aggregation is .

HashAgg has two execution modes: parallel and non-parallel execution. During the build stage, the planner is responsible for deciding the execution mode for a HashAgg. A HashAgg will operate in non-parallel execution mode if one of the following conditions is true:

  • The aggregation function contains distinct.

  • The aggregation function (GROUP_CONCAT) contains order by.

  • The user explicitly sets both hashAggPartialConcurrency and hashAggFinalConcurrency to 1.

Non-parallel Execution

Parallel Execution

Parallel execution mode performs aggregation using multiple threads, dividing the aggregation into two stages:

  • Partial stage: each thread aggregates a portion of the input data into partial results.

  • Final stage: each thread aggregates the partial results into final results.

The flow of parallel execution is illustrated in the following graph:

                            +-------------+
                            | Main Thread |
                            +------+------+
                                   ^
                                   |
                                   +
                              +-+-            +-+
                              | |    ......   | |  finalOutputCh
                              +++-            +-+
                               ^
                               |
                               +---------------+
                               |               |
                 +--------------+             +--------------+
                 | final worker |     ......  | final worker |
                 +------------+-+             +-+------------+
                              ^                 ^
                              |                 |
                             +-+  +-+  ......  +-+
                             | |  | |          | |
                             ...  ...          ...    partialOutputChs
                             | |  | |          | |
                             +++  +++          +++
                              ^    ^            ^
          +-+                 |    |            |
          | |        +--------o----+            |
 inputCh  +-+        |        +-----------------+---+
          | |        |                              |
          ...    +---+------------+            +----+-----------+
          | |    | partial worker |   ......   | partial worker |
          +++    +--------------+-+            +-+--------------+
           |                     ^                ^
           |                     |                |
      +----v---------+          +++ +-+          +++
      | data fetcher | +------> | | | |  ......  | |   partialInputChs
      +--------------+          +-+ +-+          +-+

There are three types of threads that read data and execute the aggregation:

HashJoin

HashJoin constructs the results in two distinct stages:

  1. Fetch data from the build side child and build a hash table.

  2. Fetch data from the probe side child and probe the hash table using multiple join workers.

Build stage

Probe stage

The joiner offers three foundational interfaces:

Within join2Chunk/join2ChunkForOuterHashJoin, the probe work consists of three steps for each probe row:

  1. Quick tests are conducted before accessing the hash table to determine if a probe row won't find a match. For instance, in an inner join, if the join key contains null, the probe can bypass the hash table probing since null will never match any value. For rows that are non-matching, the onMissMatch function is invoked.

  2. The hash table is looked up to identify potential matching rows.

  3. In the absence of potential matching rows, the onMissMatch function is invoked. Otherwise, the tryToMatch function is executed.

Non-parallel execution mode performs aggregation in a single thread. is the core function for non-parallel execution. In unparallelExec, it first reads all the data from its child executor, then aggregates the data using . After execute completes, unparallelExec starts to generate results by traversing all the group-by keys, generating one row for each key by calling for each aggregation function.

: This thread's concurrency level is set to 1. It reads data from the child executor and places it into inputCh, serving as the input for each partial worker.

: The concurrency of HashAggPartialWorker is determined by hashAggPartialConcurrency. This worker reads the input data, executes partial aggregation on the data, produces partial results, and sends them to the final worker.

: The concurrency of HashAggFinalWorker is set by hashAggFinalConcurrency. This worker reads partial results, produces final results, and sends them to finalOutputCh.

Similar to Sort, HashAgg is also a memory-intensive operator. When HashAgg runs in non-parallel execution mode, it supports spill-to-disk functionality (spill-to-disk in parallel execution mode is currently ). Unlike Sort, which spills all data to disk, the HashAgg approach is different. In the current implementation, once a HashAgg is flagged for spilling, for all subsequent inputs, if the group-by key of a row already exists in the current hash map, the row will be inserted into the hash map. If not, the row gets spilled to disk. Detailed workings of the HashAgg spill can be explored .

The HashJoin operator uses a hash table to perform the join operation. In TiDB, the operator that implements hash join is .

The function orchestrates the build stage. Two threads are engaged in this work:

reads data from the build side child and funnels it into the buildSideResultCh.

retrieves input data from buildSideResultCh and subsequently constructs the hash table based on this input.

Detailed mechanics of building the hash table are encapsulated within the . It's worth noting that, as of now, TiDB does not support the parallel building of hash tables.

The function executes the probe stage. This stage engages two types of threads:

operates with a concurrency of 1. It reads data from the probe child and dispatches them to various probe workers.

instances read data from fetchProbeSideChunks and probe concurrently. The concurrency level is determined by ExecutorConcurrency.

Each probeWorker contains a , a core data structure implementing various join semantics. Every type of join in TiDB has its specialized joiner. The currently supported joiners include:

- For inner join

- For left outer join

- For right outer join

- For semi join

- For anti semi join

- For anti left outer semi join

- For left outer semi join

- For null aware anti semi join

- For null aware anti left outer semi join

- For each row from the probe side, it attempts to match the rows from the build side. Returns true if a match occurs and sets isNull for the special join types: AntiSemiJoin, LeftOuterSemiJoin, and AntiLeftOuterSemiJoin.

- Exclusive to outer join scenarios where the outer side acts as the build hash table. For each row from the probe (inner) side, it attempts to match the rows from the build (outer) side.

- Used in semi join scenarios to manage cases where no rows from the build side match the probe row.

During the probeWorker operation, it reads data from the probe side. For every probe row, it attempts to match against the hash table and saves the result into a result chunk. Most of these operations utilize the function for probing. However, for outer joins that use the outer side as the build side, the function is called upon.

Open
Next
Close
SortExec
fetchRowChunks
SortedRowContainer
sort
GetSortedRowAndAlwaysAppendToChunk
partitionList
external multi-way merge sort
HashAggExec
unparallelExec
execute
AppendFinalResult2Chunk
fetchChildData
HashAggPartialWorker
HashAggFinalWorker
under development
here
HashJoinExec
fetchAndBuildHashTable
fetchBuildSideRows
buildHashTableForList
hashRowContainer
fetchAndProbeHashTable
fetchProbeSideChunks
probeWorker
joiner
innerJoiner
leftOuterJoiner
rightOuterJoiner
semiJoiner
antiSemiJoiner
antiLeftOuterSemiJoiner
leftOuterSemiJoiner
nullAwareAntiSemiJoiner
nullAwareAntiLeftOuterSemiJoiner
tryToMatchInners
tryToMatchOuters
onMissMatch
join2Chunk
join2ChunkForOuterHashJoin