Integrating Kafka and Cassandra: A Comprehensive Guide

on Data Integration, Tutorials • November 17th, 2020 • Write for Hevo

Are you trying to integrate Kafka and Cassandra? Have you looked all over the internet to find a solution for it? If yes, then this blog will answer all your queries. Kafka and Cassandra are usually integrated for the microservice architecture. In this blog, you will learn about Kafka, Cassandra, and the steps to integrate Kafka and Cassandra in detail.

Let’s see how this blog is structured for you:

  1. What is Kafka?
  2. What is Cassandra?
  3. How Kafka and Cassandra can be Useful Together?
  4. Connect API in Kafka and Cassandra Sink
  5. Integrate Kafka and Cassandra Using the Cassandra Sink
  6. Steps to Write Data from Apache Kafka to Cassandra
  7. Conclusion

What is Kafka?

Kafka and Cassandra: Kafka

Kafka is an open-source software which provides a framework for storing, reading and analysing streaming data. Kafka is designed to be run in a “distributed” environment, which means that rather than sitting on one user’s computer, it runs across several (or many) servers, leveraging the additional processing power and storage capacity that it brings.

Apache takes information from a huge number of data sources and organises it into “topics”. As an example, one of these data sources could be a transactional log, where a grocery store records every sale. Kafka would process this stream of information and make “topics”. It can be the “number of apples sold”, or “number of sales between 1 pm and 2 pm” that can be analysed by anyone needing insights into the data.

What is Cassandra?

Kafka and Cassandra: Cassandra

Apache Cassandra database is the right choice when you need scalability and high availability without compromising performance. Linear scalability and proven fault-tolerance on commodity hardware or cloud infrastructure make it the perfect platform for mission-critical data. Cassandra’s support for replicating across multiple datacenters is best-in-class, providing lower latency for your users and the peace of mind of knowing that you can survive regional outages.

Hevo Data: Integrate your Data Seamlessly

Hevo is a No-code Data Pipeline. It supports pre-built data integrations from 100+ data sources, including Kafka and Cassandra. Hevo offers a fully managed solution for your data migration process from both the sources to your desired data warehouse. It will automate your data flow in minutes without writing any line of code. Its fault-tolerant architecture makes sure that your data is secure and consistent. Hevo provides you with a truly efficient and fully-automated solution to manage data in real-time and always have analysis-ready data at your data warehouse.

Let’s look at some salient features of Hevo:

  • Fully Managed: It requires no management and maintenance as Hevo is a fully automated platform.
  • Data Transformation: It provides a simple interface to perfect, modify, and enrich the data you want to transfer. 
  • Real-Time: Hevo offers real-time data migration. So, your data is always ready for analysis.
  • Schema Management: Hevo can automatically detect the schema of the incoming data and maps it to the destination schema.
  • Live Monitoring: Advanced monitoring gives you a one-stop view to watch all the activities that occur within pipelines.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support call.

Explore more about Hevo by signing up for a 14-day free trial today.

How Kafka and Cassandra can be Useful Together?

Kafka and Cassandra integration can work in the following ways:- 

  1. Kafka as an Event Fabric between Microservices: In this method, you can use data stored in Cassandra as part of event processing. In this process, a service consumes events from a Kafka stream and performs computations on them. In the end, data is written on the Cassandra. 
  2. Cassandra as a Sink for Kafka: In this method, data is ingested into Cassandra from Kafka. It uses the DataStax Kafka Connector. This method is particularly useful when there is a need to use Cassandra’s multi-DC replication or querying among multiple events.
  3. Using Kafka for Change Data Capture from Cassandra: This methodology makes use of the Kafka Connect framework to perform CDC (Change Data Capture) from Cassandra or other databases via plugins. 

In this article, you will be focussing on the Cassandra as a Sink for Kafka and Cassandra integration. 

Connect API in Kafka and Cassandra Sink

The Connect API in Kafka is a scalable and robust framework for streaming data into and out of Apache Kafka, the engine powering modern streaming platforms.

When we are dealing in creating data pipelines, a significant amount of time is being spent in loading data and making data pipelines solid, reliable and robust. It is not a straightforward process and requires a lot of efforts and technical knowledge.

Fortunately, Kafka Connect API takes care of most of the difficult processes. The Connect API Sources and Sinks act as sensors on the edge of your analytics platform, loading and unloading events as they happen in real-time. Under the hood, they are Kafka consumers and producers with a simple and elegant API that allows developers to focus on moving data to and from Kafka. 

Kafka Connect has following features:

  • A Common Framework for Kafka Connectors: It is used to connect Kafka with data systems. It standardizes the integrations of many data systems with Kafka, including development, deployment, and management. 
  • Distributed and Standalone Modes: It has the flexibility of both scaling up or scaling down according to the need. Many users can be added if you need to scale up the Kafka Cluster. 
  • REST Interface: An easy to use REST API to get connected to the other connectors.
  • Automatic Offset Management: Kafka Connect can manage the offset commits very well, which means that developers don’t have to worry about this challenging part.
  • Streaming/Batch Integration: Along with Kafka, Kafka Connect is suitable for live streaming as well as batch streaming. 

The sink supports the following features:

  • Field Selection and Topic Routing: Kafka Topic Payload field selection is supported, allowing you to write the selected fields or all fields to Cassandra. 
  • Error Policies for Handling Errors: In case of an error, the Sink will throw a RetriableException, telling Connect to redeliver the messages to the next poll. 
  • Secure Connections vis SSL: Connection between Kafka and Cassandra is made secure using standard Secure Sockets Layer (SSL). 

The Cassandra Sink supports the following model for saving data from the Kafka topic:

INSERT INTO tableA SELECT * FROM topicA
INSERT INTO tableB SELECT x AS a, y AS b and z AS c FROM topicB

Field Selection and Topic Routing is handled by Kafka Connect Query Language (KCQL). Details related to it can be found on the following link

KCQL helps in making complex processes simple. Imagine a Sink where you source from different topics and from each topic you want to cherry-pick the payload fields or rename them. Furthermore, you might want the storage structure to be automatically created and evolve, or you might add new support for the likes of bucketing.

Integrate Kafka and Cassandra Using the Cassandra Sink

Connect API in Kafka Sources and Sinks, require configuration. For the Cassandra Sink, a typical configuration looks like the following:

name=cassandra-sink-orders
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
task.max=1
topics=draft-topics
connect.cassandra.export.route.query= INSERT INTO Data * FROM draft-topics
connect.cassandra.contact.points= localhost
connect.cassandra.port= 9042
connect.cassandra.key.space= demo-draft
connect.cassandra.username= cassandra
connect.cassandra.password= cassandra

Additionally, you need to start the Connect API in Kafka. 

bin/connect-distributed etc/schema/registry/connect-avro-distributed.properties

You can use Kafka Connect’s Rest API to confirm that your Sink is available as well.

curl http://localhost:8083/connector-plugins
[{"class":"io.connect.hdfs.tools.SchemaSourceConnector"},
{"class":"com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector"},
{"class":"io.connect.jdbc.JdbcSourceConnector"},
{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector"},
{"class":"io.connect.hdfs.HdfsSinkConnector"},
{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector"},
{"class":"com..connect.cassandra.sink.CassandraSinkConnector"}]

The following steps shows how to download Cassandra as you will be needing it next: 

  1. Make a folder for Cassandra.
mkdir cassandra
  1. Download Cassandra using the following command.
wget http://apache.cs.uu.nl/cassandra/3.5/apache-cassandra-3.5-bin.tar.gz
  1. Extract archive to Cassandra folder.
tar -xvf apache-cassandra-3.5-bin.tar.gz -C cassandra
  1. Set up environment variables.
export CASSANDRA_HOME=~/cassandra/apache-cassandra-3.5-bin
export PATH=$PATH:$CASSANDRA_HOME/bin
  1. Start Cassandra using the following command.
sudo sh ~/cassandra/bin/cassandra

Start the CQL shell and set up the keyspace and table, the `csqlsh` can be found in the `bin` folder of the Cassandra install.

CREATE KEYSPACE demo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
use demo;
create table orders (id int, created varchar, product varchar, qty int, price float, PRIMARY KEY (id, created))
WITH CLUSTERING ORDER BY (created asc);

Steps to Write Data from Apache Kafka to Cassandra

Now, you are ready to post in your Cassandra Sink configuration. To submit tasks to the Connect API in the distributed mode, you need to post your config as JSON to the rest endpoint. The Connect API exposes this by default at port 8083.

java -jar kafka-connect-cli-0.5-all.jar create cassandra-sink-orders < cassandra-sink-distributed-orders.properties
#Connector `cassandra-sink-orders`:
name=cassandra-sink-orders
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=orders-topic
connect.cassandra.export.route.query=INSERT INTO orders SELECT * FROM orders-topic
connect.cassandra.contact.points=localhost
connect.cassandra.port=9042
connect.cassandra.key.space=demo
connect.cassandra.contact.points=localhost
connect.cassandra.username=cassandra
connect.cassandra.password=cassandra
#task ids: 0

After the Kafka has been started, you can execute the following:

bin/kafka-avro-console-producer 
--broker-list localhost:9092 --topic orders-topic 
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"}, {"name":"created", "type": "string"}, {"name":"product", "type": "string"}, {"name":"price", "type": "double"}]}'

Now, you can put in the following data:

{"id": 1, "created": "mention data", "product": "mention name of product", "price": “mention price”}
{"id": 2, "created": "mention data", "product": "mention name of product", "price": “mention price”}
{"id": 3, "created": "mention data", "product": "mention name of product", "price": “mention price”}
{"id": 4, "created": "mention data", "product": "mention name of product", "price": “mention price”}

That completes the process or steps to write data from Apache Kafka to Cassandra. Logs can also be checked to verify if the data has been written successfully. 

Conclusion

Connecting Kafka and Cassandra has many benefits and is specifically useful to provide data streams for Machine Learning Engineers or Data Scientists. The manual method can be intimidating as well as challenging to deal. So, if you want to consolidate Kafka and Cassandra data into a data warehouse, then try Hevo.

Hevo is a No-code Data Pipeline. It supports pre-built integrations from 100+ data sources at a reasonable price. With Hevo, you can integrate Kafka and Cassandra data in real-time.

Explore more about Hevo by signing up for a 14-day free trial today.

Share your experience of integrating Kafka and Cassandra in the comment section below. 

No-code Data Pipeline for your data warehouse