27 Distributed Transactions How to Understand the Abstraction Process of Distributed Transactions in Sharding Sphere

27 Distributed Transactions How to Understand the Abstraction Process of Distributed Transactions in ShardingSphere #

Starting today, we will enter a new module, that is ShardingSphere Distributed Transaction. This is a very important topic, and we will introduce ShardingSphere’s transaction implementation mechanism in three lessons.

ShardingTransactionManagerEngine #

In lessons 18 and 19, “Distributed Transaction: How to Use Strong Consistency Transaction and Flexible Transaction?”, we have introduced the basic concepts of distributed transactions.

After understanding some basic concepts, let’s take a look at the code organization structure of the distributed transaction module in ShardingSphere. We found that there are three underlying projects: sharding-transaction-core, sharding-transaction-2pc, and sharding-transaction-base.

From the names, we can see that sharding-transaction-core should include some basic core classes related to distributed transactions, and sharding-transaction-2pc and sharding-transaction-base are based on strong consistency and eventual consistency implementations, respectively.

The naming of these package structures can actually be reflected in the definition of transaction type TransactionType. It is an enumeration representing local transaction, XA two-phase commit transaction, and BASE flexible transaction, as shown below:

public enum TransactionType {
    LOCAL, XA, BASE
}

TransactionType class is located in the sharding-transaction-core project. Let’s take a look at the other contents in this project first. The first thing we notice is the ShardingTransactionManagerEngine interface.

In the previous lessons, we saw this distributed transaction manager for the first time in the ShardingRuntimeContext, as shown below:

public final class ShardingRuntimeContext extends AbstractRuntimeContext<ShardingRule> {
	…
    private final ShardingTransactionManagerEngine shardingTransactionManagerEngine;

	public ShardingRuntimeContext(final Map<String, DataSource> dataSourceMap, final ShardingRule rule, final Properties props, final DatabaseType databaseType) throws SQLException {
       …
        //create ShardingTransactionManagerEngine and initialize
        shardingTransactionManagerEngine = new ShardingTransactionManagerEngine();
        shardingTransactionManagerEngine.init(databaseType, dataSourceMap);
	}
	…
}

In the constructor of ShardingTransactionManagerEngine, the loadShardingTransactionManager method is called, as shown below:

private final Map<TransactionType, ShardingTransactionManager> transactionManagerMap = new EnumMap<>(TransactionType.class);
	
    private void loadShardingTransactionManager() {
         //load ShardingTransactionManager implementation classes via ServiceLoader
        for (ShardingTransactionManager each : ServiceLoader.load(ShardingTransactionManager.class)) {
            if (transactionManagerMap.containsKey(each.getTransactionType())) {
                log.warn("Find more than one {} transaction manager implementation class, use `{}` now",
                    each.getTransactionType(), transactionManagerMap.get(each.getTransactionType()).getClass().getName());
                continue;
            }
            transactionManagerMap.put(each.getTransactionType(), each);
        }
}

As we can see, the JDK’s ServiceLoader utility class is directly used here to load the implementation classes of ShardingTransactionManager. This is the most direct way to implement the micro-kernel architecture using SPI. The above method is used to load ShardingTransactionManager on the classpath and cache it in memory. In ShardingSphere, ShardingTransactionManager is an abstraction of distributed transaction managers. We will elaborate on this in the following content.

Then, in the ShardingRuntimeContext, after constructing the ShardingTransactionManagerEngine object, its init method is called for initialization, as shown below:

public void init(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
        for (Entry<TransactionType, ShardingTransactionManager> entry : transactionManagerMap.entrySet()) {
            entry.getValue().init(databaseType, getResourceDataSources(dataSourceMap));
        }
}

private Collection<ResourceDataSource> getResourceDataSources(final Map<String, DataSource> dataSourceMap) {
        List<ResourceDataSource> result = new LinkedList<>();
        for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
           //create ResourceDataSource object
            result.add(new ResourceDataSource(entry.getKey(), entry.getValue()));
        }
        return result;
}

This part of the code is equivalent to calling the init method of the obtained ShardingTransactionManager to initialize it. During the initialization process, we also see a new data source object ResourceDataSource, as shown below:

public final class ResourceDataSource {
    private final String originalName;
    private String uniqueResourceName;
    private final DataSource dataSource;

    public ResourceDataSource(final String originalName, final DataSource dataSource) {
        this.originalName = originalName;
        this.dataSource = dataSource;
        this.uniqueResourceName = ResourceIDGenerator.getInstance().nextId() + originalName;
    }
}

The purpose of ResourceDataSource is to store the information of the database name and DataSource, and build a unique resource name for this ResourceDataSource. The construction process uses the ResourceIDGenerator utility class.

Let’s take a look at its implementation method below. We can see that it uses singleton pattern and atomic class AtomicInteger:

public final class ResourceIDGenerator {

    private static final ResourceIDGenerator INSTANCE = new ResourceIDGenerator();
	private final AtomicInteger count = new AtomicInteger();
	
	//create singleton
    public static ResourceIDGenerator getInstance() {
        return INSTANCE;
    }

    String nextId() {
        return String.format("resource-%d-", count.incrementAndGet());
    }
}

Let’s go back to ShardingTransactionManagerEngine and look at its getTransactionManager method, as shown below:

public ShardingTransactionManager getTransactionManager(final TransactionType transactionType) {
        ShardingTransactionManager result = transactionManagerMap.get(transactionType);
        if (TransactionType.LOCAL != transactionType) {
            Preconditions.checkNotNull(result, "Cannot find transaction manager of [%s]", transactionType);
        }
        return result;
}

Here, the corresponding ShardingTransactionManager is obtained based on the transaction type. Finally, in ShardingTransactionManagerEngine, there is a close method, as shown below:

public void close() throws Exception {
        for (Entry<TransactionType, ShardingTransactionManager> entry : transactionManagerMap.entrySet()) {
            entry.getValue().close();
        }
}

Based on the above analysis, we can conclude that ShardingTransactionManagerEngine serves as the entry point for distributed transactions and mainly manages and maintains ShardingTransactionManager. It is like a container.

So, how does ShardingTransactionManager work? Let’s take a look together.

ShardingTransactionManager #

The ShardingTransactionManager interface is located in the org.apache.shardingsphere.transaction.spi package in the sharding-transaction-core project and is defined as follows:

public interface ShardingTransactionManager extends AutoCloseable {

    // Initialize based on database type and ResourceDataSource
    void init(DatabaseType databaseType, Collection<ResourceDataSource> resourceDataSources);

    // Get TransactionType
    TransactionType getTransactionType();

    // Check if in transaction
    boolean isInTransaction();

    // Get a Connection that supports transactions
    Connection getConnection(String dataSourceName) throws SQLException;

    // Begin transaction
    void begin();

    // Commit transaction
    void commit();

    // Rollback transaction
    void rollback();
}

From this interface, we can see that we can get a Connection based on the DataSource name. Moreover, comparing with the TransactionManager interface in JTA to be introduced later, we can find the three basic operations required for a transaction manager: begin, commit, and rollback. ShardingSphere specifically provides an enum called TransactionOperationType for these basic operations.

By looking at the class structure of ShardingTransactionManager in ShardingSphere, we find two implementation classes: XAShardingTransactionManager and SeataATShardingTransactionManager.

1. XAShardingTransactionManager #

To understand the ShardingTransactionManager based on the XA protocol, we also need some theoretical knowledge. XA is a two-phase commit protocol proposed by the X/Open organization. It is a specification for distributed transactions, mainly defining the interfaces between the global transaction manager (TM) and the resource manager (RM) at the local level.

The XA interface is a bidirectional system interface that forms a communication bridge between the transaction manager and one or more resource managers. With this design, the transaction manager controls the global transaction, manages the transaction lifecycle, and coordinates resources, while the resource manager is responsible for controlling and managing various actual resources, including those related to the database.

The overall structure of XA and the interaction process between the transaction manager and the resource manager are illustrated in the following diagram:

Drawing 0.png

Structure diagram of XA protocol

Any introduction to distributed transactions will inevitably mention the two-phase commit because it is the key to implementing XA distributed transactions. We know that the two-phase commit process involves two roles: the coordinator and the participants. In the diagram above, the transaction manager introduced by XA acts as the “coordinator” role in the global transaction, and the resource manager in the diagram plays the role of “participant” in managing its internal resources.

After understanding these concepts, let’s look at the implementation in Java. As a transaction specification in the Java platform, JTA (Java Transaction API) also provides support for XA transactions. In fact, JTA is modeled based on the XA architecture. In JTA, the transaction manager is abstracted into the javax.transaction.TransactionManager interface and implemented through the underlying transaction service.

Like many other Java specifications, JTA only defines interfaces, and the specific implementation is provided by vendors. Currently, JTA implementations can be divided into two categories. One category integrates directly into application servers, such as JBoss. The other category is standalone implementations, such as Atomikos and Bitronix used in ShardingSphere. These implementations can be applied in environments that do not use J2EE application servers (such as regular Java applications) to provide distributed transaction guarantees. On the other hand, the ResourceManager in the JTA interface also requires the database vendor to provide an XA driver implementation.

Next, let’s further analyze the related core classes in JTA, which are the basis for understanding the implementation mechanism of distributed transactions in ShardingSphere.

In JTA, the following core interfaces are provided:

  • UserTransaction

This interface is for developers and allows programmatic control of transaction processing.

  • TransactionManager

This interface allows the application server to control distributed transactions.

  • Transaction

Represents the transaction managing the application.

  • XAResource This is an interface implementation for vendors, it is a Java mapping based on the XA protocol. When vendors provide drivers to access their own resources, they must implement such interfaces.

In addition, there are several core classes related to XA in the javax.sql package, namely XAConnection representing the connection, XADataSource representing the data source, and Xid representing the transaction ID.

We use the above core classes to simulate the common implementation process of XA-based distributed transactions with pseudocode. For a cross-database operation, we can generally implement the following operation sequence based on the UserTransaction interface:

UserTransaction userTransaction = null;
Connection connA = null;
Connection connB = null;
try{
    userTransaction.begin();
    //cross-database operation
    connA.execute("sql1");
    connB.execute("sql2");
    userTransaction.commit();
}catch(){
    userTransaction.rollback();
}

In order for the above code to work, the connection objects Connection need to support the XAResource interface, which involves a series of processes related to XADataSource and XAConnection.

Let’s go back to ShardingSphere and take a look at the XAShardingTransactionManager class, which is the XA implementation class for distributed transactions. It is mainly responsible for managing and adapting the actual DataSource, and delegates the begin/commit/rollback operations of the access point transaction to a specific XA transaction manager. For example, XAShardingTransactionManager will use the TransactionManager in XATransactionManager to complete the commit operation:

@Override
public void commit() {
    xaTransactionManager.getTransactionManager().commit();
}

The XATransactionManager here is an abstraction of various third-party XA transaction managers, encapsulating the implementation of Atomikos, Bitronix, and other third-party tools. We will delve into this XATransactionManager and XAShardingTransactionManager in the next class.

In summary, let’s clarify the relationships between the core classes related to XA two-phase commit in ShardingSphere, as shown in the following diagram:

Drawing 2.png

2.SeataATShardingTransactionManager #

After introducing XAShardingTransactionManager, let’s take a look at another implementation class, SeataATShardingTransactionManager, of the ShardingTransactionManager interface in the diagram above. Because of different technical systems and working principles, the implementation methods in SeataATShardingTransactionManager are completely different. Let’s take a look.

Before introducing SeataATShardingTransactionManager, it’s necessary to first learn about Seata itself. In contrast to XA, a distributed transaction in the Seata framework consists of three roles. In addition to the TransactionManager (TM) and ResourceManager (RM) which are also present in XA, there is also a TransactionCoordinator (TC) that maintains the running state of the global transaction and is responsible for coordinating and driving the submission or rollback of the global transaction.

Among them, TM is the initiator and terminator of a distributed transaction, TC is responsible for maintaining the running state of the distributed transaction, and RM is responsible for running the local transaction.

The overall architecture of Seata is as follows:

Drawing 4.png

Seata Distributed Transaction Composition Structure Diagram (from Seata official website)

Based on the Seata framework, the execution flow of a distributed transaction includes the following five steps:

Drawing 5.png

We will delve into these steps and the core classes involved in the next class.

From Source Code Analysis to Daily Development #

Today’s content mainly focuses on the abstraction process of distributed transactions in ShardingSphere, and does not involve much source code analysis. Our focus is to master the characteristics and core classes of the XA protocol, as well as to understand the process of executing a distributed transaction based on the Seata framework.

Summary and Preview #

This is the first class introducing the implementation principles of distributed transactions in ShardingSphere. We mainly explained how ShardingSphere unifies and abstracts two different distributed transaction implementation methods, XA and Seata, based on the same system.

Here’s a thinking question for you: Which abstractions does ShardingSphere make for different distributed transaction implementation technologies? Feel free to discuss with everyone in the comments, and I will review and comment on the answers.

In the next two classes, we will delve into the implementation principles and process of distributed transactions from a source code perspective based on the concepts and steps discussed today.