12 Sharding Engines Elasticsearch How to Implement Large Data Searches

12 Engine Sharding: How Does Elasticsearch Achieve Big Data Retrieval? #

Hello, I am Xu Changlong.

In the last lesson, we saw the strong support of ELK for the log system. It would be very difficult for us to implement distributed tracing without its help.

Why is ELK so powerful? To understand this, we need to have a clear understanding of the architecture and implementation of important technologies such as storage and indexing in ELK. I believe that after learning the content today, you will have a deeper understanding of the core implementation of big data distribution and the big data distributed statistical service.

12 Sharding Engines Elasticsearch How to Implement Large Data Searches #

So how does ELK work? Why is it able to handle such a large volume of logs?

Let’s first analyze the architecture of ELK. In fact, it is very different from the implementation of OLAP and OLTP. Let’s take a look together. The architecture of Elasticsearch is as follows:

Image

By referring to the architecture diagram, let’s summarize the overall data flow. We can see that the logs generated by our project will be collected and pushed to Kafka through Filebeat or Rsyslog. Then Logstash consumes the logs in Kafka, organizes them, and pushes them to the Elasticsearch cluster.

Next, the logs are tokenized, and after calculating the weight in the document, they are placed in the index for query and retrieval. Elasticsearch will push this information to different shards. Each shard will have multiple replicas. When data is written, only when most replicas have successfully written, the primary shard will index the data (recall distributed write consistency knowledge).

The Elasticsearch cluster has multiple roles, and I will give you a brief introduction:

  • Master node: Responsible for cluster scheduling decisions, cluster status, node information, index mapping, shard information, and routing information. The master node is actually elected through an election process. Generally, there should be at least three master candidates in a cluster to prevent the main node from being damaged (recall the Raft knowledge learned previously, but Elasticsearch did not have the Raft standard when it was first released).
  • Data storage node: Used for storing data and calculations, primary and replica shards, hot nodes, cold data nodes.
  • Client coordination node: Coordinates multiple replica data query services, aggregates the results returned by each replica, and returns them to the client.
  • Kibana computing node: Its role is real-time statistical analysis, aggregate analysis of statistical data, and graphical presentation.

In actual production installations, Elasticsearch requires a minimum of three servers. Among the three servers, one will become the master node responsible for allocating cluster indexes and resource distribution. The other two nodes will be used for data storage and retrieval calculations. When the master node fails, one of the sub-nodes will select a replacement master node to replace the failed one (recall the election process in distributed consensus algorithms).

If our hardware resources are abundant, we can add another server to deploy Kibana separately, which will provide better performance for data statistics and analysis. If our log writing speed is too slow, we can add another server for Logstash tokenization to help speed up the overall indexing speed of ELK.

It is worth noting that in recent years, most cloud vendors’ log services are implemented based on ELK, and Elasticsearch has gone public, indicating its market value.

Elasticsearch Write Storage Mechanism #

The following diagram shows the specific structure of the index storage in Elasticsearch. It may seem complex, but don’t worry, we only need to focus on the shard and index parts:

Image

Let’s dig deeper into how Elasticsearch implements the write storage mechanism for distributed full-text search. Elasticsearch uses the Lucene engine for underlying full-text search, which is actually a single machine embedded engine that does not support distribution. The distributed functionality is achieved through shard-based indexing.

In order to improve write efficiency, common distributed systems usually write data to a cache first. When the data accumulates to a certain extent, it is then flushed to disk in sequential order. Lucene also uses a similar mechanism by storing the written data in an index buffer and periodically flushing this data onto segment files.

Now let’s talk about storage. In order to facilitate faster data retrieval, Lucene generates a segment file approximately every second. This results in a large number of files and a scattered index. When performing searches, multiple segments need to be traversed. If there are too many segments, it can affect query efficiency. To address this, Lucene periodically merges multiple segments in the background.

I will provide more details about indexing later, but it’s worth mentioning that Elasticsearch is an I/O-intensive service. Placing new data on SSD can improve its performance. However, SSDs can be expensive, so Elasticsearch implements hot-cold data separation. We can store hot data on high-performance SSDs and cold data on large-capacity disks.

Additionally, it is recommended by Elasticsearch to create indexes by day. When the stored data reaches a certain size, Elasticsearch will move less frequently accessed indexes to the cold data area, thereby improving the cost-effectiveness of data storage. I also suggest that you create indexes by day so that when querying, we can reduce the amount of data scanned by specifying a time range.

Image

Furthermore, to ensure scalability of read and write performance, Elasticsearch shards the data. By default, the routing rule for shards is to use the hash of the document ID to ensure even distribution of data. It is common for distributed systems to achieve linear improvement in read and write performance through sharding.

You can think of it this way: when a single node reaches its performance limit, you need to increase the number of data server nodes and the number of replicas to reduce the write pressure. However, when the number of replicas increases, the write performance may actually decrease due to write consistency issues. How many replicas is ideal? This depends on testing with production logs to determine the specific value.

Two Types of Queries in Elasticsearch #

It was mentioned earlier that having multiple nodes and shards can improve the write performance of the system. However, this also means that the data is scattered across multiple data nodes, and Elasticsearch doesn’t know in which shard and segment file the document we are looking for is stored.

Therefore, in order to balance the performance load on each data node, Elasticsearch sends a query request to all data nodes that contain the index. The coordinating node, when receiving the query request, randomly selects one node from multiple replicas of the same data shard. This achieves load balancing.

The replica that receives the request first sorts the results based on the keyword weight. When the coordinating node receives the document ID lists from all replicas, it performs another round of result aggregation and sorting. Finally, it uses the DocID to fetch the specific document data from each replica and returns the results.

It can be said that Elasticsearch achieves full-text queries across all shards of the large dataset through this process. However, this approach also increases the query response time for Elasticsearch. The following diagram illustrates the communication process between the coordinating node and the replica:

Image

In addition to the increased response time, there are many drawbacks to this approach. For example, it has a low query QPS, low network throughput performance, and the coordinating node needs to handle pagination for each query result. When paginating, if we want to query a page that is further down, we have to wait for each node to search and sort all the data before that page can be responded to. The larger the pagination range, the slower the query…

To address these issues, Elasticsearch limits the maximum number of results returned to 10,000 by default. This restriction also serves as a reminder that Elasticsearch’s service should not be used as a database.

There is also a practical consideration: this approach can result in a small probability of not being able to find certain logs due to their low weight. To handle such scenarios, Elasticsearch provides the search_type=dfs_query_then_fetch parameter. However, this approach consumes system resources heavily, so it is not recommended to enable it unless necessary.

In addition to these, Elasticsearch has three types of query implementations: query and fetch, dfs query and fetch, and dfs query then fetch. However, they are not directly related to the main topic of this lesson. If you are interested, you can explore them on your own after the lesson.

Inverted Index in Elasticsearch #

Let’s talk about the inverted index used in full-text search in Elasticsearch.

Elasticsearch supports various types of queries, not just full-text search. For example, it uses BKD Tree for numeric queries. The full-text search in Elasticsearch is implemented using Lucene. The implementation principle of the index is completely different from LSM in OLAP and B+Tree in OLTP. Elasticsearch uses an inverted index.

Image

Image

Generally, an inverted index is commonly used for full-text search in search engines, which is different from B+Tree and B-Tree in relational databases. B+Tree and B-Tree indexes decrease the query range by traversing the tree from the root according to the left prefix. On the other hand, the process of an inverted index can be roughly divided into four steps: tokenization, retrieving relevant DocId, calculating weights and rearranging, and displaying records with high relevance.

First, the user input is tokenized to find the keywords. Then, by using the inverted index for each keyword, all the relevant DocIds are retrieved. After that, the index IDs designed for multiple keywords are intersected, and based on the occurrence frequency of each keyword in each document, the weights of each result are calculated. This is used to sort the list and implement scoring based on query relevance. Finally, the results are sorted in descending order based on the matching score, showing the records with high relevance.

Now, let’s briefly look at the specific implementation in Lucene.

Image

As shown in the above figure, the index of an Elasticsearch cluster is saved in Lucene’s segment files. For information on the segment file format, you can refer to segment format, which includes row storage, column storage, and inverted index.

To save space and improve query efficiency, Lucene has made many optimizations to the inverted index. The segment mainly saves three types of indexes:

  • Term Index: Used for fast searching of keywords (terms). The term index is implemented as a secondary index using FST (Finite State Transducer, which occupies less memory) as an improvement on the basic Trie structure. Normally, this tree is kept in memory to reduce disk I/O and accelerate term lookup. During retrieval, the FST is used to quickly find the corresponding Term Dictionary.
  • Term Dictionary: The term dictionary index stores the relationship between terms and posting lists. The term dictionary data is sorted and compressed on disk in blocks, which saves more space compared to B-Tree. The term dictionary includes suffixes and prefixes of words, which can be used for approximate and similar word queries. With this dictionary, the related inverted index list positions can be found.
  • Posting List: The posting list records the document IDs where the keyword term appears, as well as its position, offset, and term frequency in the document. This is the final document list we look for, and it can be sorted and merged based on this information.

When a log entry is stored, its actual content is not saved in the inverted index.

Before storing a log entry, it goes through tokenization and filtering to remove useless symbols and split the document into keywords (terms), along with their positions and weight frequencies. These keywords are then saved in the Term Index and Term Dictionary. Finally, the document IDs, weights, positions, and other information corresponding to each keyword are sorted and merged into the Posting List for storage. The combination of these three structures achieves an inverted index that optimizes disk I/O.

During a query, Elasticsearch parses the user-inputted keywords, finds the index of the corresponding Term Dictionary block on disk using the Term Index stored in memory, and then retrieves all the relevant document IDs and weights associated with the keyword. Based on the saved information and weight algorithm, the query results are sorted and returned.

Conclusion #

I cannot help but marvel at how Elasticsearch achieves full-text search for large-scale distributed data by combining small Lucene services. Whether in the past or now, this is truly incredible. It can be said that Elasticsearch has almost monopolized the market for real-time log analysis, monitoring, storage, retrieval, and statistics, utilizing many noteworthy technologies.

Although there are many emerging open-source projects in the market today, it is still rare to find open-source projects that are truly comprehensive and diverse enough to form a platform. Elasticsearch itself is a very large distributed search and analysis system, and it has made extensive optimizations for data writing and querying.

What I hope you pay attention to is that Elasticsearch incorporates a large number of distributed design strategies and interesting algorithms, such as distributed consensus algorithms (such as Raft, which was not yet available at that time), inverted indexing, term weighting, match weighting, tokenization, asynchronous synchronization, and data consistency checking. These outstanding designs in the industry are worth exploring and expanding upon. I recommend further exploration on your own after the course.

Reflection Questions #

If you were to implement an Elasticsearch, what do you think would be the core functionality that needs to be addressed first?

Feel free to leave comments and discuss with me in the comment section. See you in the next class!