09 How to Use Strong Consistency Transactions and Flexible Transactions in Distributed Transactions

09 How to Use Strong Consistency Transactions and Flexible Transactions in Distributed Transactions #

Hello, welcome to Lesson 09. Today, we will introduce an important topic in distributed environments, distributed transactions. Before diving into the specifics of how to use ShardingSphere, let’s briefly introduce the basic concepts of distributed transactions.

How to Understand Distributed Transactions? #

In traditional relational databases, transactions are a standard component, and almost all mature relational databases provide native support for local transactions. Local transactions provide ACID transaction properties. Based on local transactions, to ensure data consistency, we first start a transaction, then perform data operations, and finally commit or roll back. Furthermore, with the help of integrated frameworks like Spring, developers only need to focus on the business logic that causes data changes.

However, in a distributed environment, things become more complicated. Assuming there are multiple independent databases in the system, in order to ensure data consistency across these databases, we need to include them in the same transaction. Local transactions are not sufficient in this case, and we need to use distributed transactions.

The industry has some common mechanisms for implementing distributed transactions, such as the XA protocol that supports two-phase commit and flexible transactions represented by Saga. For different implementation mechanisms, there are also various vendors and development tools available. Because these development tools have significant differences in usage and implementation principles, developers have a strong demand for a unified solution that can abstract these differences. At the same time, we also hope that this solution can provide friendly system integration.

As a distributed database middleware, ShardingSphere must consider the implementation of distributed transactions. From the beginning, ShardingSphere has fully considered the demands of developers in its design. Let’s take a look.

Distributed Transactions in ShardingSphere #

In ShardingSphere, in addition to local transactions, there are two implementation options for distributed transactions: XA transactions and flexible (BASE) transactions. This can be verified from the enumeration values of TransactionType:

public enum TransactionType {
    LOCAL, XA, BASE
}

XA Transactions #

XA transactions provide an implementation mechanism based on the two-phase commit protocol. As the name suggests, the two-phase commit has two phases: prepare phase and commit phase. In the prepare phase, the coordinator initiates a proposal and asks each participant if they accept it. In the commit phase, the coordinator commits or aborts the transaction based on the feedback from the participants. If all participants agree, the transaction is committed. If any participant disagrees, the transaction is aborted.

Two-phase commit diagram

Two-phase commit diagram

Currently, there are also mainstream tool libraries for implementing XA transactions in the industry, such as Atomikos, Narayana, and Bitronix. ShardingSphere integrates these three tool libraries and defaults to using Atomikos to complete the two-phase commit.

BASE Transactions #

XA transactions are typical strong consistency transactions, which strictly adhere to the ACID principles of transactions. In contrast, flexible transactions adhere to the BASE design theory and pursue eventual consistency. The acronym BASE stands for Basically Available, Soft State, and Eventual Consistency.

For implementing flexible transactions based on the BASE principles, the industry also has excellent frameworks, such as Seata provided by Alibaba. ShardingSphere also integrates support for Seata internally. Of course, we can also integrate other open-source frameworks for distributed transactions as needed and embed them into the ShardingSphere runtime environment based on a microkernel architecture.

After introducing the theoretical knowledge, let’s now use XA transactions and BASE transactions separately to achieve data consistency in a distributed environment.

Using XA Transactions #

Adding support for XA transactions in a Spring application is relatively simple. Both the Spring framework and ShardingSphere itself provide us with low-cost development mechanisms.

Preparation of the Development Environment #

To use XA transactions, we first need to add two dependencies, sharding-jdbc-core and sharding-transaction-xa-core, to the pom.xml file:

<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-jdbc-core</artifactId>
</dependency> 

<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-transaction-xa-core</artifactId>
</dependency>

In today’s case, we will demonstrate how to implement distributed transactions in a sharding environment. Therefore, we need to create a .properties file in Spring Boot and include all the configuration items required for sharding:

spring.shardingsphere.datasource.names=ds0,ds1 
spring.shardingsphere.datasource.ds0.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds0.jdbc-url=jdbc:mysql://localhost:3306/ds0
spring.shardingsphere.datasource.ds0.username=root
spring.shardingsphere.datasource.ds0.password=root
spring.shardingsphere.datasource.ds0.autoCommit: false 
spring.shardingsphere.datasource.ds1.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds1.jdbc-url=jdbc:mysql://localhost:3306/ds1
spring.shardingsphere.datasource.ds1.username=root
spring.shardingsphere.datasource.ds1.password=root
spring.shardingsphere.datasource.ds0.autoCommit: false 
spring.shardingsphere.sharding.default-database-strategy.inline.sharding-column=user_id
spring.shardingsphere.sharding.default-database-strategy.inline.algorithm-expression=ds$->{user_id % 2}
spring.shardingsphere.sharding.binding-tables=health_record,health_task
spring.shardingsphere.sharding.broadcast-tables=health_level 
spring.shardingsphere.sharding.tables.health_record.actual-data-nodes=ds$->{0..1}.health_record
spring.shardingsphere.sharding.tables.health_record.key-generator.column=record_id
spring.shardingsphere.sharding.tables.health_record.key-generator.type=SNOWFLAKE
spring.shardingsphere.sharding.tables.health_record.key-generator.props.worker.id=33
spring.shardingsphere.sharding.tables.health_task.actual-data-nodes=ds$->{0..1}.health_task
spring.shardingsphere.sharding.tables.health_task.key-generator.column=task_id
spring.shardingsphere.sharding.tables.health_task.key-generator.type=SNOWFLAKE
spring.shardingsphere.sharding.tables.health_task.key-generator.props.worker.id=33 
spring.shardingsphere.props.sql.show=true

Implementing XA Transactions #

With the sharding configuration, we will obtain the target DataSource for SQL execution. Since we are using the Spring framework for transaction management instead of native JDBC, we need to associate the DataSource with the transaction manager PlatformTransactionManager in Spring.

On the other hand, to better integrate with the distributed transaction support in ShardingSphere, we can simplify the SQL execution process using the JdbcTemplate template class provided by the Spring framework. One common approach is to create a transaction configuration class to initialize the required PlatformTransactionManager and JdbcTemplate objects:

    @Configuration
    @EnableTransactionManagement
    public class TransactionConfiguration {
    
        @Bean
        public PlatformTransactionManager txManager(final DataSource dataSource) {
            return new DataSourceTransactionManager(dataSource);
        }
    
        @Bean
        public JdbcTemplate jdbcTemplate(final DataSource dataSource) {
            return new JdbcTemplate(dataSource);
        }
    }
    

Once the JdbcTemplate is initialized, you can inject this template class in your business code to execute various SQL operations. A common practice is to pass a PreparedStatementCallback and execute specific SQL within this callback:
    
```java
@Autowired
JdbcTemplate jdbcTemplate; 
jdbcTemplate.execute(SQL, (PreparedStatementCallback<Object>) preparedStatement -> {
    ...
    return preparedStatement;
});

In the above code, we obtain a PreparedStatement object through the PreparedStatementCallback callback. Alternatively, we can use the JdbcTemplate’s another code style to execute SQL by using a more basic ConnectionCallback callback interface:

jdbcTemplate.execute((ConnectionCallback<Object>) connection-> {
    ...
    return connection;
});

To embed distributed transaction mechanism in your business code with the least development cost, ShardingSphere also provides a @ShardingTransactionType annotation to configure the transaction type to be executed:

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface ShardingTransactionType {

    TransactionType value() default TransactionType.LOCAL;
}

As we know, there are three transaction types provided by ShardingSphere: LOCAL, XA, and BASE, with LOCAL being the default. Therefore, if you need to use distributed transactions, you need to explicitly add this annotation to your business methods:

@Transactional
@ShardingTransactionType(TransactionType.XA)
public void insert(){
    ...
}

Another way to set the TransactionType is by using the TransactionTypeHolder utility class. TransactionTypeHolder class uses ThreadLocal to store the TransactionType:

public final class TransactionTypeHolder {

    private static final ThreadLocal<TransactionType> CONTEXT = new ThreadLocal<TransactionType>() {

        @Override
        protected TransactionType initialValue() {
            return TransactionType.LOCAL;
        }
    };

    public static TransactionType get() {
        return CONTEXT.get();
    }

    public static void set(final TransactionType transactionType) {
        CONTEXT.set(transactionType);
    }

    public static void clear() {
        CONTEXT.remove();
    }
}

As you can see, TransactionTypeHolder defaults to using local transactions. We can change the initial setting by using the set method:

TransactionTypeHolder.set(TransactionType.XA);

Now that the overall structure of developing distributed transactions using XA has been outlined, we can create an insertHealthRecords method and add the data insertion code for HealthRecord and HealthTask within it:

private List<Long> insertHealthRecords() throws SQLException {
       List<Long> result = new ArrayList<>(10);
       jdbcTemplate.execute((ConnectionCallback<Object>) connection-> {
           connection.setAutoCommit(false);

         try {
```java
for (Long i = 1L; i <= 10; i++) {
    HealthRecord healthRecord = createHealthRecord(i);
    insertHealthRecord(healthRecord, connection);

    HealthTask healthTask = createHealthTask(i, healthRecord);
    insertHealthTask(healthTask, connection);

    result.add(healthRecord.getRecordId());
}
connection.commit();
} catch (final SQLException ex) {
    connection.rollback();
    throw ex;
}

return connection;
});

return result;
}

可以看到在执行插入操作之前我们关闭了 Connection 的自动提交功能在 SQL 执行完毕之后手动通过 Connection commit 方法执行事务提交一旦在 SQL 的执行过程中出现任何异常时就调用 Connection 的 rollback 方法回滚事务

这里有必要介绍执行数据插入的具体实现过程我们以 insertHealthRecord 方法为例进行展开

private void insertHealthRecord(HealthRecord healthRecord, Connection connection) throws SQLException {
    try (PreparedStatement preparedStatement = connection.prepareStatement(sql_health_record_insert, Statement.RETURN_GENERATED_KEYS)) {
        preparedStatement.setLong(1, healthRecord.getUserId());
        preparedStatement.setLong(2, healthRecord.getLevelId() % 5 );
        preparedStatement.setString(3, "Remark" + healthRecord.getUserId());
        preparedStatement.executeUpdate();

        try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
            if (resultSet.next()) {
                healthRecord.setRecordId(resultSet.getLong(1));
            }
        }
    }
}

首先通过 Connection 对象构建一个 PreparedStatement请注意由于我们需要通过 ShardingSphere 的主键自动生成机制所以在创建 PreparedStatement 时需要进行特殊地设置

connection.prepareStatement(sql_health_record_insert, Statement.RETURN_GENERATED_KEYS)

通过这种方式在 PreparedStatement 完成 SQL 执行之后我们就可以获取自动生成的主键值

try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
    if (resultSet.next()) {
        healthRecord.setRecordId(resultSet.getLong(1));
    }
}
当获取这个主键值之后就将这个主键值设置回 HealthRecord这是使用自动生成主键的常见做法

最后我们在事务方法的入口处需要设置 TransactionType

@Override
public void processWithXA() throws SQLException {
    TransactionTypeHolder.set(TransactionType.XA);

    insertHealthRecords();
}

现在让我们执行这个 processWithXA 方法看看数据是否已经按照分库的配置写入到目标数据库表中下面是 ds0 中的 health_record 表和 health_task 表

![Drawing 2.png](../images/Ciqc1F8MB4yADpvNAAAn7gHRWyw024.png)

ds0 中的 health_record 表

![Drawing 3.png](../images/Ciqc1F8MCEuAUA1NAAAuoAPD9w4209.png)

ds0 中的 health_task 表

下面则是 ds1 中的 health_record 表和 health_task 表 ![Drawing 4.png](../images/CgqCHl8MB6SAOFIhAAAoGKuCLOw688.png)

ds1 中的 health_record 表

![Drawing 5.png](../images/Ciqc1F8MCFiAH4szAAAvGNmTj1Y923.png)

ds1 中的 health_task 表

我们也可以通过控制台日志来跟踪具体的 SQL 执行过程

2020-06-01 20:11:52.043 INFO 10720 --- [ main] ShardingSphere-SQL : Rule Type: sharding
2020-06-01 20:11:52.043 INFO 10720 --- [ main] ShardingSphere-SQL : Logic SQL: INSERT INTO health_record (user_id, level_id, remark) VALUES (?, ?, ?)

2020-06-01 20:11:52.043 INFO 10720 --- [ main] ShardingSphere-SQL : Actual SQL: ds1 ::: INSERT INTO health_record (user_id, level_id, remark, record_id) VALUES (?, ?, ?, ?) ::: [1, 1, Remark1, 474308304135393280]

Now, let's simulate a scenario where a transaction fails. We can intentionally throw an exception during the execution of the code to achieve this:

try { for (Long i = 1L; i <= 10; i++) { HealthRecord healthRecord = createHealthRecord(i); insertHealthRecord(healthRecord, connection);

    HealthTask healthTask = createHealthTask(i, healthRecord);
    insertHealthTask(healthTask, connection);

    result.add(healthRecord.getRecordId());

    // Manually throw an exception
    throw new SQLException("Database execution error!");
}
connection.commit();

} catch (final SQLException ex) { connection.rollback(); throw ex; }


Executing the `processWithXA` method again, based on the `rollback` method provided by the `connection`, we find that the partially executed SQL statements have not been committed to any database.

### Using BASE Transaction

Compared to XA transactions, integrating BASE transactions into the business code is relatively more complex because we need to use an external framework to accomplish this. Here, we will demonstrate how to use BASE transactions based on the Seata framework provided by Alibaba.

#### Development Environment Setup

Similarly, to use BASE transactions based on Seata, we first need to add the following dependencies to the pom file: 

Since we are using the Seata framework, we also need to include the relevant components of the Seata framework:

<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-rm-datasource</artifactId>
</dependency> 
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-tm</artifactId>
</dependency> 
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-codec-all</artifactId>
</dependency>

Then, we download and start the Seata server. During this process, the registry.conf file in the config directory of the Seata server needs to be set to specify the registry center. Here, we use ZooKeeper as the registry center. You can refer to the official documentation of Seata for the process of starting the Seata server. Note that, according to the operational requirements of Seata, we need to create an undo_log table in each shard database instance. Lastly, we need to add a seata.conf configuration file in the classpath of the code project:

client {
    application.id = health-base
    transaction.service.group = health-base-group
}

Now, the file organization in the src/main/resources directory should be as follows:

Drawing 6.png

Of course, here we will continue to use the sharding configuration introduced earlier.

Implementing BASE Transactions #

Based on the distributed transaction abstraction provided by ShardingSphere, the only thing we need to do to switch from XA transactions to BASE transactions is to reset the TransactionType, which is to modify one line of code:

@Override
public void processWithBASE() throws SQLException {
    TransactionTypeHolder.set(TransactionType.BASE);

    insertHealthRecords();
}

Upon re-executing the test case, we find that, in both cases of normal submission and exception rollback, the distributed transactions based on Seata still work effectively.

Summary #

Distributed transaction is a core feature provided by ShardingSphere and is an essential topic to consider in data processing in a distributed environment. ShardingSphere provides two implementation approaches for distributed transactions: XA transactions based on strong consistency and BASE transactions based on eventual consistency. Today, we have provided a detailed introduction to the usage of these two transaction types based on examples.

Here’s a question for you to consider: When using ShardingSphere, what are the development approaches for incorporating distributed transactions into business code?

That concludes the content of this lesson. In the next lesson, we will introduce a topic related to data access security provided by ShardingSphere, which is secure access to sensitive data through data desensitization.