18 Distributed Transactions Besides Xa, What Are Some Other Atomic Commit Algorithms?

18 Distributed Transactions - Besides XA, What Are Some Other Atomic Commit Algorithms? #

I believe this lecture is the most essential part of the entire course because transactions are the most important feature that distinguishes a database from a general storage system. Due to its high difficulty, distributed transactions in databases have always been widely discussed. It can be said that a distributed database without solving the transaction problem would be considered incomplete. The debate over the path of transactions also showcases the different directions of development for distributed databases.

When it comes to distributed transactions, the first concept that comes to mind is atomic commit. Atomic commit refers to a class of algorithms that make a group of operations appear atomic, meaning either all of them succeed or all of them fail, and some of these operations may be remote operations. The XA distributed transaction standard proposed by the Open/X organization is a typical representation of atomic commit. XA has been widely implemented in mainstream databases and has become synonymous with distributed transactions for quite some time.

However, with the emergence of Percolator, atomic commit algorithms based on snapshot isolation have entered the public eye. After TiDB implemented Percolator optimistic transactions, this approach gradually became production-ready.

In this lecture, we will first introduce the traditional two-phase commit and three-phase commit. The former is the core concept of XA, while the latter addresses the problems exposed by the two-phase commit. Finally, we will discuss the optimistic transactions implemented by Percolator and TiDB’s improvements to it.

Two-Phase Commit and Three-Phase Commit #

Two-phase commit is very famous for two reasons: it has a long history, and its definition is ambiguous. It is not a protocol or specification but only exists as a concept. Therefore, we can see its presence in both traditional relational databases and the latest DistributedSQL systems.

Two-phase commit consists of two roles: coordinator and participants. In the first phase, the coordinator sends the data to be committed to the participants and asks whether they can commit the data. The participants then return the voting results. In the second phase, based on the voting results from the participants, the coordinator decides to commit or abort the transaction, and then sends the result to each participant. The participants then commit their local transactions based on the result.

As we can see, the core of the two-phase commit is the coordinator. It is generally implemented as a leader node. You may recall from the lecture on leader election that various schemes can be used to elect the leader node, and the health status of the leader node can be determined by a fault detection mechanism to decide whether to select another leader node as the coordinator. Another common implementation is to let the transaction initiator act as the coordinator. The benefit of doing so is that coordination work is distributed to multiple nodes, reducing the load on distributed transactions.

The entire transaction is divided into two stages:

  1. The prepare phase. The coordinator sends Propose messages to all participating nodes, which include all the information about the transaction. After receiving the information, each participant makes a commitment decision - whether the transaction can be committed or not. If they decide to commit the transaction, they inform the coordinator that they agree to commit. Otherwise, they tell the coordinator to abort the transaction. The coordinator and all participants store the decision results separately for fault recovery purposes.
  2. The commit or abort phase. If any participant aborts the transaction, all participants will receive the result to abort the transaction, even if they think the transaction can be committed. Only when all participants vote to commit the transaction, the coordinator will notify them to commit the transaction. This is the core idea of atomic commit: either all succeed or all fail.

We can see that the two-phase commit is easy to understand, but it lacks a lot of details. For example, whether the data is written to the database during the prepare phase or the commit phase. Each database implements this differently, and currently, the majority of implementations write the data during the prepare phase.

The normal flow of the two-phase commit is easy to understand. Its interesting aspect lies in its exceptional flows. Since there are two roles and two phases, there are four types of exceptional flows:

  1. Participant failure in the prepare phase. When the coordinator initiates a vote, if a participant does not respond (times out), the coordinator marks the transaction as failed, which has the same result as terminating the transaction during this phase. Although this ensures transaction consistency, it reduces the overall availability of distributed transactions. In the next lecture, I will introduce how Spanner uses Paxos groups to improve participant availability.

  2. Participant Failure after Voting. This scenario describes a participant failing after voting in favor. In this case, it is necessary to ensure that the node can be recovered. In its recovery process, the participant needs to first contact the coordinator to confirm the final outcome of the transaction. Then, based on the result, the participant can either cancel or commit the transaction.

  3. Coordinator Failure after Voting. This is the second phase where both the coordinator and participants have recorded the voting results. If the coordinator fails, we can start a backup coordinator and then retrieve the voting results of that transaction, and send messages to all participants to either cancel or commit the transaction.

  4. Coordinator Failure during the Preparation Phase. This happens in the first phase where a drawback of the two-phase commit exists. In this phase, if the coordinator fails to receive the voting result message, meaning the result is not recorded in the log, the coordinator fails. In this case, the backup coordinator cannot recover the transaction due to the lack of the voting result log. It doesn’t even know which participants have participated in this transaction, causing participants to wait indefinitely. Therefore, the two-phase commit is also known as the blocking commit algorithm.

The three-phase commit is designed to solve the blocking state described in point 4 above. Its solution is to insert an additional phase between the two phases. In the first phase, voting is still conducted; in the second phase, the voting results are distributed to all participants; and in the third phase, the commit operation is performed. The key point is that in the second phase, if the coordinator crashes before the second phase and cannot recover, participants can release the transaction through a timeout mechanism. Once all nodes have gone through the second phase, it means they all know the current state of the transaction. Therefore, even if the coordinator or participants crash, it will not affect the execution of the transaction.

We can see that the three-phase commit has a problem that the two-phase commit does not have. During the second phase, some participants lose contact with the coordinator and interrupt the transaction due to the timeout mechanism. However, if other participants have received the instruction to commit, they will submit the data, causing a brain split situation.

In addition to brain splits, the three-phase commit also suffers from the problem of high interaction volume, leading to excessive message load in the system. Therefore, the three-phase commit is rarely used in practical distributed transaction designs.

Both the two-phase and three-phase commit are atomic commit protocols that can achieve various levels of isolation requirements. In actual production, we can use a special transaction isolation level to improve the performance of distributed transactions and implement non-blocking transactions. This isolation level is called snapshot isolation.

Snapshot Isolation #

We have mentioned snapshot isolation in Lesson 11. It has a higher isolation level than “read committed” and solves the unavoidable read skew problem where a piece of data read in a transaction may change after repeated reads.

Let’s take an example of reading under snapshot isolation. Suppose there are two transactions, A and B, both modifying the same data X with an initial value of 2. Transaction A starts but neither commits nor rolls back. At this time, transaction B changes the value of X to 10 and commits. Afterwards, transaction A re-reads X, and its value remains 2, meaning it did not read the latest committed data.

What about concurrent commits of the same data? Without locks, there will be write conflicts, and typically only one of the transactions can commit data. This feature is known as the first-commit-wins mechanism.

The difference between snapshot isolation and serializability is that the former cannot solve the problem of write skew, meaning the data sets operated by concurrent transactions do not intersect, and once a transaction is committed, the consistency of the data set cannot be guaranteed. For example, consider two transactions T1: b = a + 1 and T2: a = b + 1, with the initialization of a = b = 0. Under serializability isolation level, the result can only be (a = 2, b = 1) or (a = 1, b = 2), while under snapshot isolation, the result can be (a = 1, b = 1). This is unacceptable in certain business scenarios. Of course, there are currently many ways to solve the write skew problem under snapshot isolation, known as serializable snapshot isolation (SSI).

There are many ways to implement SSI, such as using a unified transaction manager to check whether the data read by the transaction has been overwritten by the commit of another transaction when it is committed. If so, the current transaction should be marked as failed. Another way is to lock the data row to prevent other transactions from reading the locked data row, thus avoiding the occurrence of write skew.

The next topic to be introduced, Percolator, implements snapshot isolation but does not implement SSI. This is because SSI, regardless of its implementation, will affect the system’s throughput. Moreover, Percolator itself is a client-side transaction solution and cannot adequately maintain state.

Percolator Optimistic Transaction #

Percolator is a toolset proposed by Google. It is based on BigTable and supports the aforementioned snapshot isolation. Snapshot isolation requires multiple versions, so we need timestamps. The Percolator system uses a globally increasing timestamp server to generate monotonically increasing timestamps for transactions. Each transaction starts with a timestamp t1, and during its execution, it can read data before t1; when committing, it takes another timestamp t2 as the commit timestamp for the transaction. Now let’s introduce the process of transaction execution. Similar to the two-phase commit, we use the client as the coordinator and BigTable’s Tablet Server as the participant. In addition to the data of each cell existing in BigTable, the coordinator also stores the cell lock information and transaction version number in BigTable. In simple terms, if we need to write the “bal” column (balance). In BigTable, there are actually three columns: bal:data, bal:lock, and bal:write. The information they store is as follows:

  1. The “commit_ts=>start_ts” timestamp of the transaction is stored in bal:write.
  2. The “start_ts=>actual column data” is stored in the map called bal:data, which represents the actual column data for the given start timestamp.
  3. The “start_ts=>(primary cell)” is stored in bal:lock. The primary cell is a combination of Rowkey and column names. It is used in fault tolerance processing and transaction conflict resolution to clean up the lock information left behind by transaction failures caused by coordinator failures.

Now let’s use an example to explain the whole process. Please refer to the following diagram.

Drawing 0.png

In an account table, Bob has $10 and Joe has $2. We can see that the latest data for Bob in the write column is “data@5”, which indicates that the current latest data is from version with timestamp 5. The data in version with timestamp 5 is $10, which means that read operations will read this $10. Similarly, Joe’s account has $2.

Drawing 1.png

Now, let’s perform a transfer operation, transferring $7 from Bob’s account to Joe’s account. This operation requires manipulating multiple rows, which are two rows in this case. Locks need to be acquired first. Percolator randomly selects one row from the rows to be operated as the primary row, while the rest are secondary rows. The primary row is locked first, and then the secondary rows are locked after the primary row lock is successfully acquired. From the above diagram, we can see that a lock “I am primary” is written in the lock column of the row with timestamp 7. The write column of this row is empty, and the data column value is 3 (10 - 7 = 3). At this moment, timestamp 7 is the start_ts.

Drawing 2.png

Then, the lock is acquired for Joe’s account, also at timestamp 7. The lock information for Joe’s account includes a reference to the primary lock. In this way, the rows that belong to the same transaction are associated with each other. The data column of Joe’s account is written as 9 (2 + 7 = 9), and the write column is empty. This completes the Prewrite phase.

Drawing 3.png

Next, the transaction is ready to commit. The primary row performs the Commit operation first. As long as the primary row commits successfully, the transaction is considered successful. If the secondary row fails, it is not a problem as there will be remedies later. The Commit operation first clears the lock of the primary row, and then writes a row with timestamp 8 (since time is monotonically increasing, this can be called commit_ts). This row can be called the Commit Row because it does not contain data. It only writes “data@7” in the write column, indicating that the data with timestamp 7 is now visible. From this moment on, read operations can read the data with version timestamp 7.

Drawing 4.png

Next, we commit the secondary row, which follows the same logic as the primary row. If the secondary row commits successfully, the transaction is considered complete.

If the Primary Row commit is successful but the Secondary Row commit fails, what will happen and how is data consistency guaranteed? Since Percolator does not have a centralized transaction manager component to handle such exceptions, the only option is to handle it during the next read operation. If a read request finds that the data it wants to read has a Secondary lock, it will check if the corresponding Primary Row lock still exists based on the Secondary Row lock. If it exists, it means the transaction is not yet complete; if it does not exist, it means the Primary Row has already been committed. In this case, it will clear the Secondary Row lock and make the row data visible (commit). This is a concept called Roll forward.

In such a storage system, not all rows contain data. There are also transaction control rows, also known as Commit Rows. Their data column is empty, but the write column contains the TS (timestamp) of the visible data. Their purpose is to indicate transaction completion and guide read requests to read new data. Over time, a large number of redundant data rows will be generated, and useless data rows will be cleaned up periodically by the GC thread.

Another issue with this transaction is conflict resolution. In the previous introduction to snapshot isolation, we mentioned that conflict operations on the same row can be resolved by using the “first commit wins” strategy, which means that the later committing transaction will fail. So what happens if the database encounters highly concurrent modifications of the same data? Now let me introduce how TiDB, which implements optimistic transactions based on the Percolator model, handles this.

Conflict Handling in TiDB’s Optimistic Transactions #

In TiDB, write conflicts are detected during the commit phase. In Lecture 11, we introduced the conflict resolution modes of a MVCC-like database, which are forward detection and backward detection. Since TiDB uses the Percolator model, it adopts backward detection during the commit phase. From a theoretical point of view, there is no problem with this approach. However, TiDB claims to fully compatible with MySQL, and as we all know, MySQL uses pessimistic transactions in distributed transactions. This means that conflicts can be detected during the SQL execution phase, which is the forward mode. As a result, if users migrate from MySQL to TiDB, they must carefully examine whether their use of the database depends on this mode, thus increasing the migration cost for users.

Based on the above reasons, TiDB provides several solutions to address the difference between backward detection and forward detection.

  1. Retry. As the name suggests, when a conflict occurs, TiDB can retry non-query operations in the failed transaction. This is a very concise and efficient solution, but it is not universal. If the transaction involves updating data based on the read result, it is likely to cause data anomalies. This is because the read operation will not be retried, thus violating the “read committed” isolation level. Therefore, retry can only be applied in non-reading scenarios, especially in small transactions where each SQL statement is a separate transaction.
  2. Conflict pre-checking. Another approach is to perform conflict pre-checking during the prewrite phase, turning the backward detection into forward detection. TiDB relies on TiKV, which uses memory to store the keys in the transaction in order to check if the key is modified by other transactions, avoiding concurrent modifications to the key. The reason for this is that TiDB itself is stateless, which means that transactions cannot perceive each other, so it can only be resolved through lower-level means. This structure is a type of memory lock, and if there are too many transactions, it can cause the lock acquisition operation to block writes, resulting in decreased throughput.
  3. Pessimistic transactions. Finally, in order to fully implement the features of MySQL, pessimistic transactions can also be used.

The above solutions are provided by TiDB when implementing the Percolator model, making it easier for users to migrate from MySQL. In addition, with the emergence of such databases like TiDB, the Percolator transaction model is becoming increasingly recognized by the industry.

Summary #

In this lecture, we introduced the typical atomic commit operation: two-phase commit. It is the foundation of XA, but two-phase commit has inherent problems and low performance. Under snapshot isolation, we can use the Percolator model to describe and implement a new atomic commit operation, which has good performance in scenarios with low conflicts.

In the next lecture, we will introduce the competitors of a pair of distributed transaction solutions: Spanner vs Calvin. Thank you for learning, and hope to see you on time next time.

00:00

Understanding Distributed Databases in 24 Lectures