Inconsistent or incomplete data can often lead to inaccurate results and missed opportunities. Ingesting data from several data sources can be a challenging task, however free and open-source tools like ElasticSearch allow you to easily create custom ingestion pipelines. While extracting data, you can also enrich it using the existing index documents.

Using the Enrich Processor, you can effectively Enrich ElasticSearch data. You can make use of simple APIs such as create enrich policy API, create index API, execute enrich policy API, etc to quickly get started with the data enrichment process.

In this article, you will learn how to easily use the enrich processor to enrich ElasticSearch data in 5 easy steps.

What is ElasticSearch?

Enrich ElasticSearch - Elastic Logo
Image Source

Introduced in 2010, ElasticSearch(also known as Elastic) is a distributed modern search and analytics engine that can easily work with various types of data such as textual, numerical, geospatial, structured, and unstructured. ElasticSearch is an integral part of the ELK Stack(Elasticsearch, Logstash, and Kibana). Via the process of Data Ingestion in Logstash, raw data from logs, system metrics, and web applications are parsed, normalized, and enriched before it is indexed in Elasticsearch. You can also send data to ElasticSearch in the form of JSON documents using an API. After the indexing is done, you can query your data and use aggregations to get complex summaries of your data. Finally, using Kibana you can create powerful visualizations.

Under the SSPL or the Elastic License, ElasticSearch offers a lot of open-source features that are free to use. You can also opt for paid subscriptions to get additional advanced features. ElasticSearch has been popularly used for Application search, Website search, Enterprise search, log analytics, Geospatial data analysis, Security Analytics & Business analytics.

Key Features of ElasticSearch

ElasticSearch has the following salient features:

  • Easy-to-use: Elasticsearch provides a simple REST-based API and a simple HTTP interface and uses schema-free JSON documents, so you can easily start and build applications for different use cases. Simplifying the application development, it also provides support for a wide range of programming languages such as Java, Python, PHP, JavaScript, Node.js, and Ruby.
  • High performance: Elasticsearch is essentially distributed. Documents stored in Elasticsearch are assigned to various containers called shards. Shards are replicated to ensure additional copies of the data are present in the event of a hardware failure. Elasticsearch’s distributed format allows you to scale to hundreds (or thousands) of servers to process large amounts of data in parallel and quickly find the optimal matches for your query.
  • Free Tools and Plugins: Part of the ELK stack, Elasticsearch is connected with Kibana, a popular visualization, and reporting tool. Kibana provides real-time visualization of your ElasticSearch data via a user interface for quick access to application performance monitoring (APM), logs, and infrastructure metric data.  It also provides integration with Beats and Logstash to easily transform source data for loading into  Elasticsearch clusters. You can also add rich functionality to your application using several open-source Elasticsearch plugins, such as language analyzers and suggestion engines. 
  • Near Real-Time Operation: With a short wait time of around 1 second for reading and writing tasks, ElasticSearch is ideal for time-sensitive cases such as application monitoring, anomaly detection, security analysis, and infrastructure monitoring.
  • Abundant Features: Elasticsearch has many advanced built-in features that make storing and retrieving data highly efficient, including data rollup and index lifecycle management. 

Understanding Enrich Processor Components for ElasticSearch

Enrich ElasticSearch - Enrich processor Components
Image Source

To enrich ElasticSearch data, you can use the enrich processor that allows you to incorporate data from existing indices into your incoming data during the ElasticSearch Ingestion process.

For adding new data to the incoming documents, the enrich processor needs the following components:

1. Source Index

It is the index that holds the enhanced data that you want to merge with the incoming documents. These indexes can be created and managed like regular Elasticsearch indexes. You can use several source indexes in your enrichment policy and also make use of the same source index in multiple enrichment policies.

2. Enrich Policy

An enrich ElasticSearch policy is a combination of various configuration settings to add the appropriate enrichment data to the right incoming document. These enrichment policies essentially include: 

  •  A comprehensive list of one or more source indexes to save the enrich ElasticSearch data as a document. 
  • Policies that identify how the processor correctly places the enrich ElasticSearch data with incoming documents. 
  • A specific field is present in the source indices to accurately match with the incoming documents. 
  • Enhance fields with the enhanced data from the source indexes to add to incoming documents. 
  • The enrichment policy must be executed before it can be used by the enrich ElasticSearch processor. At run time, the enrichment policy uses the enriched data from the policy’s source index to create an optimized system index called the enrichment index. The processor uses this index to match and enhance incoming documents.

3. Enrich Index

It is a special system index associated with a particular enrichment policy. Matching an incoming document directly to a document in the source index can be time-consuming and resource-intensive. To speed up the process, the enrich ElasticSearch processor uses the enrich index. The enrich index contains enrich ElasticSearch data from the source index, but there are some special properties to optimize them such as:

  • Since they are System indices, they are completely managed internally by Elasticsearch and are designed for use only with enrich processors.  
  • They always start with .enrich *
  • These are read-only i.e. they cannot be modified directly.
  • They are forcibly merged to promote a faster search.

What Makes Hevo’s Data Enrichment Process Unique

Providing a high-quality ETL solution can be a difficult task if you have a large volume of data. Hevo’s automated, No-code platform empowers you with everything you need to have for a smooth data replication experience.

Check out what makes Hevo amazing:

  • Fully Managed: Hevo requires no management and maintenance as it is a fully automated platform.
  • Data Transformation: Hevo provides a simple interface to perfect, modify, and enrich the data you want to transfer.
  • Faster Insight Generation: Hevo offers near real-time data replication so you have access to real-time insight generation and faster decision making. 
  • Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
  • Scalable Infrastructure: Hevo has in-built integrations for 100+ sources (with 40+ free sources) that can help you scale your data infrastructure as required.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-Day Free Trial!

How does the Enrich Processor work for ElasticSearch? 

To understand the approach to enrich ElasticSearch data via the enrich processor, consider the following incoming data containing information about the various fruits.

{
  "_id": 7,
  "fruit_type": "apple",
  "cost": 30,
  "inventory": 2100
}, 
{
  "_id": 8,
  "fruit_type": "banana",
  "cost": 10,
  "inventory": 2200
}

Now, you may want to know the color of the fruit which can be done by importing an index that will allow you to “join” on the `fruit` field and import related fields. You can enrich ElasticSearch data by following the simple steps given below:

Step 1: Adding Enrich Data

Firstly, add the document to one or more source indexes. These documents should eventually contain the enhanced data that you like to merge with the incoming document. You can use the Document and Index APIs to easily manage source indices like regular Elasticsearch indices. You can also set up beats like Filebeat to automatically send the document to the source index for indexing.

For example, using the API request you can import the lookup data from a separate index.

POST _bulk
{ "index" : { "_index" : "fruit_colors", "_id" : "7" } }
{ "color" : "red","fruit" : "apple"  }
{ "index" : { "_index" : "fruit_colors", "_id" : "8" } }
{ "color" : "yellow","fruit" : "banana"  }
{ "index" : { "_index" : "fruit_colors", "_id" : "9" } }
{ "color" : "orange","fruit" : "orange" }

Step 2: Creating an Enrich Policy

After you have completed the first step of adding the enrich ElasticSearch data to your source indices, start building an enrich ElasticSearch policy using the create enrich policy API. However, it is to be noted that once you have created an enrich policy, you can’t modify or update it later on.

Step 3: Executing the Enrich Policy

Enrich ElasticSearch - Execute Enrich Policy
Image Source

After designing the enrich ElasticSearch policy, use the execute enrich policy API to run the policy by creating an enrich index. Since the enrich indices should always be used by the enrich processor, it is recommended to avoid using enrich ElasticSearch indices for other means.

Continuing the fruit color example, use the code below to execute the enrich policy:

PUT /_enrich/policy/color_lookup
{"match":{"indices":"fruit_colors","match_field":"fruit","enrich_fields":["color"]}}

PUT /_enrich/policy/color_lookup/_execute

Step 4: Adding an Enrich Processor to an Ingest Pipeline

Enrich ElasticSearch - Add Enrich processor to pipeline
Image Source

After you have set the source index, enrichment policy, and associated enrichment index, you can now prepare an ElasticSearch Ingestion Pipeline that contains the policy’s enrichment processor. For that, you have to establish an enrich ElasticSearch processor and add it to your ingestion pipeline using the Create or Update Pipeline API. While configuring the enrich processor, you have to specify at least the following: 

  • Data Enrichment policy to be used.  
  • The exact field is used to match the received document with the document in the enrichment index. 
  • The target field to add to the incoming document. This target field has the matched and enrichment fields specified in the enrich ElasticSearch policy.  

The max_matches settings can also be used here to configure the number of enriched documents that the incoming document can match. If the default is 1, the data will be added to the target field of the incoming document as a JSON object. If not then the data will be added as an array. You can also add other processors to your ElasticSearch ingestion pipeline.

For the fruit color example, you use the following code to use the `color_lookup` enrich policy via the enrich processor in any pipeline.

PUT _ingest/pipeline/sample
{
  "description": "Sample pipeline",
  "processors": [
   {
      "enrich": {
        "ignore_missing": true,  for any documents that don’t have the lookup field (fruit)
        "policy_name": "color_lookup",
        "field": "fruit_type",
        "target_field": "additional_info"
      }
    }
  ]
}

Step 5: Ingesting and Enriching Documents

Enrich ElasticSearch - enrich & ingest documents
Image Source

Finally, you can start using your ingest pipeline to enrich ElasticSearch data and index documents. Although, before moving towards the production environment, it is always recommended to carry out the indexing of a few sample documents first and then verify if the enrich ElasticSearch data was added accurately using the get API.

Enrich ElasticSearch Examples

You can understand more about the Enrich ElasticSearch process via the following 2 popular examples:

Enriching Data based on Exact Values

In this example, you will learn how to create a match enrich policy to add username and contact details to the incoming documents according to their email addresses. Match enrich policies try to perform the matching based on defined values like IDs or email addresses via a term query.

Here, you will also add the match enrich policy to a processor in an ingest pipeline.

  • Step 1: Firstly, you can use create index API or index API to send a request for creating a source index. It also indexes a new incoming document to that index.
PUT /users/_doc/1?refresh=wait_for
{
  "email": "karl.max@sample.com",
  "first_name": "Karl",
  "last_name": "Max",
  "city": "New Orleans",
  "county": "Orleans",
  "state": "LA",
  "zip": 70116,
  "web": "karl.sample.com"
}
  • Step 2: Now, you can use the enrich policy API to design a enrich policy with a match type policy.
PUT /_enrich/policy/users-policy
{
  "match": {
    "indices": "users",
    "match_field": "email",
    "enrich_fields": ["first_name", "last_name", "city", "zip", "state"]
  }
}
  • Step 3: After designing the policy, you can run it via the execute enrich policy API.
POST /_enrich/policy/users-policy/_execute
  • Step 4: According to the basic requirements discussed in the above sections, add an enrich processor & use the create or update pipeline API to build an ingest pipeline.
PUT /_ingest/pipeline/user_lookup
{
  "processors" : [
    {
      "enrich" : {
        "description": "Add 'user' data based on 'email'",
        "policy_name": "users-policy",
        "field" : "email",
        "target_field": "user",
        "max_matches": "1"
      }
    }
  ]
}
  • Step 5: Finally, you can start using the ingest pipeline to index a document. Note that the incoming document should consist of the field mentioned in your enrich ElasticSearch processor. 
PUT /my-index-000001/_doc/my_id?pipeline=user_lookup
{
  "email": "karl.max@sample.com"
}
  • Step 6: To check if the enrich ElasticSearch processor correctly matched and added the specified field, you can use the get API to verify the indexed document.
GET /my-index-000001/_doc/my_id

Output: 

{
  "found": true,
  "_index": "my-index-000001",
  "_id": "my_id",
  "_version": 1,
  "_seq_no": 55,
  "_primary_term": 1,
  "_source": {
    "user": {
      "email": "karl.max@sample.com",
      "first_name": "Karl",
      "last_name": "Max",
      "zip": 70116,
      "city": "New Orleans",
      "state": "LA"
    },
    "email": "karl.max@sample.com"
  }
}

Enriching Data based on Geolocation

You can take another Enrich ElasticSearch data example where you will learn how to build a geo_match enrich policy that adds postal codes to incoming documents according to a set of coordinates.

  • Step 1: Firstly, build a source index containing at least one geo_shape field by sending a create index API request.
PUT /postal_codes
{
  "mappings": {
    "properties": {
      "location": {
        "type": "geo_shape"
      },
      "postal_code": {
        "type": "keyword"
      }
    }
  }
}
  • Step 2: Now using the index API, you can index enrich ElasticSearch data to this source index.
PUT /postal_codes/_doc/1?refresh=wait_for
{
  "location": {
    "type": "envelope",
    "coordinates": [ [ 13.0, 53.0 ], [ 14.0, 52.0 ] ]
  },
  "postal_code": "96598"
}
PUT /_enrich/policy/postal_policy
{
  "geo_match": {
    "indices": "postal_codes",
    "match_field": "location",
    "enrich_fields": [ "location", "postal_code" ]
  }
}
  • Step 4: After creating the enrich ElasticSearch policy, you can set an enrich index for the policy using the execute enrich policy API
POST /_enrich/policy/postal_policy/_execute
  • Step 5: Finally, add an enrich processor and create and ingest pipeline using the create or update pipeline API. Ensure that the enrich must have the shape_relation. It shows how the processor matches geoshapes in incoming documents to geoshapes in documents from the enrich index.
PUT /_ingest/pipeline/postal_lookup
{
  "processors": [
    {
      "enrich": {
        "description": "Add 'geo_data' based on 'geo_location'",
        "policy_name": "postal_policy",
        "field": "geo_location",
        "target_field": "geo_data",
        "shape_relation": "INTERSECTS"
      }
    }
  ]
}
  • Step 6: You can now begin with indexing a document using the ingest pipeline. 
PUT /users/_doc/0?pipeline=postal_lookup
{
  "first_name": "Karl",
  "last_name": "Max",
  "geo_location": "POINT (13.5 52.5)"
}
  • Step 7: Similar to the previous example, you can check if the enrich processor worked properly or not via the GET API.
GET /users/_doc/0

Output:

{
  "found": true,
  "_index": "users",
  "_id": "0",
  "_version": 1,
  "_seq_no": 55,
  "_primary_term": 1,
  "_source": {
    "geo_data": {
      "location": {
        "type": "envelope",
        "coordinates": [[13.0, 53.0], [14.0, 52.0]]
      },
      "postal_code": "96598"
    },
    "first_name": "Karl",
    "last_name": "Max",
    "geo_location": "POINT (13.5 52.5)"
  }
}

Understanding Applications of Enrich Processor for ElasticSearch

You can use the enrich ElasticSearch processor for several purposes such as:

  • Recognizes a web service or provider using a known IP address 
  • Adding product details to retail orders according to a product ID 
  • Complete contact details based on the email address 
  • Adding zip code based on user location

Conclusion

In this article, you have learned about the Enrich ElasticSearch data process. ElasticSearch uses an enrich processor to carry out the data enrichment process. This processor uses namely 3 components i.e. source index, enrich policy & enrich index. In a simple 5 step process, you can enrich ElasticSearch Data using several APIs such as create enrich policy API, create index API, execute enrich policy API, etc. The enrich ElasticSearch processor is used in various cases such as enriching contact details, supplementing product details, & identifying zip codes based on user coordinates.  

As you collect and manage your data across several applications and databases in your business, it is important to consolidate it for a complete performance analysis of your business. However, it is a time-consuming and resource-intensive task to continuously monitor the Data Connectors. To achieve this efficiently, you need to assign a portion of your engineering bandwidth to Integrate data from all sources, Clean & Transform it, and finally, Load it to a Cloud Data Warehouse or a destination of your choice for further Business Analytics. All of these challenges can be comfortably solved by a Cloud-based ETL tool such as Hevo Data.   

Visit our Website to Explore Hevo

Hevo Data, a No-code Data Pipeline can Ingest Data from a vast sea of 100+ sources such as ElasticSearch to a Data Warehouse or a Destination of your choice. It is a reliable, completely automated, and secure service that doesn’t require you to write any code!  

If you are using ElasticSearch as your Search & Analytics Engine and searching for a no-fuss alternative to Manual Data Integration, then Hevo can effortlessly automate this for you. Hevo, with its strong integration with 100+ sources(Including 40+ Free Sources), allows you to not only export & load data but also transform & enrich your data & make it analysis-ready in a jiffy.

Want to take Hevo for a ride? Sign Up for a 14-day free trial and simplify your Data Integration process. Do check out the pricing details to understand which plan fulfills all your business needs.

Tell us about your experience of learning about the Enrich ElasticSearch data process! Share your thoughts with us in the comments section below.

Sanchit Agarwal
Research Analyst, Hevo Data

Sanchit Agarwal is an Engineer turned Data Analyst with a passion for data, software architecture and AI. He leverages his diverse technical background and 2+ years of experience to write content. He has penned over 200 articles on data integration and infrastructures, driven by a desire to empower data practitioners with practical solutions for their everyday challenges.

No-code Data Pipeline For ElasticSearch