How to Create MySQL CDC Kafka Pipeline: Easy Steps

Ofem Eteng • Last Modified: December 29th, 2022

Creating MySQL CDC Kafka pipeline

This article will take you through the following points in order to understand how to create a MySQL CDC Kafka pipeline to transfer your data seamlessly.

Introduction

In this blog post, you will learn how to perform Change Data Capture (CDC) through a MySQL database connected to Apache Kafka. Don’t worry if you do not understand some of these terms or how they fit together. Every term will be explained thoroughly and by the time you are done reading this, you will be able to not only understand the concepts but you will have a hands-on experience of how to implement them.

The intended audience for this post is anyone who wants to extend the capabilities of their database so that it can better respond to incoming changes in terms of stored records. This blog post assumes that the reader has little to no experience working with Kafka but assumes that the reader understands the concept of a database.

What is MySQL?

MySQL is a relational database engine that is used to store structured data. By structured data means data that is in the form of rows and columns (has a defined structure). Columns are generally referred to as fields while rows are instances of a specific record. MySQL is relational in that the data stored in tables can be related to each other through the concept of primary and foreign keys. Data can be retrieved or queried from the database using SQL syntax.

What is Change Data Capture (CDC)?

Change Data Capture is a software design pattern whereby when a change in data occurs, a notification is triggered that announces that change so that the change is handled somewhere else in the application. The concept can be extended to databases, for example, say you have a table called “orders” in your database and you want to respond to any change in a record in that table, what you will need is to implement CDC such that whenever there is a change, to an order record, you will be notified via some means and respond to that change because the underlying information associated with that record will be different.

There are several ways of implementing CDC such as using inbuilt database triggers in most relational databases, scanning transaction logs, leveraging event programming, etc. In this article, you will use a log-based Change Data Capture (CDC) tool that integrates with Apache Kafka. That brings us to the next point in our journey i.e. what exactly is Apache Kafka.

Hevo, A Simpler Alternative to Integrate your Data for Analysis

Hevo offers a faster way to move data from databases or SaaS applications such as Kafka, into your data warehouse to be visualized in a BI tool. Hevo is fully automated and hence does not require you to code.

Get started with hevo for free
  • Completely Automated: The Hevo platform can be set up in just a few minutes and requires minimal maintenance.
  • Real-time Data Transfer: Hevo provides real-time data migration, so you can have analysis-ready data always.
  • 100% Complete & Accurate Data Transfer: Hevo’s robust infrastructure ensures reliable data transfer with zero data loss.
  • Scalable Infrastructure: Hevo has in-built integrations for 100+ sources that can help you scale your data infrastructure as required.
  • 24/7 Live Support: The Hevo team is available round the clock to extend exceptional support to you through chat, email, and support calls.
  • Schema Management: Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the destination schema.
  • Live Monitoring: Hevo allows you to monitor the data flow so you can check where your data is at a particular point in time.
Sign up here for a 14-day free trial!

What is Apache Kafka?

Apache Kafka is a distributed streaming platform that can be used to build streaming data pipelines. It can also be used to build real-time applications that depend on a constant flow of data from a source. In our case, the source of data will be the MySQL database and changes to records in the database will be streamed as events into Apache Kafka. You will use the Kafka Connect API which is an interface between Apache Kafka and other systems in the data pipeline setup.

MySQL CDC Kafka

Source: https://www.confluent.io/blog/no-more-silos-how-to-integrate-your-databases-with-apache-kafka-and-cdc/

In the diagram above, the data source (MySQL) interfaces with kafka through Kafka Connect and because of the way the architecture of Kafka works, it can then be connected to any number of applications, alternate data sources or stream processors.

Integrating MySQL with Kafka

In order to integrate MySQL with Apache Kafka, you will make use of the MySQL transaction logs. The transaction log stores all changes to a database whether it is an insert operation, an update operation or even a delete operation. Basically any change perpetrated in a database is stored in its transaction log. As a result, every single event from the database can be streamed (passed) to Apache Kafka.

The Change Data Capture (CDC) tool you will use to enable this streaming between the MySQL database and Apache Kafka through the Kafka Connect API is Debezium. Debezium is an open-source distributed platform for Change Data Capture that plays well with Apache Kafka to enable the streaming of changes from your database to Apache Kafka. It points to the database through Kafka and because this setup of Change Data Capture is low impact, has low latency and great data fidelity, it is extremely fast and durable, responding to events quickly such that changes in the database are captured efficiently.

Performing MySQL CDC through Debezium and Kafka

Creating a MySQL CDC Kafka pipeline

To create a MySQL CDC Kafka pipeline for data transfer, you will make use of Debezium. The Debezium MySQL connector is used to take a snapshot of your MySQL database the first time it is connected to Kafka through the Kafka Connect APIs. Subsequently, it uses the binary logs from the MySQL database to track all changes and streams that into Kafka. The MySQL binary logs or binlogs contain all transactions that occurred in the database and preserves the order of those transactions. It also includes schema changes and is how the MySQL engine performs backup and replication of data.

In order to use the Debezium MySQL connector, the associated MySQL database needs to be set up to use row level binary logging. It also needs access to a database user with the required privileges. Below is an example of the required settings in the MySQL server configuration file:

server-id = your_server_id

log_bin = mysql-bin

binlog_format = row

binlog_row_image = full

expire_logs_days = 10

If you are running the MySQL database using several replicas, then you need to enable Global transaction identifiers, or GTIDs. This will allow you to fail over from one replica to the other as data consistency will be enforced. To set up GTIDs, the associated lines in your MySQL config file must be like so:

gtid_mode                 = on

enforce_gtid_consistency  = on

The final setup for MySQL is to enable query log events as shown below:

binlog_rows_query_log_events = on

The next step is to grant the required privileges to the Debezium connector via a database user. This can be done with the following command:

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, 
REPLICATION CLIENT ON *.* TO 'your_db_user' IDENTIFIED BY 'your_db_user_password';

Once this has been set up, Debezium emits event streams in a unique format that contains all the information about any change in the MySQL database. The format of the event also known as an envelope is shown below:

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}

As can be seen in the example above, the “before” field contains the previous values of a specific row in the database while “after” contains the new (updated) values. If it was an insert operation, then the “before” field will be null, similarly, if it was a delete operation, the “after” field will be null.

The Debezium MySQL connector outputs events for insert, update, and delete operations carried out on a specific table in the database to a single Kafka topic. That is, tables are mapped to Kafka topics. The convention for specifying the topic names is given as:

serverName.databaseName.tableName

You can decide to use the architecture shown in this post to migrate data from one database to another through Apache Kafka. This may be useful if you want to migrate from an on-premise MySQL database to a cloud-hosted database for example.

Conclusion

You have now learnt the basic concepts required to create a MySQL CDC Kafka pipeline and have worked on tools within the ecosystem to create your own real-time response to changes in your database. You may be wondering what is the next step to take. You can dig deeper into the technologies mentioned in the blog post or you may just want a simple managed service that does most of the heavy lifting. Fortunately for you, there is one such service – Hevo.

Visit our Website to Explore Hevo

Hevo Data, a No-code Data Pipeline, helps you transfer data from a source of your choice in a fully automated and secure manner without having to write the code repeatedly. Hevo, with its strong integration with 100+ sources & BI tools, allows you to not only export & load data but also transform & enrich your data & make it analysis-ready in a jiff.

Want to take Hevo for a spin? Sign Up for the 14-day free trial and experience the feature-rich Hevo suite first hand. You can also have a look at our unbeatable pricing that will help you choose the right plan for your business needs!

What is your preferred method to perform MySQL CDC? Let us know in the comments below.

No-code Data Pipeline For MySQL