Extracting data from data sources has always been a challenge for organizations worldwide. One of the popular free and open-source tools for creating Ingestion pipelines is ElasticSearch. It acts as a search and analytics engine that supports several data types such as textual, numerical, geospatial, structured, and unstructured. 

ElasticSearch Ingestion Pipelines allow you to manipulate data by applying basic transformations before adding a searchable reference to the document in the cluster’s index. After the indexing is done, you can simply search and get the document using the ElasticSearch API.

In this article, you will learn how to effectively set up, test, & manage your ElasticSearch Ingest Pipelines.

What is ElasticSearch?

ElasticSearch Ingest Pipeline - Elastic Logo

Introduced in 2010, ElasticSearch(also known as Elastic) is a distributed modern search and analytics engine that can easily work with various types of data such as textual, numerical, geospatial, structured, and unstructured. ElasticSearch is an integral part of the ELK Stack(Elasticsearch, Logstash, and Kibana). Via the process of Data Ingestion in Logstash, raw data from logs, system metrics, and web applications are parsed, normalized, and enriched before it is indexed in Elasticsearch. You can also send data to ElasticSearch in the form of JSON documents using an API. After the indexing is done, you can query your data and use aggregations to get complex summaries of your data. Finally, using Kibana you can create powerful visualizations.

Under the SSPL or the Elastic License, ElasticSearch offers a lot of open-source features that are free to use. You can also opt for paid subscriptions to get additional advanced features. ElasticSearch has been popularly used for Application search, Website search, Enterprise search, log analytics, Geospatial data analysis, Security Analytics & Business analytics.

Streamline Your Elasticsearch Migration with Hevo!

Ready to upgrade your search and analytics capabilities? Migrating to Elasticsearch has never been easier with Hevo! Whether you’re moving from another data source or updating your existing Elasticsearch setup, Hevo ensures a smooth and efficient transition. stomer data from Hubspot? Hevo makes it a breeze with its user-friendly, no-code platform. Here’s how we simplify the process:

  1. Seamlessly pull data from Elasticsearch and over 150+ other sources with ease.
  2. Utilize drag-and-drop and custom Python script features to transform your data.
  3. Efficiently migrate data to a data warehouse, ensuring it’s ready for insightful analysis.

Experience the simplicity of data integration with Hevo and see how Hevo helped fuel FlexClub’s drive for accurate analytics and unified data. 

Get Started with Hevo for Free

Key Features of ElasticSearch

ElasticSearch has the following salient features:

  • Full-Text Search: Powerful full-text search capabilities with relevance scoring, enabling fast and accurate search results.
  • Distributed Architecture: Designed for horizontal scalability, allowing seamless distribution of data across multiple nodes.
  • Real-Time Data Ingestion: Supports near real-time indexing and searching, providing up-to-date search results.
  • RESTful API: Easy integration with applications through a simple RESTful API, making it accessible for developers.
  • Schema-Free: Allows dynamic mapping of data, enabling flexibility in handling diverse data types without predefined schemas.

What is Ingest Pipelines?

The ingestion pipeline allows you to perform common transformations on your data before indexing(adding a searchable reference to the document) it. For instance, you can use an Elasticsearch pipeline for dropping fields, getting values ​​from text, and enriching your data. The ElasticSearch Ingest pipeline contains a set of configurable jobs called processors. Each processor runs in turn, making certain modifications to the incoming document. After the processor runs, Elasticsearch places the converted document in the data stream or index. You can use Kibana’s Ingest Pipelines feature or the Ingest API to create and manage ElasticSearch Ingest pipelines. Elasticsearch saves the pipeline in a cluster state.

What are Ingest Pipelines used for?

Want to make some tweaks to your data, but want to use an easier method than Logstash or another data analysis tool? Then, ElasticSearch Ingest Pipelines are a good alternative for you. These ElasticSearch Ingest Pipelines allow you to tailor your data to your needs with little overhead. The ElasticSearch Ingest pipeline resides in the Elasticsearch node (or the ingestion node, if defined) and makes a series of changes to the defined data.  

To understand the usage of the ElasticSearch Ingest Pipelines, you can consider the following data structure:

{
  "id": 5,
  "first_name": "Karl",
  "last_name": "Max",
  "email": "kmax@sample.com",
  "ip_address": "100.1.193.2",
  "activities": "Cricket, Dancing, Reading"
}, 
{
  "id": 6,
  "first_name": "Jim",
  "last_name": "Sanders",
  "email": "jsanders@example.com",
  "ip_address": "122.100.4.22",
  "activities": "Driving, Running, Cycling"
}

Considering the above 2 data structures, you can use the ElasticSearch Ingest Pipelines for the following operations:

  • Rename fields: Changing “first_name” to “firstName”.
  • Remove fields: Removing the field `email`.
  • Split fields: You can turn a value into an array using a separator rather than a string. For example, turn `activities` from `“Cricket, Dancing, Reading”` into [ “Cricket”,“Dancing”,” Reading”]
  • GeoIP lookup on a field.
  • Running a script for more flexibility: Merging two fields and splitting into another, or encoding sensitive data.
  • Convert fields: Modifying a field’s type from a string to an integer.
  • Enrich documents: You can perform a lookup to append extra information to each event like “More information below”.

You can also use the ElasticSearch Ingest Pipelines when importing data in its initial raw form as shown below:

2022-04-20T18:17:11.003Z new.sample.com There seems to be a problem

2021-05-11T18:17:11.003Z,new.sample.com, There seems to be a problem

To manipulate this data you can perform the following operations:

  • Parsing out fields with grok or dissect:  Saving “2022-04-20…” in a field called “date”; “new.sample.com” in a field called “origin”; and “There seems to be a problem” in “raw_message”
  • Parsing out a csv into fields: For instance, in the second sample, use a comma as a separator and name the first value “date”; the second “origin”; the third “raw_message”

To know more about other operations, check out the Official ElasticSearch Comprehensive list of Ingest Processors

Integrate ElasticSearch to Redshift
Integrate ElasticSearch to BigQuery
Integrate ElasticSearch to Databricks

Understanding Ingest Pipeline Operations

To get started with the ElasticSearch Ingest Pipelines, you can go through the following aspects:

Creating and Managing Pipelines

To create an ElasticSearch Ingest Pipeline you can, choose from the following 2 methods:

Kibana’s Graphical User Interface

Follow the simple steps to build an ElasticSearch Ingest Pipeline via a user-friendly interface:

  • Step 1: Go to Kibana and open up the main menu.
  • Step 2: Navigate to Stack Management > Ingest Pipelines. You will be able to see a list view that allows you to:
    • View a list of your ElasticSearch Ingest pipelines and drill down into details
    • Edit or clone existing ElasticSearch Ingest pipelines
    • Delete ElasticSearch Ingest pipelines
  • Step 3: To create your new ElasticSearch Ingest Pipeline, go to Create pipeline > New pipeline.
ElasticSearch Ingest Pipeline - Create Pipeline Button
  • Step 4: Enter the name and suitable description for the ElasticSearch Ingest pipeline. For this article, an example is considered to parse server logs in the Common Log Format before indexing. The sample logs look as follows:
212.87.37.154 - - [30/May/2099:16:21:15 +0000] "GET /favicon.ico HTTP/1.1"
200 3638 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36"
  • Step 5: Click on the Add Processor option and choose Grok as the processor type.
  • Step 6: Configure the Field to message and the Pattern to the Grok pattern as shown below:
%{IPORHOST:source.ip} %{USER:user.id} %{USER:user.name} [%{HTTPDATE:@timestamp}] "%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}" %{NUMBER:http.response.status_code:int} (?:-|%{NUMBER:http.response.body.bytes:int}) %{QS:http.request.referrer} %{QS:user_agent}
  • Step 7: Finally, click on the Add option to save the processor. You can also configure the processor description as Extract fields from ‘message’.
  • Step 8: Now, set the processors for the timestamp, IP address, and user agent fields
ElasticSearch Ingest Pipeline - Processors List
  • Step 9: Click on the Add documents and provide a sample document for testing in the documents tab. Then, click on the Run Pipeline option.
  • Step 10: After a successful test, you can close the panel and click on the Create Pipeline option. 

For a more detailed guide, you can visit the Official ElasticSearch Documentations page. 

Ingest API

By sending the create pipeline API request, you can also create ElasticSearch Ingest Pipelines. Consider the example below where an ElasticSearch Ingest pipeline is being created containing two set processors followed by a lowercase processor. These processors are executed sequentially in the order mentioned.

PUT _ingest/pipeline/sample-pipeline
{
  "description": "Optional ElasticSearch Ingest Pipeline Description",
  "processors": [
    {
      "set": {
        "description": " Optional New Processor description",
        "field": "sample-long-field",
        "value": 12
      }
    },
    {
      "set": {
        "description": "Set 'sample-boolean-field' to true",
        "field": "sample-boolean-field",
        "value": true
      }
    },
    {
      "lowercase": {
        "field": "sample-keyword-field"
      }
    }
  ]
}

Managing Pipeline Versions

It is a good practice to specify the optional version number while creating or updating the ElasticSearch Ingest pipeline. This version number with the if_version parameter allows you to conditionally update your pipeline. If the if_version parameter is specified, the pipeline version will be incremented if the update is successful. You can also override the version number using the API and continue replacing or updating the ElasticSearch Ingest pipeline without specifying a version parameter.

PUT _ingest/pipeline/sample-pipeline-id
{
  "version": 2,
  "processors": [ ... ]
}

Testing a Pipeline

It is always recommended that you test your ElasticSearch Ingest pipeline against the sample documents before using it in a production environment. If you have created or edited a pipeline in Kibana, click Add Document for testing. On the Documents tab, provide a sample document and click Run Pipeline.

ElasticSearch Ingest Pipeline - Test Pipeline Window

For testing your ElasticSearch Ingest Pipelines, you can also use the simulate pipeline API. To do that, follow the example shown below that specifies a configured pipeline in the request path i.e. sample-pipeline

POST _ingest/pipeline/sample-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "sample-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "sample-keyword-field": "BAR"
      }
    }
  ]
}

You can also mention the ElasticSearch Ingest pipeline and the processors inside the request body.

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "lowercase": {
          "field": "sample-keyword-field"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "sample-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "sample-keyword-field": "BAR"
      }
    }
  ]
}

Output:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "sample-keyword-field": "foo"
        },
        "_ingest": {
          "timestamp": "2099-03-07T11:04:03.000Z"
        }
      }
    },
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "sample-keyword-field": "bar"
        },
        "_ingest": {
          "timestamp": "2099-03-07T11:04:04.000Z"
        }
      }
    }
  ]
}
Solve your data replication problems with Hevo’s reliable, no-code, automated pipelines with 150+ connectors.
Get your free trial right away!

Conditionally running a Processor and Applying Pipelines

Using if condition for a processor

  • You can use an if condition for running a processor. Once applied, the processor only runs when the if condition is true.
PUT _ingest/pipeline/sample-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents with 'network.name' of 'Guest'",
        "if": "ctx?.network?.name == 'Guest'"
      }
    }
  ]
}
  • You can also enable script.painless.regex.enabled cluster setting to use regular expressions in your if condition scripts. However, expensive regular expressions may slow down the indexing process.
PUT _ingest/pipeline/sample-pipeline
{
  "processors": [
    {
      "set": {
        "description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
        "if": "ctx.url?.scheme =~ /^http[^s]/",
        "field": "url.insecure",
        "value": true
      }
    }
  ]
}
  • Though you need to specify if conditions in a single line valid JSON format, you can also use the Kibana console‘s triple quote syntax to write and debug larger scripts.
PUT _ingest/pipeline/sample-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents that don't contain 'prod' tag",
        "if": """
            Collection tags = ctx.tags;
            if(tags != null){
              for (String tag : tags) {
                if (tag.toLowerCase().contains('prod')) {
                  return false;
                }
              }
            }
            return true;
        """
      }
    }
  ]
}
  • Mentioning a stored script as the if condition is also a good alternative.
PUT _scripts/sample-prod-tag-script
{
  "script": {
    "lang": "painless",
    "source": """
      Collection tags = ctx.tags;
      if(tags != null){
        for (String tag : tags) {
          if (tag.toLowerCase().contains('prod')) {
            return false;
          }
        }
      }
      return true;
    """
  }
}

PUT _ingest/pipeline/sample-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents that don't contain 'prod' tag",
        "if": { "id": "sample-prod-tag-script" }
      }
    }
  ]
}
  • In many cases, the documents ingested have object fields. When a processor script tries to access the field which doesn’t have a parent object, Elasticsearch returns a NullPointerException. To avoid this exception and be null safe, you can use null safe operators. For instance, you can rewrite the if condition ctx.network?.name.equalsIgnoreCase(‘Guest’) as ‘Guest’.equalsIgnoreCase(ctx.network?.name), which is null safe because Guest is always non-null.

Applying pipelines with Conditions

You can also merge the if condition with the ElasticSearch Ingest pipeline processor to apply other elastic pipelines to the document according to the criteria. This pipeline can be used as the default pipeline for index templates used to configure multiple data streams or indexes.

PUT _ingest/pipeline/one-pipeline-to-rule-them-all
{
  "processors": [
    {
      "pipeline": {
        "description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
        "if": "ctx.service?.name == 'apache_httpd'",
        "name": "httpd_pipeline"
      }
    },
    {
      "pipeline": {
        "description": "If 'service.name' is 'syslog', use 'syslog_pipeline'",
        "if": "ctx.service?.name == 'syslog'",
        "name": "syslog_pipeline"
      }
    },
    {
      "fail": {
        "description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
        "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
        "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
      }
    }
  ]
}

Handling Ingest Pipeline Failures

To effectively handle the ElasticSearch Ingest Pipeline errors or failures, you can follow the settings given below:

  • The processor runs in a specified order and will stop if it encounters an error. You can set ignore_failure to true to ignore the processor failure and execute the pipeline’s remaining processors.
PUT _ingest/pipeline/sample-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "ignore_failure": true
      }
    }
  ]
}
  • The on_failure parameter specifies a list of processors to be executed immediately after a processor fails. If you have specified the on_failure, Elasticsearch will run the rest of the processors in the ElasticSearch Ingest pipeline, even if the on_failure configuration is empty.
PUT _ingest/pipeline/sample-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false
            }
          }
        ]
      }
    }
  ]
}
  • To activate nested error handling, you can nest a list of on_failure processors.
PUT _ingest/pipeline/sample-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false,
              "on_failure": [
                {
                  "set": {
                    "description": "Set 'error.message.multi'",
                    "field": "error.message.multi",
                    "value": "Document encountered multiple ingest errors",
                    "override": true
                  }
                }
              ]
            }
          }
        ]
      }
    }
  ]
}
  • Apart from the processor level, you can also specify on_failure in the pipeline. If the processor fails without the on_failure value, Elasticsearch uses this pipeline level parameter in its place. Now, Elasticsearch will not try to run the rest of the processor in the ElasticSearch Ingest pipeline.
PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Index document to 'failed-<index>'",
        "field": "_index",
        "value": "failed-{{{ _index }}}"
      }
    }
  ]
}
  • You can extract detailed information about the ElasticSearch Ingest pipeline failure from the document metadata fields on_failure_message, on_failure_processor_type, on_failure_processor_tag, and on_failure_pipeline. You can access these fields only from within an on_failure block. For instance, the code below uses the metadata fields to include information about ElasticSearch Ingest pipeline failures in documents.
PUT _ingest/pipeline/sample-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Record error information",
        "field": "error_information",
        "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
      }
    }
  ]
}

Conclusion

In summary, an Elasticsearch Ingest Pipeline is essential for automating and optimizing data ingestion, ensuring that your data is pre-processed and enriched before indexing. This boosts the quality of your search and analytics capabilities, enabling real-time insights for better decision-making.

Hevo seamlessly connects with Elasticsearch, simplifying the entire data integration process. With Hevo’s no-code platform, you can effortlessly set up and manage ingest pipelines, transforming and loading data from various sources into Elasticsearch without any hassle. This allows you to focus on extracting valuable insights rather than worrying about data management.

Ready to enhance your data processes? Start your journey with Hevo today by signing up for the 14-day free trial and experience seamless integration with Elasticsearch—unlocking the full potential of your data has never been easier!

If you are using ElasticSearch as your Search & Analytics Engine and searching for a no-fuss alternative to Manual Data Integration, then Hevo can effortlessly automate this for you. Hevo, with its strong integration with 100+ sources(Including 40+ Free Sources), allows you to not only export & load data but also transform & enrich your data & make it analysis-ready in a jiffy.

Want to take Hevo for a ride? Sign Up for a 14-day free trial and simplify your Data Integration process. Do check out the pricing details to understand which plan fulfills all your business needs.

Tell us about your experience of learning about the ElasticSearch Ingest Pipelines! Share your thoughts with us in the comments section below.

Sanchit Agarwal
Research Analyst, Hevo Data

Sanchit Agarwal is an Engineer turned Data Analyst with a passion for data, software architecture and AI. He leverages his diverse technical background and 2+ years of experience to write content. He has penned over 200 articles on data integration and infrastructures, driven by a desire to empower data practitioners with practical solutions for their everyday challenges.