18 Industrial Practice Tencent's Billion Level Elasticsearch Technical Practice

18 Industrial Practice Tencent’s Billion-Level Elasticsearch Technical Practice #

1. Background of Elasticsearch at Tencent #

img

Let’s first take a look at the main application scenarios of Elasticsearch (ES) at Tencent. ES is a real-time distributed search and analytics engine. Although many users still perceive ES as near real-time, in fact, the official documentation has changed it to real-time since version 6.8: “Elasticsearch provides real-time search and analytics for all types of data.” Before the data is flushed, ES allows real-time retrieval of documents through the getById method. However, before the data is flushed, the FST (Finite State Transducer) is not built yet, and the search capability is not available. Currently, ES is mainly applied at Tencent in three areas:

  • Search service: For example, Tencent Docs uses ES for full-text search, and our e-commerce clients such as Pinduoduo and Mogujie rely on ES for a large number of product searches.
  • Log analysis: This is the most widely used area for ES applications, supporting full-stack log analysis, including various application logs, database logs, user behavior logs, network data, security data, etc. ES has a complete log solution, enabling real-time analysis from collection to display within seconds.
  • Time series analysis: The typical application scenario is monitoring data analysis, such as cloud monitoring, where the entire Tencent Cloud monitoring is based on ES. In addition, there are also a large number of time series data in the IoT (Internet of Things) field. The characteristic of time series data is high write throughput, and ES provides rich multidimensional statistical analysis operators to support it.

Of course, in addition to the above scenarios, ES is widely used in internal search, security, APM (Application Performance Monitoring), and other fields.

Currently, ES is available in Tencent Public Cloud, Private Cloud, and Internal Cloud, providing services that can meet the business needs of both internal and external customers. The usage scenarios in the public cloud are very diverse, while the private cloud mainly focuses on standardized delivery and automated operation and maintenance. ES on the internal cloud at Tencent consists of ultra-large-scale clusters with petabytes of data.

2. Pain Points and Challenges #

In these diverse application scenarios and massive scale backgrounds, we have encountered many pain points and challenges. These mainly cover the aspects of availability, performance, cost, and scalability.

img

  • Availability: The most common problem is that nodes OOM due to high load, or the entire cluster collapses due to high load. These pain points make it difficult for us to guarantee SLA, especially in search scenarios where availability requires four nines or more.
  • Performance: In search scenarios, the requirement is generally for average response time to be below 20 milliseconds and query spikes to be below 100 milliseconds. In analysis scenarios with massive data, although real-time requirements are not as high, request response time determines user experience, and resource consumption determines performance boundaries.
  • Cost: Many users are concerned about the storage cost of ES because ES indeed has many data types, relatively low compression ratios, and higher storage costs. However, there is still a lot of room for optimization. In addition, this includes memory costs because ES has a large amount of index data that needs to be loaded into memory to provide high-performance search capabilities. Therefore, for massive scenarios like logging and monitoring, the cost challenge is even greater.
  • Scalability: In scenarios such as logging and time series, indexes are often rolled by period, and long periods generate a large number of indexes and shards. Super large-scale clusters may even require millions of shards and thousands of nodes. However, the current native version of ES can only support up to tens of thousands of shards and hundreds of nodes. With the rapid development of the big data field, ES will eventually break through the terabyte level and move into the petabyte level, making scalability the main bottleneck and challenge.

Section 3: Tencent ES Kernel Optimization Analysis #

There are many cases and experiences in the community regarding the usage and parameter optimization of ES that we can learn from. However, many pain points and challenges cannot be solved simply by tuning. At this point, it is necessary to optimize the ES at the kernel level in order to continuously improve this excellent open source product. The core part of this presentation is to see how Tencent optimizes ES at the kernel level.

img

Availability Optimization #

First, let’s introduce the availability optimization part. In general, there are three problems at the availability level in the native version:

  • Insufficient system robustness: Cluster avalanche under high pressure, mainly due to insufficient memory resources. Uneven load can lead to overloaded nodes and node OOM. Our solutions at this level mainly focus on optimizing service rate limiting and node balancing strategies.
  • Lack of disaster recovery solution: ES itself provides replica mechanisms to improve data security, but additional implementation is needed for multi-availability zone disaster recovery in the cloud platform. Even with replica mechanisms, and even with cross-cluster replication (CCR), it still cannot prevent data deletion caused by user errors. Therefore, additional low-cost backup fallback capability needs to be provided.
  • Kernel bugs: We have fixed a series of kernel availability-related issues such as master task blocking, distributed deadlock, and slow rolling restart speed, and provide new versions for users to upgrade in a timely manner.

Next, let’s analyze the two types of problems that users commonly encounter at the availability level. One is when the cluster is overloaded by high concurrent requests, and the other is when a single large query causes a node to crash.

Overloading the Cluster with High Concurrent Requests #

img

Let’s first look at the first scenario, overloading the cluster with high concurrent requests. For example, in an internal log cluster, the amount of writes suddenly increased fivefold in one day, and multiple nodes in the cluster were stuck in Old GC and became disconnected from the cluster. The cluster turned red, and the writes stopped. This pain point is indeed a bit painful. We analyzed the memory of the crashed nodes and found that most of the memory was occupied by write requests before and after deserialization. Let’s take a look at where these write requests are accumulating.

img

This is the high-level write flow of ES. The user’s write request first arrives at one of the data nodes, which we call the coordinating node. Then, the coordinating node forwards the request to the node where the primary shard resides for writing. After the primary shard finishes writing, it is forwarded to the replica shards for writing, and finally returns the write result to the client. On the right is a more detailed write process, and the position where we see the accumulation of write requests from the stack is in the red box of the coordinating node’s access layer. The root cause of the node crash is the access layer memory of the coordinating node being overwhelmed. The reason for the problem has been identified, and now let’s introduce our optimization solution.

img

For this high-concurrency scenario, our optimization solution is to implement service rate limiting. In addition to controlling the concurrency of requests, it is also important to control memory resources precisely, as insufficient memory is the main issue. Furthermore, the solution should be versatile and applicable at various levels to achieve full-chain rate limiting.

In many database usage scenarios, rate limiting is achieved by configuring relevant business rules from the business end or through an independent proxy layer, which involves resource estimation. This approach has weak adaptability, high operation and maintenance costs, and it is difficult for the business end to accurately estimate resource consumption.

The native version itself has rate limiting strategies based on the number of requests, using a leaky bucket algorithm implemented with queues and thread pools. The size of the thread pool determines the concurrency level. If the processing capacity is exceeded, the requests are placed in a queue. If the queue is full, the requests are rejected. However, the rate limiting based solely on the number of requests cannot control the usage of resources, and it only applies to the transmission layer of sub-requests at the shard level. It cannot effectively protect the access layer that we analyzed earlier. The native version also has a memory circuit breaker strategy, but it is not enforced at the coordinating node access layer.

Our optimization solution is based on a leaky bucket strategy that uses memory resources. We use the JVM memory of each node as the resource for the leaky bucket. When there is sufficient memory, requests can be processed normally. When the memory usage reaches a certain threshold, we implement stepped rate limiting between different memory usage ranges. In the example image, the light yellow range restricts writes, the deep yellow range restricts queries, and the bottom red area is reserved as a buffer for processing requests, merging operations, and other operations to ensure the safety of node memory.

img

One challenge in implementing rate limiting is how to achieve smooth rate limiting. Using a single threshold for rate limiting can easily result in request jitter. For example, when the requests suddenly increase, the memory usage quickly reaches the threshold and triggers rate limiting, but when we release a small number of requests, more requests come in and increase the memory usage again. Our solution is to set a range of high and low rate limiting thresholds. Within this range, we use the cosine transform to achieve smooth rate limiting between the number of requests and memory resources. When there is sufficient memory, the request pass rate is 100%. As the memory gradually reaches the rate limiting range, the request pass rate gradually decreases. When the memory usage decreases, the request pass rate gradually increases. This ensures a controlled and gradual increase in the pass rate rather than an abrupt change. Through practical testing, the smooth rate limiting within the range can maintain stable write performance under high pressure.

Our range-based smooth rate limiting strategy based on memory resources is an effective supplement to the native version’s leaky bucket strategy based on the number of requests. It has a wider scope of application, covering the access layer and transmission layer of coordinating nodes and data nodes, without replacing the native rate limiting solution.

img

Node Overload Caused by Single Large Query #

Next, let’s discuss the scenario where a single large query causes node overload. For example, in an analysis scenario with multiple nested aggregations, sometimes the size of the result set returned by a request is very large, which can cause the entire node to become overloaded. Analyzing the aggregation query process, after the request reaches the coordinating node, it is split into sub-queries at the shard level and sent to the data nodes where the target shards are located for sub-aggregation. Finally, the coordinating node collects the complete shard results and performs merging, aggregation, sorting, and other operations. The main problem here is that the coordinating node’s memory inflates significantly after deserializing a large amount of aggregated results, and the new result set generated by the second round of aggregation can exceed the memory limit.

img

To address the issue of single large queries described above, let’s introduce our optimization solution. The key points of the optimization solution are memory inflation estimation and streaming check. Let’s first take a look at the native solution. The native version directly limits the maximum number of returned result buckets, which is set to ten thousand by default. If the number exceeds this limit, the request will return an exception. The challenge with this approach is that in analysis scenarios, having hundreds of thousands or millions of results is common, and the default limit of ten thousand is often not enough. Adjusting this limit is not flexible, and increasing the memory might still lead to crashes, while reducing the limit cannot meet business needs. Our optimization plan is mainly divided into two stages:

  • Stage 1: Estimate memory inflation before deserializing the response results returned by the data nodes at the coordinating node. Based on the size of the received network byte stream, we estimate the inflation. If the current JVM memory usage plus the estimated usage of the response results exceeds the threshold, the request will be directly circuit broken.
  • Stage 2: During the reduce process at the coordinating node, we streamingly check the number of buckets. We check the memory every time a fixed number of buckets is added (default is 1024). If the limit is exceeded, the request will be directly circuit broken. The streaming check logic also applies during the sub-aggregation process of data node.

This way, users no longer need to worry about the maximum number of buckets. As long as the memory is sufficient, it can maximize the satisfaction of business requirements. The downside is that large requests are still rejected, sacrificing the user’s query experience. However, we can alleviate this by using the existing batch reduce method provided by the official. That is, when there are 100 sub-shard results, we aggregate them after receiving each part. This can reduce the memory overhead of each aggregation. The overall plan of the streaming aggregation mentioned above has been submitted to the official and merged, and will be released in the upcoming version 7.7.0.

Earlier, we introduced two typical availability problems that users often encounter. Next, we will summarize the entire availability optimization.

img

First, we combined our self-developed optimization plan with the native plan to implement systematic end-to-end flow control. The yellow part in the left diagram represents the self-developed optimization, and the rest represents the native plan. It covers the execution engine layer, transport layer, and access layer. In addition, we have also made optimizations related to memory utilization. Memory utilization optimization is mainly targeted at write scenarios, such as when there are too many fields in a single document, each field value will allocate a fixed-size buffer during the writing process. When there are too many fields, memory waste is severe. The optimization plan mainly implements elastic memory buffer. Memory recycling strategy, here it does not refer to GC strategy, but mainly refers to timely memory recycling for some requests that have reading or writing exceptions. JVM GC debt management mainly evaluates the ratio of JVM Old GC duration to normal working duration to measure the health of the JVM. In special cases, the JVM will be restarted to prevent long-term hanging.

The effect of availability optimization is that we have improved the overall availability of ES clusters in public clouds to 4 nines (99.99%), increased memory utilization by 30%, significantly improved stability under high-pressure scenarios, and basically ensured that nodes will not experience out of memory errors and clusters will not cascade fail.

The following section contains relevant pull requests for our availability optimization. In addition to the aforementioned streaming check at the coordinating node and memory inflation estimation, it also includes memory limitation per individual query. This is also very useful because if a single query is too large, it will affect all other requests. In addition, we have optimized the speed of rolling restarts. The restart time of a single node in a large cluster has been reduced from 10 minutes to less than 1 minute. This optimization has been merged in version 7.5. If anyone encounters efficiency issues with rolling restarts in large clusters, please pay attention.

Performance Optimization #

Next, let’s talk about performance optimization.

img

The performance optimization scenarios are mainly divided into writing and querying. The representative scenarios for writing include logging, monitoring, and other massive time-series data scenarios, which generally achieve throughput of millions. Writing performance will decrease by half when using ID-based writes, because it requires querying to check if the record already exists. Querying includes search scenarios and analysis scenarios. Search services are mainly high-concurrency and low-latency, while aggregation analysis mainly involves large queries, with high memory and CPU overhead. Let’s take a look at the impact of performance. The left half deals with hardware resources and system optimization, which users can control directly, such as scaling resources and fine-tuning parameters. The right half involves storage models and execution plans, which are difficult for users to adjust directly. Next, let’s focus on optimizing these two parts.

img

Storage Model Optimization #

img

First, let’s discuss storage model optimization. We know that ES’s underlying Lucene is based on the LSM Tree data structure. The default merge strategy is to merge files based on file size similarity, typically merging 10 files at a time in a roughly tiered manner. The biggest advantage of this merge method is efficiency, as it can quickly reduce the number of files. However, the main problem is that the data is not contiguous, which weakens the ability to trim files during queries. For example, when querying data from the last hour, it is likely that the files from that hour have been merged into files from several days ago, resulting in an increased number of files that need to be traversed.

Typical solutions in the industry to address data contiguity in merge strategies include time window-based strategies found in Cassandra and HBase. The advantage of these strategies is that they merge data in chronological order, making queries efficient and supporting Time-To-Live (TTL) within a table. However, these strategies are limited to time-series scenarios, and file sizes may vary, affecting merge efficiency. Another type is tiered merge, represented by LevelDB and RocksDB. Each layer of the merge is ordered, and a portion of the data is extracted and merged to the lower layer each time. This approach is efficient for queries, but it results in significant write amplification, where the same data may be merged multiple times, impacting write throughput.

Finally, let’s discuss our optimized merge strategy. Our goal is to improve data contiguity, reduce the number of files, and enhance file trimming capabilities to improve query performance. Our strategy mainly involves merging layers in chronological order and sorting the files within each layer by creation time. Apart from the first layer, files are merged based on time sequence and target size, with no fixed number of files being merged each time. This ensures efficient merging. For a small number of unmerged files and cold shard files, we adopt a continuous merge strategy. We continuously merge shards that have not been written to for more than the default five minutes, controlling the merging concurrency and range to reduce the merge overhead.

Through optimization of the merge strategy, we have improved the query performance of search scenarios by 40%.

Execution Engine Optimization #

Having discussed the optimization of the underlying storage model, let’s now move up to the execution engine.

img

Let’s consider a typical scenario for analysis. There is a type of aggregation in ES called the Composite Aggregation, which many of you may be familiar with. This feature was officially released in version 6.5. Its purpose is to support nested aggregation on multiple fields, similar to MySQL’s group by with multiple fields. It also supports streaming aggregation, allowing the results to be aggregated in batches through pagination. For usage, you can specify the composite keyword under the aggregation operation in the query and specify the length of each pagination and the list of fields to group by, as shown in the example on the left. Each time, the retrieved aggregation result is accompanied by an after key. The next query can use this after key to fetch the next page of results.

So, what is the implementation principle behind it? Let’s first look at the native solution. For example, let’s say we have two fields (field1 and field2) in the document, and the first column represents the document ID. We perform a composite aggregation on these two fields, with a pagination size set to 3. The specific implementation uses a fixed-size max heap, where the size corresponds to the pagination length. We iterate over all the documents to build this aggregated result based on the max heap. The result is shown in the sequence number 1 in the right image. Finally, we return this max heap and use the top of the heap as the after key. During the second aggregation, we iterate over all the documents again, but this time, we add filtering conditions to exclude the documents that do not match the after key, as shown in the sequence number 2 in the right image.

Clearly, there are performance issues with this approach because each retrieval requires iterating over all the documents and does not truly implement pagination. Next, we propose an optimization solution. img

Our optimization plan mainly utilizes index sorting to achieve after key skipping and early termination. Data must be sorted in order to achieve true streaming aggregation. Index sorting was introduced in version 6.5 and supports document sorting based on specified fields. However, it is regrettable that aggregate queries did not take advantage of data order. We can optimize this by keeping the max heap and extracting the specified number of documents in the order they appear, immediately returning them due to the data order. The next time we aggregate, we can directly jump to the specified position and continue traversing the specified number of documents based on the after key provided in the request. This avoids full traversal for each page flip and greatly improves query performance. There is a challenge here: can the optimization still be effective if the data order is inconsistent with the order of the user’s query? In actuality, it can. The after key skipping cannot be achieved in reverse order scenarios because the underlying Lucene does not support reverse document traversal, but the early termination optimization still works and can significantly improve efficiency. We developed this optimization plan in collaboration with the official development team, because while we were optimizing, the official team was also working on optimizations. However, our plan covers more comprehensively the optimization scenarios where the data order and the request order are inconsistent. In the end, we integrated the plan together with the official team. This optimization plan has been merged in version 7.6, and everyone can try it out for themselves.

Earlier, we gave examples of optimizations from the storage model at the bottom to the execution engine at the top. In fact, we have made many optimizations in terms of performance. We covered the storage model at the bottom, the execution engine, the optimizer, and the cache strategy at the top. In the image below, the left side represents optimization items, the middle shows the optimization effects, and the right side lists notable optimization PRs.

img

Here, let me briefly introduce other PR optimizations. The coarse lock optimization during the translog refresh process can improve overall write performance by 20%. The file trimming optimization at the Lucene level can double the performance of write scenarios involving IDs. This optimization is mainly based on quickly trimming and skipping when preparing to traverse a segment file based on the maximum and minimum values calculated for the segment. If the ID is not within the range, it can be quickly skipped, thus avoiding document traversal. The bottom PR is the optimization of the cache strategy, which can avoid caching that has relatively high overhead and greatly reduce query spikes.

All of these performance optimization items have been incorporated into our Tencent Cloud ES version, and everyone can try them out.

Cost Optimization #

Next, let’s look at cost optimization. In scenarios such as logs and time series data, the cost ratio of CPU, memory, and disk in a cluster is roughly 1:4:8. For example, for a typical node with 16 cores, 64GB of memory, and 2-5TB of disk, the cost ratio is approximately this. Therefore, the main bottlenecks for cost are disk and memory.

img

The main goal of cost optimization is storage cost and memory cost.

Storage Cost #

Let us first look at storage cost.

img Let’s start with a scenario. The entire Tencent Cloud Monitoring is based on ES. The average write throughput of a single cluster is tens of millions of records per second. The business needs to retain at least six months of data for querying. Based on this throughput, we calculate the cost: 10 million QPS multiplied by time, multiplied by the average size of a single document, multiplied by the total of two replicas, results in approximately 14 PB of storage, which requires about 1500 hot machine types. Obviously, this far exceeds the business cost budget. So, how can we meet the business requirements while achieving low cost?

Let’s discuss our optimization plan. First, we conducted a survey on the frequency of data access, which revealed that the most recent data had a high access frequency, while data from an hour ago, a day ago, and several days ago had a relatively low frequency. Data older than a month had an even lower frequency and tended to be used for statistical analysis.

First, we can perform hot-cold separation by storing cold data on HDDs to reduce costs. At the same time, we can use the index lifecycle management feature provided by ES to migrate the data. Since the disks storing cold data are usually large, we can also utilize a multi-disk strategy to improve throughput and data resilience. Finally, we can cold-backup the super-cold data to Tencent Cloud’s object storage (COS), which has a very low cost - only a few cents per 1 GB per month.

We can optimize these aspects from an architectural perspective. Are there any other optimization points? Based on the analysis of data access characteristics, we found that historical data is mainly used for statistical analysis. Therefore, we proposed the Rollup solution. The purpose of Rollup is to reduce the precision of historical data in order to significantly reduce storage costs. We use precomputing to convert raw data with fine granularity, such as data at the second level, into data aggregated at the hour or day level. This makes it easier to display reports with longer query time spans, as it is not meaningful to display data at the second level for a multi-day query. Additionally, it can greatly reduce storage costs while improving query performance.

We implemented the Rollup solution in Tencent Cloud Monitoring in 2017, and now the official version is also available for preview.

Now, let’s highlight the key points of our latest Rollup solution.

img

In general, the Rollup optimization solution is mainly based on streaming aggregation, query pruning, and shard-level concurrency to achieve efficiency. We have already introduced streaming aggregation and query pruning optimization in the previous performance optimization section, and our new Rollup also utilizes these optimizations, which we will not go into detail here. Next, we will introduce shard-level concurrency and the automatic concurrency control strategy.

For a normal aggregation query, the request needs to be sent to each shard for sub-aggregation. Then, the results are merged at the coordinating node through two-level aggregation and merge. By adding routing to the data, we make sure that the same objects are stored on the same shard, which allows for single-level aggregation. Since the data in each shard is independent, multiple data objects can achieve shard-level concurrency. Additionally, we control the concurrency automatically by estimating the resources required for Rollup tasks and monitoring the cluster’s load pressure. This helps to limit the overall impact on the cluster within a certain range. The image on the right shows the optimization results. For a certain statistical indicator, the storage volume for a 30-day period is reduced to only 13 GB at the daily level and 250 GB at the hourly level. Although the finer-grained level requires more storage, the overall storage volume is reduced by nearly 10 times. Only about 150 servers are needed for a single cluster, resulting in a cost reduction of 10 times. The resource consumption of the rollup for the overall write overhead is less than 10%.

Memory Cost Optimization #

We have discussed storage cost optimization. Now let’s go over memory cost optimization.

img

After analyzing our online clusters, we found that in many scenarios, the heap memory usage was high while disk usage was relatively low. Why is the heap memory usage so high? The FST (finite state transducer), which is the inverted index, accounts for the majority of the heap memory usage and is a resident memory. The memory consumption of FST for every 10 TB of disk is approximately 10 GB to 15 GB.

Can we optimize the heap memory usage of FST? Our idea is to move it off the heap (off-heap) and load it on-demand, in order to improve heap memory utilization, reduce GC overhead, and enhance the disk management capability of each node. img

Let’s take a look at the off-heap solutions. First, the native version currently implements off-heap by storing FST objects in MMAP for management. This approach is simple to implement, and we used it in the early stages, but because MMAP belongs to the page cache, it may be reclaimed by the system, resulting in disk reads and a performance loss of N times, which can lead to query spikes.

In HBase version 2.0, off-heap has also been implemented by establishing a cache outside the heap, decoupling it from the system cache. However, only data is stored outside the heap, and the index remains in the heap, and the eviction strategy depends entirely on the LRU strategy, resulting in the inability to promptly clean up cold data.

Our optimization plan also consists of establishing a cache outside the heap to ensure that the space occupied by FST is not affected by the system. In addition, we will implement a more precise eviction strategy to improve memory utilization, coupled with a multi-level cache management mode to improve performance. This approach is more complex to implement but the benefits are quite substantial. Now let’s take a look at the detailed implementation.

img

Our solution is implemented using LRU cache + zero-copy + two-level cache. First, the LRU cache is established outside the heap, and when there is a demand to access FST inside the heap, it is loaded from the disk into the cache. Since the default method for accessing FST in Lucene is a buffer in the heap, we initially implemented direct copying from outside the heap to the buffer in the heap, and during performance testing, we found a 20% performance loss in query performance, mainly due to the majority of the loss caused by the copy from outside the heap to the heap.

Therefore, we introduced the second stage of optimization, where we transformed the way Lucene accesses FST. Instead of storing FST directly in the buffer, we stored a pointer to an object outside the heap in the buffer. This achieved zero-copy between the heap and outside the heap. However, this zero-copy is different from the concept of zero-copy in the user space and kernel space in Linux. After this implementation, we found a 7% performance loss during performance testing compared to the scenario where FST is in the heap. Can we achieve the ultimate performance?

Through analysis, we found that this 7% performance loss is mainly due to the process of searching for objects outside the heap based on the key, including calculating the hash, data validation, etc. Our third stage of optimization is to use Java’s weak references to establish a second-level lightweight cache. The weak references point to the addresses outside the heap. As long as there is a request to use them, these keys will not be reclaimed and can be reused without fetching them again. Once they are no longer in use, these keys will be reclaimed by the garbage collector (GC) and the pointers to objects outside the heap will be reclaimed as well. The problem arises, how do we clean up the memory outside the heap after the pointers to the objects outside the heap are reclaimed? Should we use LRU eviction? Obviously, this will waste some memory space. The best approach is to directly reclaim the objects outside the heap when the addresses of the objects inside the heap are reclaimed. However, Java does not have the concept of destructors. Here, we used the Reference Queue of weak references. When an object is about to be reclaimed by the GC, it will clean up the memory outside the heap that the object points to. This perfectly solves the problem of deallocating memory outside the heap and ensures accurate eviction of memory outside the heap, thereby improving memory utilization. Finally, through performance testing, we found that the performance is comparable to the native FST approach in the heap scenario.

The following are the effects and benefits of memory optimization:

img

Through our memory optimization, the memory overhead, data management capability, and advantages in garbage collection are significantly improved, and the performance is roughly on par or slightly better. The heap usage for a single FST only requires about 100 bytes, which is the size of the cache key. The disk management capacity for a single node is around 50 TB with a 32GB heap, compared to 5-10 TB for the native version (with deep optimization), resulting in a 10-fold increase. Using the esrally provided by the official for performance testing, we found a significant decrease in heap memory usage and a reduction in GC time, with performance remaining at a similar level.

Scalability Optimization #

Next is the final section of kernel optimization, scalability optimization.

img Let’s take a look at the scenario first. In the metadata management model of Elasticsearch (ES), the master node manages the metadata, and all other nodes synchronize the metadata. Taking the index creation process as an example, let’s examine the synchronization process of metadata. First, the master node assigns shards and generates differential metadata, which is then sent to other nodes. It waits for the majority of master nodes to return before sending a metadata application request. Other nodes start applying the metadata and deduce their own shard creation tasks based on the old and new metadata.

There are several bottleneck points in this process:

  • When the master node assigns shards, it needs to perform a forward and backward transformation of the metadata. We know that routing information is a mapping from shards to nodes, but during shard allocation, we need a mapping from nodes to shards. After shard allocation is complete, the mapping needs to be transformed back because we only publish the mapping from shards to nodes in the metadata. This transformation process involves multiple full traversals, causing performance bottlenecks in large-scale shard allocation.
  • During each index creation process, there are multiple metadata synchronizations involved. In scenarios with a large number of nodes, synchronization bottlenecks can occur. In scenarios with thousands of nodes, even a slight network fluctuation or old garbage collection (GC) can cause synchronization failures.

Based on the above bottlenecks, the native version of Elasticsearch can only support approximately 30,000 to 50,000 shards, and the performance has reached its limit. Index creation typically takes 30 seconds or even minutes. The node count is limited to around 500 nodes.

To address this, we propose scalability optimization solutions.

img

The main optimization measures include:

  • Task-based targeted shard creation distribution: To address the metadata synchronization bottlenecks caused by shard creation, we use task distribution to distribute shard creation tasks in a targeted way, avoiding multiple full-node metadata synchronizations.
  • Incremental metadata maintenance: Instead of performing multiple full traversals during shard allocation, we use incremental data structure maintenance to avoid full traversals.
  • Cached statistics strategy: To improve the performance of statistics interfaces, we use a caching strategy to avoid repeated statistical calculations, significantly reducing resource consumption. As a result, we can expand the number of shards in a cluster to the millions and the number of nodes to thousands. The creation of new indexes can generally be completed within 5 seconds, and the response time of the statistics interface is in the order of seconds.

The above content summarizes all the optimizations made to the ES kernel. ES is an excellent open-source big data product, and we will continue its development. We have provided a complete hosting platform internally and externally, and have made systematic enhancements and optimizations to various aspects of the ES kernel. This will help Elastic Stack cover more scenarios in the big data ecosystem and achieve better development.

img

4. Open Source Contributions and Future Plans #

img

Within Tencent, we have implemented collaborative open source development for the ES product, jointly optimizing and improving ES to avoid different teams stumbling on the same issues. At the same time, we actively contribute outstanding solutions to the community, working together with ES enthusiasts from both the official team and the community to promote the development of ES. As a team representing the development of Tencent ES’s core, we have so far submitted more than twenty PRs, of which 70% have been merged, and we have 6 ES/Lucene contributors.

In the future, we will continue to optimize ES in terms of usability, performance, and cost. In terms of usability, we will strengthen ES’s fault self-healing capabilities, aiming towards the goal of autonomous operation. In terms of performance, ES is the absolute champion in search scenarios, but there are still many areas in multidimensional analysis that can be optimized, and we hope to further expand the application scenarios of ES in the analysis field. In terms of cost, we are currently developing storage and computation separation. Based on Tencent’s self-developed shared file system CFS, this will further reduce costs and improve performance.

Link to online PPT: https://elasticsearch.cn/slides/259

Q&A for related questions during the presentation: https://elasticsearch.cn/articl

Analysis of the kernel optimization behind Tencent Elasticsearch’s massive scale: https://zhuanlan.zhihu.com/p/139725905

Decryption of the memory efficiency improvement techniques behind Tencent’s trillion-level Elasticsearch: https://zhuanlan.zhihu.com/p/146083622

Decrypting the technology behind Tencent’s trillion-level Elasticsearch: https://zhuanlan.zhihu.com/p/99184436