AWS Big Data Blog

Normalize data with Amazon Elasticsearch Service ingest pipelines

September 8, 2021: Amazon Elasticsearch Service has been renamed to Amazon OpenSearch Service. See details.


Amazon OpenSearch Service is a fully managed service that makes it easy for you to deploy, secure, and run Elasticsearch cost-effectively at scale. Search and log analytics are the two most popular use cases for Amazon OpenSearch Service. In log analytics at scale, a common pattern is to create indexes from multiple sources. In these use cases, how can you ensure that all the incoming data follows a specific, predefined format if it’s operationally not feasible to apply checks in each data source? You can use Elasticsearch ingest pipelines to normalize all the incoming data and create indexes with the predefined format.

What’s an ingest pipeline?

An ingest pipeline lets you use some of your Amazon OpenSearch Service domain processing power to apply to a set of processors during indexing. Ingest pipeline applies processors in order, the output of one processor moving to the next processor in the pipe. You define a pipeline with the Elasticsearch _ingest API. The following screenshot illustrates this architecture.

To find the available ingest processors in your Amazon OpenSearch Service domain, enter the following code:

GET _ingest/pipeline/

Solution overview

In this post, we discuss three log analytics use cases where data normalization is a common technique.

We create three pipelines and normalize the data for each use case. The following diagram illustrates this architecture.

Use case 1

In this first use case, Amazon ES domain has three sources: logstash, Fluentd, and AWS Lambda. Your logstash source sends the data to an index with the name index-YYYY.MM.DD.HH (hours in the end). When you have an error in the Fluentd source, it creates the index named index-YYYY.MM.DD (missing the hours). Your domain creates indexes for both the formats, which is not what you intended.

One way to correct the index name is to calculate the hours of the ingested data and assign the value to the index. If you can’t identify any pattern, or identify further issues to the indexing name, you need to segregate the data to a different index (for example, format_error) for further analysis.

Use case 2

If your application uses time-series data and analyzes data from fixed time windows, your data sources can sometimes send data from a prior time window. In this use case, you need to check for the incoming data and discard data that doesn’t fit in the current time window.

Use case 3

In some use cases, the value for a key can contain large strings with common prefixes. End-users typically use wild card characters (*) with the prefix to search on these fields. If your application or Kibana dashboards contain several wild card queries, it can increase CPU utilization and overall search lateness. You can address this by identifying the prefixes from the values and creating a new field with the data type as a keyword. You can use Term queries for the keywords and improve search performance.

Pipeline 1: pipeline_normalize_index

The default pipeline for incoming data is pipeline_normalize_index. This pipeline performs the following actions:

  • Checks if the incoming data belongs to the current date.
  • Checks if the data has any errors in the index name.
  • Segregates the data:
    • If it doesn’t find any errors, it pushes the data to pipeline_normalize_data.
    • If it finds errors, it pushes the pipeline to pipeline_fix_index.

Checking the index date

In this step, you can create an index pipeline using a script processor, which lets you create a script and execute within the pipeline.

Use the Set processor to add _ingest.timestamp to doc_received_date and compare the index date to the document received date. The script processor lets you create a script using painless scripts. You can create a script to check if the index date matches the doc_received_date. The script processor let you access the ingest document using the ctx variable. See the following code:

       "set":{
            "field":"doc_received_date",
            "value":"{{_ingest.timestamp}}"
         }
      },
      {
         "script":{
            "lang":"painless",
            "source": """
                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd");
                    String dateandhour = ctx._index.substring(ctx._index.indexOf('-') + 1);
                    LocalDate indexdate = LocalDate.parse(dateandhour.substring(0, 10), formatter);
                    ZonedDateTime zonedDateTime = ZonedDateTime.parse(ctx.doc_received_date, DateTimeFormatter.ISO_DATE_TIME);
                    LocalDate doc_received_date = zonedDateTime.toLocalDate();
                    if (doc_received_date.isEqual(indexdate)) {
                        ctx.index_purge = "N";
                    } else {
                        ctx.index_purge = "Y";
                    }
                    if (dateandhour.length() > 10) {
                        ctx.indexformat_error = "N";
                    } else {
                        ctx.indexformat_error = "Y";
                    }
        """,
Python

Checking for index name errors

You can use the same script processor from the previous step to check if the index name matches the format index-YYYY.MM.DD.HH or index-YYYY.MM.DD. See the following code:

if (dateandhour.length() > 10) {
                        ctx.indexformat_error = "N";
                    } else {
                        ctx.indexformat_error = "Y";
                    }
JavaScript

Segregating the data

If the index date doesn’t match the _ingest.timestamp, you can drop the request using the drop processor. If the index name doesn’t match the format index-YYYY.MM.DD, you can segregate the data to pipeline pipeline_verify_index_date and proceed to the pipeline pipeline_normalize_data. If conditions aren’t met, you can proceed to the pipeline pipeline_indexformat_errors or assign a default index indexing_errors. If no are issues found, you proceed to the pipeline pipeline_normalize_data. See the following code:

 "pipeline":{
            "if":"ctx.indexformat_error == 'Y'",
            "name":"pipeline_fix_index_name"
         }
      },
      {
         "remove":{
            "field":[
               "doc_received_date",
               "index_purge",
               "indexformat_error"
            ],
            "ignore_missing":true
         }
      },
      {
         "pipeline":{
            "name":"pipeline_normalize_data"
Python

The following code is an example pipeline:

PUT _ingest/pipeline/pipeline_normalize_index
{
   "description":"pipeline_normalize_index",
   "processors":[
      {
         "set":{
            "field":"doc_received_date",
            "value":"{{_ingest.timestamp}}"
         }
      },
      {
         "script":{
            "lang":"painless",
            "source": """
                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd");
                    String dateandhour = ctx._index.substring(ctx._index.indexOf('-') + 1);
                    LocalDate indexdate = LocalDate.parse(dateandhour.substring(0, 10), formatter);
                    ZonedDateTime zonedDateTime = ZonedDateTime.parse(ctx.doc_received_date, DateTimeFormatter.ISO_DATE_TIME);
                    LocalDate doc_received_date = zonedDateTime.toLocalDate();
                    if (doc_received_date.isEqual(indexdate)) {
                        ctx.index_purge = "N";
                    } else {
                        ctx.index_purge = "Y";
                    }
                    if (dateandhour.length() > 10) {
                        ctx.indexformat_error = "N";
                    } else {
                        ctx.indexformat_error = "Y";
                    }
        """,
         "on_failure":[
            {
               "set":{
                  "field":"Amazon_es_PipelineError",
                  "value":"at Script processor - Purge older Index or Index date error"
               }
            },
            {
               "set":{
                  "field":"_index",
                  "value":"indexing_errors"
               }
            }
         ]
      },
      {
         "drop":{
            "if":"ctx.index_purge == 'Y'"
         }
      },
      {
         "pipeline":{
            "if":"ctx.indexformat_error == 'Y'",
            "name":"pipeline_fix_index_name"
         }
      },
      {
         "remove":{
            "field":[
               "doc_received_date",
               "index_purge",
               "indexformat_error"
            ],
            "ignore_missing":true
         }
      },
      {
         "pipeline":{
            "name":"pipeline_normalize_data"
         }
      }
   ]
}
Python

Pipeline 2: pipeline_normalize_data

The pipeline pipeline_normalize_data fixes index data. It extracts the prefix from the defined field and creates a new field. You can use the new field for Term queries.

In this step, you can use a grok processor to extract prefixes from the existing fields and create a new field that you can use for term queries. The output of this pipeline creates the index. See the following code of an example pipeline:

PUT _ingest/pipeline/pipeline_normalize_data
{
  "description":"pipeline_normalize_data",
  "version":1,
  "processors":[
     {
        "grok":{
           "field":"application_type",
           "patterns":[
              "%{WORD:application_group}"
           ],
           "ignore_missing":true,
           "on_failure":[
              {
                 "set":{
                    "field":"Amazon_es_PipelineError",
                    "value":"application_type error"
                 }
              }
           ]
        }
     }
  ]
}
Python

Pipeline 3: pipeline_fix_index

This pipeline fixes the index name. The indexing errors identified in pipeline_normalize_Index are the incoming data points for this pipeline. pipeline_fix_index extracts the hours from the _ingest.timestamp and appends it to the index name.

The index name errors identified from Pipeline 1 are the data source for this pipeline. You can use the script processor to write a painless script. The script extracts hours (HH) from the _ingest.timestamp and appends it to the _index. See the following code of the example pipeline:

PUT _ingest/pipeline/pipeline_fix_index_name
    {
      "description":"pipeline_fix_index_name",
      "processors":[
         {
            "set":{
               "field":"doc_received_date",
               "value":"{{_ingest.timestamp}}"
            }
         },
         {
            "script":{
               "lang":"painless",
               "source": 
               """
               ZonedDateTime zonedDateTime = ZonedDateTime.parse(ctx.doc_received_date, DateTimeFormatter.ISO_DATE_TIME);
               LocalDate doc_received_date = zonedDateTime.toLocalDate();
               String receiveddatehour = zonedDateTime.getHour().toString();
               if (zonedDateTime.getHour() < 10) {
                    receiveddatehour = "0" + zonedDateTime.getHour();
               }
               ctx._index = ctx._index + "." + receiveddatehour;
               """,
               "on_failure":[
                  {
                     "set":{
                        "field":"Amazon_es_PipelineError",
                        "value":"at Script processor - Purge older Index or Index date error"
                     }
                  },
                  {
                     "set":{
                        "field":"_index",
                        "value":"indexformat_errors"
                     }
                  }
               ]
            }
         },
         {
            "remove":{
               "field":[
                  "doc_received_date"
               ],
               "ignore_missing":true
            }
         },
         {
            "pipeline":{
               "name":"pipeline_normalize_data"
            }
         }
      ]
   }
Python

Adding the default pipeline to the index template

After creating all the pipelines, add the default pipeline to the index template. See the following code:

"default_pipeline" : "pipeline_normalize_index"

Summary

You can normalize data, fix indexing errors, and segregate operation data and anomalies by using ingest pipelines. Although you can use one pipeline with several processors (depending on the use case), indexing pipelines provides an efficient way to utilize compute resources and operational resources by eliminating unwanted indexes.


About the Author

Vijay Injam is a Data Architect with Amazon Web Services.

Kevin Fallis is an AWS specialist search solutions architect. His passion at AWS is to help customers leverage the correct mix of AWS services to achieve success for their business goals. His after-work activities include family, DIY projects, carpentry, playing drums, and all things music.