15 Detailed Explanation of Es Principles Indexing Document Process

15 Detailed Explanation of ES Principles - Indexing Document Process #

在Elasticsearch中,索引(index)是一个存储、检索和分析数据的方式。本篇文章将详细介绍Elasticsearch中索引文档的流程和原理。

索引文档的基本概念 #

在Elasticsearch中,文档(document)是最小的可被索引和存储的数据单元。每个文档都有一个唯一的标识符(ID),用来进行检索和更新操作。文档可以是结构化的JSON数据,也可以是非结构化的文本数据。索引(index)则是文档的集合,类似于关系型数据库中的表。

索引文档的流程 #

在Elasticsearch中,索引文档的流程可以分为以下几个步骤:

  1. 创建索引:首先,需要创建一个索引(index),指定索引的名称和映射(mapping)。映射定义了文档的结构和字段类型,类似于关系型数据库中的表结构。通过映射,Elasticsearch可以正确地解析和索引文档中的字段。

  2. 添加文档:一旦索引被创建,就可以向其添加文档。文档需要以JSON格式进行表示,并包含在索引请求中。Elasticsearch会自动解析文档中的字段,并将其进行索引。

  3. 更新文档:如果需要更新已索引的文档,可以通过指定文档的ID和新的文档内容来进行更新操作。Elasticsearch会根据文档的ID找到相应的文档,并将其更新为新的内容。

  4. 删除文档:如果需要删除已索引的文档,可以通过指定文档的ID来进行删除操作。Elasticsearch会根据文档的ID找到相应的文档,并将其从索引中删除。

索引文档的原理 #

在索引文档的过程中,Elasticsearch会进行以下操作:

  1. 分词:Elasticsearch会将文档中的文本进行分词,将其拆分成词条(term)。分词器(tokenizer)负责将文本拆分成词条,常用的分词器包括标准分词器(standard tokenizer)、单字母分词器(letter tokenizer)等。

  2. 建立倒排索引:Elasticsearch会将拆分后的词条与文档的ID关联起来,形成倒排索引(inverted index)。倒排索引可以快速地定位包含特定词条的文档。

  3. 文档存储:Elasticsearch会将文档中的字段值存储在倒排索引中,以备后续的检索操作。

通过以上步骤,Elasticsearch能够高效地索引和检索大量的文档数据。

小结 #

本文详细介绍了Elasticsearch中索引文档的流程和原理。索引文档是Elasticsearch的核心功能之一,通过理解其流程和原理,能够更好地使用Elasticsearch进行数据存储和检索。

Document Indexing Steps #

Single Document #

The sequence of steps for creating a single document:

img

  1. The client sends a request to Node 1 for creating, indexing, or deleting the document.
  2. The node uses the document’s _id to determine that the document belongs to shard 0. The request is forwarded to Node 3 because the primary shard for shard 0 is currently allocated on Node 3.
  3. Node 3 executes the request on the primary shard. If successful, it forwards the request in parallel to the replica shards on Node 1 and Node 2. Once all replica shards report success, Node 3 reports success to the coordinating node, which in turn reports success to the client.

Multiple Documents #

The sequence of steps for modifying multiple documents using bulk:

img

  1. The client sends a bulk request to Node 1.
  2. Node 1 creates a bulk request for each node and forwards these requests in parallel to each node hosting a primary shard.
  3. The primary shard executes each operation one by one in sequential order. After each operation is successful, the primary shard forwards the new document (or deletion) in parallel to the replica shards and then proceeds to the next operation. Once all replica shards report success for all operations, the node reports success to the coordinating node, which collects and organizes these responses and returns them to the client.

Explanation of Document Indexing Process #

Overall Indexing Process #

Let’s first take a look at the overall indexing process

img

  • The coordinating node calculates the shard where a document should be stored using the document ID (or routing, if specified) in order to provide appropriate routing to the shard.
shard = hash(document_id) % (num_of_primary_shards)
  • Once the shard node receives a request from the coordinating node, it writes the request to the Memory Buffer and periodically (default every 1 second) flushes it to the Filesystem Cache. This process is called a refresh.
  • In certain cases, there is a possibility of data loss in the Memory Buffer and Filesystem Cache. Elasticsearch ensures data reliability through the use of the transaction log (translog). When a request is received, it is also written to the translog. The translog is only cleared when the data in the Filesystem Cache is flushed to disk. This process is called a flush.
  • During a flush, the buffer in memory is cleared, and the content is written to a new segment. The fsync of the segment creates a new commit point and flushes the content to disk. The old translog is deleted, and a new translog begins. The flush is triggered either by a regular interval (default is 30 minutes) or when the translog becomes too large (default is 512MB).

Step-by-Step Data Persistence Process #

Examining the data persistence process step by step: write -> refresh -> flush -> merge

  • Write Process

img

When a new document arrives, it is stored in the in-memory buffer and the transaction log (translog) is recorded. At this point, the data has not reached the segment and the new document cannot be searched. The data can only be searched after it has been refreshed.

  • Refresh Process

img

The refresh process is executed by default every 1 second. Elasticsearch allows the modification of this value by setting the index.refresh_interval parameter. The refresh process includes the following steps:

  1. Documents in the in-memory buffer are written to a new segment, which is stored in the Filesystem Cache. The documents can be searched at this point.
  2. The in-memory buffer is cleared. Note: The translog is not cleared in order to write the segment data to disk.
  3. After the documents are refreshed, the segments are temporarily stored in the Filesystem Cache, which avoids costly IO operations and enables document searching. The default refresh interval is 1 second, but it can be extended to, for example, 5 seconds. Therefore, Elasticsearch is near real-time but not truly real-time.
  • Flush Process

A flush occurs periodically, for example, when the translog becomes larger. It creates a new translog, and a full commit is executed.

img

In the previous process, segments are stored in the Filesystem Cache, which can lead to document loss in the event of unexpected failures. To prevent document loss, the documents need to be written to disk. This process of writing the documents from the file cache to disk is called a flush. After the write, the translog is cleared. The specific process is as follows:

  1. All documents in the memory buffer are written to a new segment.
  2. The buffer is cleared.
  3. A Commit Point is written to disk.
  4. The Filesystem Cache is flushed through fsync.
  5. The old translog is deleted.
  • Merge Process

The automatic refresh process creates a new segment every second, resulting in a large number of segments in a short period. Having too many segments consumes file handles, memory, and CPU cycles. Additionally, each search request needs to check each segment, so more segments lead to slower searches.

To address this issue, Elasticsearch performs background segment merging. Small segments are merged into larger segments, and these larger segments are further merged into even larger segments.

When indexing, the refresh operation creates new segments and opens them for searching. The merge process selects a small number of similarly sized segments and merges them into larger segments in the background. This process does not interrupt indexing and searching.

img

Once the merge is completed, the old segments are deleted:

  1. The new segments are flushed to disk. **Writing a new commit point that includes the new segments and excludes the old and smaller segments.
  2. The new segments are opened for searching.
  3. The old segments are deleted.

img

Merging large segments requires significant I/O and CPU resources. If left unchecked, it can impact search performance. Elasticsearch imposes resource limits on the merge process by default, ensuring that searches still have sufficient resources to perform well.

Deep Dive into the Implementation Mechanism of Indexing Documents in Elasticsearch #

TIP

Optional reading material.

Key Points for Write Operations #

When considering or analyzing the write operations of a distributed system, several aspects need to be taken into account:

  • Reliability: Also known as durability, the data written into the system should not be rolled back or lost.
  • Consistency: After a successful write operation, querying the data should always return the latest version, and old data should not be retrieved.
  • Atomicity: A write or update operation should either fully succeed or fully fail, without allowing intermediate states.
  • Isolation: Multiple write operations should not affect each other.
  • Real-time: Whether the written data can be immediately queried.
  • Performance: How is the write performance and throughput.

As a distributed system, Elasticsearch also needs to meet the above four characteristics during the write process. We will cover these aspects in the subsequent write workflow introduction.

Next, let’s dig into the internal write mechanism of Elasticsearch layer by layer.

Lucene Indexing #

As we all know, Elasticsearch internally utilizes Lucene for indexing and searching purposes. In Lucene, write operations are mainly implemented through the IndexWriter class, which provides three interfaces:

public long addDocument();
public long updateDocuments();
public long deleteDocuments();

These three interfaces can be used to perform single document write, update, and delete operations, including processes such as tokenization, inverted index creation, forward index creation, and other search-related processes. Once a document is written through IndexWriter, it can be searched with IndexSearcher. It seems that the functionality is already well-implemented, but there are still some remaining issues to be addressed:

  • The above operations are performed on a single machine, but we need a distributed solution.
  • Documents written into Lucene are not immediately searchable; they need to be fully indexed into segments. How can we ensure real-time indexing?
  • Lucene’s segments are stored in memory. If a machine crashes or power is lost, the segments in memory will be lost. How can we ensure data reliability?
  • Lucene does not support partial document updates, but this is a strong requirement. How can we support partial updates?

These issues are not solved in Lucene, so Elasticsearch has to address the problems in the Lucene layer.

Now let’s take a look at Elasticsearch’s write mechanism.

Write Mechanism in Elasticsearch #

Elasticsearch adopts the multi-shard approach, dividing data into multiple subsets based on routing rules and providing independent indexing and searching functionalities for each data subset. When writing a document, it is sent to the specific shard for indexing based on the routing rules, achieving distribution.

In addition, Elasticsearch architecture adopts a master-slave approach:

img

Each index consists of multiple shards, with one primary node and multiple replica nodes per shard. The number of replicas can be configured. When writing, the write request is first sent to the shard according to the routing rules. The Index Request can specify which field value to use as the routing parameter. If not specified, the configuration in the Mapping is used. If the mapping does not have the configuration, the _id is used as the routing parameter. Then, the shard is selected based on the Hash value of the routing (in the OperationRouting class), and the primary node responsible for the shard is determined from the cluster’s Meta.

The request is then sent to the primary shard. After successful execution on the primary shard, the request is sent to multiple replica shards from the primary shard. After successful execution on multiple replica shards, the write request is successful and the result is returned to the client.

In this mode, the latency of write operations is equal to latency = Latency(Primary Write) + Max(Replicas Write). As long as there are replicas, the minimum write latency is the sum of the latencies of two single shard writes. The write efficiency may be lower, but the benefits are apparent. It avoids data loss caused by machine or disk failures after writing. Regarding data importance and performance, data is generally given priority unless it is a special scenario that allows data loss.

By using multiple replicas, damage to persisted data caused by machine or disk failures is avoided. However, in Elasticsearch, to reduce disk I/O and ensure read and write performance, Lucene’s segments are only written to disk periodically (e.g., every 5 minutes). For Lucene data that is written into memory but not yet flushed to disk, if a machine crash or power loss occurs, the data in memory will also be lost. How can this be ensured?

To address this issue, Elasticsearch has learned from the way databases handle it by adding a CommitLog module, called TransLog in Elasticsearch.

img

In each shard, the write process is divided into two parts: writing to Lucene first and then to TransLog.

When a write request arrives at the shard, it is first written to Lucene, creating the index. At this point, the index is still in memory. Then, it goes on to write to TransLog. After writing to TransLog, TransLog data is flushed to disk. Once the write to disk is successful, the request is returned to the user. Here are a few key points:

  • The first difference between Elasticsearch and a database is that the former writes to memory first and then writes to the transaction log, while the latter writes to the commit log first and then to memory. One possible reason for this is that writing to Lucene’s memory has complex logic, including tokenization and checking for field length limitations, which can easily fail. To avoid invalid records in the transaction log and to reduce recovery complexity and improve speed, Lucene is written first.
  • After writing to Lucene’s memory, the data is not searchable yet. It needs to be refreshed to convert the memory objects into complete segments and then reopened before it can be searched. This refresh usually takes about 1 second, so documents written to Elasticsearch can be searched in a near real-time (NRT) manner.
  • When Elasticsearch is used as a NoSQL database and a GetById query is performed, the query can be directly executed from the transaction log, making it a real-time (RT) system.
  • Periodically, for example every 30 minutes, Lucene flushes the newly generated segments from memory to disk. Once the index files are persisted, the old transaction log becomes useless and is cleared.

The above describes two key modules in Elasticsearch for writing data: Replica and Transaction Log. Next, let’s take a look at the update process:

img

Since Lucene does not support partial field updates, Elasticsearch needs to implement this functionality. The specific process is as follows:

  • When an update request is received, the complete document with the same ID is read from the segment or transaction log, and the version number is recorded as V1.
  • The full document with version V1 and the partial document in the request are merged into a complete document, and the VersionMap in memory is updated. Once the complete document is obtained, the update request becomes an index request. The operation is locked.
  • The maximum version number V2 for the ID is then read from the VersionMap. If there is no version number in the VersionMap, it is retrieved from the segment or transaction log. In most cases, it will be obtained from the VersionMap.
  • The versions are checked for conflicts (V1 == V2). If there is a conflict, the process goes back to the “Update doc” stage and is re-executed. If there is no conflict, the latest add request is executed.
  • In the “Index Doc” stage, the version is incremented by 1 to obtain V3. The document is then added to Lucene. In Lucene, the existing document with the same ID is first deleted, and then the new document is added. After successfully writing to Lucene, the current V3 is updated in the VersionMap.
  • The lock is released, and the partial update process is complete.

After introducing the partial update process, you should now have a preliminary understanding of Elasticsearch’s writing architecture as a whole. Next, let’s analyze the detailed steps of writing in more detail.

Elasticsearch Write Request Types #

The write request types in Elasticsearch mainly include: Index (Create), Update, Delete, and Bulk. Among them, the first three are single-document operations, and the last Bulk is a multi-document operation, which can include Index (Create), Update, and Delete.

In versions 6.0.0 and later, the implementation of the first three single-document operations is almost identical to that of the Bulk operation, and some are even implemented by calling the Bulk interface. It is estimated that in the next few versions, Index (Create), Update, and Delete will all be treated as specializations of Bulk operations. This will make the code and logic clearer.

Next, let’s take the Bulk request as an example to introduce the write process.

img

  • Red: Client Node.
  • Green: Primary Node.
  • Blue: Replica Node.

Client Node #

The client node also includes the previously mentioned Parse Request, which we will not go into detail here. Let’s take a look at the other parts.

  1. Ingest Pipeline

In this step, the original document can be processed, such as HTML parsing or custom processing. The specific processing logic can be implemented through plugins. In Elasticsearch, because Ingest Pipeline can be CPU-intensive and resource-consuming, a dedicated Ingest Node can be set up to handle Ingest Pipeline logic.

If the current node cannot execute the Ingest Pipeline, the request will be sent to another node that can execute it.

  1. Auto Create Index

Check if the current index exists. If it does not exist, auto-creation of the index is needed, which requires interaction with the master. Auto-creation of an index can also be disabled by configuration.

  1. Set Routing

Set the routing condition. If the request specifies a routing condition, use the routing condition from the request. Otherwise, use the routing condition configured in the mapping. If there is no routing configured in the mapping, use the default _id field value.

In this step, if no ID field is specified, a unique _id field will be automatically generated. Currently, UUID is being used.

  1. Construct BulkShardRequest

Since the Bulk request includes multiple (Index/Update/Delete) requests, these requests may be executed on multiple shards depending on the routing. In this step, single write requests are extracted and gathered together for the same shard to construct a BulkShardRequest. Each BulkShardRequest corresponds to one shard.

  1. Send Request To Primary

This step sends each BulkShardRequest to the appropriate Primary Node.

Primary Node #

The entry point for the Primary request is in PrimaryOperationTransportHandler’s messageReceived. Let’s take a look at the related logical flow.

  1. Index or Update or Delete

Loop through each Single Write Request and, for each request, choose a different processing logic based on the operation type (CREATE/INDEX/UPDATE/DELETE).

Among them, Create/Index directly adds a new Doc, Delete directly deletes a Doc based on _id, and Update is a bit more complex. We will use Update as an example to explain below.

  1. Translate Update To Index or Delete

This step is specific to the Update operation. Here, the Update request is converted into an Index or Delete request. First, use the GetRequest to query the existing Doc with the same _id (if any), including the complete fields and values (dependent on the _source field). Then, merge it with the Doc in the request. At the same time, the read Doc version number is obtained and recorded as V1.

  1. Parse Doc

Here, the fields in the Doc are parsed. A ParsedDocument object is generated, and a uid Term is also generated. In Elasticsearch, _uid = type # _id. For users, _id is visible, while Elasticsearch stores _uid. The ParsedDocument generated in this section also contains Elasticsearch’s system fields, most of which are filled based on the current content, while unknown fields will continue to be filled in ParsedDocument later.

  1. Update Mapping

Elasticsearch has an automatic mapping update feature, which takes effect in this step. It will first select new Fields that are not included in the Mapping, and then determine whether to allow automatic mapping updates. If allowed, the Mapping will be updated.

  1. Get Sequence Id and Version

Since it is a Primary Shard, it will obtain a sequenceID and Version from the SequenceNumber Service. The SequenceID increases by 1 at the Shard level each time, and the SequenceID is used to initialize the LocalCheckpoint after the Doc is successfully written. The Version is increased by 1 based on the maximum Version of the current Doc.

  1. Add Doc To Lucene

At the beginning of this step, a specific _uid lock is acquired. Then, it is checked whether the Version corresponding to that _uid is equal to the Version obtained in the earlier “Translate Update To Index or Delete” step. If they are not equal, it means that the Doc has changed after reading the Doc just now, resulting in a version conflict. At this time, a VersionConflict exception is thrown, which is caught at the beginning of the Primary Node, and execution resumes from the “Translate Update To Index or Delete” step.

If the Versions are equal, the execution continues. If a Doc with the same _id already exists, the Lucene’s UpdateDocument(uid, doc) interface is called, first deleting the Doc based on the uid, and then indexing the new Doc. If it is the first write, the Lucene’s AddDocument interface is called directly to complete the indexing of the Doc. The atomicity of the Delete-Then-Add operation and how to avoid being Refreshed in an intermediate state is an issue in this step. The answer is that before starting the Delete, a Refresh Lock is added to disable Refresh. Only after the Refresh Lock is released after the Add is completed can Refresh be performed. This ensures the atomicity of Delete-Then-Add.

The Lucene’s UpdateDocument interface only handles multiple Fields. It processes each Field one by one, in the order of invert index, store field, doc values, and point dimension. There will be a separate article to specifically introduce writing in Lucene.

  1. Write Translog

After writing the Lucene Segment, the TransLog is written in the form of key-value pairs, where the Key is _id and the Value is the Doc content. When querying, if the request is GetDocByID, it can directly read from the TransLog based on _id, meeting the timeliness requirements of a NoSQL scenario.

It should be noted that this is only writing to the TransLog in memory. The logic of whether to Sync to disk is still in the next steps.

At the end of this step, the current SequenceID is marked as successfully executed, then the LocalCheckPoint of the current Shard is updated.

  1. Renew Bulk Request

Here, the Bulk Request is reconstructed. The reason is that the previous UpdateRequest has been translated into Index or Delete requests, so all replicas only need to execute Index or Delete requests, and no longer need to execute the Update logic. This ensures that the logic in the replica is simpler and performs better, and also guarantees the same execution result of the same request in the Primary and replica.

  1. Flush Translog

Here, different execution methods are chosen based on the TransLog strategy. Either flush to disk immediately or wait until later. The higher the flush frequency, the higher the reliability and the greater the impact on write performance.

  1. Send Requests To Replicas

Here, the newly constructed Bulk Request is sent in parallel to multiple replicas, and then it waits for the replicas to return. The primary node only returns to the user after waiting for all replicas to return (some successful, some may fail). If a replica fails, the primary node sends a Remove Shard request to the master, requesting the removal of that replica shard from the available nodes.

Here, the Sequence ID, Primary Term, Global CheckPoint, etc. are also passed to the replicas.

The request sent to the replica has an Action Name that is the original Action Name plus [R], where R represents Replica. Different [R] values can be used to find the handler that processes replica requests.

  1. Receive Response From Replicas

Once all the requests in the replica are processed, the Local CheckPoint of the primary node is updated.

Replica Node #

The entry point for replica requests is in the ReplicaOperationTransportHandler’s messageReceived method. Let’s take a look at the relevant logical flow.

  1. Index or Delete

Different execution logic is chosen based on whether the request type is Index or Delete. There is no Update here because it has already been converted to an Index or Delete request in the primary node.

  1. Parse Doc
  2. Update Mapping

These are consistent with the logic in the primary node.

  1. Get Sequence Id and Version

In the primary node, a Sequence ID and Version are generated and added to the ReplicaRequest, here, they are retrieved from the request.

  1. Add Doc To Lucene

Because some Update requests have already been converted to Index or Delete requests in the primary node, here, only Index and Delete requests need to be processed, and no longer need to handle Update requests. This makes it simpler than the primary node.

  1. Write Translog
  2. Flush Translog

These are consistent with the logic in the primary node.

Finally #

The above detailed the writing process of Elasticsearch and the working mechanism of each process. Here, let’s summarize the six characteristics of distributed systems that were previously mentioned:

  • Reliability: Due to the design of Lucene, which does not consider reliability, Elasticsearch guarantees data reliability through the Replica and TransLog mechanisms.
  • Consistency: The Flush lock in Lucene only ensures that Flush does not occur between the Delete and Add in the Update interface. However, after Add is completed, Flush may still occur immediately, resulting in a readable segment. This makes it impossible to ensure that the primary and all other replicas can Flush at the same time, resulting in unstable queries. This can only achieve eventual consistency.
  • Atomicity: Both Add and Delete directly call Lucene’s interface and are atomic. When updating partially, Version and locks are used to ensure atomicity.
  • Isolation: Version and local locks are still used to ensure that updates are specific to a certain version of the data.
  • Real-time: Regularly refreshing segments into memory and reopening segments ensure that searches can be found within a short time (e.g., 1 second). Unflushed data is recorded in the TransLog to ensure real-time access to uncommitted data via ID.
  • Performance: Performance is a systematic engineering task, and the impact on performance should be considered in all aspects. In Elasticsearch, many design considerations have been made for performance. First, it is not necessary for all replicas to return before returning to the user; only a specific number needs to be returned. Second, the generated segments serve in memory and are flushed to disk after a period of time. The reliability of the segment in memory during this time is guaranteed by the TransLog. Third, the TransLog can be configured to flush periodically, but this will hurt reliability. Fourth, each thread holds a segment, and multiple threads do not affect each other, making them independent and more efficient. Fifth, the system’s write process is heavily dependent on versions and has a high frequency of reads, so the versionMap is used to reduce the multiple disk IO overhead of hot data. Lucene has made many optimizations for performance.

Reference Documents #