26 Read Write Separation Common Master Slave Architecture and Shard Master Slave Architecture How Are They Implemented

26 Read-Write Separation Common Master-Slave Architecture and Shard Master-Slave Architecture How are they Implemented #

In the lesson “17 | Routing Engine: How to Understand the Operation Mechanism of ShardingRouter in ShardingSphere?” we introduced the routing engine of ShardingSphere, where we mentioned the class ShardingMasterSlaveRouter, which is used for read/write separation of sharding information.

Today, we will focus on this topic and see how ShardingSphere implements read/write separation routing in a master-slave architecture.

ShardingMasterSlaveRouter #

Let’s take a look at the ShardingMasterSlaveRouter class. In terms of its effect, read/write separation is actually a routing strategy, so this class is also located under the sharding-core-route project.

The entry function route of ShardingMasterSlaveRouter is as follows:

public SQLRouteResult route(final SQLRouteResult sqlRouteResult) {
    for (MasterSlaveRule each : masterSlaveRules) {
        // Execute routing method based on each MasterSlaveRule
        route(each, sqlRouteResult);
    }
    return sqlRouteResult;
}

Here, it introduces a rule class called MasterSlaveRule, and for each MasterSlaveRule, it executes an independent route method, and finally returns a combined SQLRouteResult.

The route method is as follows:

private void route(final MasterSlaveRule masterSlaveRule, final SQLRouteResult sqlRouteResult) {
    Collection<RoutingUnit> toBeRemoved = new LinkedList<>();
    Collection<RoutingUnit> toBeAdded = new LinkedList<>();
    for (RoutingUnit each : sqlRouteResult.getRoutingResult().getRoutingUnits()) {
        if (!masterSlaveRule.getName().equalsIgnoreCase(each.getDataSourceName())) {
            continue;
        }
        toBeRemoved.add(each);
        String actualDataSourceName;
        // Determine whether to use the master database
        if (isMasterRoute(sqlRouteResult.getSqlStatementContext().getSqlStatement())) {
            MasterVisitedManager.setMasterVisited();
            actualDataSourceName = masterSlaveRule.getMasterDataSourceName();
        } else { // If there are multiple slave databases, the default is to use polling strategy, or you can choose random access strategy
            actualDataSourceName = masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(
                masterSlaveRule.getName(), 
                masterSlaveRule.getMasterDataSourceName(), 
                new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames())
            );
        }
        toBeAdded.add(createNewRoutingUnit(actualDataSourceName, each));
    }
    sqlRouteResult.getRoutingResult().getRoutingUnits().removeAll(toBeRemoved);
    sqlRouteResult.getRoutingResult().getRoutingUnits().addAll(toBeAdded);
}

In read/write separation scenarios, because it involves adjustments to routing information, this code segment constructs two temporary variables, toBeRemoved and toBeAdded, which are used to store the RoutingUnits that need to be removed and added.

Then, we calculate the actual database name that needs to be accessed, actualDataSourceName, and here we need to determine whether to use the master database. Please note that in the current 4.X version, ShardingSphere only supports applications with a single master database, while there can be multiple slave databases.

The method isMasterRoute to determine whether it is a master library is as follows:

private boolean isMasterRoute(final SQLStatement sqlStatement) {
    return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();
}

As you can see, there are four conditions here, and if any of them are met, the routing will be determined to use the master database. The first two conditions are relatively easy to understand, and the MasterVisitedManager is actually a thread-safe container that contains information about whether the thread access involves the master database.

Based on our understanding of the Hint concept and the forced routing mechanism in the lesson “08 | Read/Write Separation: How to Integrate Sharding with Database Master-Slave Architecture?”, the HintManager is the implementation class of the database hint access mechanism in ShardingSphere, which can be used to force access the master database or route non-query operations to the master database.

If it does not use the master database for routing, the process will move on to slave database routing. If there are multiple slave databases, a certain strategy needs to be used to determine a specific slave database. ShardingSphere provides a MasterSlaveLoadBalanceAlgorithm interface to complete the selection of slave databases. Please note that this interface is located in the sharding-core-api project and is defined as follows:

public interface MasterSlaveLoadBalanceAlgorithm extends TypeBasedSPI {
    // Select a slave database from the list of slave databases for routing
    String getDataSource(String name, String masterDataSourceName, List<String> slaveDataSourceNames);
}
}

You can see that the MasterSlaveLoadBalanceAlgorithm interface inherits the TypeBasedSPI interface, indicating that it is a SPI. It has a MasterDataSourceName and a batch of SlaveDataSourceName in its parameters, and finally returns a SlaveDataSourceName.

ShardingSphere provides two implementation classes of MasterSlaveLoadBalanceAlgorithm, one is RandomMasterSlaveLoadBalanceAlgorithm that supports random algorithm, and the other is RoundRobinMasterSlaveLoadBalanceAlgorithm that supports round-robin algorithm.

We found the corresponding ServiceLoader class MasterSlaveLoadBalanceAlgorithmServiceLoader in the sharding-core-common project, and the specific implementation classes of MasterSlaveLoadBalanceAlgorithm are obtained in the MasterSlaveRule.

Please note that in daily development, we can actually run the load balancing strategy without setting this load balancing algorithm through the configuration system.

The createMasterSlaveLoadBalanceAlgorithm method in the MasterSlaveRule gives the answer:

private MasterSlaveLoadBalanceAlgorithm createMasterSlaveLoadBalanceAlgorithm(final LoadBalanceStrategyConfiguration loadBalanceStrategyConfiguration) {
    // Get the MasterSlaveLoadBalanceAlgorithmServiceLoader
    MasterSlaveLoadBalanceAlgorithmServiceLoader serviceLoader = new MasterSlaveLoadBalanceAlgorithmServiceLoader();
    // Dynamically load the load balancing algorithm implementation class according to the configuration
    return null == loadBalanceStrategyConfiguration
            ? serviceLoader.newService() : serviceLoader.newService(loadBalanceStrategyConfiguration.getType(), loadBalanceStrategyConfiguration.getProperties());
}

As you can see, when the loadBalanceStrategyConfiguration configuration does not exist, the SPI instance is created directly using the serviceLoader.newService() method. If we review the “13 | Microkernel Architecture: How does ShardingSphere achieve system scalability?” section, we will know that this method will get the first available SPI instance in the system.

We also found the SPI configuration information in the sharding-core-common project, as shown below:

1.png

SPI configuration for MasterSlaveLoadBalanceAlgorithm

According to the configuration information here, the first obtained SPI instance should be RoundRobinMasterSlaveLoadBalanceAlgorithm, which is the round-robin strategy. Its getDataSource method is implemented as follows:

@Override
public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) {
    AtomicInteger count = COUNTS.containsKey(name) ? COUNTS.get(name) : new AtomicInteger(0);
    COUNTS.putIfAbsent(name, count);
    count.compareAndSet(slaveDataSourceNames.size(), 0);
    return slaveDataSourceNames.get(Math.abs(count.getAndIncrement()) % slaveDataSourceNames.size());
}

Of course, we can also choose random access strategy through configuration. The getDataSource method of RandomMasterSlaveLoadBalanceAlgorithm is simpler, as shown below:

@Override
public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) {
    return slaveDataSourceNames.get(ThreadLocalRandom.current().nextInt(slaveDataSourceNames.size()));
}

So far, the introduction of ShardingMasterSlaveRouter is over. Through this class, we can complete the master-slave routing of sharding information, and thus achieve read-write separation.

In ShardingSphere, there is also a MasterSlaveRouter class that does not contain sharding information. The implementation process of this class is very similar to that of ShardingMasterSlaveRouter. Let’s take a look together.

MasterSlaveRouter #

From the naming, the ShardingMasterSlaveRouter class is used to complete the master-slave routing under sharding conditions. From the previous introduction, we know that this class is mainly used in the routing engine, that is, adding another layer of read-write separation routing mechanism on top of the ordinary ShardingRouter. This can be imagined as a more low-level read-write separation mechanism, where we only make adjustments to the target database in the routing process.

Next, let’s discuss read-write separation from another perspective. Our idea is to control the entire read-write separation process from a higher level. Let’s go to the sharding-jdbc-core project, where we have discussed the ShardingDataSourceFactory class before. This time our target is MasterSlaveDataSourceFactory, and the role of this factory class is to create a MasterSlaveDataSource, as shown below:

public final class MasterSlaveDataSourceFactory {

    public static DataSource createDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final Properties props) throws SQLException {
        return new MasterSlaveDataSource(dataSourceMap, new MasterSlaveRule(masterSlaveRuleConfig), props);
    }
}

The definition of MasterSlaveDataSource is as follows, and it can be seen that this class also extends the AbstractDataSourceAdapter class. We have discussed various adapter classes for AbstractDataSourceAdapter, Connection, and Statement in detail in “03 | Specification Compatibility: What is the relationship between JDBC specifications and ShardingSphere?”, so we will not go into detail here.

public class MasterSlaveDataSource extends AbstractDataSourceAdapter {

    private final MasterSlaveRuntimeContext runtimeContext;

    public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRule masterSlaveRule, final Properties props) throws SQLException {
        super(dataSourceMap);
        runtimeContext = new MasterSlaveRuntimeContext(dataSourceMap, masterSlaveRule, props, getDatabaseType());
    }

    @Override
    public final MasterSlaveConnection getConnection() {
        return new MasterSlaveConnection(getDataSourceMap(), runtimeContext);
    }
}

Similar to other data sources, the MasterSlaveDataSource is responsible for creating the RuntimeContext and Connection objects. Let’s first look at the MasterSlaveRuntimeContext. Compared to ShardingRuntimeContext, this class is simpler and only builds and caches the required DatabaseMetaData.

Next, let’s take a look at the MasterSlaveConnection. Like other Connection classes, it also has a set of createStatement and prepareStatement methods for obtaining Statement and PreparedStatement, respectively, which correspond to MasterSlaveStatement and MasterSlavePreparedStatement.

Let’s start with the implementation of executeQuery in MasterSlaveStatement:

@Override
public ResultSet executeQuery(final String sql) throws SQLException {
    if (Strings.isNullOrEmpty(sql)) {
        throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
    }
    // Clear relevant variables in StatementExecutor
    clearPrevious();
    // Get the target DataSource through MasterSlaveRouter
    Collection<String> dataSourceNames = masterSlaveRouter.route(sql, false);
    Preconditions.checkState(1 == dataSourceNames.size(), "Cannot support executeQuery for DML or DDL");
    // Get the Statement from the Connection
    Statement statement = connection.getConnection(dataSourceNames.iterator().next()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
    routedStatements.add(statement);
    // Execute the query and return the result
    return statement.executeQuery(sql);
}

Unlike ShardingStatement, the above method does not obtain the target dataSourceNames through sharding routing, but directly through the MasterSlaveRouter. Also, notice that instead of using ShardingSphere’s execution and merging engines to execute the SQL and merge results, it directly calls the executeQuery method of the statement to execute the SQL. Clearly, this core step is implemented by the routing mechanism of MasterSlaveRouter.

The route method of MasterSlaveRouter is as follows:

private Collection<String> route(final SQLStatement sqlStatement) {
    // If it is forced to route to the master database
    if (isMasterRoute(sqlStatement)) {
        MasterVisitedManager.setMasterVisited();
        return Collections.singletonList(masterSlaveRule.getMasterDataSourceName());
    }
    // Route to the slave database through load balancing
    return Collections.singletonList(masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(
            masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames())));
}

The above code is familiar. We have already covered the relevant processing flow and LoadBalanceAlgorithm behind it when we introduced the ShardingMasterSlaveRouter class. We can obtain a DataSource name from any one of the target databases in dataSourceNames, and then build a Connection and create a Statement for query execution.

Next, let’s look at the executeUpdate method in MasterSlaveStatement:

@Override
public int executeUpdate(final String sql) throws SQLException {
    // Clear relevant variables in StatementExecutor
    clearPrevious();
    int result = 0;
    for (String each : masterSlaveRouter.route(sql, false)) {
        // Get the Statement from the Connection
        Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
        routedStatements.add(statement);
        // Execute the update
        result += statement.executeUpdate(sql);
    }
    return result;
}

The process here is to directly obtain the target databases from the masterSlaveRouter, and then create a Statement for execution for each database.

Similarly, let’s take a look at one of the constructors of MasterSlavePreparedStatement (the others are similar):

public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
    if (Strings.isNullOrEmpty(sql)) {
        throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
    }
    this.connection = connection;
    // Create MasterSlaveRouter
    masterSlaveRouter = new MasterSlaveRouter(connection.getRuntimeContext().getRule(), connection.getRuntimeContext().getParseEngine(), 
            connection.getRuntimeContext().getProps().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW));
    for (String each : masterSlaveRouter.route(sql, true)) {
        // For each target DataSource, get the PreparedStatement from the Connection
        PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
        routedStatements.add(preparedStatement);
    }
}

As we can see, the MasterSlaveRouter is created here, and then for each database obtained through the MasterSlaveRouter, a PreparedStatement is created and saved in the routedStatements list.

Next, let’s look at the executeQuery method in MasterSlavePreparedStatement:

@Override
public ResultSet executeQuery() throws SQLException {
    Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support executeQuery for DDL");
    return routedStatements.iterator().next().executeQuery();
}

For the executeQuery method mentioned above, we only need to execute any one of the prepared statements in routedStatements. For update operations, the execution process of MasterSlavePreparedStatement is the same as that of MasterSlaveStatement:

@Override
public int executeUpdate() throws SQLException {
    int result = 0;
    for (PreparedStatement each : routedStatements) {
        result += each.executeUpdate();
    }
    return result;
}

So far, we have completed the introduction to the core classes and main processes related to read-write splitting in ShardingSphere. Overall, this part of the code is relatively direct and clear because it does not involve sharding operations. In particular, after understanding the ShardingDataSource, ShardingConnection, ShardingStatement, and ShardingPreparedStatement related to sharding, it becomes particularly easy to understand today’s content. Many underlying concepts such as the adapter pattern have already been introduced.

In summary, let’s briefly summarize the class hierarchy related to read-write splitting:

image.png

From Source Code Analysis to Daily Development #

In today’s content, we encountered a very common topic in the development of distributed systems, which is Load Balancing. The load balancing scenario is similar to routing to one of multiple slave databases, which usually relies on a load balancing algorithm. ShardingSphere provides two common implementations: Random and Round Robin, which we can refer to in our daily development.

Of course, because the MasterSlaveLoadBalanceAlgorithm interface is an SPI, we can also customize new load balancing algorithms and dynamically load them into ShardingSphere.

Conclusion and Preview #

Read-write splitting is the last part of the sharding engine in ShardingSphere. In the actual application process, we can embed the read-write splitting mechanism under the sharding engine or use this feature separately.

Therefore, in terms of implementation, ShardingSphere also provides two different implementation classes: one is the ShardingMasterSlaveRouter for the sharding environment, and the other is the MasterSlaveRouter for independent use. We have analyzed and explained the principles of these two implementation classes in detail.

Finally, here is a question for you to think about: how does ShardingSphere integrate the read-write splitting engine with the load balancing algorithm?

Starting from the next lesson, we will move on to the source code analysis of another core module in ShardingSphere, which is distributed transactions.