Kafka Enrichment Simplified 101

on Data Enrichment, Data Integration, Kafka • April 18th, 2022 • Write for Hevo

Kafka Enrichment FI

Streaming Data and Apache Kafka are popular among businesses because they give real-time information about their operations and customers. In actuality, the problem is that this data is dispersed throughout the business, as well as across many systems, databases, and apps. Enriching data from numerous sources is a key process in streaming data (and traditional databases).

This article talks about Kafka, its key features, Data Enrichment and its benefits, and various Kafka Enrichment methods.

Table Of Contents

What is Kafka?

Apache Kafka is an Open-Source Distributed Event Streaming platform that was created by LinkedIn in 2010 and is used to create recommendation systems and event-driven applications. Kafka is made up of three key components: Kafka Producers, Kafka Servers, and Kafka Consumers. Producers and consumers of Kafka can send and receive real-time messages to and from Kafka servers.

Due to the distributed nature of the Kafka ecosystem, you may achieve maximum fault tolerance while streaming data into Kafka servers, resulting in maximum throughput. Kafka is utilized by over 20,300 businesses globally, including 80% of Fortune 500 corporations such as Walmart, Netflix, Spotify, and Airbnb, due to its efficient features and fault tolerance capabilities.

Key Features of Kafka

Apache Kafka is widely popular because of its features that maintain uptime, simplify scalability, and allow it to manage massive volumes, among other things. Take a look at some of the powerful features it provides:

  • Scalable: Kafka’s partitioned log model distributes data over multiple servers, allowing it to extend beyond the capacity of a single server.
  • Fast: Because Kafka decouples data streams, it has very low latency and a very high speed.
  • Durability: Data is written to a disc, and partitions are spread and duplicated across multiple servers to ensure durability. This protects data from server failure and makes it fault-tolerant and long-lasting.
  • Fault-Tolerant: The Kafka cluster can handle failures in the master and database. It’s capable of restarting the server on its own.
  • Extensibility: Since Kafka’s rise to prominence in recent years, several different software programs have built connections. This enables the installation of extra features, such as integration with other programs, in a matter of seconds. 
  • Log Aggregation: Data recording from various system components must be centralized to a single area because a modern system is typically distributed. Kafka frequently acts as a single source of truth by centralizing data from all sources, regardless of shape or volume.
  • Stream Processing: This is Kafka’s main skill. This allows it to perform real-time calculations on Event Streams. Kafka ingests, stores, and analyses data streams in real-time, from real-time Data Processing to Dataflow Programming, at any scale.
  • Metrics and Monitoring: Kafka is frequently used to track operational data in terms of metrics and monitoring. This requires gathering data from several apps and consolidating it into consolidated feeds with real-time measurements. 

Enrich Data in Minutes Using Hevo’s No-Code Data Pipeline

Hevo Data, a Fully-managed Data Pipeline platform, can help you automate, simplify & enrich your aggregation process in a few clicks. With Hevo’s out-of-the-box connectors and blazing-fast Data Pipelines, you can extract & aggregate data from 100+ Data Sources like Kafka straight into your Data Warehouse, Database, or any destination. To further streamline and prepare your data for analysis, you can process and enrich Raw Granular Data using Hevo’s robust & built-in Transformation Layer without writing a single line of code!”

GET STARTED WITH HEVO FOR FREE

Hevo is the fastest, easiest, and most reliable data replication platform that will save your engineering bandwidth and time multifold. Try our 14-day full access free trial today to experience an entirely automated hassle-free Data Replication!

What is Data Enrichment?

The process of merging first-party data from internal sources with divergent data from other internal systems or third-party data from outside sources is known as Data Enrichment. Any organization can benefit from enriched data since it becomes more useful and insightful. The vast majority of brands enrich their raw data to use it to make more informed decisions.

Benefits of Data Enrichment

Data enrichment provides several advantages that make it an excellent infrastructure for businesses.

  • Data Enrichment Cost Savings: According to a survey by Global Databerg, an enterprise with one petabyte of data spends roughly $650,000 per year on data management, but only a portion of that data is used for any real advantage. Because you don’t maintain information that isn’t useful to your business, Data Enrichment saves you money. Instead, you supplement internal data with data from external sources for the benefit of your company.
  • Customer interactions are strengthened via Data Augmentation: Personalization is aided by enriched data, which raises the chance of meaningful client interactions and commercial prospects. Your organization can design communication strategies that fit client preferences and needs using relevant customer data. When a customer believes your organization understands their wants, they are more inclined to make a purchase.
  • Customer nurturing is enhanced through Data Enrichment: By finding segments of customers to nurture, Data Enrichment maximizes customer nurturing. A segment provides information that is valuable and has the potential to elicit a purchase.
  • Successful targeted marketing is aided by Data Enrichment: Targeted marketing is the way of the future, and many companies are learning that a one-size-fits-all marketing strategy is ineffective. They’ve resorted to using targeted marketing. Data Enrichment is required for effective data segmentation for targeted marketing to be successful.

Kafka Enrichment Methods

Here are two methods to perform Kafka Enrichment:

Kafka Enrichment with Joins

Enriching Data from numerous sources via joins is a fundamental process in streaming data (and traditional databases). When a data system only supports primary-key joins, developers are left with unnecessary code complexity and inefficient resource utilization during application execution.

Kafka Enrichment with Joins: Example Use Case

This post will take the example of a marketplace application with two key datasets: merchants selling on the marketplace and the products they offer. The following schema will be used to build up the example:

Kafka Enrichment with Joins: Joining KTables

A KTable is an Event Stream Materialization. Event Stream Materialization is making a table out of a stream. To turn a stream into a table, you must apply all of the modifications included in the stream. This is referred to as “materialising the stream”. To materialise a stream, you go over all of the events from start to finish, altering the state along the way. The event’s unique key is used to create each row in the KTable data structure. The value of the KTable is overwritten when a new record for that key is received, and the changed row is transmitted downstream to the subsequent operators.

myTable = builder.table("Input Topic")
result = myTable.mapValue(x -> x*2)

KTables are advantageous because they allow Kafka Streams applications to keep track of their state. KTables may be queried from outside the application via the Interactive Query (IQ) API, and they can be used for all kinds of relational-DB-style table operations.

Kafka Enrichment with Joins: How do Primary Key Joins Work?

The most significant challenge prohibiting effective foreign-key joins using the GroupBy approach is the difficulty in preserving the aggregate data structure. The requirement that all records pertaining to the same key be grouped together into a single record causes this problem. The Kafka Streams implementation of native foreign-key KTable joins effectively and scalably solves this problem.

For example, in your marketplace app, billing may be handled by a different system from the display app. For merchants, you’d set up two separate tables:

This could be a decent design because the main marketplace page just requires DisplayMerchant information, while the payment processing system only need BillableMerchant fields. However, you may want to use the DisplayName and ProfilePhoto from DisplayMerchant, as well as the other fields from BillableMerchant, when generating an invoice. Because both tables have the same key, you can use a primary-key join for this.

KTable<Long, BillableMerchant> billableMerchantTable = …
KTable<Long, DisplayMerchant> displayMerchantTable = …

KTable<Long, Merchant> completeMerchant = 
  billableMerchantTable.join(displayMerchantTable, completeMerchantJoiner)

Because it’s very easy to compute this join in a distributed stream processing system, Kafka Streams has typically confined joins to an equal-primary-key requirement. Because KTables are partitioned based on their record keys, you may calculate a primary-key join by simply assigning the same partitions of both tables to each compute node, which can then compute the join result locally for each record:

Kafka Enrichment: primary key join
Image Source

This works because the two tables are co-partitioned (identically partitioned), meaning that all records for a given key, say Merchant1, in both tables are in the same partition, say Partition2. Because the primary-key join result for Merchant1 is independent of any other key, you may allocate partition2 of both the BillableMerchant and the DisplayMercant tables to the same node and compute the join correctly using only local data.

What Makes Hevo’s Data Enrichment Process Unique

Enrichment data can be a mammoth task without the right set of tools. Hevo’s automated platform empowers you with everything you need to have a smooth Data Collection, Processing, and Enrichment experience. Our platform has the following in store for you!

  • Fully Managed: Hevo requires no management and maintenance as it is a fully automated platform.
  • Data Transformation: Hevo provides a simple interface to perfect, modify, and enrich the data you want to transfer.
  • Faster Insight Generation: Hevo offers near real-time data replication so you have access to real-time insight generation and faster decision making. 
  • Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
  • Scalable Infrastructure: Hevo has in-built integrations for 100+ sources (with 40+ free sources) that can help you scale your data infrastructure as required.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.

Simplify your Data Analysis with Hevo today! SIGN UP HERE FOR A 14-DAY FREE TRIAL!

Kafka Enrichment with SQL

In this method, you’ll look at how to add customer information to customer call events.

A common situation for streaming SQL on Apache Kafka is enriching data streams with extra information by executing an efficient lookup.

The following are the topics are discussed:

  • customer_details:  messages contain customer information.
  • Customer_call_details: contain information about calls.
Streaming SQL Enrichment on Apache Kafka

Given below is a code snippet of how to use SQL to perform Kafka Enrichment:

SET defaults.topic.autocreate=true;

INSERT INTO customers_callInfo
SELECT STREAM
    calls._value AS call 
    , customer._value AS customer
FROM  customer_call_details AS calls
        INNER JOIN  (SELECT TABLE * FROM customer_details) AS customer
            ON customer._key.customer.id = calls._key.customer.id

Kafka Enrichment SQL: Testing Data

You’ll use SQL to construct and populate the three Apache Kafka topics to streamline your testing process and get the above example running in under 60 seconds:

  • Create topic customer_details:
CREATE TABLE customer_details(
        _key.customer.typeID string
        , _key.customer.id string
        , customer.name string
        , customer.middleName string null
        , customer.surname string
        , customer.nationality string
        , customer.passportNumber string
        , customer.phoneNumber string
        , customer.email string null
        , customer.address string
        , customer.country string
        , customer.driverLicense string null
        , package.typeID string
        , package.description string
        , active boolean
)
FORMAT(avro, avro)
PROPERTIES(partitions=5, replication=1, compacted=true);
  • Populate topic customer_details:
INSERT INTO customer_details(
        _key.customer.typeID
        , _key.customer.id
        , customer.name
        , customer.middleName
        , customer.surname
        , customer.nationality
        , customer.passportNumber
        , customer.phoneNumber
        , customer.email
        , customer.address
        , customer.country
        , customer.driverLicense
        , package.typeID
        , package.description
        , active
) VALUES
("userType1","5162258362252394","April","-","Paschall","GBR","APGBR...","1999153354","aprilP@mydomain.com","-","GBR","-","TypeA","Desc.",true),
("internal","5290441401157247","Charisse","-","Daggett","USA","CDUSA...","6418577217","charisseD@mydomain.com","-","USA","-","TypeC","Desc.",true),
("internal","5397076989446422","Gibson","-","Chunn","USA","GCUSA...","8978860472","gibsonC@mydomain.com","-","USA","-","TypeC","Desc.",true),
("partner","5248189647994492","Hector","-","Swinson","NOR","HSNOR...","8207437436","hectorS@mydomain.com","-","NOR","-","TypeA","Desc.",true),
("userType1","5196864976665762","Booth","-","Spiess","CAN","BSCAN...","6220504387","hectorS@mydomain.com","-","CAN","-","TypeA","Desc.",true),
("userType2","5423023313257503","Hitendra","-","Sibert","SWZ","HSSWZ...","6731834082","hitendraS@mydomain.com","-","SWZ","-","TypeA","Desc.",true),
("userType2","5337899393425317","Larson","-","Asbell","SWE","LASWE...","2844252229","larsonA@mydomain.com","-","SWE","-","TypeA","Desc.",true),
("partner","5140590381876333","Zechariah","-","Schwarz","GER","ZSGER...","4936431929","ZechariahS@mydomain.com","-","GER","-","TypeB","Desc.",true),
("internal","5524874546065610","Shulamith","-","Earles","FRA","SEFRA...","2119087327","ShulamithE@mydomain.com","-","FRA","-","TypeC","Desc.",true),
("userType1","5204216758311612","Tangwyn","-","Gorden","GBR","TGGBR...","9172511192","TangwynG@mydomain.com","-","GBR","-","TypeA","Desc.",true),
("userType1","5336077954566768","Miguel","-","Gonzales","ESP","MGESP...","5664871802","MiguelG@mydomain.com","-","ESP","-","TypeA","Desc.",true),
("userType3","5125835811760048","Randie","-","Ritz","NOR","RRNOR...","3245795477","RandieR@mydomain.com","-","NOR","-","TypeA","Desc.",true),
("userType1","5317812241111538","Michelle","-","Fleur","FRA","MFFRA...","7708177986","MichelleF@mydomain.com","-","FRA","-","TypeA","Desc.",true),
("userType1","5373595752176476","Thurborn","-","Asbell","GBR","TAGBR...","5927996719","ThurbornA@mydomain.com","-","GBR","-","TypeA","Desc.",true),
("userType3","5589753170506689","Noni","-","Gorden","AUT","NGAUT...","7288041910","NoniG@mydomain.com","-","AUT","-","TypeA","Desc.",true),
("userType2","5588152341005179","Vivian","-","Glowacki","POL","VGPOL...","9001088901","VivianG@mydomain.com","-","POL","-","TypeA","Desc.",true),
("partner","5390713494347532","Elward","-","Frady","USA","EFUSA...","2407143487","ElwardF@mydomain.com","-","USA","-","TypeB","Desc.",true),
("userType1","5322449980897580","Severina","-","Bracken","AUT","SBAUT...","7552231346","SeverinaB@mydomain.com","-","AUT","-","TypeA","Desc.",true);
  • Create topic customer_call_details:
CREATE TABLE customer_call_details(
    _key.customer.typeID string
    , _key.customer.id string
    , callInfoCustomerID string
    , callInfoType string
    , callInfoDuration int
    , callInfoInit int)
FORMAT(avro, avro)
PROPERTIES(partitions=1, replication=1, compacted=false)
  • Populate topic customer_call_details:
INSERT INTO customer_call_details(
    _key.customer.typeID
    , _key.customer.id
    , callInfoCustomerID
    , callInfoType
    , callInfoDuration
    , callInfoInit
) VALUES
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 470, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 67, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 377, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 209, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 209, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 887, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 203, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 1698, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 320, 1),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 89, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 355, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 65, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 43, 1),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 530, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 270, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 1633, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 110, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 540, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 168, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 1200, 0),
("internal", "5290441401157247","5290441401157247", "CallTypeC", 1200, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 22, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 333, 1),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 87, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 123, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 182, 1),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 844, 0),
("partner", "5390713494347532","5390713494347532", "CallTypeB", 56, 1),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 36, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 794, 0),
("userType3", "5589753170506689","5589753170506689", "CallTypeA", 440, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 52, 0),
("userType1", "5322449980897580","5322449980897580", "CallTypeA", 770, 0),
("internal", "5397076989446422","5397076989446422", "CallTypeC", 627, 0),
("partner", "5140590381876333","5140590381876333", "CallTypeB", 555, 0),
("userType2", "5337899393425317","5337899393425317", "CallTypeA", 55, 1);

Kafka Enrichment SQL: Validate Results

To validate the results of Kafka Enrichment, use the following code:

SELECT
    p.callInfoCustomerID AS customerID
    , p.callInfoType
    , p.callInfoInit
FROM customer_call_details AS p
        INNER JOIN customer_details AS c
            ON p._key.customer.id = c._key.customer.id

Conclusion

In this article, you have learned about Kafka Enrichment methods and how to implement them. 

However, as a Developer, extracting complex data from a diverse set of data sources like Databases, CRMs, Project management Tools, Streaming Services, and Marketing Platforms to your Database can seem to be quite challenging. If you are from non-technical background or are new in the game of data warehouse and analytics, Hevo Data can help!

Visit our Website to Explore Hevo

Hevo Data will automate your data transfer process, hence allowing you to focus on other aspects of your business like Analytics, Customer Management, etc. This platform allows you to transfer data from 100+ multiple sources like Kafka to Cloud-based Data Warehouses like Snowflake, Google BigQuery, Amazon Redshift, etc. It will provide you with a hassle-free experience and make your work life much easier.

Want to take Hevo for a spin? Sign Up for a 14-day free trial and experience the feature-rich Hevo suite first hand.

You can also have a look at our unbeatable pricing that will help you choose the right plan for your business needs!

No-Code Data Pipeline for Kafka