23 Database Middleware the Transition From Traditional Databases to Distributed Databases

23 Database Middleware - The Transition from Traditional Databases to Distributed Databases #

In the previous lesson, we discussed the attempts to transform traditional single-node databases into distributed databases. In today’s lesson, let’s talk about the helper for constructing traditional databases into distributed databases, which is also an important part of the evolution of distributed databases: database middleware. Middleware mentioned here generally refers to a database middle layer that has sharding functionality.

Relational databases themselves are relatively easy to become performance bottlenecks in a system. The storage capacity, connection count, processing power, and other aspects are all very limited. The “statefulness” of the database itself makes it not as easy to scale as web and application servers. Under the challenge of massive data and high-concurrency access in the Internet industry, application service technicians proposed sharding technology (also known as sharding and database sharding). At the same time, popular distributed system databases, especially the distributed databases we introduced in the previous lesson, which transitioned from traditional databases, inherently support sharding in a friendly manner. The principles and ideas are mostly the same.

Successful database middleware not only supports sharding but also requires support for features such as globally unique primary keys, cross-shard queries, and distributed transactions to ensure data availability in sharding scenarios. Now I will introduce these technologies to you one by one.

Globally Unique Primary Keys #

In a single-node database, we often directly use the auto-increment feature of the database to generate primary key IDs, which is indeed relatively simple. However, in the environment of sharding and database sharding, data is distributed across different shards, and we can no longer rely on the auto-increment feature of the database to generate IDs directly, otherwise, it will cause duplicate primary keys in different shards.

Now let me briefly introduce several ID generation algorithms that I have used and familiar with:

  1. Twitter’s Snowflake (also known as the “Snowflake algorithm”)
  2. UUID/GUID (generally supported by applications and databases)
  3. MongoDB ObjectID (similar to UUID)

Among them, Twitter’s Snowflake algorithm is the one I have used the most in distributed system projects in recent years, and I have not encountered any duplicate or concurrency problems. The ID generated by this algorithm is a 64-bit unique ID (composed of 41 bits of timestamp + 10 bits of custom machine code + 13 bits of incremental counter). I have discussed the details of ShardingSphere implementing Snowflake in “03 | Data Sharding: How to Store Massive Scale Data”. You can review it again.

With globally unique primary keys solved, we can start sharding the data. Now let me introduce commonly used sharding strategies.

Sharding Strategies #

The sharding modes I introduced are: range sharding and hash sharding.

When we need to use the sharding field for range queries, range sharding can quickly locate the shard for efficient querying. In most cases, it can effectively avoid the problem of cross-shard queries. When we want to scale the entire sharding cluster later, we only need to add nodes without the need to migrate data from other shards.

However, range sharding may also have the problem of data hotspots. Some nodes may be queried frequently and face greater pressure, making them the bottleneck of the entire cluster. And some nodes may store historical data that is rarely queried.

For hash sharding, we use the modulo of a hash function for sharding and splitting. The data in hash sharding is relatively evenly distributed, making it less likely to have hotspots and bottlenecks for concurrent access.

However, scaling the sharding cluster later requires migrating old data. Using consistent hashing algorithm can largely avoid this problem, so many middleware sharding clusters adopt consistent hashing algorithm. Discrete sharding also easily faces the complex issue of cross-shard queries.

Few projects will consider sharding design at the initial stage. It is generally prepared when the business is rapidly developing and facing performance and storage bottlenecks. Therefore, inevitably, the issue of migrating historical data needs to be considered. The usual practice is to first read out the historical data through the program, and then write the data into each shard node according to the specified sharding rules. We previously introduced ShardingSphere’s elastic scalability, which is a powerful tool to solve this problem. In addition, we need to do capacity planning based on the current data volume and QPS, and take into account various cost factors to estimate the number of shards needed (it is generally recommended not to exceed 10 million records per shard).

When data is distributed across different databases and tables, querying across multiple shards will inevitably bring some challenges. Below, I will introduce several strategies for querying across shards.

Cross-Shard Query #

Middleware cross-shard query essentially shifts the data aggregation process from the database layer to the middleware layer. The following solutions are based on storage engine level principles.

Pagination Query #

In general, when paging, we need to sort by a specified field. When the sorting field is the sharding field, we can easily locate the specified shard using the sharding rules. However, when the sorting field is not the sharding field, the situation becomes more complicated. In order to ensure the accuracy of the final result, we need to sort the data in different shard nodes and return the results, and then aggregate and sort the result sets returned by different shards before returning them to the user.

In a distributed scenario, the query “LIMIT 10000000, 10” needs to be rewritten as “LIMIT 0, 10000010” in order to ensure the correctness of the data. Why is that? You can think about it carefully. The result is that this pattern will load a large amount of unnecessary data into memory, putting great pressure on the memory. The general approach to solve this issue is to avoid using the LIMIT keyword and use the following pattern directly:

SELECT * FROM t_order WHERE id > 100000 AND id <= 100010 ORDER BY id;

When paging, you can reduce the amount of data loaded by keeping track of the position of the last record in the previous page.

Aggregation Functions #

When using functions like Max, Min, Sum, Count, and Avg for statistics and calculations, the corresponding function needs to be executed on each shard data source, and then the result sets are processed for a second time before returning the final result. Note that the implementation of the Avg function is special, as it requires the cooperation of the Sum and Count functions.

Cross-Shard Join

Join is the most commonly used feature in relational databases, but in sharded clusters, Join becomes very complex, and we should try to avoid cross-shard Join queries (this scenario is more complex than cross-shard pagination and has a significant impact on performance).

There are usually two ways to optimize cross-shard Join:

  1. Global tables: The basic idea of a global table is to put information about tables that are similar to data dictionaries and may require Join queries into each shard, thereby avoiding cross-shard Join.
  2. ER sharding: In relational databases, tables often have some related relationships. If we can determine the relationship in advance and store the records of tables with related relationships on the same shard, we can effectively avoid cross-shard Join problems. In the case of one-to-many relationships, we usually choose to split according to the side with more data.

These are some details of how distributed middleware implements cross-shard queries. Next, I will introduce the biggest challenge faced by middleware - distributed transactions.

Distributed Transaction #

The challenges faced by distributed transactions are similar to those faced by traditional databases in the development of distributed databases discussed in the previous lecture. The middleware can only interact with database nodes and cannot affect the underlying data structure. Therefore, the problems can only be solved at a higher level. Hence, the various solutions to be introduced below all have their own shortcomings.

Client One-Phase #

This is a transaction scheme initiated by the client, which removes the Prepare process in the two-phase commit. A typical implementation is: in a business thread, iterate over all the database connections and sequentially perform Commit or Rollback. This scheme assumes that the underlying database transaction performs “forwarding detection” (module 2 transaction), which means that conflicts can be detected in the SQL execution stage. The probability of failure when committing on the client side is very low, so it can be inferred that the overall probability of transaction failure is very low. This scheme was adopted by the early stage of the Yuewen Group and was able to achieve an SLA of 99.99%.

Compared to the other schemes introduced below, this scheme has low performance overhead. However, during the execution of the transaction submission, if there are unexpected exceptions such as network failure or database downtime, it will result in data inconsistency and cannot be rolled back.

XA Two-Phase #

The two-phase commit is the standard implementation of XA. Let’s review the two-phase commit. It splits the submission of distributed transactions into two phases: Prepare and Commit/Rollback.

After starting a global XA transaction, all sub-transactions will lock resources according to the local default isolation level, and record undo and redo logs, and then the TM initiates a Prepare vote to ask all sub-transactions if they can be committed. When the results of all sub-transactions are “Yes”, TM initiates Commit; if any sub-transaction’s result is “No”, TM initiates Rollback. If the feedback result during the Prepare phase is “Yes”, and a failure occurs during Commit due to server crash or other exceptions, after the node service restarts, Commit compensation can be performed again using XA Recover to ensure data consistency.

In the 2PC model, it is necessary to wait for the feedback from all participating sub-transactions during the Prepare phase, which may cause the database resource lock time to be too long, which is not suitable for business scenarios with high concurrency and long sub-transaction lifecycles.

ShardingSphere supports XA-based strong consistency transaction solution, which can inject different third-party components as transaction managers to implement the XA protocol, such as Atomikos.

Best-Effort Delivery #

Best-effort delivery is a compensating strategy for the client one-phase. It uses a transaction table to record all transactional SQL operations. If a sub-transaction is successfully committed, the transaction log will be deleted. If it fails, it will attempt to submit again according to the configured retry times, that is, it will make the best effort to submit and try to ensure data consistency. Here, C and A can be balanced according to different business scenarios by using synchronous or asynchronous retries. This is similar to the retry approach in the Percolator transaction implemented in TiDB.

The advantage of this strategy is that there is no resource lock time and it has low performance overhead. The disadvantage is that after multiple attempts to submit fail, it cannot be rolled back. It is only suitable for business scenarios where the transaction will definitely succeed in the end. Therefore, best-effort delivery is a compromise on the transaction rollback functionality in exchange for performance improvement.

TCC #

The TCC model completely delegates the granularity of locks to the business logic. Each sub-transaction business needs to implement the Try-Confirm/Cancel interface.

  • Try: Attempts to execute the business logic. It completes all business checks and reserves necessary business resources.
  • Confirm: Confirms the business execution. It actually executes the business logic without any business checks. It only uses the business resources reserved during the Try phase. The Confirm operation satisfies idempotence.
  • Cancel: Cancels the business execution. It releases the business resources reserved during the Try phase. The Cancel operation satisfies idempotence.

All three phases are executed in a local transaction manner. Unlike XA Prepare, TCC does not need to suspend all resources during the XA voting period, thus greatly improving throughput. However, the disadvantage is that Cancel operation needs to be implemented, which not only brings a lot of trouble to the implementation but also some operations cannot be canceled.

Saga #

Saga originated from the paper “Sagas” published by Hector & Kenneth in 1987.

The Saga model splits a distributed transaction into multiple local transactions, each with its corresponding execution module and compensation module (Confirm and Cancel in TCC). When any local transaction in the Saga transaction fails, the transaction can be restored by calling the corresponding compensation method to achieve eventual transaction consistency.

The difference between Saga and TCC is that Saga operates at the level of database transactions, while TCC operates at the level of services.

When each Saga sub-transaction “T1, T2, …, Tn” has a corresponding compensation definition “C1, C2, …, Cn-1”, the Saga system can ensure that the sub-transaction sequence “T1, T2, …, Tn” is completed (best case) or the sequence “T1, T2, …, Tj, Cj, …, C2, C1” is completed, which means cancelling all transaction operations.

Since the Saga model does not have a Prepare phase, it cannot guarantee isolation between transactions. When multiple Saga transactions operate on the same resource, problems such as update loss and dirty data reading may occur. In this case, concurrency needs to be controlled at the business layer, for example, by locking at the application layer or pre-freezing resources at the application layer.

Saga supports backward and forward recovery.

  • Backward recovery: if any sub-transaction fails, compensate for all completed transactions.
  • Forward recovery: assuming each sub-transaction will eventually succeed, retry failed transactions.

Obviously, there is no need to provide compensation transactions for forward recovery. If in your business, sub-transactions will always succeed or compensation transactions are difficult to define or impossible, forward recovery will be more suitable for your needs. In theory, compensation transactions never fail. However, in the distributed world, servers may crash, networks may fail, and even data centers may experience power outages. In such cases, a mechanism for rollback after failure recovery, such as manual intervention, is needed.

In summary, TCC handles distributed transactions based on the application service layer, while XA, Bed, and Saga handle distributed processing based on the database layer. Therefore, middleware generally tends to use the latter to achieve finer-grained control.

Transition of Distributed Transactions in Apache ShardingSphere #

Before version 3.0, ShardingSphere implemented client-one-phase (weak XA), best-effort and TCC. Among them, best-effort requires asynchronous execution of scheduled tasks. As the default implementation mode, weak XA balances practicability and implementation difficulty, but it will lead to inconsistency issues in the scenario described by the distributed failure model.

After version 3.0, the team clarified the transaction model and implemented XA two-phase and Saga. These two types of transactions are database-oriented and have complete theoretical support, which is more in line with the design style of modern distributed databases. At the same time, the transaction module also supports SPI like other modules, that is, third-party transaction models can be implemented. The JD Transaction (JDTX) engine of JD.com is integrated into ShardingSphere through SPI. In the next lecture, I will introduce the relevant content of JDTX.

Summary #

In this lecture, we discussed several techniques for implementing database middleware, including global unique primary keys, sharding strategies, and cross-shard queries. The most important aspect is distributed transactions.

Different from distributed databases, middleware’s distributed transactions have many characteristics of application services, such as client-one-phase and TCC. They are more inclined towards the service layer, revealing that most of the middleware is the result of development and iteration by application research and development or application architecture teams. And as middleware evolves, it inevitably evolves towards distributed data, such as Alibaba Cloud’s DRDS and PolarDB-X, which have evolved from the middleware TDDL.

Database middleware is a transitional product. With the development of technology in recent years, more and more native NewSQL databases have emerged. In the next lecture, I will introduce several typical NewSQL databases and see what characteristics they have.