TiDB Development Guide
  • TiDB Development Guide
  • Contributing to TiDB Development Guide
  • .github
    • pull_request_template
  • TiDB Development Guide
    • Summary
    • contribute-to-tidb
      • Cherry-pick a Pull Request
      • TiDB Code Style and Quality Guide
      • Committer Guide
      • Community Guideline
      • Contribute Code
      • Contribute to TiDB
      • Issue Triage
      • Make a Proposal
      • Miscellaneous Topics
      • Release Notes Language Style Guide
      • Report an Issue
      • Review a Pull Request
      • Write Document
    • extending-tidb
      • Add a function
      • Extending TiDB
    • get-started
      • Get the code, build, and run
      • Commit the code and submit a pull request
      • Debug and profile
      • Install Golang
      • Get Started
      • run-and-debug-integration-tests
      • Setup an IDE
      • Write and run unit tests
    • project-management
      • Project Management
      • Release Train Model
      • TiDB Versioning
    • system-tables
      • System tables
      • slow_query
    • understand-tidb
      • 1PC
      • Async Commit
      • Cost-based Optimization
      • DDL - Data Definition Language / Schema change handling
      • DML
      • DQL
      • Execution
      • Implementation of Typical Operators
      • Implementation of Vectorized Execution
      • Introduction of TiDB Architecture
      • Lock Resolver
      • Memory Management Mechanism
      • MVCC Garbage Collection
      • Optimistic Transaction
      • Parallel Execution Framework
      • Parser
      • Pessimistic Transaction
      • Plan Cache
      • Planner
      • Plugin
      • Privilege
      • Rule-based Optimization
      • Session
      • SQL Plan Management
      • Table Statistics
      • The Life cycle of a Statement
      • transaction-on-tikv
      • Transaction
      • system-tables
        • System tables
        • information_schema
          • information_schema
          • slow_query
Powered by GitBook
On this page
  • TiDB part
  • Preparations
  • Prewrite
  • Commit
  • Transaction recovery
  • DDL compatibility
  • TiKV part
  • Concurrency manager
  • Prewrite
  • Memory lock checking
  • Check transaction status
  • Update max TS on region changes
  • Summary

Was this helpful?

  1. TiDB Development Guide
  2. understand-tidb

Async Commit

Previous1PCNextCost-based Optimization

Last updated 3 years ago

Was this helpful?

Async commit is an optimization of two phase commit introduced in TiDB 5.0. The optimization greatly reduces the latency of the two phase commit process.

This document talks about the implementation of async commit in TiDB. It is recommended that you have learned about the first.

This document refers to the code of , the corresponding and .

TiDB part

Preparations

Async commit does not change the behavior during transaction execution. The changes begin from .

Because we need to record the key list in the primary lock, it is not suitable to use the async commit protocol for large transactions. And binlog does not support async commit, so we disable async commit if binlog is enabled. These checks can be found .

And in the , we proves that using a latest timestamp from PD can guarantee linearizability. You can find the code . Actually, it is not always necessary to get this timestamp, the comment explains it:

// priority of the sysvar is lower than `start transaction with causal consistency only`
if val := s.txn.GetOption(kv.GuaranteeLinearizability); val == nil || val.(bool) {
    // We needn't ask the TiKV client to guarantee linearizability for auto-commit transactions
    // because the property is naturally holds:
    // We guarantee the commitTS of any transaction must not exceed the next timestamp from the TSO.
    // An auto-commit transaction fetches its startTS from the TSO so its commitTS > its startTS > the commitTS
    // of any previously committed transactions.
    s.txn.SetOption(kv.GuaranteeLinearizability,
                    sessVars.TxnCtx.IsExplicit && sessVars.GuaranteeLinearizability)
}

Later, we also calculate a maxCommitTS. This will be discussed later in the DDL compatibility part.

Prewrite

req := &kvrpcpb.PrewriteRequest{/* ommitted */}
if c.isAsyncCommit() {
    if batch.isPrimary {
        req.Secondaries = c.asyncSecondaries()
    }
    req.UseAsyncCommit = true
}

If TiKV cannot proceed the async-commit protocol successfully, (probably because the calculated commit TS exceeds maxCommitTS), we fallback to the traditional percolator protocol. Otherwise, the prewrite request succeeds, so we can update the global MinCommitTS.

// 0 if the min_commit_ts is not ready or any other reason that async
// commit cannot proceed. The client can then fallback to normal way to
// continue committing the transaction if prewrite are all finished.
if prewriteResp.MinCommitTs == 0 {
    c.setAsyncCommit(false)
} else {
    c.mu.Lock()
    if prewriteResp.MinCommitTs > c.minCommitTS {
        c.minCommitTS = prewriteResp.MinCommitTs
    }
    c.mu.Unlock()
}
defer func() {
    if err != nil {
        // If we fail to receive response for async commit prewrite, it will be undetermined whether this
        // transaction has been successfully committed.
        // If prewrite has been cancelled, all ongoing prewrite RPCs will become errors, we needn't set undetermined
        // errors.
        if (c.isAsyncCommit() || c.isOnePC()) && sender.GetRPCError() != nil && atomic.LoadUint32(&c.prewriteCancelled) == 0 {
            c.setUndeterminedErr(errors.Trace(sender.GetRPCError()))
        }
    }
}()

But don't worry, this does not happen very often. It is safe to retry a prewrite which temporarily fails due to network reasons. The above problem only happens if a prewrite request has been sent, but later retries all fail due to RPC errors.

Commit

if c.isAsyncCommit() {
    // For async commit protocol, the commit is considered success here.
    c.txn.commitTS = c.commitTS
    go func() {
        commitBo := retry.NewBackofferWithVars(c.store.Ctx(), CommitSecondaryMaxBackoff, c.txn.vars)
        c.commitMutations(commitBo, c.mutations)
    }()
    return nil
}

It does not matter even if some temporary error happens in the commit process. Anyone who encounters these uncommitted async-commit locks is able to finally commit them. Next, we will talk about this.

Transaction recovery

If a reader encounters an expired async-commit lock, it needs to resolve this lock.

First, it checks all secondary locks. After that we should know the commit TS of this transaction. If all locks exist or some key has been committed, we can calculate a real commit TS. And if some lock does not exist, the commit TS is zero which indicates the transaction should be rolled back.

resolveData, err := lr.checkAllSecondaries(bo, l, &status)
if err != nil {
    return err
}
status.commitTS = resolveData.commitTs

Then we can use this commit TS to resolve all the locks in this transaction.

if !lockInfo.UseAsyncCommit {
    return &nonAsyncCommitLock{}
}
if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok {
    err = resolve(l, true)
}

DDL compatibility

Without async commit, we check whether the schema changes before the second phase of the commit. But as the transaction is committed after prewriting all the locks, we don't have the chance to check the schema version. Here we use a trick to work around the problem.

func (c *twoPhaseCommitter) calculateMaxCommitTS(ctx context.Context) error {
	// Amend txn with current time first, then we can make sure we have another SafeWindow time to commit
	currentTS := oracle.ComposeTS(int64(time.Since(c.txn.startTime)/time.Millisecond), 0) + c.startTS
	_, _, err := c.checkSchemaValid(ctx, currentTS, c.txn.schemaVer, true)
	if err != nil {
		return errors.Trace(err)
	}

	safeWindow := config.GetGlobalConfig().TiKVClient.AsyncCommit.SafeWindow
	maxCommitTS := oracle.ComposeTS(int64(safeWindow/time.Millisecond), 0) + currentTS

	c.maxCommitTS = maxCommitTS
	return nil
}

Therefore, all async-commit transaction using the old schema should be committed before DDL reorganization happens. So, the DDL reorganization will not miss these data.

TiKV part

Concurrency manager

For simplicity, we use a global component to implement it. We call it the "concurrency manager".

It is very easy to update the max TS. It's just an atomic operation:

pub fn update_max_ts(&self, new_ts: TimeStamp) {
    if new_ts != TimeStamp::max() {
        self.max_ts.fetch_max(new_ts.into_inner(), Ordering::SeqCst);
    }
}

It is a bit complex for memory locks.

The memory locks can have multiple accessors. Of course, the first one is the prewrite process. And because all readers need to check the memory locks, they are accessors of the memory locks, too. The locks can be removed from the table when there are no accessors.

So the memory table just owns a weak reference to the lock. We define the table like this:

pub struct LockTable(pub Arc<SkipMap<Key, Weak<KeyHandle>>>);

Prewrite

let commit_kind = match (&self.secondary_keys, self.try_one_pc) {
    (_, true) => CommitKind::OnePc(self.max_commit_ts),
    (&Some(_), false) => CommitKind::Async(self.max_commit_ts),
    (&None, false) => CommitKind::TwoPc,
};

Only when prewriting the primary lock, secondary locks need to be written in the lock:

let mut secondaries = &self.secondary_keys.as_ref().map(|_| vec![]);
if Some(m.key()) == async_commit_pk {
    secondaries = &self.secondary_keys;
}

Here is the simplified code:

let final_min_commit_ts = key_guard.with_lock(|l| {
    let max_ts = txn.concurrency_manager.max_ts();

    let min_commit_ts = cmp::max(cmp::max(max_ts, start_ts), for_update_ts).next();
    let min_commit_ts = cmp::max(lock.min_commit_ts, min_commit_ts);

    let max_commit_ts = max_commit_ts;
    if (!max_commit_ts.is_zero() && min_commit_ts > max_commit_ts) {
        return Err(ErrorInner::CommitTsTooLarge {
            start_ts,
            min_commit_ts,
            max_commit_ts,
        });
    }

    lock.min_commit_ts = min_commit_ts;
    *l = Some(lock.clone());
    Ok(min_commit_ts)
})?;

txn.guards.push(key_guard);

The final min_commit_ts is set to the maximum of (max TS + 1) and the original min_commit_ts. And if the min_commit_ts is greater than max_commit_ts, a CommitTsTooLarge is returned and triggers a fallback to non-async commit.

The operation is done while locked to guarantee the atomicity of getting the max TS and setting the min_commit_ts in the lock.

The key guard is saved until the lock is successfully written into RocksDB. Before that, readers are able to check the locks in order not to break any constraint. We can release the guard to remove the lock in the memory table after the readers can read the lock from the RocksDB.

Fallback to non-async commit

The client may provide a max_commit_ts constraint. If the calculated min_commit_ts is larger than the max_commit_ts, we need to fallback to non-async commit.

if let Err(Error(box ErrorInner::CommitTsTooLarge { .. })) = &res {
    lock.use_async_commit = false;
    lock.secondaries = Vec::new();
}
 Err(MvccError(box MvccErrorInner::CommitTsTooLarge { .. })) | Ok((..)) => {
    // fallback to not using async commit or 1pc
    props.commit_kind = CommitKind::TwoPc;
    async_commit_pk = None;
    self.secondary_keys = None;
    // release memory locks
    txn.guards = Vec::new();
    final_min_commit_ts = TimeStamp::zero();
}

When any key in a transaction fallbacks to non-async commit mode, the transaction will be considered as a non-async commit transaction.

Memory lock checking

All transactional reading requests need to update max TS and check memory locks. If the min_commit_ts of the lock is not larger than the snapshot timestamp of the reading, it is not safe to proceed this read. Then, an error will be returned and the client needs to retry later.

// Update max_ts and check the in-memory lock table before getting the snapshot
if !pb_ctx.get_stale_read() {
    concurrency_manager.update_max_ts(start_ts);
}
let isolation_level = pb_ctx.get_isolation_level();
if isolation_level == IsolationLevel::Si {
    for key in keys.clone() {
        concurrency_manager
            .read_key_check(key, |lock| {
                Lock::check_ts_conflict(Cow::Borrowed(lock), key, start_ts, bypass_locks)
            })
            .map_err(|e| txn::Error::from_mvcc(e))?;
    }
}

Check transaction status

We use CheckTxnStatus to get the status of the primary lock and use CheckSecondaryLocks for secondary locks.

In CheckTxnStatus, we cannot remove the primary lock simply because it is expired because the transaction may have prewritten all the locks. So we always just return the lock information for async commit locks:

if lock.use_async_commit {
    if force_sync_commit {
        // The fallback case
    } else {
        return Ok((TxnStatus::uncommitted(lock, false), None));
    }
}

Update max TS on region changes

In TiKV, we must guarantee that when a key is prewritten using the async-commit protocol, all readings at this key have updated the max TS. Now we update the max TS on the local TiKV. But there are some other cases we missed. If the reading happens on other TiKVs, then the region leader is transfered to the current TiKV or the region is merged into a region whose leader is on this TiKV, the max TS can be incorrect.

Summary

This is how the "async commit" optimization is implemented in TiDB.

Due to limited space, some subtle problems such as non-unique timestamps and the compatibility with follower read are not involved.

During the implementation of async commit, many problems blocking one-phase commit (1PC) are solved. So it becomes relatively easy to implement 1PC in TiDB. The next document will introduce the implementation details of 1PC.

If we decide to use async commit, we need to provide some to enable the async commit protocol, the UseAsyncCommit flag and the secondary keys:

If the , there are two cases.

However, if any response of prewrite is finally lost due to RPC reasons, it is impossible for us to know whether the prewriting succeeds. And it also means we cannot know whether the transaction succeeds. In this case, we can only and the client connection will be closed:

The whole commit process is . This is why the optimization is called "async commit":

As usual, the primary lock is checked first to get the transaction information. If it is using the async-commit protocol, the primary lock is never cleaned in CheckTxnStatus. Then we call the to resolve this transaction.

Another case is when the transaction is actually not an async-commit transaction. Some keys are prewritten with the async-commit protocol while some keys fail and fallback. Such a case can be when checking secondary locks:

And then, we will assuming the transaction is not an async-commit transaction. And now, CheckTxnStatus can clean up an expired primary lock:

For DDLs which involve data reorganizations, we 3 seconds by default. Then, before doing 2PC, we to 2 seconds later from now:

As discussed in the , TiKV needs to record the max TS and set some memory locks for ongoing prewrite requests.

The methods provided by the concurrency manager can be found in .

To add a memory lock and be able to write lock information, the lock_key method needs to be called to get a lock guard. The locking process is a bit tricky to handle various possiblities in the multi-thread environment. If interested, you can refer to for details.

The code of prewrite can be found . We will talk about some key points in the code about async commit.

In TiKV, secondary_keys and try_one_pc in the prewrite request are used to of the prewrite:

In the prewrite action, async commit does not change the checks. What is different is in the .

Besides setting secondary keys in the primary lock, it calls to set min_commit_ts in the lock.

When the CommitTsTooLarge error happens, the lock will still be written, but in the lock and no secondary keys will be recorded:

After any key encounters this error, we for later keys:

Here is in the storage module:

The idea of CheckSecondaryLocks is simple. If any lock in the list of secondary keys does not exist, remove the lock and write rollback if necessary. And if any lock has been committed, the transaction is committed. You can refer to for details.

So, for safety, we choose to get a latest timestamp from PD when a region or a region .

Before the max TS is updated, the corresponding region is not allowed to proceed an async-commit prewrite. The property is checked .

theories of async commit
TiDB v5.2.1
client-go
TiKV v5.2.1
2PC execution
here
theory blog
here
here
extra information
prewriting succeeds
return an "undetermined error"
done asynchronously in background
resolveLockAsync function
detected
retry the lock resolving process
delay
set MaxCommitTS
theory blog
this file
the code
here
determine the type
write_lock function
async_commit_timestamps
there will be no use_async_commit flag
don't need to do async commit prewrite
an example
its implementation
becomes leader
is merged
here