16 Detailed Explanation of Es Principles Document Read Process

16 Detailed Explanation of ES Principles - Document Read Process #

在理解 ES(Elasticsearch)的读取文档流程之前,我们需要先了解一些基础知识。ES是一个分布式的实时搜索与分析引擎,它具有高性能、可扩展性和易用性等特点。

当 ES读取一个文档时,它的流程包括以下几个步骤:

  1. 索引选择:ES首先会根据指定的索引进行文档的读取。索引是文档的逻辑分组,类似于关系数据库中的表。通过指定索引,ES可以快速定位到文档所在的位置。

  2. 分片选择:每个索引可以分成多个分片(shard),分片可以分散数据的存储和查询负载。当读取文档时,ES会选择其中的一个或多个分片来处理读取请求。

  3. 主分片定位:如果读取请求的目标是主分片(primary shard),ES将直接定位到主分片所在的节点。主分片存储着文档的原始数据,是读取请求的主要处理节点。

  4. 副本分片定位:如果读取请求的目标是副本分片(replica shard),ES将先定位到主分片所在的节点,然后再将请求转发给相应的副本分片。副本分片是主分片的复制品,用于提高读取请求的并发处理能力。

  5. 文档读取:当读取请求到达分片所在的节点后,ES会从磁盘中读取对应的文档数据,并返回给客户端。如果读取请求涉及到聚合、过滤或排序等操作,ES会在内存中对文档数据进行处理。

通过以上流程,ES可以快速、并发地读取文档数据,并提供相应的搜索和分析能力。这使得ES成为一个强大的搜索与分析引擎,被广泛应用于各种大规模数据处理场景。

Document Query Steps Sequence #

First, let’s look at the overall query process.

Single Document #

The following is the sequence of steps to retrieve a document from the primary shard or replica shards:

img

  1. The client sends a get request to Node 1.
  2. The node uses the _id of the document to determine which shard it belongs to. Shard 0’s replica shards exist on all three nodes. In this case, it forwards the request to Node 2.
  3. Node 2 returns the document to Node 1, and then Node 1 returns the document to the client.

When processing read requests, the coordinating node achieves load balancing by polling all replica shards for each request.

When a document is retrieved, the indexed document may already exist on the primary shard but has not yet been replicated to the replica shards. In this case, the replica shard may report that the document does not exist, but the primary shard may successfully return the document. Once the indexing request is successfully returned to the user, the document is available on both the primary shard and the replica shards.

Multiple Documents #

The steps to retrieve multiple documents using mget:

img

The following is the sequence of steps required to retrieve multiple documents using a single mget request:

  1. The client sends an mget request to Node 1.
  2. Node 1 builds a multi-document get request for each shard and then forwards these requests in parallel to the nodes hosting the required primary or replica shards. Once all replies are received, Node 1 constructs the response and returns it to the client.

Explanation of the Document Retrieval Process #

Most search systems generally use a two-stage query process. In the first stage, the query retrieves the matching DocID, and in the second stage, it retrieves the complete document corresponding to the DocID. This process is called query_then_fetch in Elasticsearch. (For other methods, you can refer to here).

img

  1. During the initial query stage, the query is broadcasted to every shard copy (primary shards or replica shards) in the index. Each shard performs a local search and builds a priority queue of matching documents with a size of from + size. PS: When searching in step 2, the Filesystem Cache is queried, but some data is still in the Memory Buffer, so the search is near real-time.
  2. Each shard returns the ID and sorting value of all documents in their respective priority queues to the coordinating node. The coordinating node merges these values into its own priority queue to generate a globally sorted result list.
  3. The next step is the retrieval stage. The coordinating node identifies which documents need to be retrieved and submits multiple GET requests to the relevant shards. Each shard loads and enriches the documents and then returns them to the coordinating node if necessary. Once all the documents have been retrieved, the coordinating node returns the results to the client.

Implementation Mechanism of Reading Documents in Elasticsearch #

TIP

This is optional reading material.

Read Operations #

Consistency refers to the requirement that after a successful write operation, the next read operation must be able to read the latest data. For search operations, this requirement can be relaxed to allow for some delay. However, for NoSQL databases, strong consistency is generally preferred.

In terms of result matching, NoSQL databases only have two possibilities - matching or not matching, while in search operations, there is also the concept of relevance. In NoSQL, the result can only be 0 or 1, whereas in search operations, there can be partial matches such as 0.1, 0.5, 0.9, or even more relevant matches.

In terms of result retrieval, search operations typically only need to retrieve the top N results that satisfy the conditions, while NoSQL databases usually need to return all results that meet the criteria.

Search systems generally involve a two-phase query process: the first phase retrieves the corresponding document IDs, while the second phase retrieves the complete documents based on the document IDs. In contrast, NoSQL databases typically return results in a single phase. Both these approaches are supported in Elasticsearch.

Currently, NoSQL databases are weaker than search operations in terms of querying, aggregation, analysis, statistics, and other functions.

Reading in Lucene #

Elasticsearch uses Lucene as its search engine library to implement specific functionalities, such as searching for specific fields. In Lucene, these functionalities are implemented through the following interfaces in the IndexSearcher class:

public TopDocs search(Query query, int n);
public Document doc(int docID);
public int count(Query query);
...(others)

The first search interface performs search operations and returns the top N results that satisfy the query. The second doc interface retrieves the document content based on the document ID. The third count interface retrieves the number of matches based on the query.

These three functionalities are the most basic in search operations. However, for most queries in Elasticsearch, simply using these interfaces is not sufficient to meet the requirements, especially for distributed queries. Elasticsearch addresses these issues, and now let’s explore the analysis of the related read functionalities in Elasticsearch.

Reading in Elasticsearch #

In Elasticsearch, each shard has multiple replicas, primarily to ensure data reliability, but it also increases read capacity. During writes, most replica shards need to be written to, but during queries, only the primary shard or any one of the replicas needs to be queried.

img

In the above diagram, the shard has one primary and two replica nodes. During a query, one node is selected from the three nodes based on the preference parameter in the request. The preference parameter can be set to _local, _primary, _replica, or other options. If primary is selected, every query directly queries the primary shard, ensuring that each query retrieves the latest data. If other parameters are set, such as R1 or R2, it may not query the latest data.

Note: The above code logic can be found in the searchShards method in OperationRouting.Java.

Next, let’s take a look at how queries in Elasticsearch support distribution.

img

Elasticsearch achieves distribution through partitioning. When data is written, it is written to a specific shard based on the _routing rule, which allows massive amounts of data to be distributed across multiple shards and multiple machines to achieve distribution. This means that during a query, potential data can be spread across all shards in the current index. Therefore, when Elasticsearch queries, it needs to query all shards of the index. Only one primary or replica shard from each shard group is selected. The query request is distributed to all shards, where each shard is an independent query engine. For example, to return the top 10 results, each shard will query and return the top 10 results. Then, the client node will receive the results from all shards, perform secondary sorting using a priority queue, and select the top 10 results to return to the user.

One issue here is request inflation. A user’s search request within Elasticsearch becomes several requests to shards internally. However, the number of shards does not necessarily have to be the same as the number of shards in the current index; it only needs to be the shards relevant to the current query. This optimization needs to be based on business and request content to reduce the number of inflated requests.

Queries in Elasticsearch can be broadly classified into two types: Get Requests - which retrieve specific documents by ID, and Search Requests - which use a query to match documents.

img

Note: In the above diagram, the term “Segment” in memory refers to a newly-refreshed segment that has not been persisted to disk, rather than a segment loaded from disk into memory. For Search-class requests, the query is performed by searching both the memory and disk segments. The results are then merged and returned. This type of query is considered near real-time because it takes some time for the index data in memory to be refreshed into segments.

For Get-class requests, the query is first performed by searching the transaction log in memory. If a match is found, the result is immediately returned. If no match is found, the transaction log on disk is searched. If there is still no match, the segment on disk is searched. This type of query is considered real-time. This query order ensures that the retrieved document is the latest version of the document, which is necessary to meet the real-time requirements in NoSQL scenarios.

img

Most search systems typically use a two-stage query process. In the first stage, the matching document IDs are retrieved, and in the second stage, the complete documents corresponding to those document IDs are retrieved. In Elasticsearch, this is referred to as “query_then_fetch”. There is also another approach called “query_and_fetch”, where the complete document is returned in the first stage query. This second approach is generally used for requests that only need to query one shard.

In addition to the one-stage and two-stage queries, there is also a three-stage query scenario. In search, there is a scoring logic that calculates the base score based on TF (Term Frequency) and DF (Document Frequency). However, when querying in Elasticsearch, the query is executed independently in each shard, and the TF and DF in each shard are also independent. Although the use of “_routing” during writing ensures even distribution of documents, it does not guarantee even distribution of TF and DF. This may result in inaccurate local TF and DF values, which in turn affect the scoring based on TF and DF. To solve this problem, Elasticsearch introduced DFS (Distributed Frequency Scoring) queries. For example, “DFS_query_then_fetch” first collects the TF and DF values from all shards, then includes these values in the request, and finally executes “query_then_fetch” again. This ensures that the TF and DF values used in scoring are accurate. Similarly, there is “DFS_query_and_fetch”. The advantage of this type of query is more accurate scoring, but the efficiency is lower. Another option is to use BM25 instead of the TF/DF model.

In the newer versions of Elasticsearch, users cannot specify “DFS_query_and_fetch” and “query_and_fetch”. These two types can only be rewritten by the Elasticsearch system.

Elasticsearch Query Process #

The majority of queries and core functionalities in Elasticsearch are of the Search type. As we mentioned earlier, queries can be divided into one-stage, two-stage, and three-stage queries. Here, we will use the most common two-stage query as an example to introduce the query process.

img

Client Node #

The Client Node also includes the Parse Request mentioned earlier, so we will not go into detail here. Let’s now take a look at other parts.

  1. Get Remote Cluster Shard

Check if cross-cluster access is required. If so, obtain the list of shards to be accessed.

  1. Get Search Shard Iterator

Get the shards to be accessed in the current cluster, and combine them with the shards obtained in the previous step (Remove Cluster Shard) to build the final complete shard list to be accessed.

In this step, a shard to be accessed is selected from the primary node and multiple replica nodes based on the parameters in the request.

  1. For Every Shard: Perform

Iterate over each shard and perform the following logic for each shard.

  1. Send Request To Query Shard

Send the query phase request to the corresponding shard.

  1. Merge Docs

After sending the request to multiple shards, this step is to asynchronously wait for the results to be returned and then merge them. The merge strategy here is to maintain a priority queue of size “Top N”. Whenever a shard’s result is received, it is added to the priority queue and sorted. This process continues until all shards have returned their results.

The pagination logic is also implemented here. For example, if the top 30 to top 40 results need to be retrieved, this means the results from the 30th to 40th in all shard query results. Since it is not possible to determine the final results in each shard, each shard needs to return the top 40 results to the Client Node. Then, during the merge docs process in the Client Node, the top 40 results are calculated, and the top 30 results are discarded to obtain the remaining 10 results, which are the top 30 to top 40 results needed. The above paging logic has an obvious flaw: each time the Shard returns data, it includes the historical results that have already been flipped. If the paging is deep, there will be many Docs that need to be sorted here. For example, if the Shard has 1000 and the results from 9990 to 10000 are taken, then this query requires a total of 1000 * 10000, or 10 million Docs to be returned. This situation is prone to OOM (out of memory) errors.

Another way to flip pages is to use search_after. This approach is more lightweight. If only 10 results need to be returned each time, each Shard only needs to return the 10 results after search_after, and the total amount of data returned is only related to the number of Shards and the number of results required for this query, not the number of historical results read. This approach is safer and recommended.

If there is an aggregate, it will also be done here, but the merge strategy for different aggregate types is different, and will be introduced in the subsequent aggregate article.

  1. Send Request To Fetch Shard

Select the top N Doc IDs and send them to the Shards where these Doc IDs are located for Fetch Phase execution. Finally, the contents of the top N Docs will be returned.

Query Phase #

Next, let’s look at the steps of the first phase of the query:

  1. Create Search Context

Create Search Context, and all intermediate states of the Search process will be stored in the Context. There are more than 50 such states in total, which can be viewed in DefaultSearchContext or other subclasses of SearchContext.

  1. Parse Query

Parse the Query source and store the results in the Search Context. Here, different Query objects will be created based on the type of Query in the request, such as TermQuery, FuzzyQuery, etc. The place where the semantic of TermQuery, FuzzyQuery, etc. is actually executed is in Lucene.

This includes the preProcess part of the dfsPhase, queryPhase, and fetchPhase in three different stages. Only the queryPhase’s preProcess has execution logic, the other two are empty logic. After the preProcess is completed, all the required parameters will be set.

Since some requests in Elasticsearch are interrelated and not independent, such as scroll requests, the lifecycle of the Context will also be set here.

At the same time, the lowLevelCancellation is set to turn on or off, which is a cluster-level configuration and can be dynamically enabled or disabled. When turned on, more checks will be performed during execution to determine whether to stop subsequent logic and return directly.

  1. Get From Cache

Check if the request is allowed to be cached. If allowed, check if there are already results in the cache. If yes, read directly from the cache. If not, continue with the subsequent steps, and after that, add the results to the cache.

  1. Add Collectors

The goal of Collectors is to collect query results, implement sorting, filter and collect custom result sets, etc. In this step, multiple Collectors will be added, and multiple Collectors form a List.

  • FilteredCollector: First, check if there is a Post Filter in the request. The Post Filter is used to filter the results after Search, Agg, etc., hoping that the filter does not affect the Agg results. If there is a Post Filter, create a FilteredCollector and add it to the Collector List.
  • PluginInMultiCollector: Check if any custom Collectors are specified in the request. If yes, create and add them to the Collector List.
  • MinimumScoreCollector: Check if the minimum score threshold is specified in the request. If specified, create a MinimumScoreCollector and add it to the Collector List. When collecting results later, it will filter out Docs with scores less than the minimum score.
  • EarlyTerminatingCollector: Check if the request specifies early termination of Doc seeking. If yes, create an EarlyTerminatingCollector and add it to the Collector List. During the subsequent seeking and collecting of Docs, when the number of Docs sought reaches the Early Terminating count, seeking of the subsequent inverted list will stop.
  • CancellableCollector: Check if the current operation can be interrupted and terminated, such as if it has timed out. If yes, throwing a TaskCancelledException exception. This function is generally used to terminate long query requests in advance to protect the system.
  • EarlyTerminatingSortingCollector: If the Index is sorted, it can terminate the seeking of inverted lists in advance. It’s like returning the maximum N values on a sorted descending linked list, and only the first N values need to be returned directly. This Collector will be added to the head of the Collector List. The difference between EarlyTerminatingSorting and EarlyTerminating is that EarlyTerminatingSorting is an optimization that does not damage the results, while EarlyTerminating is a lossy optimization that artificially interrupts execution.
  • TopDocsCollector: This is the most core selector for selecting top N results. It will be added to the head of the Collector List. Both TopScoreDocCollector and TopFieldCollector are subclasses of TopDocsCollector. TopScoreDocCollector calculates scores in a fixed way and sorts by score and then doc id. If multiple docs have the same score, the one with the smaller doc id is selected first. TopFieldCollector sorts based on the value of a Field specified by the user.
  1. lucene::search

This step calls the search interface of IndexSearch in Lucene to execute the actual search logic. Each shard contains multiple segments, and each segment corresponds to a LeafReaderContext. Here, we iterate through each segment, search for results in each segment, and calculate scores.

Generally, there are two stages in scoring a search. The first stage is computed here, where the score is calculated for each document that is seeked. To reduce CPU consumption, usually only a basic score is calculated. After this stage is completed, sorting is performed. Then, in the second stage, a two-stage scoring is performed on the top results. In the second stage of scoring, more factors are considered. The two-stage scoring is done in subsequent operations.

The specific logic for query types such as TermQuery and WildcardQuery is implemented in Lucene, and there will be a dedicated article explaining it later.

  1. rescore

Whether to perform a two-stage sorting is determined by whether the request contains a rescore configuration. If it does, the two-stage scoring logic is executed, considering more scoring factors. Two-stage scoring is a common multi-level design in computer science, striking a balance between resource consumption and efficiency.

Elasticsearch supports configuring multiple rescoring options, and these rescoring logics are executed sequentially. Each rescoring first selects the top window documents based on the request parameter, and then sorts these documents. After sorting, the results are merged back into the original order of the top results.

  1. suggest::execute()

If there is a suggestion request, it is executed here. If the request only includes the recommendation part, there are many optimizations that can be made. Since recommendation is not the focus today, it will not be introduced here. We will have a chance to discuss it later.

  1. aggregation::execute()

If there is an aggregation (statistical) request, it is executed here. The handling logic for aggregation in Elasticsearch is similar to that of search, using multiple collectors. Aggregation also needs to be merged in the client node. The logic for aggregation is more complex, so it will not be elaborated here. We will cover it in a separate article when needed.

After the above logic is executed, if the current query request only needs to query one shard, the Fetch Phase will be directly executed on the current node.

Fetch Phase #

In Elasticsearch, as well as in any search system, there is a Fetch Phase in addition to the Query Phase. This Fetch Phase does not exist in database systems and is an additional phase added in search systems. The reason for adding the Fetch Phase in search systems is due to the data distribution in search systems. In search, when data is distributed to shards based on routing, it can only be determined based on one primary field value. However, during querying, other non-primary fields may be used for querying, so in this case, there may be documents with the same non-primary field value in all shards. Therefore, it is necessary to query all shards to avoid missing results. If querying the primary field, it is similar to a database system, as it can directly locate the shard and only needs to query the specific shard. Additionally, secondary indexes in databases are another situation, but it is similar to querying the primary field, so it will not be discussed further here.

Based on the above reasons, during the first stage of querying, it is not known which shard the final result will be on. So each shard needs to query the complete result. For example, if the top 10 is needed, each shard needs to query all the data in the shard, find the top 10 in the shard, and then return them to the client node. If there are 100 shards, 100 * 10 = 1000 results need to be returned. Fetching the document content is relatively resource-intensive in terms of IO and CPU. If the document is fetched in the first stage, the resource overhead will be very large. Therefore, it is usually more cost-effective to fetch the document content after the client node has selected the final top N results. By adding a little network overhead, IO and CPU operations can be massively reduced, which is a very cost-effective trade-off.

The purpose of the Fetch Phase is to retrieve the complete document content that the user needs based on the document ID (DocID). This content includes DocValues, Store, Source, Script, and Highlight, etc. The specific functionality is registered in the SearchModule. The system default registration includes:

  • ExplainFetchSubPhase
  • DocValueFieldsFetchSubPhase
  • ScriptFieldsFetchSubPhase
  • FetchSourceSubPhase
  • VersionFetchSubPhase
  • MatchedQueriesFetchSubPhase
  • HighlightPhase
  • ParentFieldSubFetchPhase

In addition to the default 8 types, custom functionality can also be registered through plugins. The most important subphases among these are Source and Highlight. Source loads the original text, while Highlight calculates the highlighted fragments.

The aforementioned subphases are executed sequentially for each document, which may result in multiple random IO operations. There are some optimization schemes here, but they are specific to certain scenarios and not universal.

After the Fetch Phase is executed, the entire query process is complete.

Reference Documents #