Organizations today are overflowing with data. The amount of data produced every day is truly staggering. A study by Forbes[1] a few years ago confirmed the Data Explosion.
It was found in the study that nearly 2.5 Quintillion Bytes of data are generated every day!
Because of this Data Explosion, it has become seemingly difficult to capture, process, and store big or complex datasets. You see, having a large amount of data isn’t a problem, but extracting useful information out of it is.
Thankfully, there exist systems and technologies capable enough not only to store this data efficiently but also to elevate the value of the generated data. One such technology is Kafka for Data Ingestion.
Organizations today have access to a wide stream of data. Apache Kafka, a popular Data Processing Service is used by over 30% of Fortune 500 companies to develop real-time data feeds.
Kafka is a fault-tolerant distributed event store platform that exhibits high resiliency and eases the process of data ingestion. But before getting started with Kafka for Data Ingestion, let’s discuss this robust platform in brief.
What is Kafka?
Apache Kafka is a popular Distributed Data Streaming software that allows for the development of real-time event-driven applications. Kafka is an open-source application that allows you to store, read, and analyze streams of data free of cost.
Kafka is distributed, which means that it can run as a Cluster that spans multiple servers. Leveraging its distributed nature, users can achieve high throughput, minimal latency, high computation power, etc., and can handle large volumes of data without any perceptible lag in performance.
Written in Scala, Kafka supports data from a large number of external Data Sources and stores them as “Topics”. Kafka employs two functions “Producers” and “Consumers” to read, write, and process events.
Producers act as an interface between Data Sources and Topics, and Consumers allow users to read and transfer the data stored in Kafka.
The fault-tolerant architecture of Kafka is highly scalable and can handle billions of events with ease. In addition to that, Kafka is super fast and is highly accurate with data records.
Key Features of Kafka
Take a look at the prominent features responsible for the immense popularity of Kafka:
- Fault-Tolerant: Kafka’s fault-tolerant clusters keep the organization’s data safe and secure in distributed and durable clusters. Kafka is exceptionally reliable and it also allows you to create new custom connections as per your needs.
- Scalability: Kafka can readily handle large volumes of data streams and trillions of messages per day. Kafka’s high scalability allows organizations to easily scale production clusters up to a thousand brokers.
- High Availability: Kafka is extremely fast and ensures zero downtime making sure your data is available anytime. Kafka replicates your data across multiple clusters efficiently without any data loss.
- Integrations: Kafka comes with a set of connectors that simplify moving data in and out of Kafka. Kafka Connect allows Developers to easily connect to 100s of event sources and event sinks such as AWS S3, PostgreSQL, MySQL, Elasticsearch, etc.
- Ease of Use: Kafka is a user-friendly platform and doesn’t require extensive programming knowledge to get started. Kafka has extensive resources in terms of documentation, tutorials, videos, projects, etc, to help Developers learn and develop applications using Kafka CLI.
Take the complexity out of Kafka data integration with Hevo. Easily move your Kafka streams to your destination for real-time analytics.
Why Consider Hevo?
- No-Code Setup: Get your data pipelines up and running in minutes without any coding.
- Fault-Tolerant Architecture: Ensure data reliability with Hevo’s built-in fault-tolerant system.
- Real-Time Data Sync: Enjoy continuous, real-time data flow from Kafka to your preferred destination.
Start focusing on insights, not data movement—try Hevo today for a seamless data-loading experience.
Get Started with Hevo for Free
What is Data Ingestion?
There’s a tremendous amount of data coming from disparate sources, it’s coming from your Website, it’s coming from your Mobile Application, REST Services, External Queues, and it’s even coming from your own Business Systems.
Data needs to be collected and stored securely without data losses and with the lowest possible latency. This is where Data Ingestion comes in.
Data Ingestion refers to the process of collecting and storing mostly unstructured sets of data from multiple Data Sources for further analysis.
This data can be real-time or integrated into batches. Real-time data is ingested on arrival, whereas batch data is ingested in chunks at regular intervals.
There are 3 different layers of Data Ingestion:
- Data Collection Layer: This layer of the Data Ingestion process decides how the data is collected from resources to build the Data Pipeline.
- Data Processing Layer: This layer of the Data Ingestion process decides how the data is getting processed which further helps in building a complete Data Pipeline.
- Data Storage Layer: The primary focus of the Data Storage Layer is on how to store the data. This layer is mainly used to store huge amounts of real-time data which is already getting processed from the Data Processing Layer.
Now that you’re familiar with Kafka and Data Ingestion, let’s dive straight into Kafka for Data Ingestion.
Steps to Use Kafka for Data Ingestion
It is important to have a reliable event-based system that can handle large volumes of data with low latency, scalability, and fault tolerance.
This is where Kafka for Data Ingestion comes in. Kafka is a framework that allows multiple producers from real-time sources to collaborate with consumers who ingest data.
In this infrastructure, S3 Objects Storage is used to centralize the data stores, harmonize data definitions and ensure good governance.
S3 is highly scalable and provides fault-tolerance storage for your Data Pipelines, easing the process of Data Ingestion using Kafka.
Follow the below-mentioned steps to use Kafka for Data Ingestion.
Integrate Kafka to BigQuery
Integrate Kafka to Databricks
Integrate Kafka to Redshift
Producing Data to Kafka for Data Ingestion
The first step in Kafka for Data Ingestion requires producing data in Kafka. Multiple components read from external sources such as Queues, WebSockets, or REST Services.
Consequently, multiple Kafka Producers are deployed, each delivering data to a distinct topic, which will comprise the source’s raw data.
A homogeneous data structure allows Kafka for Data Ingestion processes to run transparently while writing messages to multiple Kafka raw topics.
Then, all the messages are produced as .json.gzip and contain these general data fields:
- raw_data: This represents the data as it comes from the Kafka Producer.
- metadata: This represents the Kafka Producer metadata required to track the message source.
- ingestion_timestamp: This represents the timestamp when the message was produced. This is later used for Data Partitioning.
Here’s an example of an empty record:
{
"raw_data": {},
"metadata":{"thread_id":0,"host_name":"","process_start_time":""},
"ingestion_timestamp":0
}
raw_data
is a placeholder for the main data (currently empty).
metadata
stores additional info like thread_id
, host_name
, and process_start_time
(all placeholders).
ingestion_timestamp
marks the time of ingestion (currently set to 0).
Using Kafka-connect to Store Raw Data
The first layer of the raw data layer for Kafka Data Ingestion is written to the Data Lake.
This layer provides immense flexibility to the technical processes and business definitions as the information available is ready for analysis from the beginning.
Then, you can use Kafka-connect to perform this raw data layer ETL without writing a single line of code.
Thus, the S3 Sync is used to read data from the raw topics and produce data for S3.
Kafka-connect uses the org.apache.kafka.connect.json.JsonConverter to collect data as it comes in and the io.confluent.connect.storage.partitioner.TimeBasedPartitioner to write data.
Kafka-connect collects information as it comes using the org.apache.kafka.connect.json.JsonConverter and writes data using the io.confluent.connect.storage.partitioner.TimeBasedPartitioner. You can follow this example to understand how to configure the connector.
Quickly load data from Kafka to BigQuery
No credit card required
Configure and Start the S3 Sink Connector
To finish up the process of Kafka for Data Ingestion, you need to configure the S3 Connector by adding its properties in JSON format, and storing them in a file called meetups-to-s3.json:
{
"name": "meetups-to-s3",
"config": {
"_comment": "The S3 sink connector class",
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"_comment": "The total number of Connect tasks to spawn (with implicit upper limit the number of topic-partitions)",
"tasks.max":"1",
"_comment": "Which topics to export to S3",
"topics":"meetups",
"_comment": "The S3 bucket that will be used by this connector instance",
"s3.bucket.name":"meetups",
"_comment": "The AWS region where the S3 bucket is located",
"s3.region":"us-west-2",
"_comment": "The size in bytes of a single part in a multipart upload. The last part is of s3.part.size bytes or less. This property does not affect the total size of an S3 object uploaded by the S3 connector",
"s3.part.size":"5242880",
"_comment": "The maximum number of Kafka records contained in a single S3 object. Here a high value to allow for time-based partition to take precedence",
"flush.size":"100000",
"_comment": "Kafka Connect converter used to deserialize keys (unused in this example)",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"_comment": "Kafka Connect converter used to deserialize values",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"_comment": "The type of storage for this storage cloud connector",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"_comment": "The storage format of the objects uploaded to S3",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"_comment": "Schema compatibility mode between records with schemas (Useful when used with schema-based converters. Unused in this example, listed for completeness)",
"schema.compatibility":"NONE",
"_comment": "The class used to partition records in objects to S3. Here, partitioning based on time is used.",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"_comment": "The locale used by the time-based partitioner to encode the date string",
"locale":"en",
"_comment": "Setting the timezone of the timestamps is also required by the time-based partitioner",
"timezone":"UTC",
"_comment": "The date-based part of the S3 object key",
"path.format":"'date'=YYYY-MM-dd/'hour'=HH",
"_comment": "The duration that aligns with the path format defined above",
"partition.duration.ms":"3600000",
"_comment": "The interval between timestamps that is sufficient to upload a new object to S3. Here a small interval of 1min for better visualization during the demo",
"rotate.interval.ms":"60000",
"_comment": "The class to use to derive the timestamp for each record. Here Kafka record timestamps are used",
"timestamp.extractor":"Record"
}
}
- Connector Class: Specifies the use of the
S3SinkConnector
data to be exported to S3.
- Tasks: Limits to 1 task with
"tasks.max":"1"
, dictating parallelism.
- Topics: Targets the Kafka topic
meetups
for export to S3.
- S3 Bucket Configuration: Sets
s3.bucket.name
to "meetups"
and region to "us-west-2"
.
- Multipart Upload: Defines part size with
"s3.part.size":"5242880"
(5 MB).
- Flush Size: Determines that a file is created after
100,000
records.
- Data Format: Uses JSON format with
JsonConverter
for key and value serialization.
- Storage and Format Classes: Configures S3 storage using
S3Storage
and JSON file format.
- Partitioning: Uses
TimeBasedPartitioner
to organize files by date and hour.
- Time-based Settings:
- Sets locale to
en
and timezone to UTC
.
- Formats path as
'date'=YYYY-MM-dd/'hour'=HH
.
- Uses
rotate.interval.ms
of 60000
(1 minute) for frequent uploads.
- Timestamp Extraction: Extracts timestamp directly from Kafka records (
Record
).
Some additional parameters are needed to modify Kafka for Data Ingestión:
- topics.regex: A uniform name for all the raw topics with the suffix _raw is kept. Therefore, a single connector configuration template file is used to create the connectors for multiple topics.
"topics.regex": ".*raw$"
- flush.size: Small files could be produced to S3 while receiving tiny messages. This can be prevented by configuring a bigger flush size. Besides, the ingestion linger-timestamp needs to be configured as well: rotate.interval.ms and rotate.schedule.interval.ms.
Example:
"flush.size": "180000",
"rotate.interval.ms": "300000",
"rotate.schedule.interval.ms": "300000"
- path.format: The data is partitioned using the following statement:
"timestamp.field": "ingestion_timestamp
Then, the following format is defined:
"'date'=YYYY-MM-dd".
Partitioning the data by date speeds up the ingestion process and the future implementation of parquet partitioned queries.
You can then issue the REST API call using the Confluent CLI to start the S3 Connector:
confluent local load meetups-to-s3 -- -d ./meetups-to-s3.json
You can confirm if the S3 connector has started correctly or not by inspecting the logs of the Connect Worker.
INFO Starting connectors and tasks using config offset 9
INFO Starting connector meetups-to-s3
INFO Starting task meetups-to-s3-0
INFO Creating task meetups-to-s3-0
- Use config
offset 9
to begin loading connectors and tasks.
- Start the connector
meetups-to-s3
.
- Launch task
meetups-to-s3-0
for data processing.
- Confirms creation of task
meetups-to-s3-0
.
The raw data is now successfully stored in S3. That’s it, this is how you can use Kafka for Data Ingestion.
Read More About: Connect Kafka to S3
Conclusion
Kafka is distributed event store and stream-processing platform developed by the Apache Software Foundation and written in Java and Scala.
Without the need for additional resources, you can use Kafka-connect or Kafka for Data Ingestion to external sources.
This article helped you use Kafka for Data Ingestion. However, in businesses, extracting complex data from a diverse set of Data Sources can be a challenging task and this is where Hevo saves the day!
Give Hevo Data a try 14-day free trial today. Hevo offers plans & pricing for different use cases and business needs. Check them out!
Share your experience of understanding Kafka for Data Ingestion in the comments section below.
References:
FAQs
1. Is Kafka used for data ingestion?
Yes, Kafka is great for bringing in large amounts of real-time data from different sources and delivering it to various destinations. It’s a go-to tool for streaming data into systems quickly and efficiently.
2. Is Kafka used for data processing?
Yes, you can use Kafka to process data in real-time. By using Kafka Streams or tools like Apache Flink or Spark, you can analyze and transform data as needed.
3. What is the ingestion time in Kafka?
Kafka processes data really fast, usually with minimal delays. The exact time it takes to ingest data depends on things like the size of the messages, how your Kafka brokers are set up, and the speed of your network. But generally, it’s super quick—usually happening in just a few milliseconds.
Raj, a data analyst with a knack for storytelling, empowers businesses with actionable insights. His experience, from Research Analyst at Hevo to Senior Executive at Disney+ Hotstar, translates complex marketing data into strategies that drive growth. Raj's Master's degree in Design Engineering fuels his problem-solving approach to data analysis.