This article is a step-by-step guide for the Kafka stream processing Python operation using Apache Kafka streams and Python. Apache Kafka or Kafka is an open-source distributed real-time event streaming platform commonly employed by a wide variety of businesses, worldwide.
Kafka event stream processing with Python is widely implemented for ease and a user-friendly platform. Many companies use Apache Kafka with Python for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. In this guide, we take a comprehensive overview of the Kafka stream processing python operation.
Table of Contents
What is Apache Kafka?
Image Source: Kafka
Kafka is an open-source distributed stream processing platform developed by Apache Software Foundation. It is also called Apache Kafka and is written in Java and Scala. In January 2011, Apache released the first version of Kafka.
Kafka aims to provide a unified, high-throughput, and low-latency platform for real-time handling of data feeds. It offers Kafka stream libraries for smooth data processing in applications. In addition, it can connect to external systems for data import and export via Kafka Connect. Thousands of companies use Apache Kafka for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Many industrial verticals like manufacturing, banks, insurance, and telecom use Apache Kafka for efficiency in the data processing. Almost 80% of Fortune 100 companies trust and use Kafka.
You can connect to the Apache Kafka GitHub repository from here. More information about Kafka event streaming can be found in this guide – Kafka Event Streaming Methodology & Working: A Comprehensive Guide 101. To know more about building a streaming data pipeline using Kafka, visit this helpful guide – How to Build a Streaming Kafka Data Pipeline: 7 Easy Steps.
Kafka Core Capabilities
Apache Kafka’s core capabilities are
- High Throughput: Kafka delivers messages with high throughput using a cluster of machines with latencies as low as 2ms.
- Scalable: Kafka is highly scalable up to a thousand brokers, trillions of messages per day, petabytes of data, and hundreds of thousands of partitions.
- Permanent Storage: Kafka can store streams of data safely in distributed, durable, fault-tolerant cluster machines.
- High Availability: Kafka is highly available, with many clusters spread across geographic regions and efficiently connected.
- Build-in Stream Processing: Kafka has built-in stream processing features that process streams of events using event-time with joins, aggregations, filters, transformations, and more.
- Connection Interface: Kafka offers an out-of-the-box connect interface that integrates with hundreds of event sources and event sinks like AWS S3, Postgres, JMS, Elasticsearch, etc.
- Client Libraries: Kafka can read, write, and process streams of events in various programming languages like Python, Java, C/C++, Scala, etc.
Hevo Data a Fully-managed Data Pipeline platform, can help you automate, simplify & enrich your data replication process in a few clicks. With Hevo’s wide variety of connectors and blazing-fast Data Pipelines, you can extract & load data from 100+ Data Sources straight into your Data Warehouse or any Databases. To further streamline and prepare your data for analysis, you can process and enrich raw granular data using Hevo’s robust & built-in Transformation Layer without writing a single line of code!
Get Started with Hevo for Free
Hevo is the fastest, easiest, and most reliable data replication platform that will save your engineering bandwidth and time multifold. Try our 14-day full access free trial today to experience an entirely automated hassle-free Data Replication!
Kafka Basic Concepts
Before proceeding further, let’s understand a few important concepts of Kafka, a message-passing system.
- Topics: Topics refer to a group or collection of messages or events. All messages are organized and passed through topics in Kafka. There can be multiple topics, each with a unique name.
- Producers: The producer in Kafka is responsible for writing messages and sending these messages to particular topics.
- Consumers: The consumers in Kafka connect to a particular topic to read its messages.
- Consumer Groups: Kafka has multiple topics and multiple consumers. Multiple consumers subscribed to the same topics. Consumer groups manage a set of consumers to track messages which are already processed.
Kafka Stream Processing Python Mechanism
Let’s learn the Kafka stream processing Python mechanism step-by-step:
Python is an object-oriented and high-level programming language. It is a simple and user-friendly language that offers many modules and packages for various programming applications.
To get started with Kafka stream processing Python operation, first, we need to install and set up Python on our system. Follow the link to install and set up Python3 on the Windows system.
Kafka Environment Setup
Now, we need to set up a Kafka environment for seamless stream processing Python. Here, we are implementing these steps on the Windows system.
Prerequisite
Local systems must have Java8+ installed and set the environment variable for Java. To download and install java, follow the link.
Install Kafka on Windows
The first step is to download the latest Kafka from the link and extract it.
Image Source: Self
Run Kafka Environment
Step 1: Open a command prompt window.
Step 2: Go to the Kafka root folder and then run the next commands to start the services required.
Step 3: Run the below command to go to the required folder.
$ cd binwindows
Step 4: Start the Zookeeper server using the below command.
$ zookeeper-server-start.bat .configzookeeper.properties
Step 5: Open another command prompt session and start the Kafka server using the below command.
$ kafka-server-start.bat .configserver.properties
Once both services have successfully launched, the basic Kafka environment will be running and ready to use.
Create Kafka Topic to Store Events
Now, Kafka is ready to read, write, store, and process events across many machines. These events are stored in topics. We will create the topic ‘quickstart-events’ in the new command prompt window by running the below command.
$ kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092
You’ll be displayed with an output something like this:
Image Source: Self
From the above logs, we can see the ‘quickstart-events’ topic is created successfully.
Writing Events into the Topic
The next part in our Kafka stream processing Python guide is to write a few events into the topic ‘quickstart-events’ using the Kafka console producer by running the below command.
$ kafka-console-producer.bat --topic quickstart-events --bootstrap-server localhost:9092
See the below prompt window snapshot for writing events logs.
Image Source: Self
Read the Events from the Topic
Read the events from the ‘quickstart-events’ topic written in the previous step. Run the console consumer client in the new command prompt terminal using the below command.
$ kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
Image Source: Self
From the above logs, we can see events read from the topic successfully.
Providing a high-quality ETL solution can be a difficult task if you have a large volume of data. Hevo Data‘s automated, No-code platform empowers you with everything you need to have for a smooth data replication experience.
Check out what makes Hevo amazing:
- Fully Managed: Hevo requires no management and maintenance as it is a fully automated platform.
- Data Transformation: Hevo provides a simple interface to perfect, modify, and enrich the data you want to transfer.
- Faster Insight Generation: Hevo offers near real-time data replication so you have access to real-time insight generation and faster decision making.
- Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
- Scalable Infrastructure: Hevo has in-built integrations for 100+ sources (with 40+ free sources) that can help you scale your data infrastructure as required.
- Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-day free trial!
Kafka Stream Processing Python Package Installation
Now, our Kafka environment is up and running and it will be able to send and receive events. We need to install the Kafka-Python package to work with Kafka stream processing Python. To do this, install the Kafka-Python package using the pip command as shown below.
$ Python -m pip install kafka-Python
Image Source: Self
With this, the Kafka stream processing Python package will get installed on your system.
Writing a Kafka Producer in Python
Now, it’s time to create Kafka Producer in Python. The Kafka producer needs to know where Kafka is running and the topic name where events need to be streamed. Here, Kafka runs on ‘localhost:9092’, and the topic is ‘quickstart-events’.
First, we will write some messages in a text file named ‘inputfile.txt,’ and these messages will be sent by KafkaProducer to the Kafka topic.
Image Source: Self
Python code:
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
with open('inputfile.txt') as f:
lines = f.readlines()
for line in lines:
producer.send('quickstart-events', json.dumps(line).encode('utf-8'))
Writing a Kafka Consumer in Python
Kafka Consumer is to read the message sent by Kafka Producer on the given port and topic. The auto_offset_reset parameter is used to display the oldest messages first.
Python code:
from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')
consumer.subscribe(['quickstart-events'])
for event in consumer:
print(event)
Final Testing
Kafka stream processing Python is now running successfully and can be tested from Python code output as well as on the command prompt window where we run the Kafka-console-consumer command.
Here’s the output of Python Code:
The below output shows all the messages sent to the topic ’quickstart-events’ on port 9092.
ConsumerRecord(topic='quickstart-events', partition=0, offset=0, timestamp=1650078325182, timestamp_type=0, key=None, value=b'This is my first event', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=22, serialized_header_size=-1)
ConsumerRecord(topic='quickstart-events', partition=0, offset=1, timestamp=1650078332088, timestamp_type=0, key=None, value=b'This is my second event', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=23, serialized_header_size=-1)
ConsumerRecord(topic='quickstart-events', partition=0, offset=2, timestamp=1650078334261, timestamp_type=0, key=None, value=b'Hello', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=5, serialized_header_size=-1)
ConsumerRecord(topic='quickstart-events', partition=0, offset=3, timestamp=1650078449481, timestamp_type=0, key=None, value=b'Yes!! Kafka is running successfully!!', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=37, serialized_header_size=-1)
ConsumerRecord(topic='quickstart-events', partition=0, offset=4, timestamp=1650079106092, timestamp_type=0, key=None, value=b'"Send Message onen"', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=20, serialized_header_size=-1)
ConsumerRecord(topic='quickstart-events', partition=0, offset=5, timestamp=1650079106093, timestamp_type=0, key=None, value=b'"Send Message twon"', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=20, serialized_header_size=-1)
ConsumerRecord(topic='quickstart-events', partition=0, offset=6, timestamp=1650079106093, timestamp_type=0, key=None, value=b'"Send Message three"', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=20, serialized_header_size=-1)
Snapshot of Command prompt window of Kafka consumer:
Image Source: Self
From the above two logs, we can see, that Kafka events/messages are streaming successfully through Python.
Conclusion
We have our first Python-based Producer and Consumer code for Apache Kafka using Kafka stream processing Python operation. Messages are streaming successfully in the dummy example as shown. The principle of Kafka stream processing Python remains the same and users can do continuous stream processing through it.
Now, when the base is ready, users can advance the given code as per their needs. For that, you just need to follow the given procedure step by step, modify it as per your needs and get the smooth Kafka stream processing Python custom-built for your own use case.
An in-depth and complete analysis of your business requires the consolidation of information from different sources like Apache Kafka to a single central data repository like a Data Warehouse. To extract this complex data with everchanging Data Connectors, you seamless, easy-to-use & economical ETL solutions like Hevo Data.
Visit our Website to Explore Hevo
Hevo Data, a No-code Data Pipeline can seamlessly transfer data from a vast sea of sources including Apache Kafka to a Data Warehouse or a Destination of your choice. It is a reliable, completely automated, and secure service that doesn’t require you to write any code! Hevo, with its strong integration with 100+ sources (Including 40+ Free Sources), allows you to not only export & load data but also transform & enrich your data & make it analysis-ready in a jiffy.
Want to take Hevo for a ride? Sign Up for a 14-day free trial and simplify your Data Integration process. You can also check out our unbeatable pricing page to understand which plan fulfills all your business needs.
Tell us about your experience of learning about Kafka stream processing Python operation in the comment box below.