Elasticsearch Ingest Pipeline 101: Usage & Setup Made Easy

on API, Data Ingestion, Data Pipeline, Elasticsearch, ETL • April 19th, 2022 • Write for Hevo

ElasticSearch Ingest Pipeline - Featured Image

Extracting data from data sources has always been a challenge for organizations worldwide. One of the popular free and open-source tools for creating Ingestion pipelines is ElasticSearch. It acts as a search and analytics engine that supports several data types such as textual, numerical, geospatial, structured, and unstructured. 

ElasticSearch Ingestion Pipelines allow you to manipulate data by applying basic transformations before adding a searchable reference to the document in the cluster’s index. After the indexing is done, you can simply search and get the document using the ElasticSearch API.

In this article, you will learn how to effectively set up, test, & manage your ElasticSearch Ingest Pipelines.

Table of Contents

What is ElasticSearch?

ElasticSearch Ingest Pipeline - Elastic Logo
Image Source

Introduced in 2010, ElasticSearch(also known as Elastic) is a distributed modern search and analytics engine that can easily work with various types of data such as textual, numerical, geospatial, structured, and unstructured. ElasticSearch is an integral part of the ELK Stack(Elasticsearch, Logstash, and Kibana). Via the process of Data Ingestion in Logstash, raw data from logs, system metrics, and web applications are parsed, normalized, and enriched before it is indexed in Elasticsearch. You can also send data to ElasticSearch in the form of JSON documents using an API. After the indexing is done, you can query your data and use aggregations to get complex summaries of your data. Finally, using Kibana you can create powerful visualizations.

Under the SSPL or the Elastic License, ElasticSearch offers a lot of open-source features that are free to use. You can also opt for paid subscriptions to get additional advanced features. ElasticSearch has been popularly used for Application search, Website search, Enterprise search, log analytics, Geospatial data analysis, Security Analytics & Business analytics.

Ingest ElasticSearch Data in Minutes Using Hevo’s No-Code Data Pipeline

Hevo Data, a Fully-managed No-Code Data Pipeline, can help you automate, simplify & enrich your data ingestion and integration process in a few clicks. With Hevo’s out-of-the-box connectors and blazing-fast Data Pipelines, you can ingest data in real-time from 100+ Data Sources(including 40+ free data sources) like ElasticSearch and load it straight into your Data Warehouse, Database, or any destination. Adding to its flexibility, Hevo provides several Data Ingestion Modes such as Change Tracking, Table, Binary Logging, Custom SQL, Oplog, etc. To further streamline and prepare your data for analysis, you can process and enrich Raw Granular Data using Hevo’s robust & built-in Transformation Layer without writing a single line of code!”

Get Started with Hevo for Free

Hevo is the fastest, easiest, and most reliable data replication platform that will save your engineering bandwidth and time multifold. Try our 14-day full access free trial today to experience an entirely automated hassle-free Data Replication!

Key Features of ElasticSearch

ElasticSearch has the following salient features:

  • Easy-to-use: Elasticsearch provides a simple REST-based API and a simple HTTP interface and uses schema-free JSON documents, so you can easily start and build applications for different use cases. Simplifying the application development, it also provides support for a wide range of programming languages such as Java, Python, PHP, JavaScript, Node.js, and Ruby.
  • High performance: Elasticsearch is essentially distributed. Documents stored in Elasticsearch are assigned to various containers called shards. Shards are replicated to ensure additional copies of the data are present in the event of a hardware failure. Elasticsearch’s distributed format allows you to scale to hundreds (or thousands) of servers to process large amounts of data in parallel and quickly find the optimal matches for your query.
  • Free Tools and Plugins: Part of the ELK stack, Elasticsearch is connected with Kibana, a popular visualization, and reporting tool. Kibana provides real-time visualization of your ElasticSearch data via a user interface for quick access to application performance monitoring (APM), logs, and infrastructure metric data.  It also provides integration with Beats and Logstash to easily transform source data for loading into  Elasticsearch clusters. You can also add rich functionality to your application using several open-source Elasticsearch plugins, such as language analyzers and suggestion engines. 
  • Near Real-Time Operation: With a short wait time of around 1 second for reading and writing tasks, ElasticSearch is ideal for time-sensitive cases such as application monitoring, anomaly detection, security analysis, and infrastructure monitoring.
  • Abundant Features: Elasticsearch has many advanced built-in features that make storing and retrieving data highly efficient, including data rollup and index lifecycle management. 

What is Ingest Pipelines?

The ingestion pipeline allows you to perform common transformations on your data before indexing(adding a searchable reference to the document) it. For instance, you can use an Elasticsearch pipeline for dropping fields, getting values ​​from text, and enriching your data. The ElasticSearch Ingest pipeline contains a set of configurable jobs called processors. Each processor runs in turn, making certain modifications to the incoming document. After the processor runs, Elasticsearch places the converted document in the data stream or index. You can use Kibana’s Ingest Pipelines feature or the Ingest API to create and manage ElasticSearch Ingest pipelines. Elasticsearch saves the pipeline in a cluster state.

What are Ingest Pipelines used for?

Want to make some tweaks to your data, but want to use an easier method than Logstash or another data analysis tool? Then, ElasticSearch Ingest Pipelines are a good alternative for you. These ElasticSearch Ingest Pipelines allow you to tailor your data to your needs with little overhead. The ElasticSearch Ingest pipeline resides in the Elasticsearch node (or the ingestion node, if defined) and makes a series of changes to the defined data.  

To understand the usage of the ElasticSearch Ingest Pipelines, you can consider the following data structure:

{
  "id": 5,
  "first_name": "Karl",
  "last_name": "Max",
  "email": "kmax@sample.com",
  "ip_address": "100.1.193.2",
  "activities": "Cricket, Dancing, Reading"
}, 
{
  "id": 6,
  "first_name": "Jim",
  "last_name": "Sanders",
  "email": "jsanders@example.com",
  "ip_address": "122.100.4.22",
  "activities": "Driving, Running, Cycling"
}

Considering the above 2 data structures, you can use the ElasticSearch Ingest Pipelines for the following operations:

  • Rename fields: Changing “first_name” to “firstName”.
  • Remove fields: Removing the field `email`.
  • Split fields: You can turn a value into an array using a separator rather than a string. For example, turn `activities` from `“Cricket, Dancing, Reading”` into [ “Cricket”,“Dancing”,” Reading”]
  • GeoIP lookup on a field.
  • Running a script for more flexibility: Merging two fields and splitting into another, or encoding sensitive data.
  • Convert fields: Modifying a field’s type from a string to an integer.
  • Enrich documents: You can perform a lookup to append extra information to each event like “More information below”.

You can also use the ElasticSearch Ingest Pipelines when importing data in its initial raw form as shown below:

2022-04-20T18:17:11.003Z new.sample.com There seems to be a problem

2021-05-11T18:17:11.003Z,new.sample.com, There seems to be a problem

To manipulate this data you can perform the following operations:

  • Parsing out fields with grok or dissect:  Saving “2022-04-20…” in a field called “date”; “new.sample.com” in a field called “origin”; and “There seems to be a problem” in “raw_message”
  • Parsing out a csv into fields: For instance, in the second sample, use a comma as a separator and name the first value “date”; the second “origin”; the third “raw_message”

To know more about other operations, check out the Official ElasticSearch Comprehensive list of Ingest Processors

What Makes Hevo’s Data Ingestion Process Unique

Ingesting, aggregating & loading data can be a mammoth task without the right set of tools. Hevo’s automated platform empowers you with everything you need to have a smooth Data Collection, Processing, and Aggregation experience. Our platform has the following in store for you!

  • Fully Managed: Hevo requires no management and maintenance as it is a fully automated platform.
  • Data Transformation: Hevo provides a simple interface to perfect, modify, and enrich the data you want to transfer.
  • Faster Insight Generation: Hevo offers near real-time data replication so you have access to real-time insight generation and faster decision making. 
  • Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
  • Scalable Infrastructure: Hevo has in-built integrations for 100+ sources (with 40+ free sources) that can help you scale your data infrastructure as required.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
  • Incremental Data Load: Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
Sign up here for a 14-Day Free Trial!

Understanding Ingest Pipeline Operations

ElasticSearch Ingest Pipeline - Internal Working
Image Source

To get started with the ElasticSearch Ingest Pipelines, you can go through the following aspects:

Creating and Managing Pipelines

To create an ElasticSearch Ingest Pipeline you can, choose from the following 2 methods:

Kibana’s Graphical User Interface

Follow the simple steps to build an ElasticSearch Ingest Pipeline via a user-friendly interface:

  • Step 1: Go to Kibana and open up the main menu.
  • Step 2: Navigate to Stack Management > Ingest Pipelines. You will be able to see a list view that allows you to:
    • View a list of your ElasticSearch Ingest pipelines and drill down into details
    • Edit or clone existing ElasticSearch Ingest pipelines
    • Delete ElasticSearch Ingest pipelines
  • Step 3: To create your new ElasticSearch Ingest Pipeline, go to Create pipeline > New pipeline.
ElasticSearch Ingest Pipeline - Create Pipeline Button
Image Source
  • Step 4: Enter the name and suitable description for the ElasticSearch Ingest pipeline. For this article, an example is considered to parse server logs in the Common Log Format before indexing. The sample logs look as follows:
212.87.37.154 - - [30/May/2099:16:21:15 +0000] "GET /favicon.ico HTTP/1.1"
200 3638 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36"
  • Step 5: Click on the Add Processor option and choose Grok as the processor type.
  • Step 6: Configure the Field to message and the Pattern to the Grok pattern as shown below:
%{IPORHOST:source.ip} %{USER:user.id} %{USER:user.name} [%{HTTPDATE:@timestamp}] "%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}" %{NUMBER:http.response.status_code:int} (?:-|%{NUMBER:http.response.body.bytes:int}) %{QS:http.request.referrer} %{QS:user_agent}
  • Step 7: Finally, click on the Add option to save the processor. You can also configure the processor description as Extract fields from ‘message’.
  • Step 8: Now, set the processors for the timestamp, IP address, and user agent fields
ElasticSearch Ingest Pipeline - Processors List
Image Source
  • Step 9: Click on the Add documents and provide a sample document for testing in the documents tab. Then, click on the Run Pipeline option.
  • Step 10: After a successful test, you can close the panel and click on the Create Pipeline option. 

For a more detailed guide, you can visit the Official ElasticSearch Documentations page. 

Ingest API

By sending the create pipeline API request, you can also create ElasticSearch Ingest Pipelines. Consider the example below where an ElasticSearch Ingest pipeline is being created containing two set processors followed by a lowercase processor. These processors are executed sequentially in the order mentioned.

PUT _ingest/pipeline/sample-pipeline
{
  "description": "Optional ElasticSearch Ingest Pipeline Description",
  "processors": [
    {
      "set": {
        "description": " Optional New Processor description",
        "field": "sample-long-field",
        "value": 12
      }
    },
    {
      "set": {
        "description": "Set 'sample-boolean-field' to true",
        "field": "sample-boolean-field",
        "value": true
      }
    },
    {
      "lowercase": {
        "field": "sample-keyword-field"
      }
    }
  ]
}

Managing Pipeline Versions

It is a good practice to specify the optional version number while creating or updating the ElasticSearch Ingest pipeline. This version number with the if_version parameter allows you to conditionally update your pipeline. If the if_version parameter is specified, the pipeline version will be incremented if the update is successful. You can also override the version number using the API and continue replacing or updating the ElasticSearch Ingest pipeline without specifying a version parameter.

PUT _ingest/pipeline/sample-pipeline-id
{
  "version": 2,
  "processors": [ ... ]
}

Testing a Pipeline

It is always recommended that you test your ElasticSearch Ingest pipeline against the sample documents before using it in a production environment. If you have created or edited a pipeline in Kibana, click Add Document for testing. On the Documents tab, provide a sample document and click Run Pipeline.

ElasticSearch Ingest Pipeline - Test Pipeline Window
Image Source

For testing your ElasticSearch Ingest Pipelines, you can also use the simulate pipeline API. To do that, follow the example shown below that specifies a configured pipeline in the request path i.e. sample-pipeline

POST _ingest/pipeline/sample-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "sample-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "sample-keyword-field": "BAR"
      }
    }
  ]
}

You can also mention the ElasticSearch Ingest pipeline and the processors inside the request body.

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "lowercase": {
          "field": "sample-keyword-field"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "sample-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "sample-keyword-field": "BAR"
      }
    }
  ]
}

Output:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "sample-keyword-field": "foo"
        },
        "_ingest": {
          "timestamp": "2099-03-07T11:04:03.000Z"
        }
      }
    },
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "sample-keyword-field": "bar"
        },
        "_ingest": {
          "timestamp": "2099-03-07T11:04:04.000Z"
        }
      }
    }
  ]
}

Conditionally running a Processor and Applying Pipelines

Using if condition for a processor

  • You can use an if condition for running a processor. Once applied, the processor only runs when the if condition is true.
PUT _ingest/pipeline/sample-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents with 'network.name' of 'Guest'",
        "if": "ctx?.network?.name == 'Guest'"
      }
    }
  ]
}
  • You can also enable script.painless.regex.enabled cluster setting to use regular expressions in your if condition scripts. However, expensive regular expressions may slow down the indexing process.
PUT _ingest/pipeline/sample-pipeline
{
  "processors": [
    {
      "set": {
        "description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
        "if": "ctx.url?.scheme =~ /^http[^s]/",
        "field": "url.insecure",
        "value": true
      }
    }
  ]
}
  • Though you need to specify if conditions in a single line valid JSON format, you can also use the Kibana console‘s triple quote syntax to write and debug larger scripts.
PUT _ingest/pipeline/sample-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents that don't contain 'prod' tag",
        "if": """
            Collection tags = ctx.tags;
            if(tags != null){
              for (String tag : tags) {
                if (tag.toLowerCase().contains('prod')) {
                  return false;
                }
              }
            }
            return true;
        """
      }
    }
  ]
}
  • Mentioning a stored script as the if condition is also a good alternative.
PUT _scripts/sample-prod-tag-script
{
  "script": {
    "lang": "painless",
    "source": """
      Collection tags = ctx.tags;
      if(tags != null){
        for (String tag : tags) {
          if (tag.toLowerCase().contains('prod')) {
            return false;
          }
        }
      }
      return true;
    """
  }
}

PUT _ingest/pipeline/sample-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents that don't contain 'prod' tag",
        "if": { "id": "sample-prod-tag-script" }
      }
    }
  ]
}
  • In many cases, the documents ingested have object fields. When a processor script tries to access the field which doesn’t have a parent object, Elasticsearch returns a NullPointerException. To avoid this exception and be null safe, you can use null safe operators. For instance, you can rewrite the if condition ctx.network?.name.equalsIgnoreCase(‘Guest’) as ‘Guest’.equalsIgnoreCase(ctx.network?.name), which is null safe because Guest is always non-null.

Applying pipelines with Conditions

You can also merge the if condition with the ElasticSearch Ingest pipeline processor to apply other elastic pipelines to the document according to the criteria. This pipeline can be used as the default pipeline for index templates used to configure multiple data streams or indexes.

PUT _ingest/pipeline/one-pipeline-to-rule-them-all
{
  "processors": [
    {
      "pipeline": {
        "description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
        "if": "ctx.service?.name == 'apache_httpd'",
        "name": "httpd_pipeline"
      }
    },
    {
      "pipeline": {
        "description": "If 'service.name' is 'syslog', use 'syslog_pipeline'",
        "if": "ctx.service?.name == 'syslog'",
        "name": "syslog_pipeline"
      }
    },
    {
      "fail": {
        "description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
        "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
        "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
      }
    }
  ]
}

Handling Ingest Pipeline Failures

To effectively handle the ElasticSearch Ingest Pipeline errors or failures, you can follow the settings given below:

  • The processor runs in a specified order and will stop if it encounters an error. You can set ignore_failure to true to ignore the processor failure and execute the pipeline’s remaining processors.
PUT _ingest/pipeline/sample-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "ignore_failure": true
      }
    }
  ]
}
  • The on_failure parameter specifies a list of processors to be executed immediately after a processor fails. If you have specified the on_failure, Elasticsearch will run the rest of the processors in the ElasticSearch Ingest pipeline, even if the on_failure configuration is empty.
PUT _ingest/pipeline/sample-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false
            }
          }
        ]
      }
    }
  ]
}
  • To activate nested error handling, you can nest a list of on_failure processors.
PUT _ingest/pipeline/sample-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false,
              "on_failure": [
                {
                  "set": {
                    "description": "Set 'error.message.multi'",
                    "field": "error.message.multi",
                    "value": "Document encountered multiple ingest errors",
                    "override": true
                  }
                }
              ]
            }
          }
        ]
      }
    }
  ]
}
  • Apart from the processor level, you can also specify on_failure in the pipeline. If the processor fails without the on_failure value, Elasticsearch uses this pipeline level parameter in its place. Now, Elasticsearch will not try to run the rest of the processor in the ElasticSearch Ingest pipeline.
PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Index document to 'failed-<index>'",
        "field": "_index",
        "value": "failed-{{{ _index }}}"
      }
    }
  ]
}
  • You can extract detailed information about the ElasticSearch Ingest pipeline failure from the document metadata fields on_failure_message, on_failure_processor_type, on_failure_processor_tag, and on_failure_pipeline. You can access these fields only from within an on_failure block. For instance, the code below uses the metadata fields to include information about ElasticSearch Ingest pipeline failures in documents.
PUT _ingest/pipeline/sample-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Record error information",
        "field": "error_information",
        "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
      }
    }
  ]
}

Conclusion

In this article, you have learned how to effectively create, manage & test ElasticSearch Ingest Pipelines. You can either use Kibana, a powerful Data Visualization tool, to create a pipeline via its user-friendly interface, or opt for a more technical option by sending a create pipeline API request. After configuring your ElasticSearch Ingest Pipeline, you can run the pipeline with a sample document to test it. Using the if statement, you can apply conditions for both processors and ElasticSearch Ingest pipelines. You can also use parameters such as on_failure and ignore_failure to effectively handle processor errors.

As you collect and manage your data across several applications and databases in your business, it is important to consolidate it for a complete performance analysis of your business. However, it is a time-consuming and resource-intensive task to continuously monitor the Data Connectors. To achieve this efficiently, you need to assign a portion of your engineering bandwidth to Integrate data from all sources, Clean & Transform it, and finally, Load it to a Cloud Data Warehouse or a destination of your choice for further Business Analytics. All of these challenges can be comfortably solved by a Cloud-based ETL tool such as Hevo Data.   

Visit our Website to Explore Hevo

Hevo Data, a No-code Data Pipeline can Ingest Data from a vast sea of 100+ sources such as ElasticSearch to a Data Warehouse or a Destination of your choice. It is a reliable, completely automated, and secure service that doesn’t require you to write any code!  

If you are using ElasticSearch as your Search & Analytics Engine and searching for a no-fuss alternative to Manual Data Integration, then Hevo can effortlessly automate this for you. Hevo, with its strong integration with 100+ sources(Including 40+ Free Sources), allows you to not only export & load data but also transform & enrich your data & make it analysis-ready in a jiffy.

Want to take Hevo for a ride? Sign Up for a 14-day free trial and simplify your Data Integration process. Do check out the pricing details to understand which plan fulfills all your business needs.

Tell us about your experience of learning about the ElasticSearch Ingest Pipelines! Share your thoughts with us in the comments section below.

No-code Data Pipeline For ElasticSearch