Kafka Stream Processing Python Operation: An Easy Guide

on Data Integration, Data Streaming, Kafka, Python, Tutorials • April 21st, 2022 • Write for Hevo

Stream Processing Python- Featured Image

This article is a step-by-step guide for the Kafka stream processing Python operation using Apache Kafka streams and Python. Apache Kafka or Kafka is an open-source distributed real-time event streaming platform commonly employed by a wide variety of businesses, worldwide.

Kafka event stream processing with Python is widely implemented for ease and a user-friendly platform. Many companies use Apache Kafka with Python for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. In this guide, we take a comprehensive overview of the Kafka stream processing python operation.

Table of Contents

What is Apache Kafka?

Apache Kafka Logo: Stream Processing Python | Hevo Data
Image Source: Kafka

Kafka is an open-source distributed stream processing platform developed by Apache Software Foundation. It is also called Apache Kafka and is written in Java and Scala. In January 2011, Apache released the first version of Kafka.

Kafka aims to provide a unified, high-throughput, and low-latency platform for real-time handling of data feeds. It offers Kafka stream libraries for smooth data processing in applications. In addition, it can connect to external systems for data import and export via Kafka Connect. Thousands of companies use Apache Kafka for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Many industrial verticals like manufacturing, banks, insurance, and telecom use Apache Kafka for efficiency in the data processing. Almost 80% of Fortune 100 companies trust and use Kafka.

You can connect to the Apache Kafka GitHub repository from here. More information about Kafka event streaming can be found in this guide – Kafka Event Streaming Methodology & Working: A Comprehensive Guide 101. To know more about building a streaming data pipeline using Kafka, visit this helpful guide – How to Build a Streaming Kafka Data Pipeline: 7 Easy Steps.

Kafka Core Capabilities

Apache Kafka’s core capabilities are

  • High Throughput: Kafka delivers messages with high throughput using a cluster of machines with latencies as low as 2ms.
  • Scalable: Kafka is highly scalable up to a thousand brokers, trillions of messages per day, petabytes of data, and hundreds of thousands of partitions.
  • Permanent Storage: Kafka can store streams of data safely in distributed, durable, fault-tolerant cluster machines.
  • High Availability: Kafka is highly available, with many clusters spread across geographic regions and efficiently connected.
  • Build-in Stream Processing: Kafka has built-in stream processing features that process streams of events using event-time with joins, aggregations, filters, transformations, and more.
  • Connection Interface: Kafka offers an out-of-the-box connect interface that integrates with hundreds of event sources and event sinks like AWS S3, Postgres, JMS, Elasticsearch, etc.
  • Client Libraries: Kafka can read, write, and process streams of events in various programming languages like Python, Java, C/C++, Scala, etc.

Turbocharge Your Apache Kafka ETL Using the Fastest ETL on Cloud

Hevo Data a Fully-managed Data Pipeline platform, can help you automate, simplify & enrich your data replication process in a few clicks. With Hevo’s wide variety of connectors and blazing-fast Data Pipelines, you can extract & load data from 100+ Data Sources straight into your Data Warehouse or any Databases. To further streamline and prepare your data for analysis, you can process and enrich raw granular data using Hevo’s robust & built-in Transformation Layer without writing a single line of code!

Get Started with Hevo for Free

Hevo is the fastest, easiest, and most reliable data replication platform that will save your engineering bandwidth and time multifold. Try our 14-day full access free trial today to experience an entirely automated hassle-free Data Replication!

Kafka Basic Concepts

Before proceeding further, let’s understand a few important concepts of Kafka, a message-passing system.

  • Topics: Topics refer to a group or collection of messages or events. All messages are organized and passed through topics in Kafka. There can be multiple topics, each with a unique name.
  • Producers: The producer in Kafka is responsible for writing messages and sending these messages to particular topics.
  • Consumers: The consumers in Kafka connect to a particular topic to read its messages.
  • Consumer Groups: Kafka has multiple topics and multiple consumers. Multiple consumers subscribed to the same topics. Consumer groups manage a set of consumers to track messages which are already processed.

Kafka Stream Processing Python Mechanism

Let’s learn the Kafka stream processing Python mechanism step-by-step:

Python is an object-oriented and high-level programming language. It is a simple and user-friendly language that offers many modules and packages for various programming applications. 

To get started with Kafka stream processing Python operation, first, we need to install and set up Python on our system. Follow the link to install and set up Python3 on the Windows system.

Kafka Environment Setup

Now, we need to set up a Kafka environment for seamless stream processing Python. Here, we are implementing these steps on the Windows system. 

Prerequisite

Local systems must have Java8+ installed and set the environment variable for Java. To download and install java, follow the link

Install Kafka on Windows

The first step is to download the latest Kafka from the link and extract it.

Install Kafka on Windows: Stream Processing Python | Hevo Data
Image Source: Self

Run Kafka Environment

Step 1: Open a command prompt window.

Step 2: Go to the Kafka root folder and then run the next commands to start the services required.

Step 3: Run the below command to go to the required folder.

$ cd binwindows

Step 4: Start the Zookeeper server using the below command. 

$ zookeeper-server-start.bat .configzookeeper.properties

Step 5: Open another command prompt session and start the Kafka server using the below command.

$ kafka-server-start.bat .configserver.properties

Once both services have successfully launched, the basic Kafka environment will be running and ready to use.

Create Kafka Topic to Store Events

Now, Kafka is ready to read, write, store, and process events across many machines. These events are stored in topics. We will create the topic ‘quickstart-events’ in the new command prompt window by running the below command.

$ kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092

You’ll be displayed with an output something like this:

Create Kafka Topic to Store Events: Stream Processing Python | Hevo Data
Image Source: Self

From the above logs, we can see the ‘quickstart-events’ topic is created successfully.

Writing Events into the Topic

The next part in our Kafka stream processing Python guide is to write a few events into the topic ‘quickstart-events’ using the Kafka console producer by running the below command.

$ kafka-console-producer.bat --topic quickstart-events --bootstrap-server localhost:9092

See the below prompt window snapshot for writing events logs.

Writing Events Into the Topic: Stream Processing Python | Hevo Data
Image Source: Self

Read the Events from the Topic

Read the events from the ‘quickstart-events’ topic written in the previous step. Run the console consumer client in the new command prompt terminal using the below command.

$ kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
Read the Events from the Topic: Stream Processing Python | Hevo Data
Image Source: Self

From the above logs, we can see events read from the topic successfully.

What Makes Your Apache Kafka ETL Experience With Hevo Best-in-Class?

Providing a high-quality ETL solution can be a difficult task if you have a large volume of data. Hevo Data‘s automated, No-code platform empowers you with everything you need to have for a smooth data replication experience.

Check out what makes Hevo amazing:

  • Fully Managed: Hevo requires no management and maintenance as it is a fully automated platform.
  • Data Transformation: Hevo provides a simple interface to perfect, modify, and enrich the data you want to transfer.
  • Faster Insight Generation: Hevo offers near real-time data replication so you have access to real-time insight generation and faster decision making. 
  • Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
  • Scalable Infrastructure: Hevo has in-built integrations for 100+ sources (with 40+ free sources) that can help you scale your data infrastructure as required.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-day free trial!

Kafka Stream Processing Python Package Installation

Now, our Kafka environment is up and running and it will be able to send and receive events. We need to install the Kafka-Python package to work with Kafka stream processing Python. To do this, install the Kafka-Python package using the pip command as shown below.

$ Python -m pip install kafka-Python
Kafka Stream Processing Python Package: Stream Processing Python | Hevo Data
Image Source: Self

With this, the Kafka stream processing Python package will get installed on your system.

Writing a Kafka Producer in Python

Now, it’s time to create Kafka Producer in Python. The Kafka producer needs to know where Kafka is running and the topic name where events need to be streamed. Here, Kafka runs on ‘localhost:9092’, and the topic is ‘quickstart-events’.

First, we will write some messages in a text file named ‘inputfile.txt,’ and these messages will be sent by KafkaProducer to the Kafka topic.

Writing a Kafka Producer in Python: Stream Processing Python | Hevo Data
Image Source: Self

Python code:

from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
with open('inputfile.txt') as f:
     lines = f.readlines()

for line in lines:
    producer.send('quickstart-events', json.dumps(line).encode('utf-8'))

Writing a Kafka Consumer in Python

Kafka Consumer is to read the message sent by Kafka Producer on the given port and topic. The auto_offset_reset parameter is used to display the oldest messages first.

Python code:

from kafka import KafkaConsumer


consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')
consumer.subscribe(['quickstart-events'])


for event in consumer:
    print(event)

Final Testing

Kafka stream processing Python is now running successfully and can be tested from Python code output as well as on the command prompt window where we run the Kafka-console-consumer command.

Here’s the output of Python Code:

The below output shows all the messages sent to the topic ’quickstart-events’ on port 9092.

ConsumerRecord(topic='quickstart-events', partition=0, offset=0, timestamp=1650078325182, timestamp_type=0, key=None, value=b'This is my first event', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=22, serialized_header_size=-1)
ConsumerRecord(topic='quickstart-events', partition=0, offset=1, timestamp=1650078332088, timestamp_type=0, key=None, value=b'This is my second event', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=23, serialized_header_size=-1)
ConsumerRecord(topic='quickstart-events', partition=0, offset=2, timestamp=1650078334261, timestamp_type=0, key=None, value=b'Hello', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=5, serialized_header_size=-1)
ConsumerRecord(topic='quickstart-events', partition=0, offset=3, timestamp=1650078449481, timestamp_type=0, key=None, value=b'Yes!! Kafka is running successfully!!', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=37, serialized_header_size=-1)
ConsumerRecord(topic='quickstart-events', partition=0, offset=4, timestamp=1650079106092, timestamp_type=0, key=None, value=b'"Send Message onen"', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=20, serialized_header_size=-1)
ConsumerRecord(topic='quickstart-events', partition=0, offset=5, timestamp=1650079106093, timestamp_type=0, key=None, value=b'"Send Message twon"', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=20, serialized_header_size=-1)
ConsumerRecord(topic='quickstart-events', partition=0, offset=6, timestamp=1650079106093, timestamp_type=0, key=None, value=b'"Send Message three"', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=20, serialized_header_size=-1)

Snapshot of Command prompt window of Kafka consumer:

Snapshot of Command Prompt Window of Kafka: Stream Processing Python | Hevo Data
Image Source: Self

From the above two logs, we can see, that Kafka events/messages are streaming successfully through Python.

Conclusion

We have our first Python-based Producer and Consumer code for Apache Kafka using Kafka stream processing Python operation. Messages are streaming successfully in the dummy example as shown. The principle of Kafka stream processing Python remains the same and users can do continuous stream processing through it.

Now, when the base is ready, users can advance the given code as per their needs. For that, you just need to follow the given procedure step by step, modify it as per your needs and get the smooth Kafka stream processing Python custom-built for your own use case.

An in-depth and complete analysis of your business requires the consolidation of information from different sources like Apache Kafka to a single central data repository like a Data Warehouse. To extract this complex data with everchanging Data Connectors, you seamless, easy-to-use & economical ETL solutions like Hevo Data.

Visit our Website to Explore Hevo

Hevo Data, a No-code Data Pipeline can seamlessly transfer data from a vast sea of sources including Apache Kafka to a Data Warehouse or a Destination of your choice. It is a reliable, completely automated, and secure service that doesn’t require you to write any code!  Hevo, with its strong integration with 100+ sources (Including 40+ Free Sources), allows you to not only export & load data but also transform & enrich your data & make it analysis-ready in a jiffy.

Want to take Hevo for a ride? Sign Up for a 14-day free trial and simplify your Data Integration process. You can also check out our unbeatable pricing page to understand which plan fulfills all your business needs.

Tell us about your experience of learning about Kafka stream processing Python operation in the comment box below.

No Code Data Pipeline For Apache Kafka