How to Build a Streaming Kafka Data Pipeline: 7 Easy Steps

on Confluent, Data Integration, Data Pipeline, Data Streaming, Kafka, MySQL, PubSub, SQL • April 7th, 2022

Kafka Data Pipeline

To stay ahead of their competitors, organizations use data-driven approaches to run their business operations efficiently. Applications or websites with lagging services can hinder business growth. To deliver high performance, scalability, and flexibility to business applications, it is essential to power them with advanced Data Pipelining and Data Streaming microservices.

Apache Kafka is an event streaming platform used by organizations for distributing events at high throughput and a pub-sub system that allows users to read and write data more conveniently. Kafka Data Pipeline helps Developers stream data in real-time from source to target with high throughput. 

Kafka Data Pipelines can help companies offload their transactional data from Databases and other sources for Analytical purposes. In this article, you will learn about Apache Kafka and the process of how to build a streaming Kafka Data Pipeline. Also, read how it helps users 

Table of Contents

Prerequisites

  • Confluent Cloud account.
  • A brief knowledge of Kafka and SQL.
  • MySQL Database.
  • Elasticsearch.

What is Apache Kafka?

Apache Kafka - Kafka Data Pipeline
Image Source

Apache Kafka is a distributed event store and stream processing platform widely used by companies to create and manage seamless streaming Kafka Data Pipelines, Data Integration, and Analytics. It is developed by LinkedIn that aims to deliver high throughput and low latency platforms for handling trillions of events daily and ensuring the smooth functioning of modern applications.

Apache Kafka is a publish-subscribe messaging platform to deliver data feed in real-time to Data Pipelines, Streaming, and replay data feeds. It uses Kafka Connect to help users to connect to external systems using TCP-based protocols to provide Kafka Streams.

Key Features of Apache Kafka

Some of the main features of Apache Kafka are listed below:

  • Scalability: Apache Kafka is capable of handling trillions of messages per day and petabytes of data with thousands of partitions. This allows organizations to easily scale the production clusters up to a thousand brokers.
  • Durability: Kafka Partitions are distributed and replicated across many servers that write all the data to the disk. It protects against any data loss during a server failure.
  • Performance: Kafka offers a high throughput rate for both publishing and subscribing to the messages. It maintains stability if many terabytes of messages are stored.

To know more about Kafka, click here.

Replicate Kafka Data in Minutes Using Hevo’s No-Code Data Pipeline

Hevo Data, a Fully-managed Data Pipeline platform, can help you automate, simplify & enrich your data replication process in a few clicks. With Hevo’s wide variety of connectors and blazing-fast Data Pipelines, you can extract & load data from 100+ Data Sources such as Kafka straight into your Data Warehouse or any Databases. To further streamline and prepare your data for analysis, you can process and enrich raw granular data using Hevo’s robust & built-in Transformation Layer without writing a single line of code!

Get Started with Hevo for Free

Hevo is the fastest, easiest, and most reliable data replication platform that will save your engineering bandwidth and time multifold. Try our 14-day full access free trial today to experience an entirely automated hassle-free Data Replication!

Steps to Build a Streaming Kafka Data Pipeline

Now that you have understood Apache Kafka. In this section, you will learn about the steps to build a streaming Kafka Data Pipeline. It will use Confluent Cloud, MySQL Database, and Elasticsearch and customer data on ratings. The following steps to build a streaming Kafka Data Pipeline are listed below:

Step 1: Setting Up the Environment

  • You will use the Confluent Cloud to get access to managed Kafka services, connectors, and stream processing for streaming Kafka Data Pipeline.
  • For this, go to your Confluent Cloud account here.
  • Now from the Confluent Cloud, create a new cluster for Kafka Data Pipeline. 
  • Provide the name of the cluster as “pipelines_quickstart” and set the type as “Basic“, as shown in the image below.
Creating Cluster - Kafka Data Pipeline
Image Source
  • After creating the cluster successfully for your Kafka Data Pipeline, navigate to the Kafka Topics page and wait for the “Create topic” button to become active.
  • Once it is active, your cluster is ready, and click on the “Create topic” button.
  • Create the Confluent Schema Registry for your environment and if you already have the Confluent Schema Registry for your Kafka Data Pipeline then you can skip this.
  • Navigate to the environment’s homepage and go to the “Schema Registry” option. There, click on the “Set up on my own” option.
  • Now, select the cloud platform and region, as shown in the image below.
Creating Schema Registry - Kafka Data Pipeline
Image Source
  • Then, click on the “Continue” button.
  • Navigate to the ksqlDB page and click on the “Add application” option.
  • Select the “Create application myself” option.
  • Set the application name as “pipelines-quickstart-ksqldb” and leave all the details as same.
  • Run the following command in the mysql CLI on your Amazon RDS managed MySQL instance to create the table and data used in this tutorial.
https://raw.githubusercontent.com/confluentinc/learn-kafka-courses/main/data-pipelines/customers.sql | 
  mysql -u admin -h $MYSQL_HOST -p

Step 2: Integrate Kafka with External Systems

  • Kafka Connect is used to create streaming Kafka Data Pipeline integrations with many other different technologies such as Data Warehouses, Databases, Message Queues, Document stores, etc.
  • Confluent Cloud provides fully managed connectors that you only need to configure.
  • Use Confluent UI to configure the Kafka Connect or you can use Kafka Connect REST API to its configuration in JSON.
{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "connection.url" : "jdbc:mysql://asgard:3306/demo",
  "table.whitelist": "sales,orders,customers"
  […]
}

What Makes Hevo’s ETL Process Best-In-Class

Providing a high-quality ETL solution can be a difficult task if you have a large volume of data. Hevo’s automated, No-code platform empowers you with everything you need to have for a smooth data replication experience.

Check out what makes Hevo amazing:

  • Fully Managed: Hevo requires no management and maintenance as it is a fully automated platform.
  • Data Transformation: Hevo provides a simple interface to perfect, modify, and enrich the data you want to transfer.
  • Faster Insight Generation: Hevo offers near real-time data replication so you have access to real-time insight generation and faster decision making. 
  • Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
  • Scalable Infrastructure: Hevo has in-built integrations for 100+ sources (with 40+ free sources) that can help you scale your data infrastructure as required.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-Day Free Trial!

Step 3: Creating a Data Generator

  • First, create a topic for your streaming Kafka Data Pipeline by navigating to the topics page of the Confluent Cloud cluster.
  • Then, click on the “Add topic” button.
  • Let’s name the new topic “ratings” and set the “Number of partitions” to “6“, as shown in the image below.
Creating New Topic - Kafka Data Pipeline
Image Source
  • Click on the “Create with defaults” button to add your first topic for Kafka Data Pipeline.
  • Now, the Data Generator connector for Kafka Connect will be used.
  • From the Confluent Cloud, navigate to the “Connector” page of the cluster.
  • Here in the search box type “datagen” and search, as shown in the image below.
Datagen Connector - Kafka Data Pipeline
Image Source
  • Select the “Datagen Source” connector for streaming Kafka Data Pipeline.
  • Click on the “Generate Kafka API key & secret” option under the “Kafka Cluster credentials” section, as shown in the image below.
Generating Kafka API Key & Secret - Kafka Data Pipeline
Image Source
  • Here, provide the “Description” of the API key and copy the credentials to a safe place for future use.
  • Fill remaining details as shown below.
Topic nameratings (as created in the step above)
Output messages
Output message formatAVRO
Datagen Details
QuickstartRATINGS
Max interval between messagse (ms)1000
Number of tasks for this connector
Tasks1
  • Then, click on the “Next” button.
  • The JSON on the conformation screen would similar to the one shown below.
{
  "name": "DatagenConnector1",
  "config": {
    "connector.class": "DatagenSource",
    "name": "DatagenConnector1",
    "kafka.api.key": "****************",
    "kafka.api.secret": "***********************",
    "kafka.topic": "ratings",
    "output.data.format": "AVRO",
    "quickstart": "RATINGS",
    "max.interval": "1000",
    "tasks.max": "1"
  }
}
  • Click on the “Launch” button to start the connector.
  • After some time you can see the new connector listed on the “Connectors” page with its status as “Running“.
  • On the “Topics” page of the cluster, choose the “ratings” topic and select the “Messages” tab, as shown in the image below.
Messages in Topic - Kafka Data Pipeline
Image Source
  • Here, you can see a stream of new messages through Kafka Data Pipeline.

Step 4: Loading Data from MySQL into Kafka with Kafka Connect

  • Now let’s ingest the rating messages from the MySQL Database.
  • Connect to the MySQL Database and check for rating data by running the query, given below.
mysql> SELECT first_name, last_name, email, club_status 
         FROM demo.CUSTOMERS 
         LIMIT 5;
+-------------+------------+------------------------+-------------+
| first_name  | last_name  | email                  | club_status |
+-------------+------------+------------------------+-------------+
| Rica        | Blaisdell  | rblaisdell0@rambler.ru | bronze      |
| Ruthie      | Brockherst | rbrockherst1@ow.ly     | platinum    |
| Mariejeanne | Cocci      | mcocci2@techcrunch.com | bronze      |
| Hashim      | Rumke      | hrumke3@sohu.com       | platinum    |
| Hansiain    | Coda       | hcoda4@senate.gov      | platinum    |
+-------------+------------+------------------------+-------------+
5 rows in set (0.24 sec)
  • With the help of MySQL connector for streaming Kafka Data Pipeline, you can create a topic that it ingests.
  • Click on the “Create topic” button from the Topics page of the Confluent Cloud cluster.
  • Name the topics as “mysql01.demo.CUSTOMERS” and set the “Number of partitions” to 6, as shown in the image below.
Creating New Topic - Kafka Data Pipeline
Image Source
  • Select the “Customize settings” option and then set the “Cleanup policy” to the “Compact” option under the “Storage” set.
  • Click on the “Save & create” button.
  • Open the Connectors page and click on the “Add connector” button and type “MySQL CDC Source” in the search box.
  • Then, add the connector for Kafka Data Pipeline.
  • Configure the connector’s remaining settings as given below:
Database hostnameThese values will depend on where your database is and how you have configured it.
Database port
Database username
Database password
Database server namemysql01
SSL modepreferred
Database details
Tables includeddemo.CUSTOMERS
Snapshot modewhen_needed
Output messages
Output message formatAVRO
After-state onlytrue
Number of tasks for this connector
Tasks1
  • Click on the “Next” button.
  • After a successful connection with the Database, you will get a JSON summary of the configuration as given below.
{
  "name": "MySqlConnector1",
  "config": {
    "connector.class": "MySqlCdcSource",
    "name": "MySqlConnector1",
    "kafka.api.key": "****************",
    "kafka.api.secret": "**************************",
    "database.hostname": "kafka-data-pipelines.xxxxx.rds.amazonaws.com",
    "database.port": "3306",
    "database.user": "admin",
    "database.password": "********************",
    "database.server.name": "mysql01",
    "database.ssl.mode": "preferred",
    "table.include.list": "demo.CUSTOMERS",
    "snapshot.mode": "when_needed",
    "output.data.format": "AVRO",
    "after.state.only": "true",
    "tasks.max": "1"
  }
}
  • Click on the “Launch” button.
  • From the list of Topics, click on the “mysql01.demo.CUSTOMERS” and select the “Messages” tab. 
  • Click on the “offset” and set it to 0, then select the first option of the list, as shown in the image below.
Setting Offset to - Kafka Data Pipeline
Image Source

Step 5: Filtering Streams of Data with ksqlDB

  • The rating messages received are from the channel1 device via Kafka Data Pipeline. 
  • Now let’s create a new stream for Kafka Data Pipeline that consists of only data from live devices using ksqlDB.
  • For this, you should have a ksqlDB application on Confluent Cloud.
  • You see the “Up” status of the application on the klsqlDB page.
  • Open up the editor by clicking on the ksqlDB application. 
  • First, declare the ksqlDB stream on the topic that has a rating message.
  • You can do this by running the following query given below:
CREATE STREAM RATINGS 
  WITH (KAFKA_TOPIC='ratings',VALUE_FORMAT='AVRO');
Creating ksqlDB Streams - Kafka Data Pipeline
Image Source
  • You can also view the messages running through the Kafka Data pipeline by running a SELECT command for the Kafka Topic stream, given below:
SELECT USER_ID, STARS, CHANNEL, MESSAGE 
  FROM RATINGS EMIT CHANGES;
  • You can remove the fields that have the “test” word in the “CHANNEL” column from the messages streaming through Kafka Data Pipeline using the SQL query given below.
SELECT USER_ID, STARS, CHANNEL, MESSAGE
  FROM RATINGS
  WHERE LCASE(CHANNEL) NOT LIKE '%test%'
  EMIT CHANGES;
  • For ksqlDB to process all the previous or existing and new messages in the topic, you need to set the “auto.offset.reset” parameter to the “earliest” by changing the drop-down from “Latest” to the “Earliest” option, as shown in the image below.
Setting Auto Offset to Earliest - Kafka Data Pipeline
Image Source
  • A ksqlDB stream is always backed by the Kafka topic and you can write all messages matching the criteria into a new ksqlDB stream of Kafka Data Pipeline using the SQL query given below.
CREATE STREAM RATINGS_LIVE AS
SELECT * 
  FROM RATINGS
 WHERE LCASE(CHANNEL) NOT LIKE '%test%'
 EMIT CHANGES;
  • Once again query the new stream and ensure that there are no “CHANNEL” values with “test” in streaming Kafka Data Pipeline, using the query given below.
SELECT USER_ID, STARS, CHANNEL, MESSAGE
  FROM RATINGS_LIVE
  EMIT CHANGES;
test in Channel Values - Kafka Data Pipeline
Image Source
  • Navigate to the Topics page and here you can see a new Kafka topic created with the prefix in its name as “RATINGS_LIVE“.
  • If Data Lineage is enabled then you can view the flow of data by clicking on the “Topic” option.

Step 6: Joining Data Streams with ksqlDB

  • Now let’s enrich each rating as it arrives from the external MySQL Database from where you are pulling the information. To do this run the following query given below:
CREATE STREAM CUSTOMERS_S
WITH (KAFKA_TOPIC ='mysql01.demo.CUSTOMERS',
      KEY_FORMAT ='JSON',
      VALUE_FORMAT='AVRO');
  • Next, create a ksqlDB table on customer data and built it on a stream that will return the value for a given key. Run the following command given below:
CREATE TABLE CUSTOMERS WITH (FORMAT='AVRO') AS
    SELECT id AS customer_id,
           LATEST_BY_OFFSET(first_name) AS first_name,
           LATEST_BY_OFFSET(last_name) AS last_name,
           LATEST_BY_OFFSET(email) AS email,
           LATEST_BY_OFFSET(club_status) AS club_status
    FROM CUSTOMERS_S
    GROUP BY id;
  • Now, you can use the primary or foreign key to enrich the rating events with information about the customer.
  • Join the stream of ratings and the table of customer details by running the following command given below.
CREATE STREAM RATINGS_WITH_CUSTOMER_DATA
        WITH (KAFKA_TOPIC='changed-ratings') AS
  SELECT C.CUSTOMER_ID,
         C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME,
         C.CLUB_STATUS,
         C.EMAIL,
         R.RATING_ID,
         R.MESSAGE,
         R.STARS,
         R.CHANNEL,
         TIMESTAMPTOSTRING(R.ROWTIME,'yyyy-MM-dd''T''HH:mm:ss.SSSZ') AS RATING_TS
FROM RATINGS_LIVE R
        INNER JOIN CUSTOMERS C
          ON R.USER_ID = C.CUSTOMER_ID
EMIT CHANGES;
  • Query the newly created stream by running the following command given below.
SELECT * 
  FROM RATINGS_WITH_CUSTOMER_DATA 
  EMIT CHANGES;

Step 7: Streaming Data from Kafka to Elasticsearch with Kafka Connect

  • Add a connector by navigating to the Connectors page. Here, search for the “Elasticsearch Service Sink” connector and click on the “Add Connector” button.
  • Now configure the “Elasticsearch Service Sink” connector with the following information given below.
Elasticsearch Connector - Kafka Data Pipeline
Image Source
  • Click on the “Next” button and check the connection of Kafka Data Pipeline with Elasticsearch. 
  • You will get the JSON output similar to the one given below.
{
  "name": "ElasticsearchSinkConnector_0",
  "config": {
    "topics": "changed-ratings",
    "input.data.format": "AVRO",
    "connector.class": "ElasticsearchSink",
    "name": "ElasticsearchSinkConnector_0",
    "kafka.api.key": "****************",
    "kafka.api.secret": "***************************",
    "connection.url": "https://es-host:port",
    "connection.username": "elastic",
    "connection.password": "************************",
    "type.name": "_doc",
    "key.ignore": "true",
    "schema.ignore": "true",
    "batch.size": "5",
    "tasks.max": "1"
  }
}
  • Then, click on the “Launch” button.
  • After some time you can see that the status of the connector will be “Running“.
  • Now, you can check if the data is streaming in your Elasticsearch.

That’s it! You have successfully built a Streaming Kafka Data Pipeline.

Conclusion

In this article, you will learn about Apache Kafka and the steps to how to build a streaming Kafka Data Pipeline. You used streams of data filtered them and enriched the information with the data from the Database. Kafka Data Pipelines deliver high throughput data from source to target applications or systems to keep various business activities running. 

Visit our Website to Explore Hevo

Kafka streams thousands of events that include valuable business data. It is essential to store this data in Data Warehouses and run Analytics on it to generate insights. Hevo Data is a No-code Data Pipeline solution that helps to transfer data from Kafka and 100+ data sources to desired Data Warehouse. It fully automates the process of transforming and transferring data to a destination without writing a single line of code.

Want to take Hevo for a spin? Sign Up here for a 14-day free trial and experience the feature-rich Hevo suite first hand.

Share your experience of learning about working with Kafka Data Pipeline in the comments section below!

Data Engineering
Survey 2022
Calling all data engineers – fill out this short survey to help us build an industry report for our data engineering community.
TAKE THE SURVEY
Amazon Gift Cards of $25 each are on offer for all valid and complete survey submissions.