29 Distributed Transactions How to Integrate Support for Strong Consistency Transactions and Flexible Transactions in Sharding Sphere Below

29 Distributed Transactions How to Integrate Support for Strong Consistency Transactions and Flexible Transactions in ShardingSphere Below #

In the previous lesson, we discussed in detail about XAShardingTransactionManager, which supports strong consistency transactions in ShardingSphere. Today, we will continue explaining the remaining content of this class and introduce SeataATShardingTransactionManager, which supports flexible transactions.

XAShardingTransactionManager #

Regarding XAShardingTransactionManager, in the previous lesson, we introduced core classes such as XADataSource, XAConnection, and XATransactionDataSource.

Next, based on the previous lesson, we will provide the implementation process of XATransactionManager and ShardingConnection.

1. XATransactionManager #

Let’s go back to XAShardingTransactionManager. We have already introduced the variables in XAShardingTransactionManager before. Next, let’s take a look at the methods it implements, starting with the init method:

public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) {
    for (ResourceDataSource each : resourceDataSources) {
        // Create XATransactionDataSource and cache it
        cachedDataSources.put(each.getOriginalName(), new XATransactionDataSource(databaseType, each.getUniqueResourceName(), each.getDataSource(), xaTransactionManager));
    }
    // Initialize XATransactionManager
    xaTransactionManager.init();
}

The above method constructs XATransactionDataSource based on the incoming ResourceDataSource and puts it into the cache. At the same time, it also initializes the XATransactionManager created through the SPI mechanism.

The getTransactionType, isInTransaction, and getConnection methods in XAShardingTransactionManager are all relatively simple, as shown below:

@Override
public TransactionType getTransactionType() {
    return TransactionType.XA;
}

@Override
public boolean isInTransaction() {
    return Status.STATUS_NO_TRANSACTION != xaTransactionManager.getTransactionManager().getStatus();
}

@Override
public Connection getConnection(final String dataSourceName) throws SQLException {
    try {
        return cachedDataSources.get(dataSourceName).getConnection();
    } catch (final SystemException | RollbackException ex) {
        throw new SQLException(ex);
    }
}

The implementation of begin, commit, and rollback methods related to transaction operations is also relatively simple. They directly delegate to the TransactionManager stored in XATransactionManager to complete, as shown below:

@Override
public void begin() {
    xaTransactionManager.getTransactionManager().begin();
}

@Override
public void commit() {
    xaTransactionManager.getTransactionManager().commit();
}

@Override
public void rollback() {
    xaTransactionManager.getTransactionManager().rollback();
}

So far, we have completed the introduction of all the content in the sharding-transaction-xa-core project. Let’s move on to the sharding-transaction-xa-atomikos-manager project and take a look at the implementation of AtomikosTransactionManager, which is the default implementation of TransactionManager in ShardingSphere.

But before that, let’s first take a look at the implementation of AtomikosXARecoverableResource, which represents a resource, as shown below:

public final class AtomikosXARecoverableResource extends JdbcTransactionalResource {

    private final String resourceName;

    AtomikosXARecoverableResource(final String serverName, final XADataSource xaDataSource) {
        super(serverName, xaDataSource);
        resourceName = serverName;
    }

    @Override
    public boolean usesXAResource(final XAResource xaResource) {
        return resourceName.equals(((SingleXAResource) xaResource).getResourceName());
    }
}

As you can see, the usesXAResource method here actually compares the ResourceName of SingleXAResource to determine whether the resource is being used. This is why we need a wrapper class SingleXAResource that wraps XAResource.

AtomikosTransactionManager uses AtomikosXARecoverableResource. The implementation process is as follows:

public final class AtomikosTransactionManager implements XATransactionManager {

    private final UserTransactionManager transactionManager = new UserTransactionManager();

    private final UserTransactionService userTransactionService = new UserTransactionServiceImp();

    @Override
    public void init() {
        userTransactionService.init();
    }

    @Override
public class ShardingTransactionTestCase {
    private static final String SQL = "SELECT * FROM users";
    private static final String INSERT_SQL = "INSERT INTO users (id, name) VALUES (?, ?)";
    private static final String DELETE_SQL = "DELETE FROM users WHERE id = ?";
    private static final String UPDATE_SQL = "UPDATE users SET name = ? WHERE id = ?";
    
    private ShardingTransactionManager shardingTransactionManager;
    private ShardingTransaction shardingTransaction;
    private ShardingConnection shardingConnection;
    
    @Before
    public void setUp() {
        shardingTransactionManager = new ShardingTransactionManager();
        shardingTransaction = new ShardingTransaction(shardingTransactionManager);
        shardingConnection = new ShardingConnection(shardingTransaction);
    }
    
    @Test
    public void testSelectWithoutTransaction() throws SQLException {
        try (Statement statement = shardingConnection.createStatement()) {
            assertTrue(statement.execute(SQL));
        }
    }
    
    @Test
    public void testSelectWithinTransaction() throws SQLException {
        try (Statement statement = shardingConnection.createStatement()) {
            shardingTransaction.begin();
            assertTrue(statement.execute(SQL));
            shardingTransaction.commit();
        }
    }
    
    @Test
    public void testInsertWithoutTransaction() throws SQLException {
        try (PreparedStatement preparedStatement = shardingConnection.prepareStatement(INSERT_SQL)) {
            preparedStatement.setInt(1, 1);
            preparedStatement.setString(2, "Alice");
            assertEquals(1, preparedStatement.executeUpdate());
        }
    }
    
    @Test
    public void testInsertWithinTransaction() throws SQLException {
        try (PreparedStatement preparedStatement = shardingConnection.prepareStatement(INSERT_SQL)) {
            shardingTransaction.begin();
            preparedStatement.setInt(1, 1);
            preparedStatement.setString(2, "Alice");
            assertEquals(1, preparedStatement.executeUpdate());
            shardingTransaction.commit();
        }
    }
    
    @Test
    public void testDeleteWithoutTransaction() throws SQLException {
        try (PreparedStatement preparedStatement = shardingConnection.prepareStatement(DELETE_SQL)) {
            preparedStatement.setInt(1, 1);
            assertEquals(1, preparedStatement.executeUpdate());
        }
    }
    
    @Test
    public void testDeleteWithinTransaction() throws SQLException {
        try (PreparedStatement preparedStatement = shardingConnection.prepareStatement(DELETE_SQL)) {
            shardingTransaction.begin();
            preparedStatement.setInt(1, 1);
            assertEquals(1, preparedStatement.executeUpdate());
            shardingTransaction.commit();
        }
    }
    
    @Test
    public void testUpdateWithoutTransaction() throws SQLException {
        try (PreparedStatement preparedStatement = shardingConnection.prepareStatement(UPDATE_SQL)) {
            preparedStatement.setString(1, "Bob");
            preparedStatement.setInt(2, 1);
            assertEquals(1, preparedStatement.executeUpdate());
        }
    }
    
    @Test
    public void testUpdateWithinTransaction() throws SQLException {
        try (PreparedStatement preparedStatement = shardingConnection.prepareStatement(UPDATE_SQL)) {
            shardingTransaction.begin();
            preparedStatement.setString(1, "Bob");
            preparedStatement.setInt(2, 1);
            assertEquals(1, preparedStatement.executeUpdate());
            shardingTransaction.commit();
        }
    }
    
    @After
    public void tearDown() {
        shardingConnection.close();
        shardingTransactionManager.close();
    }
}
super.setAutoCommit(autoCommit);
return;
}
if (autoCommit && !shardingTransactionManager.isInTransaction() || !autoCommit && shardingTransactionManager.isInTransaction()) {
return;
}
if (autoCommit && shardingTransactionManager.isInTransaction()) {
shardingTransactionManager.commit();
return;
}
if (!autoCommit && !shardingTransactionManager.isInTransaction()) {
closeCachedConnections();
shardingTransactionManager.begin();
}
}

In the above method, we can see that when the transaction type is local transaction, the setAutoCommit method in the parent class AbstractConnectionAdapter of ShardingConnection is directly called to handle the auto-commit of the local transaction.

When autoCommit is true and running in a transaction, the shardingTransactionManager.commit() method is called to complete the commit; when autoCommit is false and not in a transaction, the shardingTransactionManager.begin() method is called to start a transaction.

The handling of commit and rollback at the end is similar to setAutoCommit, based on the transaction type to determine whether to perform distributed commit and rollback, as shown below:

@Override
public void commit() throws SQLException {
if (TransactionType.LOCAL == transactionType) {
super.commit();
} else {
shardingTransactionManager.commit();
}
}

@Override
public void rollback() throws SQLException {
if (TransactionType.LOCAL == transactionType) {
super.rollback();
} else {
shardingTransactionManager.rollback();
}
}

As we mentioned in the previous lesson, while ShardingSphere provides an XA protocol implementation solution for two-phase commit, it also implements flexible transactions.

After introducing XAShardingTransactionManager, let’s continue with the implementation class of the flexible transaction TransactionManager SeataATShardingTransactionManager based on the Seata framework.

SeataATShardingTransactionManager #

Because SeataATShardingTransactionManager completely uses Alibaba’s Seata framework to provide distributed transaction features, instead of following development specifications like XA, the code implementation is much simpler than the class structure of XAShardingTransactionManager, hiding the complexity inside the framework.

To integrate Seata, we first need to initialize two client objects, TMClient and RMClient. In the Seata internals, these two clients communicate with each other through RPC.

Therefore, in the init method of XAShardingTransactionManager, ShardingSphere implements an initSeataRPCClient method to initialize these two client objects, as shown below:

// Create the configuration object based on seata.conf configuration file
private final FileConfiguration configuration = new FileConfiguration("seata.conf"); 
private void initSeataRPCClient() {
    String applicationId = configuration.getConfig("client.application.id");
    Preconditions.checkNotNull(applicationId, "please config application id within seata.conf file");
    String transactionServiceGroup = configuration.getConfig("client.transaction.service.group", "default");
    TMClient.init(applicationId, transactionServiceGroup);
    RMClient.init(applicationId, transactionServiceGroup);
}

Recalling the introduction of Seata usage in “09 | Distributed Transaction: How to Use Strong Consistency Transaction and Flexible Transaction?”, it is not difficult to understand that the initialization operation is performed based on the configuration items application.id and transaction.service.group configured in the seata.conf configuration file.

At the same time, for Seata, it also provides a set of implementation strategies built on top of the JDBC specification, which is similar to the compatibility between ShardingSphere and the JDBC specification introduced in “03 | Specification Compatibility: What is the Relationship between the JDBC Specification and ShardingSphere?”.

Although the naming of Seata is more direct and straightforward, using proxy objects like DataSourceProxy and ConnectionProxy. Taking DataSourceProxy as an example, we can outline its class structure as follows:

image

It can be seen that DataSourceProxy implements its own Resource interface and extends the abstract class AbstractDataSourceProxy, which in turn implements the DataSource interface in JDBC.

Therefore, when we initialize the Seata framework, we also need to construct DataSourceProxy based on the input DataSource object, and obtain ConnectionProxy through DataSourceProxy. The relevant code in the SeataATShardingTransactionManager class is as follows:

@Override
public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) {
    // Initialize Seata client
    initSeataRPCClient();
    // Create DataSourceProxy and put it into the Map
    for (ResourceDataSource each : resourceDataSources) {
        dataSourceMap.put(each.getOriginalName(), new DataSourceProxy(each.getDataSource()));
    }
}

@Override
public Connection getConnection(final String dataSourceName) throws SQLException {
    // Get ConnectionProxy based on DataSourceProxy
    return dataSourceMap.get(dataSourceName).getConnection();
}

After introducing the initialization work, let’s take a look at the entry points provided by SeataATShardingTransactionManager for transaction opening and submission. In Seata, GlobalTransaction is a core interface that encapsulates the access to distributed transactions at the user operation layer. The definition of this interface is as follows, and the corresponding operation meanings can be directly seen from the method names:

public interface GlobalTransaction {
    void begin() throws TransactionException;
    void begin(int timeout) throws TransactionException;
    void begin(int timeout, String name) throws TransactionException;
    void commit() throws TransactionException;
    void rollback() throws TransactionException;
    GlobalStatus getStatus() throws TransactionException;
String getXid();
}

As a user layer of GlobalTransaction, ShardingSphere also completes the distributed transaction operations based on the GlobalTransaction interface. However, ShardingSphere does not directly use this layer but designs a SeataTransactionHolder class, which holds the thread-safe GlobalTransaction object.

The SeataTransactionHolder class is located in the sharding-transaction-base-seata-at project and is defined as follows:

final class SeataTransactionHolder {
    private static final ThreadLocal<GlobalTransaction> CONTEXT = new ThreadLocal<>();

    static void set(final GlobalTransaction transaction) {
        CONTEXT.set(transaction);
    } 
    static GlobalTransaction get() {
        return CONTEXT.get();
    }

    static void clear() {
        CONTEXT.remove();
    }
}

Here, the ThreadLocal utility class is used to ensure the thread safety of accessing GlobalTransaction.

The next question is how to determine whether the current operation is within a global transaction.

In Seata, there is a context object called RootContext, which is used to store the Xid propagated between participants and initiators. When the transaction initiator starts a global transaction, the Xid will be filled in the RootContext. Then, the Xid will be propagated along the service invocation chain and filled in the RootContext of each transaction participant process. When a transaction participant finds an Xid in the RootContext, it knows that it is within a global transaction.

Based on this principle, we can determine whether we are within a global transaction by using the following method:

@Override
public boolean isInTransaction() {
    return null != RootContext.getXID();
}

At the same time, Seata also provides a GlobalTransactionContext class for global transactions. With this context class, we can use the getCurrent method to get a GlobalTransaction object or use the getCurrentOrCreate method to create a new one when unable to obtain a GlobalTransaction object.

With this understanding, we can easily understand the implementation process of the begin method in SeataATShardingTransactionManager, as shown below:

@Override
@SneakyThrows
public void begin() {
    SeataTransactionHolder.set(GlobalTransactionContext.getCurrentOrCreate());
    SeataTransactionHolder.get().begin();
    SeataTransactionBroadcaster.collectGlobalTxId();
}

Here, a GlobalTransaction is created through the GlobalTransactionContext.getCurrentOrCreate() method, and then it is saved in the SeataTransactionHolder. Next, a GlobalTransaction is obtained from the SeataTransactionHolder and the begin method is called to start the transaction.

Notice that there is also a SeataTransactionBroadcaster class, which is a container class for storing the Seata global Xid. We collect and save the global Xid when the transaction is started, and we clear these Xids when the transaction is committed or rolled back.

Therefore, the implementation of the commit, rollback, and close methods, as shown below, becomes easy to understand:

@Override
public void commit() {
    try {
        SeataTransactionHolder.get().commit();
    } finally {
        SeataTransactionBroadcaster.clear();
        SeataTransactionHolder.clear();
    }
}

@Override
public void rollback() {
    try {
        SeataTransactionHolder.get().rollback();
    } finally {
        SeataTransactionBroadcaster.clear();
        SeataTransactionHolder.clear();
    }
}

@Override
public void close() {
    dataSourceMap.clear();
    SeataTransactionHolder.clear();
    TmRpcClient.getInstance().destroy();
    RmRpcClient.getInstance().destroy();
}

The code in the sharding-transaction-base-seata-at project actually only contains these contents, which constitute the implementation process of integrating the Seata framework into ShardingSphere.

From Source Code Analysis to Daily Development #

Today’s content provides a detailed process of how to integrate the Seata distributed transaction framework into an application. ShardingSphere provides us with a template implementation. In daily development, if we want to integrate Seata into our business code, we can refer to the code in core classes such as SeataTransactionHolder and SeataATShardingTransactionManager without making too many modifications.

Summary and Next Steps #

This tutorial is the last part of ShardingSphere distributed transactions. We have covered the remaining part of XAShardingTransactionManager and the complete implementation of SeataATShardingTransactionManager.

Looking back on the previous tutorial, we found that understanding XAShardingTransactionManager is challenging due to the integration and compatibility process from ShardingConnection to the underlying JDBC specification. As for XAShardingTransactionManager, we need to have a certain understanding of the Seata framework in order to better understand today’s content.

Here is a question for you to think about: If you were asked to integrate with the Seata framework, what are the core steps you need to take? Feel free to discuss with everyone in the comments section, and I will review and comment on the answers.

After introducing distributed transactions, we will move on to the “Source Code Analysis of Orchestrating and Governance in ShardingSphere” module. Starting from the next tutorial, I will explain the implementation principle of the data masking module.