09 Distributed Transactions How Are 2 PC and Tcc Implemented in Multi Services

09 Distributed Transactions How Are 2PC and TCC Implemented in Multi-Services #

Hello, I am Xu Changlong. In today’s class, we will talk about distributed transactions.

Currently, microservices are popular in the industry, and Domain-Driven Design (DDD) has also become popular as a result. DDD is a method of splitting microservices by looking at the business process from top to bottom, splitting domains, associating multiple domains with aggregate roots, and aggregating multiple processes together to form independent services. Compared to microservices designed based on data table structures, this approach is more reasonable, but it also increases the difficulty of implementing distributed transactions.

In the traditional way of implementing distributed transactions, we usually place a complete transaction in a separate project for centralized maintenance and manage all operations in a single database. This ensures data consistency and uniformity by rolling back together when problems occur.

However, this approach has poor service reusability and isolation. Many core businesses have to be aggregated together for transaction consistency.

To ensure consistency, transactions lock a large amount of data during execution, resulting in overall performance bottlenecks. For non-core businesses to implement cross-service transactions in a system with high isolation requirements, the difficulty is even greater, because core businesses are unlikely to cooperate with non-core businesses for modifications and core businesses often change due to business requirements (too many aggregations). As a result, non-core businesses cannot implement transactions and core businesses cannot be personalized.

Because of this, it is a headache for multiple systems to maintain transaction consistency while interacting with each other. Many non-core businesses in the industry cannot start transactions with core modules, and often encounter situations where manual compensation and repair are required when operations fail.

Especially in systems implemented using microservices architecture or DDD, services are split into finer-grained units and independently deployed with separate databases. This makes it even more difficult to maintain transaction consistency, and therefore achieving distributed transactions across multiple services has become an urgent need.

Fortunately, there are many ways to implement distributed transactions in the industry, such as 2PC, 3PC, TCC, etc. However, which one is more suitable? This is what we need to focus on. Therefore, in this class, I will discuss distributed transactions with you to deepen your understanding and help you make better decisions.

XA Protocol #

Before discussing distributed transactions, let’s first understand the XA protocol.

XA protocol is a widely used distributed transaction protocol that supports the implementation of distributed transactions such as 2PC, 3PC, etc. This protocol is suitable for coordinating distributed transactions in multiple databases. Currently, Oracle, DB2, and MySQL versions above 5.7.7 support it (although there are many bugs). Understanding the XA protocol is helpful for us to deeply understand the essence of distributed transactions.

Databases that support the XA protocol can temporarily store the executed business results when the client is disconnected until another process confirms them, making it easy to achieve transaction consistency across multiple databases.

In the XA protocol, there are three main roles:

  • Application (AP): The application is the specific implementation of business logic code. The business logic requests the transaction coordinator to begin a global transaction. After registering multiple sub-transactions with the transaction coordinator, the business code will sequentially send requests to all participating sub-transactions of the business. After all sub-transactions are successfully submitted, the business code informs the transaction coordinator about the execution status of each sub-transaction based on the return results. The transaction coordinator decides whether the sub-transactions should be committed or rolled back (some implementations require the transaction coordinator to send requests to sub-services).
  • Transaction Coordinator (TM): Used to create the main transaction and coordinate the sub-transactions. Based on the execution status of each sub-transaction, the transaction coordinator decides whether these sub-transactions should be finally committed or rolled back. In addition, the transaction coordinator often automatically assists us in committing transactions.
  • Resource Manager (RM): It is a data resource that supports transactions or XA protocol, such as MySQL, Redis, etc.

In addition, the XA protocol also specifies two stages for distributed transactions: the Prepare stage and the Commit stage.

In the Prepare stage, the transaction coordinator coordinates multiple resource managers to execute sub-transactions using the XID (transaction unique identifier generated by the business or transaction coordinator). After all sub-transactions have been successfully executed, they report to the transaction coordinator.

At this time, the successful execution of a sub-transaction means that the SQL statements within the transaction have been successfully executed, but the transaction has not been finally committed. Whether the sub-transactions should be committed or rolled back requires the decision of the transaction coordinator.

Then the distributed transaction enters the Commit stage: when the transaction coordinator receives the messages indicating the successful execution of all sub-transactions from all resource managers, it records the successful execution of the transaction and performs the real submission for the sub-transactions. If any sub-transaction fails in the Prepare stage or if the transaction coordinator does not receive the messages indicating the successful execution of all sub-transactions within a certain period of time, it will notify all resource managers to roll back the sub-transactions.

It should be noted that each sub-transaction has multiple states, and the transitions of each state are shown in the following diagram:

States of sub-transactions in the XA protocol

As shown in the above diagram, the sub-transactions have four stages of states:

  • ACTIVE: The SQL statements of the sub-transaction are being executed.
  • IDLE: The sub-transaction has finished execution and is waiting to switch to the Prepared state. If this operation does not participate in the rollback, it can be directly committed.
  • PREPARED: The sub-transaction has finished execution and is waiting for other service instances’ sub-transactions to be ready.
  • COMMITTED/FAILED: After all sub-transactions have successfully/failed, they are committed or rolled back together.

Now let’s take a look at the specific process of XA coordinating two transactions. Here we will use the most common 2PC method as an example.

XA Protocol 2PC Flow

As shown in the above figure, when coordinating the two services Application 1 and Application 2, the business first requests the transaction coordinator to create a global transaction and generates a unique identifier XID for the global transaction. Then, in the transaction coordinator, the two sub-transactions are registered separately, and each sub-transaction is assigned its own XID. Here, let me explain that XID consists of gtrid+bqual+formatID. The gtrid of multiple sub-transactions is the same, but the other parts must be distinguished to prevent these services from being in the same database.

With the XID of the sub-transactions, the requested service will open an XA sub-transaction and execute the business operations based on the XID. When all the transactional data operations are completed, the sub-transaction will execute the Prepare instruction to mark itself as Prepared, and then execute transaction xid2 in the same way.

After all sub-transactions are completed, the Prepared XA transaction will be temporarily stored in MySQL. Even if the business is temporarily disconnected, the transaction will still exist. At this time, the business code requests the transaction coordinator to notify that all requested sub-transactions have been successfully executed. At the same time, the TM will notify RM1 and RM2 to perform the final commit (or call each business’s commit interface).

By doing this, the entire transaction process is completed. In the Prepare stage, if any sub-transaction fails to execute, the program or transaction coordinator will notify all sub-transactions in the Prepared state to roll back.

The above is the process of using the XA protocol to achieve transaction consistency among multiple subsystems. It can be said that most distributed transactions are implemented in a similar way. Now let’s take a look at how the XA protocol is used in MySQL through a case study.

MySQL XA Distributed Transactions #

Before we dive into the case study, you can first familiarize yourself with the instruction set for all XA protocols in MySQL, to facilitate further learning:

# Start a subtransaction in XA with transaction ID xid
# gtrid is the main transaction ID, bqual is the subtransaction identifier
# formatid is the data type annotation, similar to format type
XA {START|BEGIN} xid[gtrid[,bqual[,format_id]]] [JOIN|RESUME] 

# End the subtransaction with xid, marking it as IDLE
# If a subtransaction in IDLE state is directly committed with XA COMMIT, it becomes 1PC
XA END xid [SUSPEND [FOR MIGRATE]] 

# Put the subtransaction in a PREPARED state, waiting for other subtransactions to be processed, and then uniformly committed or rolled back in the end
# In addition, if the connection is disconnected before this operation, all previous transactions will be rolled back
XA PREPARE xid 

# Different subtransactions above use different xid (same gtrid, and bqual must be different in the same instance)
# Specify the subtransaction with xid for final commit
XA COMMIT xid [ONE PHASE] 
XA ROLLBACK xid (subtransaction rollback)

# View transactions in the PREPARED state
# We use this to confirm the progress of the transaction and decide whether to commit as a whole
# Even if the committing connection is disconnected, we can still see all transactions in the PREPARED state
# 
XA RECOVER [CONVERT XID] 

Now, let’s get back to the topic. Let’s take a shopping scenario as an example. In the entire transaction process of shopping, three services need to be coordinated: user wallet, product inventory, and user shopping orders. Their data is stored in private databases.

Image

According to the business process, when a user purchases a product, the system needs to perform the following operations: deduct inventory, generate a shopping order, and deduct the user’s account balance. Among them, “deduct inventory” and “deduct the user’s account balance” are performed to ensure data accuracy and consistency. Therefore, during the deduction process, other threads with mutual exclusion need to be locked to ensure consistency. Then, through a 2PC (Two-Phase Commit) approach, transaction coordination is achieved among the three services.

The specific implementation code is as follows:

package main

import (
	"database/sql"
	"fmt"
	_ "github.com/go-sql-driver/mysql"
	"strconv"
	"time"
)

func main() {
	// Connection to stock database
	stockDb, err := sql.Open("mysql", "root:paswd@tcp(127.0.0.1:3306)/shop_product_stock")
	if err != nil {
		panic(err.Error())
	}
	defer stockDb.Close()

	// Connection to order database
	orderDb, err := sql.Open("mysql", "root:paswd@tcp(127.0.0.1:3307)/shop_order")
	if err != nil {
		panic(err.Error())
	}
	defer orderDb.Close()

	// Connection to money database
	moneyDb, err := sql.Open("mysql", "root:paswd@tcp(127.0.0.1:3308)/user_money_bag")
	if err != nil {
		panic(err.Error())
	}
	defer moneyDb.Close()

	// Generate xid (If it's in the same database, subtransactions cannot use the same xid)
	xid := strconv.FormatInt(time.Now().UnixMilli(), 10)

	// If there is an error in the subsequent execution process, rollback all subtransactions
	defer func() {
		if err := recover(); err != nil {
			stockDb.Exec("XA ROLLBACK ?", xid)
			orderDb.Exec("XA ROLLBACK ?", xid)
			moneyDb.Exec("XA ROLLBACK ?", xid)
		}
	}()

	// First Phase: Prepare
	// Start subtransaction for stock
	if _, err = stockDb.Exec("XA START ?", xid); err != nil {
		panic(err.Error())
	}
	// Deduct stock, omitting data row locking operation here
	if _, err = stockDb.Exec("update product_stock set stock=stock-1 where id =1"); err != nil {
		panic(err.Error())
	}
	// End transaction execution
	if _, err = stockDb.Exec("XA END ?", xid); err != nil {
		panic(err.Error())
	}
	// Set stock task as Prepared state
	if _, err = stockDb.Exec("XA PREPARE ?", xid); err != nil {
		panic(err.Error())
	}

	// Start subtransaction for order
	if _, err = orderDb.Exec("XA START ?", xid); err != nil {
		panic(err.Error())
	}
	// Create order
	if _, err = orderDb.Exec("insert shop_order(id,pid,xx) value (1,2,3)"); err != nil {
		panic(err.Error())
	}
	// End transaction execution
	if _, err = orderDb.Exec("XA END ?", xid); err != nil {
		panic(err.Error())
	}
	// Set task as Prepared state
	if _, err = orderDb.Exec("XA PREPARE ?", xid); err != nil {
		panic(err.Error())
	}

	// Start subtransaction for money
	if _, err = moneyDb.Exec("XA START ?", xid); err != nil {
		panic(err.Error())
	}
	// Deduct user account cash, omitting data row locking operation
	// ...
}

Please note that in the code, certain parts are omitted for the sake of brevity.

if _, err = moneyDb.Exec("update user_money_bag set money=money-1 where id =9527"); err != nil {
   panic(err.Error())
}
// Transaction execution completed
if _, err = moneyDb.Exec("XA END ?", xid); err != nil {
   panic(err.Error())
}
// Set the transaction status to Prepared
if _, err = moneyDb.Exec("XA PREPARE ?", xid); err != nil {
   panic(err.Error())
}
// At this point, if the connection is lost, the Prepared XA transaction will still exist in MySQL
// Calling XA RECOVER on any connection will show these three uncommitted transactions

// --------
// Phase 2: No issues at this point
// So we execute a commit
// --------
if _, err = stockDb.Exec("XA COMMIT ?", xid); err != nil {
   panic(err.Error())
}
if _, err = orderDb.Exec("XA COMMIT ?", xid); err != nil {
   panic(err.Error())
}
if _, err = moneyDb.Exec("XA COMMIT ?", xid); err != nil {
   panic(err.Error())
}
// Everything is completed at this point
}

As you can see, MySQL easily achieves transactional consistency for multiple databases or services using the XA command.

You might wonder why you didn’t see any operations related to the transaction coordinator in the code above. Let’s remove the specific implementation of the sub-business logic and see how it works through API calls instead:

package main

import (
   "database/sql"
   "fmt"
   _ "github.com/go-sql-driver/mysql"
   "strconv"
   "time"
)

func main() {
   // Connection to the inventory database
   stockDb, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/shop_product_stock")
   if err != nil {
      panic(err.Error())
   }
   defer stockDb.Close()

   // Connection to the order database
   orderDb, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/shop_order")
   if err != nil {
      panic(err.Error())
   }
   defer orderDb.Close()

   // Connection to the wallet database
   moneyDb, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3308)/user_money_bag")
   if err != nil {
      panic(err.Error())
   }
   defer moneyDb.Close()
   
   // Generate xid
   xid := strconv.FormatInt(time.Now().UnixMilli(), 10)

   // Roll back all sub-transactions if an error occurs during subsequent execution
   defer func() {
      if err := recover(); err != nil {
         stockDb.Exec("XA ROLLBACK ?", xid)
         orderDb.Exec("XA ROLLBACK ?", xid)
         moneyDb.Exec("XA ROLLBACK ?", xid)
      }
   }()

   // Call the API to deduct money, which executes xa start, sql, xa end, xa prepare internally
   if _, err = API.Call("UserMoneyBagPay", uid, price, xid); err != nil {
      panic(err.Error())
   }

   // Call the API to deduct inventory
   if _, err = API.Call("ShopStockDecr", productId, 1, xid); err != nil {
      panic(err.Error())
   }

   // Call the API to create an order
   if _, err = API.Call("ShopOrderCreate",productId, uid, price, xid); err != nil {
      panic(err.Error())
   }

   // --------
   // Phase 2: No issues at this point
   // So we execute a commit
   // --------
   if _, err = stockDb.Exec("XA COMMIT ?", xid); err != nil {
      panic(err.Error())
   }
   if _, err = orderDb.Exec("XA COMMIT ?", xid); err != nil {
      panic(err.Error())
   }
   if _, err = moneyDb.Exec("XA COMMIT ?", xid); err != nil {
      panic(err.Error())
   }
   // Everything is completed at this point
}

As you can see from the flowchart, the two-phase commit (2PC) is easy to understand and implement.

However, the main drawback of 2PC is that during the Prepare phase, data for many operations needs to be locked in order to ensure data consistency. Additionally, the application and each sub-transaction’s process needs to be blocked, waiting for the entire transaction to complete before releasing resources. This leads to a long time of resource locking and low concurrency, often resulting in a large number of transactions queuing up.

Furthermore, in certain special cases, 2PC can result in data loss. For example, if the commit operation of the transaction coordinator is interrupted during the Commit phase, the XA transaction will be left in MySQL.

Moreover, you may have noticed that 2PC lacks a timeout mechanism. If a Prepared XA sub-transaction remains uncommitted in MySQL for a long time, it can cause the database to be locked for an extended period.

In many open-source implementations, the transaction coordinator of 2PC will automatically roll back or force commit long-running uncommitted transactions. However, if the process restarts or crashes, this operation will be lost, and manual intervention will be required to repair it.

3PC Overview #

In addition, aside from 2PC, there is another method for implementing distributed transactions called 3PC. Compared to 2PC, 3PC introduces transaction timeouts, multiple retry attempts, and the capability to perform commit checks. However, because of the increased number of confirmation steps, the queuing time for many business transactions can be quite lengthy, resulting in a much higher failure rate for 3PC compared to 2PC.

To reduce the duplicate work caused by timeouts while waiting for resource locking in 3PC, a pre-operation is performed, and the overall process is divided into three phases:

  • CanCommit phase: In order to reduce timeouts caused by waiting for data locking and improve the success rate of transactions, the transaction coordinator sends messages to confirm the resource locking status of the resource manager, as well as the database locking status of all sub-transactions.
  • PreCommit phase: Executes the Prepare phase of 2PC.
  • DoCommit phase: Executes the Commit phase of 2PC.

Overall, the 3PC process involves more steps and is more complex, resulting in a slower overall execution. Therefore, it is rarely used in distributed production environments. I won’t delve into it further here.

TCC Protocol #

In fact, both 2PC and 3PC suffer from slow execution and low concurrency. Here, I will introduce TCC, a distributed transaction protocol that has even better performance.

TCC stands for Try-Confirm-Cancel. In terms of the process, it adds an extra phase compared to 2PC. It splits the Prepare phase into two phases: Try and Confirm. TCC can achieve distributed transactions without using XA, but instead using regular transactions.

First, in the Try phase, the business code reserves all the resources required for the transaction. For example, it freezes $100 in a user’s account, deducts a product’s inventory in advance, or creates an order that hasn’t started the transaction yet. This reduces the amount of data locked by each sub-transaction. Once the business receives these resources, the subsequent two phases can be performed without locks.

In the Confirm phase, after the business confirms that all the required resources have been obtained, the sub-transactions will execute these business operations in parallel. The execution can be done without any lock contention or checks. All the resources prepared in the Try phase will be directly executed.

Please note that the protocol requires all operations to be idempotent to support failed retries. In some special situations, such as resource lock timeouts or unstable networks, operations may need to be retried multiple times before succeeding.

Finally, in the Cancel phase, if the sub-transaction fails even after multiple retries in the Try or Confirm phases, the transaction manager (TM) will execute the code in the Cancel phase. It will release the reserved resources from the Try phase and rollback the content from the Confirm phase. It is important to make the code in the Cancel phase idempotent to support multiple executions.

The flowchart of the above process is as follows:

Image

Finally, let’s summarize the advantages of TCC transactions:

  • High concurrency and no long-term resource locks.
  • Implement distributed transaction rollbacks through code invasion, with a large amount of development work needed to provide specific operations for each phase.
  • Relatively good data consistency.
  • Suitable for order-related businesses and businesses with constraints on intermediate states.

Of course, it also has some obvious disadvantages:

  • Only suitable for short transactions and not suitable for multi-phase transactions.
  • Not suitable for multi-level nested services.
  • Requires idempotent transaction logic.
  • There is a risk of data loss if the execution process is interrupted.

Summary #

In general, implementing distributed transactions requires a great deal of effort and time, as well as significant investment in hardware. However, when business needs truly call for distributed transactions, the XA protocol can provide powerful support at the data layer.

There are multiple ways to implement distributed transactions, with common ones being 2PC, 3PC, and TCC. Among them, 2PC can achieve unified submission and rollback of multiple sub-transactions, but due to the need to guarantee data consistency, its concurrency performance is not good. Moreover, 2PC does not have a timeout mechanism, which often leads to many XA sub-transactions being left behind in the database.

Although 3PC has a timeout mechanism, it often experiences timeout situations due to excessive interaction, resulting in poor transaction performance. If 3PC fails and times out multiple times, it will attempt a rollback. If this rollback also times out, data loss may occur.

TCC can pre-reserve the resources needed in a transaction in order to reduce the granularity of business operations. It can coordinate distributed transactions using ordinary transactions, so its performance is relatively good compared to other methods. However, both the final transaction submission and rollback logic need to be idempotent, which requires more manual effort to implement.

Currently, there are many excellent middleware solutions available on the market, such as DTM and Seata. They have made many optimizations for distributed transaction coordination. For example, in case of interruption during the process, they can automatically retry, and in AT mode, they can generate rollback SQL based on the modified SQL of the business logic, which is relatively intelligent.

Furthermore, these middleware solutions can also support more complex multi-level and multi-step transaction coordination, and provide more comprehensive workflow mechanisms. Therefore, when implementing distributed transactions, it is recommended to use mature open-source solutions as an assist, which can help us avoid unnecessary detours.

Thought Exercise #

There are many ways to implement distributed systems in the market. Which one do you think performs better?

Feel free to discuss and exchange ideas with me in the comments section. See you in the next class!