Kafka for Data Ingestion Simplified 101

on Apache Kafka, Data Ingestion, Data Integration, Data Streaming, Kafka CLI, Kafka Producers • May 10th, 2022 • Write for Hevo

kafka for data ingestion featured image

Organizations today are overflowing with data. The amount of data produced every day is truly staggering. A study by Forbes a few years ago confirmed the Data Explosion. It was found in the study that nearly 2.5 Quintillion Bytes of data are generated every day. Because of this Data Explosion, it has become seemingly difficult to capture, process, and store big or complex datasets. You see, having a large amount of data isn’t a problem, but extracting useful information out of it is.

Thankfully, there exist systems and technologies capable enough not only to store this data in an efficient manner but also to elevate the value of the generated data. One such technology is Kafka for Data Ingestion.

Organizations today have access to a wide stream of data. Apache Kafka, a popular Data Processing Service is used by over 30% of Fortune 500 companies to develop real-time data feeds. Kafka is a fault-tolerant distributed event store platform that exhibits high resiliency and eases the process of data ingestion. But before getting started with Kafka for Data Ingestion, let’s discuss this robust platform in brief.

Table of Contents

What is Kafka?

Kafka for Data Ingestion: Kafka | Hevo
Image Source: www.kafka.apache.org

Apache Kafka is a popular Distributed Data Streaming software that allows for the development of real-time event-driven applications. Kafka is an open-source application that allows you to store, read, and analyze streams of data free of cost. Kafka is distributed, which means that it can run as a Cluster that spans multiple servers. Leveraging its distributed nature, users can achieve high throughput, minimal latency, high computation power, etc., and can handle large volumes of data without any perceptible lag in performance.

Written in Scala, Kafka supports data from a large number of external Data Sources and stores them as “Topics”. Kafka employs two functions “Producers” and “Consumers” to read, write, and process events. Producers act as an interface between Data Sources and Topics, and Consumers allow users to read and transfer the data stored in Kafka. The fault-tolerant architecture of Kalka is highly scalable and can handle billions of events with ease. In addition to that, Kafka is super fast and is highly accurate with data records.

Key Features of Kafka

Take a look at the prominent features responsible for the immense popularity of Kafka.

  • Fault-Tolerant: Kafka’s fault-tolerant clusters keep the organization’s data safe and secure in distributed and durable clusters. Kafka is exceptionally reliable and it also allows you to create new custom connections as per your needs.
  • Scalability: Kafka can readily handle large volumes of data streams and trillions of messages per day. Kafka’s high scalability allows organizations to easily scale production clusters up to a thousand brokers.
  • High Availability: Kafka is extremely fast and ensures zero downtime making sure your data is available anytime. Kafka replicates your data across multiple clusters efficiently without any data loss.
  • Integrations: Kafka comes with a set of connectors that simplify moving data in and out of Kafka. Kafka Connect allows Developers to easily connect to 100s of event sources and event sinks such as AWS S3, PostgreSQL, MySQL, Elasticsearch, etc.
  • Ease of Use: Kafka is a user-friendly platform and doesn’t require extensive programming knowledge to get started. Kafka has extensive resources in terms of documentation, tutorials, videos, projects, etc, to help Developers learn and develop applications using Kafka CLI.

Simplify Your Kafka ETL with Hevo’s No-code Data Pipeline

Hevo Data is a No-code Data Pipeline that offers a fully managed solution to set up data integration from Apache Kafka and 100+ Data Sources (including 40+ Free Data Sources) and will let you directly load data to a Data Warehouse. It will automate your data flow in minutes without writing any line of code. Hevo provides you with a truly efficient and fully automated solution to manage data in real-time and always have analysis-ready data.

Get started with hevo for free

Hevo is the fastest, easiest, and most reliable data replication platform that will save your engineering bandwidth and time multifold.

Experience an entirely automated hassle-free ETL. Try our 14–day full access free trial today!

What is Data Ingestion?

Kafka for Data Ingestion: Data Ingestion | Hevo
Image Source: www.irisidea.com

There’s a tremendous amount of data coming from disparate sources, it’s coming from your Website, it’s coming from your Mobile Application, REST Services, External Queues, and it’s even coming from your own Business Systems. Data needs to be collected and stored securely without data losses and with the lowest possible latency. This is where Data Ingestion comes in.

Data Ingestion refers to the process of collecting and storing mostly unstructured sets of data from multiple Data Sources for further analysis. This data can be real-time or integrated into batches. Real-time data is ingested on arrival, whereas batch data is ingested in chunks at regular intervals. There are basically 3 different layers of Data Ingestion.

  • Data Collection Layer: This layer of the Data Ingestion process decides how the data is collected from resources to build the Data Pipeline.
  • Data Processing Layer: This layer of the Data Ingestion process decides how the data is getting processed which further helps in building a complete Data Pipeline.
  • Data Storage Layer: The primary focus of the Data Storage Layer is on how to store the data. This layer is mainly used to store huge amounts of real-time data which is already getting processed from the Data Processing Layer.

Now that you’re familiar with Kafka and Data Ingestion, let’s dive straight into Kafka for Data Ingestion.

Steps to Use Kafka for Data Ingestion

It is important to have a reliable event-based system that can handle large volumes of data with low latency, scalability, and fault tolerance. This is where Kafka for Data Ingestion comes in. Kafka is a framework that allows multiple producers from real-time sources to collaborate with consumers who ingest data.

In this infrastructure, S3 Objects Storage is used to centralize the data stores, harmonize data definitions and ensure good governance. S3 is highly scalable and provides fault-tolerance storage for your Data Pipelines, easing the process of Data Ingestion.

Follow the below-mentioned steps to use Kafka for Data Ingestion.

Producing Data to Kafka for Data Ingestion

The first step in Kafka for Data Ingestion requires producing data to Kafka. There are multiple components reading from external sources such as Queues, WebSockets, or REST Services. Consequently, multiple Kafka Producers are deployed, each delivering data to a distinct topic, which will comprise the source’s raw data.

A homogeneous data structure allows Kafka for Data Ingestion processes to run transparently while writing messages to multiple Kafka raw topics. Then, all the messages are produced as .json.gzip and contain these general data fields:

  • raw_data: This represents the data as it comes from the Kafka Producer.
  • metadata: This represents the Kafka Producer metadata required to track the message source.
  • ingestion_timestamp: This represents the timestamp when the message was produced. This is later used for Data Partitioning.

Here’s an example of an empty record:

{
"raw_data": {},
"metadata":{"thread_id":0,"host_name":"","process_start_time":""},
"ingestion_timestamp":0
}

Using Kafka-connect to Store Raw Data

The first layer of the raw data layer is written to the Data Lake. This layer provides immense flexibility to the technical processes and business definitions as the information available are ready for analysis from the beginning. Then, you can use Kafka-connect to perform this raw data layer ETL without writing a single line of code. Thus, the S3 Sync is used to read data from the raw topics and produce data to S3.

Kafka for Data Ingestion | Hevo
Image Source: www.medium.com

Kafka-connect uses the org.apache.kafka.connect.json.JsonConverter to collect data as it comes in and the io.confluent.connect.storage.partitioner.TimeBasedPartitioner to write data.

Kafka-connect collects information as it comes using the org.apache.kafka.connect.json.JsonConverter and and writes data using the io.confluent.connect.storage.partitioner.TimeBasedPartitioner. You can follow this example to understand how to configure the connector.

What makes Hevo’s ETL Process Best-In-Class

Providing a high-quality ETL solution can be a cumbersome task if you just have lots of data. Hevo’s automated, No-code platform empowers you with everything you need to have a smooth ETL experience. Our platform has the following in store for you!

Check out what makes Hevo amazing:

  • Fully Managed: It requires no management and maintenance as Hevo is a fully automated platform.
  • Data Transformation: It provides a simple interface to perfect, modify, and enrich the data you want to transfer.
  • Real-Time: Hevo offers real-time data migration. So, your data is always ready for analysis in a BI tool such as Power BI.
  • Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
  • Scalable Infrastructure: Hevo has in-built integrations for 100’s sources that can help you scale your data infrastructure as required.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-day free trial!

Configure and Start the S3 Sink Connector

To finish up the process of Kafka for Data Ingestion, you need to configure the S3 Connector by adding its properties in JSON format, and store them in a file called meetups-to-s3.json:

{                                                                                                                                                                                                                                                                  
 "name": "meetups-to-s3",                                                                                                                                                                                                                                         
 
 "config": {                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                
 
   "_comment": "The S3 sink connector class",                                                                                                                                                                                                                     
 
   "connector.class":"io.confluent.connect.s3.S3SinkConnector",                                                                                                                                                                                                   

   "_comment": "The total number of Connect tasks to spawn (with implicit upper limit the number of topic-partitions)",                                                                                                                                           
 
   "tasks.max":"1",                                                                                                                                                                                                                                               

   "_comment": "Which topics to export to S3",                                                                                                                                                                                                                    
 
   "topics":"meetups",                                                                                                                                                                                                                                            

   "_comment": "The S3 bucket that will be used by this connector instance",                                                                                                                                                                                      
 
   "s3.bucket.name":"meetups",                                                                                                                                                                                                                              

   "_comment": "The AWS region where the S3 bucket is located",                                                                                                                                                                                                   
 
   "s3.region":"us-west-2",                                                                                                                                                                                                                                       

   "_comment": "The size in bytes of a single part in a multipart upload. The last part is of s3.part.size bytes or less. This property does not affect the total size of an S3 object uploaded by the S3 connector",                                            
 
   "s3.part.size":"5242880",                                                                                                                                                                                                                                      

   "_comment": "The maximum number of Kafka records contained in a single S3 object. Here a high value to allow for time-based partition to take precedence",                                                                                                                                                                             
 
   "flush.size":"100000",                                                                                                                                                                                                                                         

   "_comment": "Kafka Connect converter used to deserialize keys (unused in this example)",                                                                                                                                                                       
 
   "key.converter":"org.apache.kafka.connect.json.JsonConverter",                                                                                                                                                                                                 
 
   "key.converter.schemas.enable":"false",                                                                                                                                                                                                                        

   "_comment": "Kafka Connect converter used to deserialize values",                                                                                                                                                                                              
 
   "value.converter":"org.apache.kafka.connect.json.JsonConverter",                                                                                                                                                                                               
 
   "value.converter.schemas.enable":"false",                                                                                                                                                                                                                      
 
   "_comment": "The type of storage for this storage cloud connector",                                                                                                                                                                                            
 
   "storage.class":"io.confluent.connect.s3.storage.S3Storage",                                                                                                                                                                                                   

   "_comment": "The storage format of the objects uploaded to S3",                                                                                                                                                                                                
 
   "format.class":"io.confluent.connect.s3.format.json.JsonFormat",                                                                                                                                                                                               

   "_comment": "Schema compatibility mode between records with schemas (Useful when used with schema-based converters. Unused in this example, listed for completeness)",                                                                                         
 
   "schema.compatibility":"NONE",                                                                                                                                                                                                                                 

   "_comment": "The class used to partition records in objects to S3. Here, partitioning based on time is used.",                                                                                                                                                 

"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",                                                                                                                                                                           

   "_comment": "The locale used by the time-based partitioner to encode the date string",                                                                                                                                                                         
 
   "locale":"en",

   "_comment": "Setting the timezone of the timestamps is also required by the time-based partitioner",                                                                                                                                                           
 
   "timezone":"UTC",                                                                                                                                                                                                                                              

   "_comment": "The date-based part of the S3 object key",                                                                                                                                                                                                        
 
   "path.format":"'date'=YYYY-MM-dd/'hour'=HH",                                                                                                                                                                                                                   

   "_comment": "The duration that aligns with the path format defined above",                                                                                                                                                                                     
 
   "partition.duration.ms":"3600000",                                                                                                                                                                                                                             

   "_comment": "The interval between timestamps that is sufficient to upload a new object to S3. Here a small interval of 1min for better visualization during the demo",                                                                                         
 
   "rotate.interval.ms":"60000",                                                                                                                                                                                                                                  

   "_comment": "The class to use to derive the timestamp for each record. Here Kafka record timestamps are used",                                                                                                                                                 
 
   "timestamp.extractor":"Record"                                                                                                                                                                                                                                 
 
 }                                                                                                                                                                                                                                                                
 
}                    

Some additional parameters are needed to modify Kafka for Data Ingestión:

  • topics.regex: A uniform name for all the raw topics with the suffix _raw is kept. Therefore, a single connector configuration template file is used to create the connectors for multiple topics.
"topics.regex": ".*raw$"
  • flush.size: Small files could be produced to S3 while receiving tiny messages. This can be prevented by configuring a bigger flush size. Besides, the ingestion linger-timestamp needs to be configured as well: rotate.interval.ms and rotate.schedule.interval.ms.

Example:

"flush.size": "180000",
"rotate.interval.ms": "300000",
"rotate.schedule.interval.ms": "300000"
  • path.format: The data is partitioned using:
"timestamp.field": "ingestion_timestamp

Then, the following format is defined:

"'date'=YYYY-MM-dd".

Partitioning the data by date speeds up the ingestion process and the future implementation of parquet partitioned queries.

You can then issue the REST API call using the Confluent CLI to start the S3 Connector:

confluent local load meetups-to-s3 -- -d ./meetups-to-s3.json

You can confirm if the S3 connector has started correctly or not by inspecting the logs of the Connect Worker.

INFO Starting connectors and tasks using config offset 9 
INFO Starting connector meetups-to-s3 
INFO Starting task meetups-to-s3-0 
INFO Creating task meetups-to-s3-0

The raw data is now successfully stored in S3. That’s it, this is how you can use Kafka for Data Ingestion.

Conclusion

Kafka is distributed event store and stream-processing platform developed by the Apache Software Foundation and written in Java and Scala. Without the need for additional resources, you can use Kafka-connect or Kafka for Data Ingestion to external sources.

This article helped you use Kafka for Data Ingestion. However, in businesses, extracting complex data from a diverse set of Data Sources can be a challenging task and this is where Hevo saves the day!

visit our website to explore hevo

Hevo Data with its strong integration with 100+ Sources & BI tools such as Apache Kafka, allows you to not only export data from multiple sources & load data to the destinations, but also transform & enrich your data, & make it analysis-ready so that you can focus only on your key business needs and perform insightful analysis using BI tools.

Give Hevo Data a try and sign up for a 14-day free trial today. Hevo offers plans & pricing for different use cases and business needs, check them out!

Share your experience of understanding Kafka for Data Ingestion in the comments section below.

No-code Data Pipeline For Kafka