Implementation of Typical Operators
Last updated
Was this helpful?
Last updated
Was this helpful?
This section introduces the implementation details of three typical TiDB operators: Sort, HashAgg, and HashJoin.
Firstly, every operator should implement three basic interfaces of Executor
:
- Initializes the operator, sets up the memory tracker/disk tracker, and other meta-info for the current operator.
- Each call to Next
returns a chunk of data. Returning an empty chunk indicates that the execution is complete for the current executor. Note that Next
is not thread-safe. It's by design that Next
is not called concurrently for all operators.
- Responsible for releasing all resources held by the executor.
The Sort operator is used to arrange the result set of a query in a specific order. In TiDB, the operator implementing sort is . The fundamental concept behind SortExec
is to read all the data from its child executor and then sort the entire data set.
In Next
, it invokes to read all the data from its child executor. fetchRowChunks
aims to store all the data in one . The memory usage grows as the input data volume increases. To manage the memory usage, SortExec
has spill-to-disk support. The details of this spilling are encapsulated within SortedRowContainer
. Every time the insertion of a chunk into the current SortedRowContainer
returns ErrCannotAddBecauseSorted
, it indicates that the current SortedRowContainer
has been spilled. SortExec
will then create a new SortedRowContainer
and insert the chunk into this new container. Once there's no data coming from its child executor, SortExec
will the current SortedRowContainer
.
After fetchRowChunks
completes, Next
starts producing sorted results. Depending on whether a spill to disk was initiated, there are two methods to produce the final sorted outcomes:
Spill not initiated: In this straightforward scenario, if there's no spill, since the entire SortedRowContainer
gets sorted at the end of fetchRowChunks
, during Next
, it simply invokes to fetch the sorted data from SortedRowContainer
.
Spill initiated: When a spill occurs, each spilling round produces an independent SortedRowContainer
, stored in . In Next
, an merges all partially sorted data streams into one final sorted data stream.
The HashAgg
operator uses a hash table to perform grouping and aggregation. In TiDB, the operator implementing hash aggregation is .
HashAgg
has two execution modes: parallel and non-parallel execution. During the build stage, the planner is responsible for deciding the execution mode for a HashAgg
. A HashAgg
will operate in non-parallel execution mode if one of the following conditions is true:
The aggregation function contains distinct
.
The aggregation function (GROUP_CONCAT
) contains order by
.
The user explicitly sets both hashAggPartialConcurrency
and hashAggFinalConcurrency
to 1.
Parallel execution mode performs aggregation using multiple threads, dividing the aggregation into two stages:
Partial stage: each thread aggregates a portion of the input data into partial results.
Final stage: each thread aggregates the partial results into final results.
The flow of parallel execution is illustrated in the following graph:
There are three types of threads that read data and execute the aggregation:
HashJoin
constructs the results in two distinct stages:
Fetch data from the build side child and build a hash table.
Fetch data from the probe side child and probe the hash table using multiple join workers.
The joiner
offers three foundational interfaces:
Within join2Chunk/join2ChunkForOuterHashJoin
, the probe work consists of three steps for each probe row:
Quick tests are conducted before accessing the hash table to determine if a probe row won't find a match. For instance, in an inner join, if the join key contains null, the probe can bypass the hash table probing since null will never match any value. For rows that are non-matching, the onMissMatch
function is invoked.
The hash table is looked up to identify potential matching rows.
In the absence of potential matching rows, the onMissMatch
function is invoked. Otherwise, the tryToMatch
function is executed.
Non-parallel execution mode performs aggregation in a single thread. is the core function for non-parallel execution. In unparallelExec
, it first reads all the data from its child executor, then aggregates the data using . After execute
completes, unparallelExec
starts to generate results by traversing all the group-by keys, generating one row for each key by calling for each aggregation function.
: This thread's concurrency level is set to 1. It reads data from the child executor and places it into inputCh
, serving as the input for each partial worker.
: The concurrency of HashAggPartialWorker
is determined by hashAggPartialConcurrency
. This worker reads the input data, executes partial aggregation on the data, produces partial results, and sends them to the final worker.
: The concurrency of HashAggFinalWorker
is set by hashAggFinalConcurrency
. This worker reads partial results, produces final results, and sends them to finalOutputCh
.
Similar to Sort
, HashAgg
is also a memory-intensive operator. When HashAgg
runs in non-parallel execution mode, it supports spill-to-disk functionality (spill-to-disk in parallel execution mode is currently ). Unlike Sort
, which spills all data to disk, the HashAgg
approach is different. In the current implementation, once a HashAgg
is flagged for spilling, for all subsequent inputs, if the group-by key of a row already exists in the current hash map, the row will be inserted into the hash map. If not, the row gets spilled to disk. Detailed workings of the HashAgg
spill can be explored .
The HashJoin
operator uses a hash table to perform the join operation. In TiDB, the operator that implements hash join is .
The function orchestrates the build stage. Two threads are engaged in this work:
reads data from the build side child and funnels it into the buildSideResultCh
.
retrieves input data from buildSideResultCh
and subsequently constructs the hash table based on this input.
Detailed mechanics of building the hash table are encapsulated within the . It's worth noting that, as of now, TiDB does not support the parallel building of hash tables.
The function executes the probe stage. This stage engages two types of threads:
operates with a concurrency of 1. It reads data from the probe child and dispatches them to various probe workers.
instances read data from fetchProbeSideChunks
and probe concurrently. The concurrency level is determined by ExecutorConcurrency
.
Each probeWorker
contains a , a core data structure implementing various join semantics. Every type of join in TiDB has its specialized joiner. The currently supported joiners include:
- For inner join
- For left outer join
- For right outer join
- For semi join
- For anti semi join
- For anti left outer semi join
- For left outer semi join
- For null aware anti semi join
- For null aware anti left outer semi join
- For each row from the probe side, it attempts to match the rows from the build side. Returns true if a match occurs and sets isNull
for the special join types: AntiSemiJoin
, LeftOuterSemiJoin
, and AntiLeftOuterSemiJoin
.
- Exclusive to outer join scenarios where the outer side acts as the build hash table. For each row from the probe (inner) side, it attempts to match the rows from the build (outer) side.
- Used in semi join scenarios to manage cases where no rows from the build side match the probe row.
During the probeWorker
operation, it reads data from the probe side. For every probe row, it attempts to match against the hash table and saves the result into a result chunk. Most of these operations utilize the function for probing. However, for outer joins that use the outer side as the build side, the function is called upon.