14 Skip List Index a New Start With Click House

14 Skip List Index A New Start with ClickHouse #

Hello, I am Xu Changlong.

Through the previous lessons, we have witnessed the powerful functionality of Elasticsearch. However, when it comes to technology selection, price is also an important factor. Although Elasticsearch is convenient to use, it consumes a large amount of hardware resources. Even for wealthy companies, they may feel a little heartache when they see their monthly server bills.

On the other hand, ClickHouse is a new generation OLAP database that has tried many interesting implementations. Although it still has many deficiencies, such as lack of support for data updates, poor dynamic indexing, high query optimization difficulty, and the need for manual design in distributed scenarios. However, because of its simple architecture and relatively low cost, ClickHouse has gradually gained recognition from many teams. Many internet companies have joined the community to continuously improve ClickHouse.

ClickHouse belongs to the columnar storage database and is mostly used in scenarios with a high write-to-read ratio. It provides a flexible distributed storage engine as well as various modes such as sharding and clustering, which allows us to select according to our needs when building.

In this lesson, I will reintroduce ClickHouse from the perspectives of writing, sharding, indexing, and query implementation. During the learning process, I recommend comparing the specific implementations of Elasticsearch, MySQL, and RocksDB, and think about their advantages, disadvantages, and suitable scenarios. I believe that through comparison, you will gain more insights.

CPU Throughput and Performance of Parallelism Capability #

Let me first talk about the most unexpected aspect of using ClickHouse.

Let’s start with a familiar reference point - MySQL. MySQL can only utilize one CPU when processing an SQL request. However, ClickHouse takes full advantage of multiple cores to perform fast calculations on local massive data, resulting in higher data processing capability (2-30G/s, uncompressed data). But this also leads to its low concurrency, as a single request can consume all system resources.

When we first started using ClickHouse, we often encountered situations where a SQL query for several years of user behavior would freeze the entire ClickHouse and have no response for several minutes.

The official recommendation is to limit the Query Per Second (QPS) of ClickHouse to around 100. If our query is indexed well, several tens to hundreds of billions of data can be returned in 1 second. As a reference, if we were to use MySQL, this would take at least one minute or more. However, if the ClickHouse query is poorly designed, it could take half an hour or even freeze without completing the computation.

Therefore, if you use ClickHouse in a scenario that serves users, it is best to cache this kind of query. And during the loading of the interface, set a waiting time of more than 30 seconds, as our requests may be queued waiting for other queries.

If we have a large number of users, it is recommended to add more nodes to share the query calculation pressure through partitioning, replicas, and identical data subsets clusters. However, considering that in order to provide a query rate of 10,000 QPS, under extreme circumstances, it would require 100 ClickHouse storage nodes to store the same data. Therefore, it is recommended to push the data results into cache and serve them externally as much as possible.

However, if our cluster contains small data and we can guarantee that each query is controllable, ClickHouse can support query rates of tens of thousands QPS depending on the amount of time we invest in optimization and analysis.

For this, I recommend the following optimization approach: filter through range queries based on sorted fields, and then perform aggregate queries. Also, you need to be aware that services requiring high-concurrency data queries and slow queries need to be isolated from each other in order to provide better performance.

Having shared my user experience, let’s proceed step by step to analyze the characteristics of ClickHouse in terms of writing, storage, querying, etc. Only in this way can you have a more comprehensive and in-depth understanding of it.

Batch Write Optimization #

ClickHouse’s client driver is interesting. The client maintains multiple buffers for writing data. When we batch insert data, the client caches the data locally until it accumulates enough data with the configured block_size, and then it submits the data to the server in bulk, improving the write performance.

If we have high real-time requirements, we can set a smaller block_size, but the trade-off is reduced performance.

In order to optimize high-concurrency write services, in addition to the merge operations performed by the client, ClickHouse’s MergeTree engine also performs similar operations. Due to this optimization, the batch write performance of a single ClickHouse instance can reach 280M/s (subject to hardware performance and input data volume).

MergeTree uses batch writing to disk and periodic merging (batch write-merge). This design reminds us of RocksDB, which has extremely strong write performance. In fact, when ClickHouse was first released, it did not use memory for caching and wrote directly to disk.

In the past two years, ClickHouse has made updates and implemented memory caching and write-ahead logging (WAL). Therefore, if you use ClickHouse, it is recommended to use high-performance SSDs as the disk storage for writing.

In fact, OLAP has two different sources of data: one is from business systems, and the other is from big data.

Data from business systems usually have many attribute fields, but the update volume is not large. In this case, ClickHouse is often used to filter historical data and perform attribute-based calculations. On the other hand, data from big data usually has many columns, with each column representing different user behavior, and the data volume is generally large.

Since the data volume differs in these two cases, the optimization methods are naturally different as well. Let’s continue analyzing with the help of the following image:

Image

When the batch input data volume is smaller than the setting of min_bytes_for_wide_part, it will be flushed to disk using the compact part approach. This approach puts the flushed data into a data.bin file, and merging can achieve good write efficiency. This method is suitable for filtering small amounts of business data.

When the batch input data volume exceeds the configured size, it will be flushed to disk using the wide part approach. When flushing to disk, different files are generated for each field. This method is suitable for data with many fields. The merging process may be relatively slower, but it provides stronger parallel throughput for writing and computing, making it suitable for analyzing specific columns within a small range.

As we can see, these two approaches have specific optimizations for data storage and querying. The number of fields, the amount of data updated each time, and the number of columns involved in statistical queries all affect the efficiency of our service.

When most of our data is small, splitting one record into multiple columns may waste some disk IO. In this case, since we won’t allocate too many resources to it, it is recommended to use the compact parts approach. When our data has a large number of columns and we need to perform statistical analysis on a few of them, the wide parts approach for column storage has its advantages.

How to Improve Query Efficiency in ClickHouse #

As we can see, the storage and usage of data in a database are closely related to how we query it. However, the periodic disk writing operations, although they have good write performance, generate a large number of data part files, which can greatly affect query efficiency. So, how does ClickHouse improve query efficiency?

Let’s analyze it in more detail. Newly written parts data is stored in the data parts folder, and once the data is written, it will not be changed again.

Generally, the folder name format of a data part is partition_min_block_max_block_level. In order to improve query efficiency, ClickHouse regularly performs merge operations on data parts.

Image

As shown in the above figure, the merge operation is performed in layers, reducing the number of folders to scan, and organizing, deleting, and merging the data. You also need to note that different partitions cannot be merged, so if we want to improve the write performance of a table, it will be helpful to have more partitions.

If the amount of data being written is too large and the data is being written too quickly, the speed of folder generation will exceed the speed of background merge, and ClickHouse will report a “Too many part” error. After all, the number of data parts folders cannot increase indefinitely.

To address this issue, you can adjust the min_bytes_for_wide_part parameter or increase the number of partitions. If the amount of data being written is not large, you can consider generating more compact parts data, which can speed up the merge process.

In addition, because distributed ClickHouse tables are based on ZooKeeper for distributed scheduling, if the data in the table is written with high concurrent access, ZooKeeper may become a bottleneck. In such cases, it is recommended to upgrade ClickHouse to a new version that supports multiple sets of ZooKeeper. However, this also means that we need to invest more resources.

Sparse Index and Skip Index #

The query function of ClickHouse relies on index support. ClickHouse has two types of indexes: primary key index, which needs to be specified when creating a table, and skip list index, which is used to skip some data. Here I recommend using primary key index for our queries.

Primary Key Index #

Using a primary key index for ClickHouse tables can significantly improve query performance. This is because both the data and the index are stored in sorted order based on the primary key. Querying data using the primary key index can quickly process the data and return the results. ClickHouse belongs to the “left-prefix search” category. It quickly narrows down the data range using indexes and partitions, and then performs calculations by traversing the data. The traversal process is carried out in a multi-node and multi-CPU parallel processing manner.

So how does ClickHouse retrieve data? To understand this, let’s first take a look at the main components of the data parts folder, as shown in the diagram below:

Image

Based on the diagram, let’s examine the structure of the data parts folder in descending order.

Within the data parts folder, the bin file(s) store the data of one or more fields. Breaking down the bin file further, it contains multiple blocks, which are the smallest units of disk interaction. The size of a block depends on the setting of min_compress_block_size.

Let’s now take a closer look at the structure within a block. It contains multiple granules, which are the smallest units of data scanning. Each granule by default stores 8192 rows of data, with the first row being the primary key index data. The primary key index in the data parts folder stores all the sorted primary key index data, with the sorting order specified during table creation.

To speed up the query process, the primary key index (i.e. the sparse index) in the data parts is loaded into memory. To facilitate fast data lookup on disk, ClickHouse saves multiple mark files in the data parts folder, named after the field names. These files store the offset of compressed blocks in the bin file, as well as the offset of the granularity within the decompressed block. The overall query process is illustrated in the following diagram:

Image

The specific query process is as follows: We first use binary search to locate the primary key index in memory, and identify the specific mark file based on that index. Then, we use the mark to find the corresponding block and load it into memory. After that, we search for the specified granule within the block and start processing it until we find the desired data.

Please note that since ClickHouse allows multiple inserts for the same primary key, the query results may contain duplicate data for the same primary key. We need to manually deduplicate the results after the query.

Skip Index #

You may have noticed that besides the primary key, ClickHouse does not have any other indexes. This means that it is not possible to use indexes other than the primary key for query statistics, and a full table scan is required for calculation. However, databases often store billions of data points every day, so performing a full table scan would result in poor performance.

Therefore, in order to make a performance trade-off, ClickHouse adopts a reverse approach and designs skip indexes to reduce the waste of resources in traversing granules. Some common types of skip indexes in ClickHouse include:

  • min_max: assists in range queries for numeric fields, storing the current minimum and maximum values within a matrix.
  • set: lists all the enumerated values that appear in a field, with the option to specify the number of values to retrieve.
  • Bloom Filter: uses a Bloom Filter to confirm whether the data may exist in the current block.
  • func: supports various functions within the WHERE conditions. For more details, you can refer to the official documentation at here.

Skip indexes are stored in the data parts folder according to the aforementioned types and corresponding fields. Skip indexes do not reduce the data search range, but rather exclude granules that do not meet the filtering conditions. This helps to speed up query performance.

Now let’s take a step back and examine the overall query workflow in ClickHouse:

  1. Based on the query conditions, filter out the data parts folder range that needs to be read for the query.
  2. Use the primary key index in the data parts to filter out the granules to be queried.
  3. Use skip indexes to skip granules that do not meet the conditions.
  4. Perform calculations, summarization, statistics, filtering, and sorting on the data within the filtered range.
  5. Return the results. Let me explain further. Among the five steps mentioned above, only a few operations in the fourth step are performed in parallel, while the other processes are executed sequentially.

After using ClickHouse in practice, you will find it difficult to optimize index queries. It often performs full table scans, and you might wonder why.

The main reason is that most of our data lacks distinct characteristics, and the index created does not provide enough differentiation. Therefore, the data we write has low differentiation within each granule. As a result, the sparse index cannot exclude most of the granules, forcing ClickHouse to perform a full table scan for calculations.

On the other hand, due to the excessive number of directories, multiple copies of data are scattered across various data parts folders. ClickHouse needs to load the indexes of all data parts and query them one by one, consuming a significant amount of resources. These two factors make it difficult for ClickHouse to optimize queries. However, if our input data has distinct characteristics and we insert feature data in the same order as the feature sorting sequence, the performance may be slightly better.

Real-time Statistics #

As we mentioned earlier, ClickHouse often needs to perform a full table scan for statistics. This limitation also affects its metric analysis capabilities. To address this, the ClickHouse team has provided another engine. Let’s take a closer look at it.

Similar to the in-memory computing we discussed before, ClickHouse can use its own table as a data source and create a Materialized View table. This view table calculates real-time statistics by applying aggregate functions to the data from the data source. Every time we query this table, we obtain the predefined statistical results.

Let me show you a simple example to demonstrate how it is used:

-- Create the data source table
CREATE TABLE products_orders
(
    prod_id     UInt32 COMMENT 'Product ID',
    type        UInt16 COMMENT 'Product type',
    name        String COMMENT 'Product name',
    price       Decimal32(2) COMMENT 'Price'
) ENGINE = MergeTree()
ORDER BY (prod_id, type, name)
PARTITION BY prod_id;

-- Create the materialized view table
CREATE MATERIALIZED VIEW product_total
ENGINE = AggregatingMergeTree()
PARTITION BY prod_id
ORDER BY (prod_id, type, name)
AS
SELECT prod_id, type, name, sumState(price) AS price
FROM products_orders
GROUP BY prod_id, type, name;

-- Insert data
INSERT INTO products_orders VALUES 
(1,1,'Roller Coaster Toy', 20000),
(2,2,'Rocket',10000);

-- Query the result
SELECT prod_id,type,name,sumMerge(price)
FROM product_total
GROUP BY prod_id, type, name;

When the data source inserts data into the ClickHouse data source table, data parts are generated, triggering the view table. The view table performs batch aggregation on the inserted data according to the aggregate functions specified during its creation. Each batch of data generates a specific aggregated statistical result, which is then written to disk.

When we query statistical data, ClickHouse aggregates and summarizes this data again to retrieve the final results for external presentation. This approach allows for metric statistics and is quite unique, aligning well with the engine philosophy of ClickHouse.

Distributed Tables #

Finally, I would like to share an additional feature of ClickHouse. However, this feature is still immature, so let’s focus on what functionalities it supports.

Unlike Elasticsearch, ClickHouse’s distributed tables do not automatically handle sharding for us. Instead, we need to manually set up and create them. Although ClickHouse does provide syntax for automatic creation and management of distributed tables, I do not recommend using it at the moment. The resource allocation is still more inclined towards manual planning, and ClickHouse does not automatically handle this. Utilizing similar commands can result in creating 100 shards on 100 servers, which is quite wasteful.

When using distributed tables, we need to manually create sharded tables with the same structure on different servers, and then create a mapping for the distributed table on each server. This allows us to access the distributed table on each service.

In general, we understand sharding as storing multiple shards on a single server. However, ClickHouse operates differently. It specifies that only one shard of a table can exist on a single server.

There are generally two ways to insert data into ClickHouse’s distributed tables.

One way is to directly insert data into the distributed table. In this case, the data will first be stored locally and then asynchronously forwarded to the corresponding shard, achieving data distribution and storage.

The second way is for the client to push shard data to the corresponding server based on different rules, such as random or hash-based rules. This method generally provides better performance, but it requires the client to know the IP addresses of all shard nodes. Clearly, this approach is not conducive to failure recovery.

To achieve a better balance between high availability and performance, I recommend choosing the first method. However, since each shard stores a local copy for high availability and then synchronizes the data, it is resource-intensive. To address this, we recommend using a proxy service to forward requests, which helps with node changes and direct distribution issues.

Now let’s talk about master-slave sharding. ClickHouse’s tables can have replicas (master-slave synchronization) set according to the table. Replicas are implemented through the same path in ZooKeeper for distributed tables. This setting allows for various combinations of sharding and replication in ClickHouse, such as multiple subclusters within a cluster, multiple shards in a cluster as a whole, client-managed sharding for data writes, and distributed table proxies for data writes.

In simple terms, ClickHouse supports multi-tenant data services that allow for resource sharing managed by users. When scaling up servers, we need to manually modify the new shards added to the cluster, create distributed and local tables. Only then can data scaling be achieved. However, this kind of data scaling does not happen automatically and requires manual migration.

Summary #

ClickHouse, as the representative of OLAP databases, has many unique designs. It has brought a revolution to OLAP databases and has also sparked a lot of thinking among cloud providers to implement HTAP services based on its ideas.

After today’s explanation, I believe you also have a clear understanding of the key features of ClickHouse.

Let’s review: ClickHouse improves write concurrency by sharding and flushing data in sequential order within memory periods. It improves query efficiency by periodically merging data parts files in the background. In terms of indexing, ClickHouse uses sparse indexes to narrow down the range of retrieved data. For queries that are not in the primary key, it uses skip indexes to reduce the amount of data to be traversed. In addition, ClickHouse also has a design for parallel reading and filtering with multiple threads.

These features collectively enable ClickHouse to achieve high-throughput data retrieval.

As for the recent debate on whether Elasticsearch or ClickHouse is better, it is still a hot topic and there is no clear winner at the moment.

Personally, if you have abundant hardware resources and fewer developers, I would recommend choosing Elasticsearch. If you have limited hardware resources and more developers, you can consider trying ClickHouse. If both hardware and personnel are limited, I would recommend using cloud-based distributed databases. The specific decision should be made based on the specific situation of the team.

I have also prepared an evaluation table for you, which is included in the document.

Image

Thought Question #

ClickHouse cannot easily modify or delete data. So how can we clean up historical data?

I look forward to interacting and exchanging ideas with you in the comments section! If you find this lesson valuable, please recommend and share it with more friends.