12 Detailed Explanation of Aggregation Pipline Aggregations

12 Detailed Explanation of Aggregation - Pipline Aggregations #

在Elasticsearch中,聚合(Aggregations)指的是从索引中提取汇总数据的操作。Pipline聚合(Pipeline Aggregation)是一种特殊的聚合类型,可以在其他聚合的结果基础上进行进一步处理和计算。

Pipline聚合通常用于在多个级别上进行聚合,并且可以根据结果进行一系列的操作。与其他类型的聚合不同,Pipline聚合能够访问其他聚合的结果,这使得它在复杂的数据分析和处理中非常有用。

使用Pipline聚合时,需要使用一系列的聚合操作来定义计算过程。这些聚合操作按照顺序进行处理,并且每个操作都可以使用前一个操作的结果作为输入。

以下是一些常见的Pipline聚合操作:

  1. Bucket Selector(桶选择器):根据条件选择特定的桶。
  2. Serial Differencing(序列差分):计算连续值之间的差异。
  3. Moving Average(移动平均):计算指定窗口大小内的平均值。
  4. Derivative(导数):计算指定字段的导数。
  5. Bucket Script(桶脚本):使用指定的脚本对桶进行运算。

这些Pipline聚合操作可以根据具体需求和数据特点进行自由组合。使用Pipline聚合可以进一步挖掘和分析数据,从而得到更有价值的信息。

总的来说,Pipline聚合是Elasticsearch中一种强大的功能,可以对聚合结果进行多层级的处理和计算。通过合理运用Pipline聚合,可以更好地理解和利用索引中的数据。

Understanding Pipeline Aggregation #

To understand pipeline aggregation, it is important to approach this feature from the perspective of its purpose: allowing the aggregation results from the previous step to become the input for the next step. This is the essence of a pipeline.

Common Scenarios for Pipeline Mechanism #

First, let’s review the common scenarios in which the pipeline mechanism is used in Tomcat’s pipeline design.

Chain of Responsibility Pattern #

The pipeline mechanism belongs to the Chain of Responsibility pattern in design patterns. If you don’t understand this, please refer to the following article:

Chain of Responsibility Pattern: With the Chain of Responsibility pattern, you can create a chain of objects for a request. Each object checks the request in sequence and handles it or passes it to the next object in the chain.

FilterChain #

In software development, a commonly encountered application of the Chain of Responsibility pattern is the FilterChain, which is reflected in many software designs:

  • For example, in the Spring Security framework,

img

  • For example, in the filters used in HttpServletRequest processing,

When a request comes in, it needs to go through a series of processing steps. By using the Chain of Responsibility pattern, each processing component can be modularized, reducing coupling. It can also be used to find the appropriate processing method for a request. When a processing method is not suitable for the request, it is passed on to the next processing method, which then attempts to process the request.

I found a diagram online, and in the following sections, we will explain it to you using Tomcat’s request processing.

img

Designing Pipeline Mechanism in Elasticsearch #

To put it simply: the pipeline mechanism allows the aggregation results from the previous step to become the input for the next step.

Next, we need interfaces to support different types of aggregations, such as:

img

The first dimension: pipeline aggregations have many types, and each type calculates different information from other aggregations. However, these types can be divided into two categories:

  • Parent - The output of a parent aggregation provides a set of pipeline aggregations that can calculate new buckets or new aggregations to add to existing buckets.
  • Sibling - The output of a sibling aggregation provides pipeline aggregations that can calculate new aggregations at the same level as the sibling aggregation.

The second dimension: based on the intent of functional design

For example, a preceding aggregation might be a bucket aggregation, and a subsequent aggregation might be a metric aggregation, which can be considered as a type of pipeline.

This leads to the concept of xxx bucket (now it should be easier to understand, @pdai).

  • Bucket Aggregation -> Metric Aggregation

This means that the result of the bucket aggregation becomes the input for the metric aggregation in the next step.

  • Average bucket
  • Min bucket
  • Max bucket
  • Sum bucket
  • Stats bucket
  • Extended stats bucket

For the purpose of understanding the overall system, it is sufficient to understand the above types. The other types are just icing on the cake.

Examples #

Here let’s take a look at a few simple examples, for more details please refer to the documentation. @pdai

Average bucket aggregation #

POST _search
{
  "size": 0,
  "aggs": {
    "sales_per_month": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"
      },
      "aggs": {
        "sales": {
          "sum": {
            "field": "price"
          }
        }
      }
    },
    "avg_monthly_sales": {
// tag::avg-bucket-agg-syntax[]               
      "avg_bucket": {
        "buckets_path": "sales_per_month>sales",
        "gap_policy": "skip",
        "format": "#,##0.00;(#,##0.00)"
      }
// end::avg-bucket-agg-syntax[]               
    }
  }
}
  • Nested bucket aggregation: Aggregating a histogram of prices per month
  • Metric aggregation: Calculating the average of the above aggregation.

Field Types :

  • buckets_path: Specifies the name of the aggregation, supports nested aggregations.
  • gap_policy: Defines what to do when a missing value is encountered in the pipeline aggregation, similar to the (missing) strategy used in term aggregations. The available options are: skip, insert_zeros.
  • skip: This option treats missing data as if the bucket does not exist. It skips the bucket and continues the calculation with the next available value.
  • format: Used to format the output (key) of the aggregation bucket.

The output is as follows:

{
  "took": 11,
  "timed_out": false,
  "_shards": ...,
  "hits": ...,
  "aggregations": {
    "sales_per_month": {
      "buckets": [
        {
          "key_as_string": "2015/01/01 00:00:00",
          "key": 1420070400000,
          "doc_count": 3,
          "sales": {
            "value": 550.0
          }
        },
        {
          "key_as_string": "2015/02/01 00:00:00",
          "key": 1422748800000,
          "doc_count": 2,
          "sales": {
            "value": 60.0
          }
        },
        {
          "key_as_string": "2015/03/01 00:00:00",
          "key": 1425168000000,
          "doc_count": 2,
          "sales": {
            "value": 375.0
          }
        }
      ]
    },
    "avg_monthly_sales": {
      "value": 328.33333333333333,
      "value_as_string": "328.33"
    }
  }
}

Stats bucket aggregation #

Further understanding of stat bucket is easy as well

POST /sales/_search
{
  "size": 0,
  "aggs": {
    "sales_per_month": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"
      },
      "aggs": {
        "sales": {
          "sum": {
            "field": "price"
          }
        }
      }
    },
    "stats_monthly_sales": {
      "stats_bucket": {
        "buckets_path": "sales_per_month>sales" 
      }
    }
  }
}

Returns:

{
   "took": 11,
   "timed_out": false,
   "_shards": ...,
   "hits": ...,
   "aggregations": {
      "sales_per_month": {
         "buckets": [
            {
               "key_as_string": "2015/01/01 00:00:00",
               "key": 1420070400000,
               "doc_count": 3,
               "sales": {
                  "value": 550.0
               }
            },
            {
               "key_as_string": "2015/02/01 00:00:00",
               "key": 1422748800000,
               "doc_count": 2,
               "sales": {
                  "value": 60.0
               }
            },
            {
               "key_as_string": "2015/03/01 00:00:00",
               "key": 1425168000000,
               "doc_count": 2,
               "sales": {
                  "value": 375.0
               }
            }
         ]
      },
      "stats_monthly_sales": {
         "count": 3,
         "min": 60.0,
         "max": 550.0,
         "avg": 328.3333333333333,
         "sum": 985.0
      }
   }
}

Reference Article #

[https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-pipeline.html]