28 Distributed Transactions How to Integrate Support for Strong Consistency Transactions and Flexible Transactions in Sharding Sphere Above

28 Distributed Transactions How to Integrate Support for Strong Consistency Transactions and Flexible Transactions in ShardingSphere Above #

Today, we will delve into the specific implementation process of distributed transactions in ShardingSphere, based on the previous lesson. First, let’s introduce the XAShardingTransactionManager that supports strong consistency transactions.

XAShardingTransactionManager #

Let’s go back to ShardingSphere and look at the XAShardingTransactionManager class in the sharding-transaction-xa-core project, which is the XA implementation class for distributed transactions.

First, let’s take a look at the definition of the XAShardingTransactionManager class and the variables it contains:

public final class XAShardingTransactionManager implements ShardingTransactionManager {

    private final Map<String, XATransactionDataSource> cachedDataSources = new HashMap<>();

    private final XATransactionManager xaTransactionManager = XATransactionManagerLoader.getInstance().getTransactionManager();

}

As you can see, XAShardingTransactionManager implements the ShardingTransactionManager interface as introduced in the previous lesson, and it maintains a group of XATransactionDataSource instances. At the same time, the loading of the XATransactionManager instance still uses the ServiceLoader class in JDK, as shown below:

private XATransactionManager load() {
    Iterator<XATransactionManager> xaTransactionManagers = ServiceLoader.load(XATransactionManager.class).iterator();
    if (!xaTransactionManagers.hasNext()) {
        return new AtomikosTransactionManager();
    }
    XATransactionManager result = xaTransactionManagers.next();
    if (xaTransactionManagers.hasNext()) {
        log.warn("There are more than one transaction mangers existing, chosen first one by default.");
    }
    return result;
}

XATransactionManager is an abstraction of various third-party XA transaction managers. From the above code, we can see that if no suitable XATransactionManager is found, the system will create an AtomikosTransactionManager by default. The definition of this XATransactionManager is actually in a separate code project, namely the sharding-transaction-xa-spi project, and the interface is defined as follows:

public interface XATransactionManager extends AutoCloseable {

    // Initialize the XA transaction manager
    void init();

    // Register transaction recovery resource
    void registerRecoveryResource(String dataSourceName, XADataSource xaDataSource);

    // Remove transaction recovery resource
    void removeRecoveryResource(String dataSourceName, XADataSource xaDataSource);

    // Embed a SingleXAResource resource
    void enlistResource(SingleXAResource singleXAResource);

    // Get TransactionManager
    TransactionManager getTransactionManager();
}

The meanings of these interface methods can be understood from their names, but we still need to understand the detailed usage by combining them with the specific XATransactionManager implementation. Here, we also find a SingleXAResource, which is also located in the sharding-transaction-xa-spi project. From the name, it should be an implementation of the XAResource in the JTA. Let’s take a look:

public final class SingleXAResource implements XAResource {

    private final String resourceName;

    private final XAResource delegate;

    @Override
    public void start(final Xid xid, final int i) throws XAException {
        delegate.start(xid, i);
    }
    
    @Override
    public void commit(final Xid xid, final boolean b) throws XAException {
        delegate.commit(xid, b);
    }

    @Override
    public void rollback(final Xid xid) throws XAException {
        delegate.rollback(xid);
    }
    
    @Override
    public boolean isSameRM(final XAResource xaResource) {
        SingleXAResource singleXAResource = (SingleXAResource) xaResource;
        return resourceName.equals(singleXAResource.getResourceName());
    }
    ...
}

As you can see, although SingleXAResource implements the XAResource interface in JTA, it is more like a proxy class. The specific operational methods are delegated to the internal XAResource implementation.

Next, we will discuss several core classes in XA distributed transactions.

1. XADataSource #

XADataSource is part of the JDBC specification, and we have mentioned this interface in “03 | Specification Compatibility: What is the relationship between JDBC specification and ShardingSphere?”. The purpose of this interface is to obtain XAConnection.

So how is XADataSource constructed? First, we found a XADataSourceFactory factory class, which is responsible for generating specific XADataSource instances. The build method that completes this work is shown below:

public static XADataSource build(final DatabaseType databaseType, final DataSource dataSource) {
    XADataSourceDefinition xaDataSourceDefinition = XADataSourceDefinitionFactory.getXADataSourceDefinition(databaseType);
    XADataSource result = createXADataSource(xaDataSourceDefinition);
    Properties xaProperties = xaDataSourceDefinition.getXAProperties(SWAPPER.swap(dataSource));
    PropertyUtils.setProperties(result, xaProperties);
    return result;
}

Here, we first use an XADataSourceDefinition interface, which seems to be a definition of XADataSource. It is defined as follows:

public interface XADataSourceDefinition extends DatabaseTypeAwareSPI {

    // Get XA driver class names
    Collection<String> getXADriverClassName();

    // Get XA properties
    Properties getXAProperties(DatabaseAccessConfiguration databaseAccessConfiguration);
}

We can see that this interface inherits from DatabaseTypeAwareSPI. From its name, it is also an SPI interface, which is defined as follows:

public interface DatabaseTypeAwareSPI { 
    // Get database type
    String getDatabaseType();
}

In ShardingSphere, the only interface that inherits from DatabaseTypeAwareSPI is XADataSourceDefinition, and there are several implementation classes for the latter. The overall class hierarchy is as follows:

Drawing 0.png

Implementation classes of XADataSourceDefinition

Taking MySQLXADataSourceDefinition as an example, this class implements three methods defined in both DatabaseTypeAwareSPI and XADataSourceDefinition:

public final class MySQLXADataSourceDefinition implements XADataSourceDefinition {

    @Override
    public String getDatabaseType() {
        return "MySQL";
    }

    @Override
    public Collection<String> getXADriverClassName() {
        return Arrays.asList("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource", "com.mysql.cj.jdbc.MysqlXADataSource");
    }

    @Override
    public Properties getXAProperties(final DatabaseAccessConfiguration databaseAccessConfiguration) {
        Properties result = new Properties();
        result.setProperty("user", databaseAccessConfiguration.getUsername());
        result.setProperty("password", Optional.fromNullable(databaseAccessConfiguration.getPassword()).or(""));
        result.setProperty("URL", databaseAccessConfiguration.getUrl());
        … 
        return result;
    }
}

From here, we can see that as a database vendor, MySQL provides two XADataSource driver programs. In getXAProperties, we find that URL, Username, Password, and other information is obtained through the DatabaseAccessConfiguration object, which will be introduced later in this article.

On the other hand, because the DatabaseTypeAwareSPI interface name contains “SPI”, we can imagine that various XADataSourceDefinition classes are actually loaded based on the SPI mechanism. This can be confirmed in the XADataSourceDefinitionFactory factory class used to obtain XADataSourceDefinition:

public final class XADataSourceDefinitionFactory {

    private static final Map<DatabaseType, XADataSourceDefinition> XA_DATA_SOURCE_DEFINITIONS = new HashMap<>();

	static {
       // Load XADataSourceDefinition using ServiceLoader
        for (XADataSourceDefinition each : ServiceLoader.load(XADataSourceDefinition.class)) {
            XA_DATA_SOURCE_DEFINITIONS.put(DatabaseTypes.getActualDatabaseType(each.getDatabaseType()), each);
        }
    }

    public static XADataSourceDefinition getXADataSourceDefinition(final DatabaseType databaseType) {
        return XA_DATA_SOURCE_DEFINITIONS.get(databaseType);
    }
}

Similarly, we also found SPI configuration information in the sharding-transaction-xa-core project:

Drawing 1.png

SPI configuration in the sharding-transaction-xa-core project

After obtaining the corresponding XADataSourceDefinition based on the database type, we can create the specific XADataSource based on XADriverClassName:

private static XADataSource loadXADataSource(final String xaDataSourceClassName) {
        Class xaDataSourceClass;
        try {
           // Load the implementation class of XADataSource
            xaDataSourceClass = Thread.currentThread().getContextClassLoader().loadClass(xaDataSourceClassName);
        } catch (final ClassNotFoundException ignored) {
            try {
                xaDataSourceClass = Class.forName(xaDataSourceClassName);
            } catch (final ClassNotFoundException ex) {
                throw new ShardingException("Failed to load [%s]", xaDataSourceClassName);
            }
        }
        try {
            return (XADataSource) xaDataSourceClass.newInstance();
        } catch (final InstantiationException | IllegalAccessException ex) {
            throw new ShardingException("Failed to instance [%s]", xaDataSourceClassName);
        }
}

Here, the target driver implementation class is first loaded from the current thread’s ContextClassLoader. If it can’t be found, it is directly created through reflection, and finally, an instance of XADataSource is returned.

After obtaining the instance of XADataSource, we need to set its properties, which is done by the DataSourceSwapper class. Here, ShardingSphere also provides a layer of abstraction for different types of database connection pool tools, and extracts the DataSourcePropertyProvider interface to abstract the basic information of the DataSource, such as URL, Username, Password, etc.

The DataSourcePropertyProvider interface is defined as follows:

public interface DataSourcePropertyProvider {
    String getDataSourceClassName();
    String getURLPropertyName();
    String getUsernamePropertyName();
    String getPasswordPropertyName();
}

There are two implementation classes for DataSourcePropertyProvider, one is DefaultDataSourcePropertyProvider, and the other is HikariCPPropertyProvider. ShardingSphere uses HikariCPPropertyProvider by default, which can be confirmed from the following SPI configuration file:

Drawing 2.png

SPI configuration of DataSourcePropertyProvider

HikariCPPropertyProvider implements the DataSourcePropertyProvider interface and defines the basic information:

public final class HikariCPPropertyProvider implements DataSourcePropertyProvider {

    @Override
    public String getDataSourceClassName() {
        return "com.zaxxer.hikari.HikariDataSource";
}
    
@Override
public String getURLPropertyName() {
    return "jdbcUrl";
}

@Override
public String getUsernamePropertyName() {
    return "username";
}

@Override
public String getPasswordPropertyName() {
    return "password";
}
}

@Override
public DatabaseAccessConfiguration swap(final DataSource dataSource) {
    DataSourcePropertyProvider provider = DataSourcePropertyProviderLoader.getProvider(dataSource);
    try {
        String url = (String) findGetterMethod(dataSource, provider.getURLPropertyName()).invoke(dataSource);
        String username = (String) findGetterMethod(dataSource, provider.getUsernamePropertyName()).invoke(dataSource);
        String password = (String) findGetterMethod(dataSource, provider.getPasswordPropertyName()).invoke(dataSource);
        return new DatabaseAccessConfiguration(url, username, password);
    } catch (final ReflectiveOperationException ex) {
        throw new ShardingException("Cannot swap data source type: `%s`, please provide an implementation from SPI `%s`", 
                dataSource.getClass().getName(), DataSourcePropertyProvider.class.getName());
    }
}
}

2. XAConnection

Next, let's talk about XAConnection, which is also an interface defined in the JDBC specification.

The XAConnectionFactory class, responsible for creating XAConnection, is as follows:

public final class XAConnectionFactory { 
    // Create XAConnection based on a regular Connection
    public static XAConnection createXAConnection(final DatabaseType databaseType, final XADataSource xaDataSource, final Connection connection) {
        switch (databaseType.getName()) {
            case "MySQL":
                return new MySQLXAConnectionWrapper().wrap(xaDataSource, connection);
            case "MariaDB":
                return new MariaDBXAConnectionWrapper().wrap(xaDataSource, connection);
            case "PostgreSQL":
                return new PostgreSQLXAConnectionWrapper().wrap(xaDataSource, connection);
            case "H2":
                return new H2XAConnectionWrapper().wrap(xaDataSource, connection);
            default:
                throw new UnsupportedOperationException(String.format("Cannot support database type: `%s`", databaseType));
        }
    }
}

As you can see, compared to XADataSource, the process of creating XAConnection is more straightforward. Here, a switch statement is used to create the corresponding ConnectionWrapper based on the database type, and then the wrap method is called to return XAConnection.

Let's take MySQLXAConnectionWrapper as an example to analyze the specific implementation process.

MySQLXAConnectionWrapper implements the XAConnectionWrapper interface, so let's first look at the definition of the XAConnectionWrapper interface:

public interface XAConnectionWrapper { 
    // Wrap the Connection into XAConnection based on XADataSource
    XAConnection wrap(XADataSource xaDataSource, Connection connection);
}

The XAConnectionWrapper interface only has one method, which is to create a new XAConnection object based on the passed XADataSource and a regular Connection object. The class hierarchy of the XAConnectionWrapper interface is as follows:

XAConnectionWrapper interface implementation classes

The wrap method in MySQLXAConnectionWrapper is as follows:

@Override
public XAConnection wrap(final XADataSource xaDataSource, final Connection connection) {
    // Get the actual Connection object
    Connection physicalConnection = unwrapPhysicalConnection(xaDataSource.getClass().getName(), connection);
    Method method = xaDataSource.getClass().getDeclaredMethod("wrapConnection", Connection.class);
    method.setAccessible(true);
    // Wrap the Connection object through reflection
    return (XAConnection) method.invoke(xaDataSource, physicalConnection);
}

The above method first transforms the passed Connection into a real connection object using unwrapPhysicalConnection, and then uses the wrapConnection method of XADataSource through reflection to wrap this physical connection, thus forming an XAConnection object.

For MySQL, as we already know from the previous content, it has two XADataSource driver classes. In MySQLXAConnectionWrapper, we also find the class name definitions of these two driver classes as follows:

private static final String MYSQL_XA_DATASOURCE_5 = "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource";

private static final String MYSQL_XA_DATASOURCE_8 = "com.mysql.cj.jdbc.MysqlXADataSource";

Obviously, the behavior of these two driver classes is different depending on the database version. Therefore, the processing of the unwrapPhysicalConnection method is as follows:

private Connection unwrapPhysicalConnection(final String xaDataSourceClassName, final Connection connection) {
    switch (xaDataSourceClassName) {
        case MYSQL_XA_DATASOURCE_5:
            return (Connection) connection.unwrap(Class.forName("com.mysql.jdbc.Connection"));
        case MYSQL_XA_DATASOURCE_8:
            return (Connection) connection.unwrap(Class.forName("com.mysql.cj.jdbc.JdbcConnection"));
        default:
            throw new UnsupportedOperationException(String.format("Cannot support xa datasource: `%s`", xaDataSourceClassName));

} }


As a comparison, let's take a look at the `PostgreSQLXAConnectionWrapper`, which has a simpler `wrap` method shown below. Obviously, understanding this part requires some understanding of different database drivers.

public XAConnection wrap(final XADataSource xaDataSource, final Connection connection) { BaseConnection physicalConnection = (BaseConnection) connection.unwrap(Class.forName(“org.postgresql.core.BaseConnection”)); return new PGXAConnection(physicalConnection); }


#### 3. XATransactionDataSource

After introducing the creation process of `XADataSource` and `XAConnection`, let's go back to `XAShardingTransactionManager`. We noticed that the `DataSource` used here is not the native `XADataSource` in JDBC, but a `XATransactionDataSource`.

Let's take a look at the `XATransactionDataSource` class, which has the following variables and constructor:

private final DatabaseType databaseType; private final String resourceName; private final DataSource dataSource; private XADataSource xaDataSource; private XATransactionManager xaTransactionManager;

public XATransactionDataSource(final DatabaseType databaseType, final String resourceName, final DataSource dataSource, final XATransactionManager xaTransactionManager) { this.databaseType = databaseType; this.resourceName = resourceName; this.dataSource = dataSource; if (!CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) { this.xaDataSource = XADataSourceFactory.build(databaseType, dataSource); this.xaTransactionManager = xaTransactionManager; xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource); } }


We are already familiar with the above variables. In the constructor, the `registerRecoveryResource` method of the `XATransactionManager` class is called to register the constructed `XADataSource` as a resource.

Next, let's take a look at the core method `getConnection` in `XATransactionDataSource`, as shown below:

public Connection getConnection() throws SQLException, SystemException, RollbackException { if (CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) { return dataSource.getConnection(); } // Build a Connection from the DataSource Connection result = dataSource.getConnection(); // Create an XAConnection with XAConnectionFactory XAConnection xaConnection = XAConnectionFactory.createXAConnection(databaseType, xaDataSource, result); // Get the Transaction object from XATransactionManager final Transaction transaction = xaTransactionManager.getTransactionManager().getTransaction(); // Check if the Transaction is already in the current thread if (!enlistedTransactions.get().contains(transaction)) { // Associate the XAResource in XAConnection with the target Transaction object transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource())); // Register a Synchronization interface in Transaction transaction.registerSynchronization(new Synchronization() { @Override public void beforeCompletion() { enlistedTransactions.get().remove(transaction); }

        @Override
        public void afterCompletion(final int status) {
            enlistedTransactions.get().clear();
        }
    });
    // Put the Transaction object into the current thread
    enlistedTransactions.get().add(transaction);
}
return result;

}


Here, we first build a `Connection` from the `DataSource`, then create an `XAConnection` with `XAConnectionFactory`. Next, we get the `Transaction` object from `XATransactionManager`. Please note that in `XATransactionDataSource`, there is a `ThreadLocal` variable `enlistedTransactions` to store the list of `Transaction` objects involved in the current thread:

private final ThreadLocal<Set> enlistedTransactions = new ThreadLocal<Set>() { @Override public Set initialValue() { return new HashSet<>(); } };


In the above method, after getting the `Transaction` object from `XATransactionManager`, it will first check if the `Transaction` object exists in `enlistedTransactions`. If not, it associates the `XAResource` in `XAConnection` with the target `Transaction` object.

Next, let's take a look at the usage of the `registerSynchronization` method in the `Transaction` object. This method registers a `Synchronization` interface, which contains the `beforeCompletion` and `afterCompletion` methods.

Before the two-phase commit, the `TransactionManager` calls the `beforeCompletion` method of the `Synchronization` interface; and when the transaction ends, the `TransactionManager` calls the `afterCompletion` method of the `Synchronization` interface. We can see the use of these two methods in the `getConnection` method. Finally, we put the `Transaction` object into the thread-safe `enlistedTransactions`.

Finally, let's take a look at the `close` method in `XATransactionDataSource`, as shown below:

@Override public void close() { if (!CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) { xaTransactionManager.removeRecoveryResource(resourceName, xaDataSource); } else { close(dataSource); } }


Here, the `removeRecoveryResource` method of `XATransactionManager` is called to remove the resource.

So far, the process of obtaining a `Connection` based on `XATransactionDataSource` has been explained. We will continue to explore the remaining parts of `XAShardingTransactionManager` and another sharding transaction manager `SeataATShardingTransactionManager` in the next lesson.

### From Source Code Analysis to Daily Development

ShardingSphere is a distributed database middleware that fully complies with the JDBC specification and also supports the related objects in distributed transactions. In today's lesson, we further strengthened our understanding of the JDBC specification and how to extend the core interface methods in the JDBC specification. In the `MySQLXAConnectionWrapper` class, we saw the implementation method of creating `XAConnection` objects using reflection techniques again. These development techniques are worth learning and applying.

### Summary and Preview

Distributed transactions are a relatively complex concept, and ShardingSphere provides two implementation solutions for strong consistency and eventual consistency in distributed environments. In today's lesson, we discussed the `XAShardingTransactionManager` based on the XA protocol. When understanding the core objects such as `XADataSource` and `XAConnection` in `XAShardingTransactionManager`, it is still important to have a foundation in the JDBC specification and grasp the entire process of integration and compatibility with distributed transactions. This lesson provides a detailed introduction to this process.

Here's a question for you to think about: What dimensions of abstract the ShardingSphere framework take for achieving strong consistency in a distributed environment? Feel free to discuss with others in the comments section, and I will provide comments and answers one by one.

There is still a lot of content in `XAShardingTransactionManager`. In the next lesson, we will continue to explore the remaining parts of `XAShardingTransactionManager` based on today's lesson, as well as another sharding transaction manager `SeataATShardingTransactionManager` in ShardingSphere.