Rule-based Optimization

As stated in the planner overview, rule-based optimization (usually used interchangeably with logical optimization in TiDB code) consists of logical optimization rules. These rules have predefined order to be iterated. Each rule has a responding flag, and a rule will be applied only if it is flagged and not disabled. The flag is set according to the SQL in the plan building stage.

The rule-based optimization will produce a logical plan tree that is logically equal to the original one. Besides the original plan tree, it will also make use of table schema information to make optimizations, but it doesn't rely on the statistics to do optimization (join reorder is the only exception, we'll talk about it later).

Implementation Patterns

Code for each rule is placed in a separated file named "rule_xxx_xxx.go".

All logical rule implements the logicalOptRule interface.

It is defined as follows:

type logicalOptRule interface {
	optimize(context.Context, LogicalPlan) (LogicalPlan, error)
	name() string
}

The overall logic of a rule is traversing the plan tree, matching a specific operator (or a pattern), and modifying the plan tree.

The traversal logic is implemented mainly in two ways:

Rules

Column Pruning

This is a very fundamental optimization. It will prune unneeded columns for each operator.

This main logic is in PruneColumns(parentUsedCols []*expression.Column) error method of the LogicalPlan interface. It traverses the plan tree from top to bottom. Each operator receives which columns are used by the parent operator, then uses this information to prune unneeded columns from itself (different kinds of operator would have different behaviors), then collect and pass columns needed by itself to its children.

Decorrelation

As stated in the planner overview, the correlated subquery in the SQL becomes the Apply operator, which is a special kind of Join operator, in the plan tree. If we can transform it to a normal Join and keep it logically equal to the Apply, we can do make more optimizations that are only available to normal join operators.

An Apply is equivalent to a Join if there are no correlated columns in its inner side. Here we try to pull up operators with correlated columns in the inner side across the Apply, then Apply can be changed to Join directly. So this kind of transformation is called decorrelation.

The main logic is in (*decorrelateSolver).optimize(). It finds Apply and tries to decorrelate it.

Currently, there're several cases we can decorrelate.

If the inner side is a Selection, we can directly change it to a join condition of the Apply.

Example:

CREATE TABLE t1(a INT, b INT);
CREATE TABLE t2(a INT, b INT);
EXPLAIN SELECT * FROM t1 WHERE t1.a IN (SELECT t2.a FROM t2 WHERE t2.b = t1.b);
+------------------------------+----------+-----------+---------------+--------------------------------------------------------------------------+
| id                           | estRows  | task      | access object | operator info                                                            |
+------------------------------+----------+-----------+---------------+--------------------------------------------------------------------------+
| HashJoin_22                  | 7984.01  | root      |               | semi join, equal:[eq(test2.t1.b, test2.t2.b) eq(test2.t1.a, test2.t2.a)] |
| ├─TableReader_28(Build)      | 9990.00  | root      |               | data:Selection_27                                                        |
| │ └─Selection_27             | 9990.00  | cop[tikv] |               | not(isnull(test2.t2.a))                                                  |
| │   └─TableFullScan_26       | 10000.00 | cop[tikv] | table:t2      | keep order:false, stats:pseudo                                           |
| └─TableReader_25(Probe)      | 9980.01  | root      |               | data:Selection_24                                                        |
|   └─Selection_24             | 9980.01  | cop[tikv] |               | not(isnull(test2.t1.a)), not(isnull(test2.t1.b))                         |
|     └─TableFullScan_23       | 10000.00 | cop[tikv] | table:t1      | keep order:false, stats:pseudo                                           |
+------------------------------+----------+-----------+---------------+--------------------------------------------------------------------------+

If the inner side is a MaxOneRow and its child can assure there will be one row at most, which means the MaxOneRow is unneeded, we can remove the MaxOneRow.

Example:

CREATE TABLE t1(a INT UNIQUE NOT NULL, b INT);
CREATE TABLE t2(a INT, b INT);
EXPLAIN SELECT t2.a, (SELECT t1.a FROM t1 WHERE t1.a = t2.a) FROM t2;
+-----------------------------+----------+-----------+----------------------+-----------------------------------------------------+
| id                          | estRows  | task      | access object        | operator info                                       |
+-----------------------------+----------+-----------+----------------------+-----------------------------------------------------+
| HashJoin_19                 | 12500.00 | root      |                      | left outer join, equal:[eq(test2.t2.a, test2.t1.a)] |
| ├─IndexReader_26(Build)     | 10000.00 | root      |                      | index:IndexFullScan_25                              |
| │ └─IndexFullScan_25        | 10000.00 | cop[tikv] | table:t1, index:a(a) | keep order:false, stats:pseudo                      |
| └─TableReader_22(Probe)     | 10000.00 | root      |                      | data:TableFullScan_21                               |
|   └─TableFullScan_21        | 10000.00 | cop[tikv] | table:t2             | keep order:false, stats:pseudo                      |
+-----------------------------+----------+-----------+----------------------+-----------------------------------------------------+

If the inner side is a Projection, we can move the calculation in the Projection into the Apply and add a new Projection above Apply if needed.

If the inner side is an Aggregation, it will be more complicated to decorrelate it. To assure correctness, there are many requirements. For example, the output schema of the outer side must be unique, the join type must be InnerJoin or LeftOuterJoin, there cannot be any join conditions in the Apply, and so on. We can pull up the Aggregation only when all of them are met. During pulling up, we cannot directly move the Aggregation to above the Apply. To assure correctness, its GroupByItems should also be set to the unique key of the outer side, and the join type of the Apply should also be set to LeftOuterJoin.

Example:

CREATE TABLE t1(a INT UNIQUE NOT NULL, b INT);
CREATE TABLE t2(a INT, b INT);
EXPLAIN SELECT a, (SELECT sum(t2.b) FROM t2 WHERE t2.a = t1.a) FROM t1;
+----------------------------------+----------+-----------+----------------------+--------------------------------------------------------------------------------------------+
| id                               | estRows  | task      | access object        | operator info                                                                              |
+----------------------------------+----------+-----------+----------------------+--------------------------------------------------------------------------------------------+
| HashAgg_11                       | 8000.00  | root      |                      | group by:Column#13, funcs:firstrow(Column#11)->test2.t1.a, funcs:sum(Column#12)->Column#10 |
| └─Projection_22                  | 12487.50 | root      |                      | test2.t1.a, cast(test2.t2.b, decimal(32,0) BINARY)->Column#12, test2.t1.a                  |
|   └─HashJoin_13                  | 12487.50 | root      |                      | left outer join, equal:[eq(test2.t1.a, test2.t2.a)]                                        |
|     ├─TableReader_21(Build)      | 9990.00  | root      |                      | data:Selection_20                                                                          |
|     │ └─Selection_20             | 9990.00  | cop[tikv] |                      | not(isnull(test2.t2.a))                                                                    |
|     │   └─TableFullScan_19       | 10000.00 | cop[tikv] | table:t2             | keep order:false, stats:pseudo                                                             |
|     └─IndexReader_18(Probe)      | 10000.00 | root      |                      | index:IndexFullScan_17                                                                     |
|       └─IndexFullScan_17         | 10000.00 | cop[tikv] | table:t1, index:a(a) | keep order:false, stats:pseudo                                                             |
+----------------------------------+----------+-----------+----------------------+--------------------------------------------------------------------------------------------+

There is one more case we can decorrelate when we fail to decorrelate Apply with an Aggregation in the inner side directly. That's when the inner side is an Aggregation, and the Aggregation's child operator is a Selection. Here we try to pull up the equal conditions in the Selection to above Aggregation, then change it to a join condition of the Apply. To assure correctness, correlated columns of the pulled-up conditions should also be added into LogicalAggregation.GroupByItems. Note that we'll do this transformation only if the Apply is guaranteed can be changed to a Join. Otherwise, we'll keep the plan tree unchanged.

Example:

CREATE TABLE t1(a INT, b INT);
CREATE TABLE t2(a INT, b INT);
EXPLAIN SELECT a, (SELECT sum(t2.b) FROM t2 WHERE t2.a = t1.a) FROM t1;
+----------------------------------+----------+-----------+---------------+----------------------------------------------------------------------------------------------+
| id                               | estRows  | task      | access object | operator info                                                                                |
+----------------------------------+----------+-----------+---------------+----------------------------------------------------------------------------------------------+
| HashJoin_11                      | 10000.00 | root      |               | left outer join, equal:[eq(test2.t1.a, test2.t2.a)]                                          |
| ├─HashAgg_20(Build)              | 7992.00  | root      |               | group by:test2.t2.a, funcs:sum(Column#11)->Column#10, funcs:firstrow(test2.t2.a)->test2.t2.a |
| │ └─TableReader_21               | 7992.00  | root      |               | data:HashAgg_15                                                                              |
| │   └─HashAgg_15                 | 7992.00  | cop[tikv] |               | group by:test2.t2.a, funcs:sum(test2.t2.b)->Column#11                                        |
| │     └─Selection_19             | 9990.00  | cop[tikv] |               | not(isnull(test2.t2.a))                                                                      |
| │       └─TableFullScan_18       | 10000.00 | cop[tikv] | table:t2      | keep order:false, stats:pseudo                                                               |
| └─TableReader_14(Probe)          | 10000.00 | root      |               | data:TableFullScan_13                                                                        |
|   └─TableFullScan_13             | 10000.00 | cop[tikv] | table:t1      | keep order:false, stats:pseudo                                                               |
+----------------------------------+----------+-----------+---------------+----------------------------------------------------------------------------------------------+

This rule will keep trying to decorrelate an Apply until it can't be decorrelated anymore. If there are no correlated columns in its inner side now, it is converted to a Join.

Decorrelation can't guarantee a better plan

It might be intuitive to think that a decorrelated Join is more efficient than the nested-loop style Apply. That's probably true in most cases. However, as we said above, decorrelation just enables us to make more optimizations that are only available for normal Join. This doesn't mean Apply is always a worse plan.

The decorrelation involves some "pull-up" operation. This usually makes the execution of the inner sub-tree of the Apply/Join becomes less efficient. And in some cases, for example, when the outer side of the Apply only has one row of data, the nested-loop style Apply execution won't incur inefficiency compared with a normal Join. In such cases, the decorrelated plan is worse than the original one.

Aggregation Elimination

This rule finds Aggregations and tries to remove useless Aggregation operator or useless DISTINCT of aggregate functions.

A DISTINCT of an aggregate function is useless when the argument of the aggregate function is a unique column. In this case, we can set the AggFuncDesc.HasDistinct to false directly.

Example:

CREATE TABLE t(a INT, b INT UNIQUE);
EXPLAIN SELECT count(distinct b) FROM t;
+----------------------------+----------+-----------+---------------------+----------------------------------+
| id                         | estRows  | task      | access object       | operator info                    |
+----------------------------+----------+-----------+---------------------+----------------------------------+
| StreamAgg_20               | 1.00     | root      |                     | funcs:count(Column#6)->Column#4  |
| └─IndexReader_21           | 1.00     | root      |                     | index:StreamAgg_8                |
|   └─StreamAgg_8            | 1.00     | cop[tikv] |                     | funcs:count(test2.t.b)->Column#6 |
|     └─IndexFullScan_19     | 10000.00 | cop[tikv] | table:t, index:b(b) | keep order:false, stats:pseudo   |
+----------------------------+----------+-----------+---------------------+----------------------------------+

This part is implemented in (*aggregationEliminateChecker).tryToEliminateDistinct().

An Aggregation is useless if its GroupByItems are unique column(s). In this case, we can remove this Aggregation. But we still need a Projection in the same place. Because for most aggregate function, its arguments and result have different types and we need a Projection to keep their types correct. And we also need to rewrite some expressions to correctly handle NULL values.

Example:

CREATE TABLE t(a INT, b INT UNIQUE NOT NULL);
EXPLAIN SELECT count(a), sum(a), max(a) FROM t GROUP BY b;
+-------------------------+----------+-----------+---------------+---------------------------------------------------------------------------------------------------+
| id                      | estRows  | task      | access object | operator info                                                                                     |
+-------------------------+----------+-----------+---------------+---------------------------------------------------------------------------------------------------+
| Projection_5            | 10000.00 | root      |               | if(isnull(test2.t.a), 0, 1)->Column#4, cast(test2.t.a, decimal(32,0) BINARY)->Column#5, test2.t.a |
| └─TableReader_7         | 10000.00 | root      |               | data:TableFullScan_6                                                                              |
|   └─TableFullScan_6     | 10000.00 | cop[tikv] | table:t       | keep order:false, stats:pseudo                                                                    |
+-------------------------+----------+-----------+---------------+---------------------------------------------------------------------------------------------------+

This part is implemented in (*aggregationEliminateChecker).tryToEliminateAggregation().

Projection Elimination

Projection elimination finds Projection and try to remove useless Projections. The main logic is in (*projectionEliminator).eliminate(p LogicalPlan, replace map[string]*expression.Column, canEliminate bool).

Generally, there are two cases we can optimize. First, if there are two Projections in a row, we can merge them into one Projection. Second, if all expressions of a Projection are Column, which means there are no extra calculations, we can remove this Projection.

Note that for the second case, not all Projection can be eliminated. For example, the Projection at the top of the plan tree or below UnionAll can't be removed. This is indicated by the canEliminate parameter.

Max/Min Elimination

Max/Min elimination finds Aggregation with max() or min() aggregate function. It doesn't actually "eliminate" the Aggregation. It adds Limit and Sort to get the same effect of max() and min(), but the Aggregagation is remained to assure correctness.

Example:

CREATE TABLE t(a int, b int UNIQUE NOT NULL);
EXPLAIN SELECT MAX(a) FROM t;
+--------------------------------+----------+-----------+---------------+-----------------------------------+
| id                             | estRows  | task      | access object | operator info                     |
+--------------------------------+----------+-----------+---------------+-----------------------------------+
| StreamAgg_10                   | 1.00     | root      |               | funcs:max(test2.t.a)->Column#4    |
| └─TopN_11                      | 1.00     | root      |               | test2.t.a:desc, offset:0, count:1 |
|   └─TableReader_19             | 1.00     | root      |               | data:TopN_18                      |
|     └─TopN_18                  | 1.00     | cop[tikv] |               | test2.t.a:desc, offset:0, count:1 |
|       └─Selection_17           | 9990.00  | cop[tikv] |               | not(isnull(test2.t.a))            |
|         └─TableFullScan_16     | 10000.00 | cop[tikv] | table:t       | keep order:false, stats:pseudo    |
+--------------------------------+----------+-----------+---------------+-----------------------------------+
EXPLAIN SELECT MIN(a) FROM t;
+--------------------------------+----------+-----------+---------------+--------------------------------+
| id                             | estRows  | task      | access object | operator info                  |
+--------------------------------+----------+-----------+---------------+--------------------------------+
| StreamAgg_10                   | 1.00     | root      |               | funcs:min(test2.t.a)->Column#4 |
| └─TopN_11                      | 1.00     | root      |               | test2.t.a, offset:0, count:1   |
|   └─TableReader_19             | 1.00     | root      |               | data:TopN_18                   |
|     └─TopN_18                  | 1.00     | cop[tikv] |               | test2.t.a, offset:0, count:1   |
|       └─Selection_17           | 9990.00  | cop[tikv] |               | not(isnull(test2.t.a))         |
|         └─TableFullScan_16     | 10000.00 | cop[tikv] | table:t       | keep order:false, stats:pseudo |
+--------------------------------+----------+-----------+---------------+--------------------------------+

This change enables TiDB to make use of indexes, which are ordered by certain column(s). In the optimal case, we only need to scan one row in TiKV.

This optimization will become a little more complicated when there are more than one max() or min() function. In this case, we will compose a plan tree for every max() or min() function, then put them together with a Join. Note that we'll only do this when we can make sure every max() or min() function can benefit from index. This is checked in (*maxMinEliminator).splitAggFuncAndCheckIndices().

Example:

CREATE TABLE t(a int, b int, INDEX ia(a), INDEX ib(b));
EXPLAIN SELECT MAX(a), MIN(b) FROM t;
+----------------------------------+---------+-----------+----------------------+-------------------------------------+
| id                               | estRows | task      | access object        | operator info                       |
+----------------------------------+---------+-----------+----------------------+-------------------------------------+
| HashJoin_18                      | 1.00    | root      |                      | CARTESIAN inner join                |
| ├─StreamAgg_34(Build)            | 1.00    | root      |                      | funcs:min(test2.t.b)->Column#5      |
| │ └─Limit_38                     | 1.00    | root      |                      | offset:0, count:1                   |
| │   └─IndexReader_45             | 1.00    | root      |                      | index:Limit_44                      |
| │     └─Limit_44                 | 1.00    | cop[tikv] |                      | offset:0, count:1                   |
| │       └─IndexFullScan_43       | 1.00    | cop[tikv] | table:t, index:ib(b) | keep order:true, stats:pseudo       |
| └─StreamAgg_21(Probe)            | 1.00    | root      |                      | funcs:max(test2.t.a)->Column#4      |
|   └─Limit_25                     | 1.00    | root      |                      | offset:0, count:1                   |
|     └─IndexReader_32             | 1.00    | root      |                      | index:Limit_31                      |
|       └─Limit_31                 | 1.00    | cop[tikv] |                      | offset:0, count:1                   |
|         └─IndexFullScan_30       | 1.00    | cop[tikv] | table:t, index:ia(a) | keep order:true, desc, stats:pseudo |
+----------------------------------+---------+-----------+----------------------+-------------------------------------+

Predicate Pushdown

This is a very fundamental and important optimization. It traverses the plan tree from top to bottom, collects predicates (filter conditions), and tries to push them down as deep as possible.

The main logic is in the PredicatePushDown([]expression.Expression) ([]expression.Expression, LogicalPlan) method of LogicalPlan interface. The parament is the pushed-down predicates. The return values are predicates that can't be pushed down anymore and the child operator after pushing down predicates.

The predicates mainly come from Selection. The predicates can be pushed across some operators, like Projection and UnionAll. For some operators, we can only push down predicates when some requirements are met. For example, we can only push predicates across Window if all Columns in the predicates are Window's PartitionBy columns. For some operators, we can't push predicates across them, like Limit.

In the optimal case, the predicates reach DataSource and can be pushed down to the storage layer in the physical optimization stage.

Join is a special case in this rule. We not only push down predicates for Join, but we also make some other optimizations here. They are implemented in (*LogicalJoin).PredicatePushDown. Two of them are important and explained as follows.

First, we will try to "simplify" outer joins, which means convert outer joins to inner joins. As we know, outer join is different from inner join because we will pad NULLs for unmatched rows from the outer side. If the predicates are guaranteed to filter such rows, this join makes no difference from an inner join. In this case, we can directly change it to an inner join.

Example:

CREATE TABLE t(a int, b int);
CREATE TABLE t1(a int, b int);
EXPLAIN SELECT * FROM t LEFT JOIN t1 ON t.a = t1.a WHERE t1.a IS NOT NULL;
+------------------------------+----------+-----------+---------------+-----------------------------------------------+
| id                           | estRows  | task      | access object | operator info                                 |
+------------------------------+----------+-----------+---------------+-----------------------------------------------+
| HashJoin_8                   | 12487.50 | root      |               | inner join, equal:[eq(test2.t.a, test2.t1.a)] |
| ├─TableReader_15(Build)      | 9990.00  | root      |               | data:Selection_14                             |
| │ └─Selection_14             | 9990.00  | cop[tikv] |               | not(isnull(test2.t1.a))                       |
| │   └─TableFullScan_13       | 10000.00 | cop[tikv] | table:t1      | keep order:false, stats:pseudo                |
| └─TableReader_12(Probe)      | 9990.00  | root      |               | data:Selection_11                             |
|   └─Selection_11             | 9990.00  | cop[tikv] |               | not(isnull(test2.t.a))                        |
|     └─TableFullScan_10       | 10000.00 | cop[tikv] | table:t       | keep order:false, stats:pseudo                |
+------------------------------+----------+-----------+---------------+-----------------------------------------------+

Second, we will also try to derive some extra conditions that are the common conditions from the existing OR predicates or try to add NOT NULL when possible. This enables us to push more predicates down.

Example:

EXPLAIN SELECT * FROM t1 JOIN t ON t1.b = t.b WHERE (t1.a=1 AND t.a=1) OR (t1.a=2 AND t.a=2);
+---------------------------------+----------+-----------+----------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id                              | estRows  | task      | access object        | operator info                                                                                                                                                                                                            |
+---------------------------------+----------+-----------+----------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| IndexJoin_13                    | 24.98    | root      |                      | inner join, inner:IndexLookUp_12, outer key:test2.t1.b, inner key:test2.t2.b, equal cond:eq(test2.t1.b, test2.t2.b), other cond:or(and(eq(test2.t1.a, 1), eq(test2.t2.a, 1)), and(eq(test2.t1.a, 2), eq(test2.t2.a, 2))) |
| ├─TableReader_27(Build)         | 19.98    | root      |                      | data:Selection_26                                                                                                                                                                                                        |
| │ └─Selection_26                | 19.98    | cop[tikv] |                      | not(isnull(test2.t1.b)), or(eq(test2.t1.a, 1), eq(test2.t1.a, 2))                                                                                                                                                        |
| │   └─TableFullScan_25          | 10000.00 | cop[tikv] | table:t1             | keep order:false, stats:pseudo                                                                                                                                                                                           |
| └─IndexLookUp_12(Probe)         | 1.00     | root      |                      |                                                                                                                                                                                                                          |
|   ├─IndexRangeScan_9(Build)     | 1.00     | cop[tikv] | table:t2, index:b(b) | range: decided by [eq(test2.t2.b, test2.t1.b)], keep order:false, stats:pseudo                                                                                                                                           |
|   └─Selection_11(Probe)         | 1.00     | cop[tikv] |                      | or(eq(test2.t2.a, 1), eq(test2.t2.a, 2))                                                                                                                                                                                 |
|     └─TableRowIDScan_10         | 1.00     | cop[tikv] | table:t2             | keep order:false, stats:pseudo                                                                                                                                                                                           |
+---------------------------------+----------+-----------+----------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Outer Join Elimination

This rule finds and tries to eliminate outer Join. Specifically, it removes the Join and its inner side sub-plan tree.

We can do this only when the operators above Join only need columns from their outer side. But this is not enough. We also need at least one of the following requirements to be met:

  1. The join keys from the inner side are unique. This means the LogicalJoin has no effects on the rows from the outer side;

  2. Duplicated rows from the output of Join have no effect on the calculation results. Specifically, this is when there's a Aggregation above the Join and the aggregation functions in it have DISTINCT or are max(), min(), firstrow() or approx_count_distinct().

Example:

CREATE TABLE t1(a INT, b INT);
CREATE TABLE t2(a INT, b INT UNIQUE NOT NULL);
EXPLAIN SELECT t1.a, t1.b FROM t1 LEFT JOIN t2 on t1.b = t2.b;
+-----------------------+----------+-----------+---------------+--------------------------------+
| id                    | estRows  | task      | access object | operator info                  |
+-----------------------+----------+-----------+---------------+--------------------------------+
| TableReader_7         | 10000.00 | root      |               | data:TableFullScan_6           |
| └─TableFullScan_6     | 10000.00 | cop[tikv] | table:t1      | keep order:false, stats:pseudo |
+-----------------------+----------+-----------+---------------+--------------------------------+
EXPLAIN SELECT count(distinct t1.a), max(t1.b) FROM t1 LEFT JOIN t2 on t1.b = t2.b;
+--------------------------+----------+-----------+---------------+-----------------------------------------------------------------------------+
| id                       | estRows  | task      | access object | operator info                                                               |
+--------------------------+----------+-----------+---------------+-----------------------------------------------------------------------------+
| StreamAgg_8              | 1.00     | root      |               | funcs:count(distinct test2.t1.a)->Column#7, funcs:max(test2.t1.b)->Column#8 |
| └─TableReader_12         | 10000.00 | root      |               | data:TableFullScan_11                                                       |
|   └─TableFullScan_11     | 10000.00 | cop[tikv] | table:t1      | keep order:false, stats:pseudo                                              |
+--------------------------+----------+-----------+---------------+-----------------------------------------------------------------------------+

Partition Pruning

This rule finds DataSource containing a partitioned table. For each partition, there will be a separated DataSource, and they will be composed together by a special PartitionUnionAll operator. This rule is responsible for this work, and during this process, it will try to prune unneeded partitions according to the pushed-down filter conditions.

The rationale of this rule is rather simple, but there are different kinds of partition types and the pushed-down conditions can be very complicated. So the implementation details of this rule are also complicated. Some descriptions of these details can be found in the official docs.

Note that there is a feature called dynamic pruning. As of this section is written, it is an experimental feature and is not enabled by default. In this mode, we no longer build a DataSource for every partition. Accessing partitions is done in one operator, and the partition pruning work is done at the execution stage. So this rule is not needed in this mode.

Aggregation Pushdown

This rule finds LogicalAggregation and tries to push it down. Currently, we can push it across Join, Projection, UnionAll, and PartitionUnionAll. Note that pushdown here doesn't mean "move this operator below other operators". There should be one Aggregation remained in the original position and another Aggregation pushed down to assure correctness.

Pushing Aggregation across Join is the most complicated case of them. The aggregate functions are separated into left and right sides and we try to push them to the left and right side of Join respectively. There are many requirements to make this transformation. For example, the join type should be among InnerJoin, LeftOuterJoin and RightOuterJoin. Only specific types of aggregate functions can be pushed down. And when we try to push aggregate functions down to one side of the Join, there can't be count() and sum() functions in the other side. If all requirements are met, we can generate and push down Aggregation. The new Aggregation is also transformed and different from the original Aggregation. For example, the columns in the join conditions should be added into GroupByItems.

Example:

CREATE TABLE t1(a int, b int);
CREATE TABLE t2(a int, b int);
set @@tidb_opt_agg_push_down=1;
explain select max(t1.b), min(t2.b) from t1 left join t2 on t1.a = t2.a;
+------------------------------------+----------+-----------+---------------+----------------------------------------------------------------------------------------------+
| id                                 | estRows  | task      | access object | operator info                                                                                |
+------------------------------------+----------+-----------+---------------+----------------------------------------------------------------------------------------------+
| HashAgg_9                          | 1.00     | root      |               | funcs:max(Column#10)->Column#7, funcs:min(Column#9)->Column#8                                |
| └─HashJoin_10                      | 8000.00  | root      |               | left outer join, equal:[eq(test2.t1.a, test2.t2.a)]                                          |
|   ├─HashAgg_25(Build)              | 7992.00  | root      |               | group by:test2.t2.a, funcs:min(Column#13)->Column#9, funcs:firstrow(test2.t2.a)->test2.t2.a  |
|   │ └─TableReader_26               | 7992.00  | root      |               | data:HashAgg_20                                                                              |
|   │   └─HashAgg_20                 | 7992.00  | cop[tikv] |               | group by:test2.t2.a, funcs:min(test2.t2.b)->Column#13                                        |
|   │     └─Selection_24             | 9990.00  | cop[tikv] |               | not(isnull(test2.t2.a))                                                                      |
|   │       └─TableFullScan_23       | 10000.00 | cop[tikv] | table:t2      | keep order:false, stats:pseudo                                                               |
|   └─HashAgg_16(Probe)              | 8000.00  | root      |               | group by:test2.t1.a, funcs:max(Column#11)->Column#10, funcs:firstrow(test2.t1.a)->test2.t1.a |
|     └─TableReader_17               | 8000.00  | root      |               | data:HashAgg_12                                                                              |
|       └─HashAgg_12                 | 8000.00  | cop[tikv] |               | group by:test2.t1.a, funcs:max(test2.t1.b)->Column#11                                        |
|         └─TableFullScan_15         | 10000.00 | cop[tikv] | table:t1      | keep order:false, stats:pseudo                                                               |
+------------------------------------+----------+-----------+---------------+----------------------------------------------------------------------------------------------+

Pushing Aggregation across Projection is rather simple. It directly moves the calculation in the Projection into Aggregation. Then the Projection can be removed. And there will be only one Aggregation.

Pushing Aggregation across UnionAll and PartitionUnionAll share the same logic. It's similar to the LogicalJoin case. There will be some checks. If all requirements are met. We can generate and push down a new LogicalAggregation across UnionAll or PartitionUnionAll. Note that the original Aggregation may also be modified here.

Example:

CREATE TABLE t1(a int, b int);
CREATE TABLE t2(a int, b int);
-- Note that we need to turn on this variable to enable this optimization. We will explain the reason later.
set @@tidb_opt_agg_push_down=1;
explain select count(a) from (select * from t1 union all select * from t2);
+----------------------------------+----------+-----------+----------------------+------------------------------------+
| id                               | estRows  | task      | access object        | operator info                      |
+----------------------------------+----------+-----------+----------------------+------------------------------------+
| HashAgg_14                       | 1.00     | root      |                      | funcs:count(Column#10)->Column#9   |
| └─Union_15                       | 2.00     | root      |                      |                                    |
|   ├─StreamAgg_31                 | 1.00     | root      |                      | funcs:count(Column#12)->Column#10  |
|   │ └─IndexReader_32             | 1.00     | root      |                      | index:StreamAgg_19                 |
|   │   └─StreamAgg_19             | 1.00     | cop[tikv] |                      | funcs:count(test2.t1.a)->Column#12 |
|   │     └─IndexFullScan_30       | 10000.00 | cop[tikv] | table:t1, index:a(a) | keep order:false, stats:pseudo     |
|   └─StreamAgg_48                 | 1.00     | root      |                      | funcs:count(Column#14)->Column#10  |
|     └─TableReader_49             | 1.00     | root      |                      | data:StreamAgg_40                  |
|       └─StreamAgg_40             | 1.00     | cop[tikv] |                      | funcs:count(test2.t2.a)->Column#14 |
|         └─TableFullScan_47       | 10000.00 | cop[tikv] | table:t2             | keep order:false, stats:pseudo     |
+----------------------------------+----------+-----------+----------------------+------------------------------------+

Aggregation Pushdown can't guarantee a better plan

As we know, Aggregation usually involves heavy calculations. After aggregation pushdown, the original Aggregation operator will become two different Aggregation operators (except for the Projection case), so it's possible that the plan with Aggregation pushed down is worse than the original plan.

In TiDB's current implementation, this kind of scenario mainly happens when we push an Aggregation across Join. Because there will be additional group by keys added into the pushed-down Aggregation. And the NDV (number of distinct values) of the new group by keys may be very high. That will make this Aggregation waste lots of calculation resources. So currently we use a system variable tidb_opt_agg_push_down to control this optimization, which is disabled by default.

TopN Pushdown

TopN is an operator not directly corresponding to any syntax in the SQL. Its semantic is equal to a Limit above a Sort. We can execute it more efficiently when they are together, so we create a new operator for this case.

type LogicalTopN struct {
	baseLogicalPlan

	ByItems    []*util.ByItems
	Offset     uint64
	Count      uint64
	limitHints limitHintInfo
}

type LogicalLimit struct {
	logicalSchemaProducer

	Offset     uint64
	Count      uint64
	limitHints limitHintInfo
}

type LogicalSort struct {
	baseLogicalPlan

	ByItems []*util.ByItems
}

This rule is mainly implemented by the pushDownTopN(topN *LogicalTopN) LogicalPlan method of the LogicalPlan interface. Like the predicate push down, it traverses the plan tree from top to bottom and collects TopN information from operators.

When it meets a Limit, the Limit itself is converted into a TopN and pushed down. This is where the TopN operator appears for the first time in a plan tree.

For most kinds of operators, the pushed-down TopN just can't be pushed down anymore, and it becomes a TopN operator above this operator.

There are several cases we can optimize:

When it meets a Sort, it is merged into ByItems of the pushed-down TopN. If the TopN already has ByItems, this Sort becomes useless and can be removed directly.

When it meets a Projection, the TopN can be directly pushed down across it.

When it meets a UnionAll or Join. The TopN can be pushed down. Like in the aggregation push down, we put one TopN above and push down another one across the operator. The pushed-down one should be modified. Its Offset should be added into Count and set to 0. Also note that for Join, we can only push down for outer joins and only push down across the outer side.

Example:

CREATE TABLE t1(a INT, b INT);
CREATE TABLE t2(a INT, b INT);
EXPLAIN SELECT * FROM t1 LEFT JOIN t2 ON t1.a = t2.a ORDER BY t1.b LIMIT 20, 10;
+----------------------------------+----------+-----------+---------------+-----------------------------------------------------+
| id                               | estRows  | task      | access object | operator info                                       |
+----------------------------------+----------+-----------+---------------+-----------------------------------------------------+
| TopN_12                          | 10.00    | root      |               | test2.t1.b, offset:20, count:10                     |
| └─HashJoin_18                    | 37.50    | root      |               | left outer join, equal:[eq(test2.t1.a, test2.t2.a)] |
|   ├─TopN_19(Build)               | 30.00    | root      |               | test2.t1.b, offset:0, count:30                      |
|   │ └─TableReader_26             | 30.00    | root      |               | data:TopN_25                                        |
|   │   └─TopN_25                  | 30.00    | cop[tikv] |               | test2.t1.b, offset:0, count:30                      |
|   │     └─TableFullScan_24       | 10000.00 | cop[tikv] | table:t1      | keep order:false, stats:pseudo                      |
|   └─TableReader_29(Probe)        | 9990.00  | root      |               | data:Selection_28                                   |
|     └─Selection_28               | 9990.00  | cop[tikv] |               | not(isnull(test2.t2.a))                             |
|       └─TableFullScan_27         | 10000.00 | cop[tikv] | table:t2      | keep order:false, stats:pseudo                      |
+----------------------------------+----------+-----------+---------------+-----------------------------------------------------+
EXPLAIN SELECT * FROM (select * from t1 union all select * from t2) ORDER BY b LIMIT 20, 10;
+----------------------------------+----------+-----------+---------------+--------------------------------+
| id                               | estRows  | task      | access object | operator info                  |
+----------------------------------+----------+-----------+---------------+--------------------------------+
| TopN_17                          | 10.00    | root      |               | Column#8, offset:20, count:10  |
| └─Union_22                       | 60.00    | root      |               |                                |
|   ├─TopN_24                      | 30.00    | root      |               | test2.t1.b, offset:0, count:30 |
|   │ └─TableReader_31             | 30.00    | root      |               | data:TopN_30                   |
|   │   └─TopN_30                  | 30.00    | cop[tikv] |               | test2.t1.b, offset:0, count:30 |
|   │     └─TableFullScan_29       | 10000.00 | cop[tikv] | table:t1      | keep order:false, stats:pseudo |
|   └─TopN_33                      | 30.00    | root      |               | test2.t2.b, offset:0, count:30 |
|     └─TableReader_40             | 30.00    | root      |               | data:TopN_39                   |
|       └─TopN_39                  | 30.00    | cop[tikv] |               | test2.t2.b, offset:0, count:30 |
|         └─TableFullScan_38       | 10000.00 | cop[tikv] | table:t2      | keep order:false, stats:pseudo |
+----------------------------------+----------+-----------+---------------+--------------------------------+

Join Reorder

Join reorder tries to find the most efficient order to join several tables together. In fact, it's not a rule-based optimization. It makes use of statistics to estimate row counts of join results. We put join reorder in this stage out of convenience.

Currently, we have implemented two join reorder algorithms: greedy and dynamic programming. The dynamic programming one is not mature enough now and is disabled by default. We focus on the greedy algorithm here.

There are three files relevant to join reorder. rule_join_reorder.go contains the entry and common logic of join reorder. rule_join_reorder_dp.go contains the dynamic programming algorithm. rule_join_reorder_greedy.go contains the greedy algorithm.

At the beginning of join reorder, we extract "join groups" from the plan tree. A join group is some sub-trees connected together by inner Joins directly, which means there can't exist any other kind of operator between inner Joins. The join reorder algorithm optimizes one join group at a time. And join groups are optimized from bottom to top.

For every node in a join group, the row count is estimated. The join result row count is estimated using the simple and classic leftRowCount * rightRowCount / max(leftNDV, rightNDV) formula. Then two of them, which can get the minimum cost (calculated in (*baseSingleGroupJoinOrderSolver).baseNodeCumCost()), are chosen, connected by an inner join, then added into the join group. This process is repeated until all nodes in the join group are joined together.

Build Key Information

This one is actually not an optimization rule. It collects information from bottom to top that is needed by other optimizations. Two kinds of information are collected and set up for each operator.

The first information is the unique key. This is collected in DataSource from table schema information and stored as KeyInfo in the Schema for each operator. There is one thing tricky about the unique key: when you declare UNIQUE for one column when creating a table, there can be duplicated NULLs in this column actually. You should declare UNIQUE NOT NULL to get "true" uniqueness.

The second is the MaxOneRow attribute, which means if this operator is guaranteed to output no more than one row.

Ending

Currently, our rule-based optimization is a batch of rules executed in a fixed order. This is not enough to make some optimizations when the query is complicated. So we usually do more things than what the name of a rule implies. As stated above, we specially optimize Joins in predicate pushdown. Except for that, we also try to eliminate aggregations in aggregation pushdown and build key information for the newly generated Aggregations. There are more examples like that.

Some optimization rules are also not guaranteed to produce a better plan like decorrelation and aggregation push down. In theory, the physical distribution of data should be considered when making such optimizations. However, we don't have such a fine-grained strategy for these rules. Now we mainly rely on heuristics and variables that control the behaviors.

As this section is written, TiDB doesn't record transformation steps in rule-based optimization and doesn't support printing logical plans. But usually, the transformation steps are reliably reproducible given query and table schema. So the most effective method to learn about it in depth or investigate a bug is to place breakpoints in logicalOptimize() and see the runtime information using debug tools.

Last updated