Are you trying to integrate Kafka and Cassandra? Have you looked all over the internet to find a solution for it?
If yes, then this blog will answer all your queries. Kafka and Cassandra are usually integrated for the microservice architecture.
What is Kafka?
- Kafka is open-source software that provides a framework for storing, reading and analysing streaming data.
- Kafka is designed to be run in a “distributed” environment, which means that rather than sitting on one user’s computer, it runs across several (or many) servers, leveraging the additional processing power and storage capacity that it brings.
What is Cassandra?
- Apache Cassandra database is the right choice when you need scalability and high availability without compromising performance.
- Linear scalability and proven fault-tolerance on commodity hardware or cloud infrastructure make it the perfect platform for mission-critical data.
Methods to Connect API in Kafka and Cassandra Sink
- The Connect API in Kafka is a scalable and robust framework for streaming data into and out of Apache Kafka, the engine powering modern streaming platforms.
- When we are dealing in creating data pipelines, a significant amount of time is being spent in loading data and making data pipelines solid, reliable and robust.
- It is not a straightforward process and requires a lot of effort and technical knowledge.
- Fortunately, Kafka Connect API takes care of most of the difficult processes.
- The Connect API Sources and Sinks act as sensors on the edge of your analytics platform, loading and unloading events as they happen in real-time.
- Under the hood, they are Kafka consumers and producers with a simple and elegant API that allows developers to focus on moving data to and from Kafka.
Helpful Tips –
- Set up Reliable Data Pipeline in Minutes and Experience Hevo 14 days for no cost, Create Your Free Account
- Move data effortlessly with Hevo’s zero-maintenance data pipelines, Get a demo that’s customized to your unique data integration challenges
Kafka Connect has the following features:
- A Common Framework for Kafka Connectors
- Distributed and Standalone Modes
- REST Interface
- Automatic Offset Management
- Streaming/Batch Integration
The sink supports the following features:
- Field Selection and Topic Routing: Kafka Topic Payload field selection is supported, allowing you to write the selected fields or all fields to Cassandra.
- Error Policies for Handling Errors: In case of an error, the Sink will throw a RetriableException, telling Connect to redeliver the messages to the next poll.
- Secure Connections vis SSL: The connection between Kafka and Cassandra is made secure using standard Secure Sockets Layer (SSL).
The Cassandra Sink supports the following model for saving data from the Kafka topic:
INSERT INTO tableA SELECT * FROM topicA
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB
Field Selection and Topic Routing is handled by Kafka Connect Query Language (KCQL). Details related to it can be found on the following link.
KCQL helps in making complex processes simple. Imagine a Sink where you source from different topics and from each topic you want to cherry-pick the payload fields or rename them.
Furthermore, you might want the storage structure to be automatically created and evolve, or you might add new support for the likes of bucketing.
Integrate Kafka and Cassandra Using the Cassandra Sink
Connect API in Kafka Sources and Sinks, require configuration. For the Cassandra Sink, a typical configuration looks like the following:
name=cassandra-sink-orders
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
task.max=1
topics=draft-topics
connect.cassandra.export.route.query= INSERT INTO Data * FROM draft-topics
connect.cassandra.contact.points= localhost
connect.cassandra.port= 9042
connect.cassandra.key.space= demo-draft
connect.cassandra.username= cassandra
connect.cassandra.password= cassandra
Additionally, you need to start the Connect API in Kafka.
bin/connect-distributed etc/schema/registry/connect-avro-distributed.properties
You can use Kafka Connect’s Rest API to confirm that your Sink is available as well.
curl http://localhost:8083/connector-plugins
[{"class":"io.connect.hdfs.tools.SchemaSourceConnector"},
{"class":"com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector"},
{"class":"io.connect.jdbc.JdbcSourceConnector"},
{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector"},
{"class":"io.connect.hdfs.HdfsSinkConnector"},
{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector"},
{"class":"com..connect.cassandra.sink.CassandraSinkConnector"}]
The following steps show how to download Cassandra as you will be needing it next:
- Make a folder for Cassandra.
mkdir cassandra
- Download Cassandra using the following command.
wget http://apache.cs.uu.nl/cassandra/3.5/apache-cassandra-3.5-bin.tar.gz
- Extract archive to Cassandra folder.
tar -xvf apache-cassandra-3.5-bin.tar.gz -C cassandra
- Set up environment variables.
export CASSANDRA_HOME=~/cassandra/apache-cassandra-3.5-bin
export PATH=$PATH:$CASSANDRA_HOME/bin
- Start Cassandra using the following command.
sudo sh ~/cassandra/bin/cassandra
Start the CQL shell and set up the keyspace and table, the `csqlsh` can be found in the `bin` folder of the Cassandra install.
CREATE KEYSPACE demo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
use demo;
create table orders (id int, created varchar, product varchar, qty int, price float, PRIMARY KEY (id, created))
WITH CLUSTERING ORDER BY (created asc);
Steps to Write Data from Apache Kafka to Cassandra
Now, you are ready to post in your Cassandra Sink configuration. To submit tasks to the Connect API in the distributed mode, you need to post your config as JSON to the rest endpoint. The Connect API exposes this by default at port 8083.
java -jar kafka-connect-cli-0.5-all.jar create cassandra-sink-orders < cassandra-sink-distributed-orders.properties
#Connector `cassandra-sink-orders`:
name=cassandra-sink-orders
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=orders-topic
connect.cassandra.export.route.query=INSERT INTO orders SELECT * FROM orders-topic
connect.cassandra.contact.points=localhost
connect.cassandra.port=9042
connect.cassandra.key.space=demo
connect.cassandra.contact.points=localhost
connect.cassandra.username=cassandra
connect.cassandra.password=cassandra
#task ids: 0
After the Kafka has been started, you can execute the following:
bin/kafka-avro-console-producer
--broker-list localhost:9092 --topic orders-topic
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"}, {"name":"created", "type": "string"}, {"name":"product", "type": "string"}, {"name":"price", "type": "double"}]}'
Now, you can put in the following data:
{"id": 1, "created": "mention data", "product": "mention name of product", "price": “mention price”}
{"id": 2, "created": "mention data", "product": "mention name of product", "price": “mention price”}
{"id": 3, "created": "mention data", "product": "mention name of product", "price": “mention price”}
{"id": 4, "created": "mention data", "product": "mention name of product", "price": “mention price”}
That completes the process or steps to write data from Apache Kafka to Cassandra. Logs can also be checked to verify if the data has been written successfully.
Sync Data from Kafka to BigQuery
Sync Data from Kafka to Snowflake
Sync Data from Confluent Cloud to BigQuery
How Kafka and Cassandra can be Useful Together?
Kafka and Cassandra integration can work in the following ways:-
- Kafka as an Event Fabric between Microservices: In this method, you can use data stored in Cassandra as part of event processing. In this process, a service consumes events from a Kafka stream and performs computations on them. In the end, data is written on the Cassandra.
- Cassandra as a Sink for Kafka: In this method, data is ingested into Cassandra from Kafka. It uses the DataStax Kafka Connector. This method is particularly useful when there is a need to use Cassandra’s multi-DC replication or querying among multiple events.
- Using Kafka for Change Data Capture from Cassandra: This methodology makes use of the Kafka Connect framework to perform CDC (Change Data Capture) from Cassandra or other databases via plugins.
Conclusion
- Connecting Kafka and Cassandra has many benefits and is specifically useful to provide data streams for Machine Learning Engineers or Data Scientists.
- The manual method can be intimidating as well as challenging to deal with. So, if you want to consolidate Kafka and Cassandra data into a data warehouse, then try Hevo
Explore more about Hevo and Sign Up up for a 14-day free trial today.
Share your experience of integrating Kafka and Cassandra in the comment section below.
Muhammad Faraz is an AI/ML and MLOps expert with extensive experience in cloud platforms and new technologies. With a Master's degree in Data Science, he excels in data science, machine learning, DevOps, and tech management. As an AI/ML and tech project manager, he leads projects in machine learning and IoT, contributing extensively researched technical content to solve complex problems.