Debezium is a real-time data monitoring platform that continuously captures and streams event changes or modifications made on any external database system. It streams and stores all real-time changes or newly made updates on the respective databases in the Kafka servers. Usually, Debezium serializes and stores data changes in the form of a complex message structure, which is difficult for the end-users to understand. To eliminate such complications, Debezium allows end-applications or consumers to deserialize the complex message structure into a logical and readable message format using the Debezium SerDes process.
In this article, you will learn about Debezium and how to implement the change event deserialization using the Debezium SerDes.
Table of Contents
Prerequisites
Fundamentals of real-time data streaming.
Understanding Debezium
Image Source
Originally developed by Red Hat, Debezium is an Open-Source, distributed event monitoring platform that allows you to perform CDC (Change Data Capture) operations. In other words, Debezium is a real-time data change monitoring tool that enables you to capture, record, and track Real-Time modifications made on external Database Systems such as MySQL, SQL Server, and Oracle. Debezium also provides you with a vast collection of source connectors that help you connect with various external database systems to capture and stream real-time data changes made on the respective databases.
Hevo Data is a No-code Data Pipeline that offers a fully managed solution to set up Data Integration for 100+ Data Sources (including 40+ Free sources) like Kafka and will let you directly load data from sources to a Data Warehouse or the Destination of your choice. It will automate your data flow in minutes without writing any line of code. Its fault-tolerant architecture makes sure that your data is secure and consistent. Hevo provides you with a truly efficient and fully automated solution to manage data in real-time and always have analysis-ready data.
Get Started with Hevo for Free
Let’s look at some of the salient features of Hevo:
- Fully Managed: It requires no management and maintenance as Hevo is a fully automated platform.
- Data Transformation: It provides a simple interface to perfect, modify, and enrich the data you want to transfer.
- Real-Time: Hevo offers real-time data migration. So, your data is always ready for analysis.
- Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
- Connectors: Hevo supports 100+ Integrations to SaaS platforms FTP/SFTP, Files, Databases, BI tools, and Native REST API & Webhooks Connectors. It supports various destinations including Google BigQuery, Amazon Redshift, Snowflake, Firebolt, Data Warehouses; Amazon S3 Data Lakes; Databricks; MySQL, SQL Server, TokuDB, MongoDB, PostgreSQL Databases to name a few.
- Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
- Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
- Live Monitoring: Advanced Airflow Monitoring gives you a one-stop view to watch all the activities that occur within Data Pipelines.
- Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-Day Free Trial!
Steps to implement Change Event Deserialization using Debezium SerDes
Step 1: Starting the Kafka Environment
To implement change event deserialization using Debezium SerDes, you have to start the Kafka environment with Kafka server, Zookeeper Instance, and Kafka Connect platform. After setting up the Kafka environment, you have to install the Debezium MySQL connector to capture row-level changes from the MySQL database system. In the below steps, you will set up and start the Kafka environment and Debezium MySQL Connector.
- Since you will use Docker and Docker container images to set up the Kafka instances, ensure that Docker is installed and configured in your local machine.
- Initially, you have to start the Zookeeper Instance in a Docker container. Open a new command prompt and execute the below command to start a Zookeeper instance.
$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.8
- In the above command, debezium/zookeeper is the Docker image of Zookeeper with version 1.8.
- You can start a Kafka server in a new Docker container. Open another new command prompt and execute the following command.
$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.8
- In the above command, debezium/Kafka is the Docker image of Zookeeper with version 1.8.
- In the next step, you have to start the Kafka Connect platform for managing the Kafka clusters and Debezium SerDes connector.
- Execute the following command to start a Kafka Connect service in a Docker container.
Code Credit
Step 2: Setting up MySQL database
- You have to start the MySQL database server from which Debezium will capture real-time, row-level changes in the future. Open a command terminal and execute the following command to start a Docker container that runs the MySQL database. The newly started MySQL server is pre-configured with the sample “inventory” database by default.
$ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.8
- Run the command given below to start a MySQL command-line client to access the MySQL database for querying purposes.
$ docker run -it --rm --name mysqlterm --link mysql
--rm mysql:8.0 sh -c 'exec mysql
-h"$MYSQL_PORT_3306_TCP_ADDR"
-P"$MYSQL_PORT_3306_TCP_PORT" -uroot
-p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
- Now, you have to download the Debezium MySQL connector for capturing data changes from the MySQL database. You can download the MySQL connector from the official website of Confluent.
- After downloading the MySQL connector, add it to the plugin path of the Kafka Connect setup.
- You have to configure the plugin, as shown in the below image.
Code Credit
- Since you are about to capture changes made on the customer and addresses table present inside the inventory database, the “table.whitelist” parameter just has two values, such as “inventory.customers” and “inventory.addresses”
- On setting the parameter as transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope, you stream only new data modifications made on the database to the respective Kafka servers.
- Now, you can start the MySQL application by executing the following command.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @mysql-source.json
Step 3: Implementing Change Event Deserialization using Debezium SerDes
- At this stage, you are now ready to capture and stream real-time changes made on the database. You can use Kafka Consumer CLI, which is provided by default during the Kafka installation. Execute the following command to run the Kafka Consumer CLI in the Docker container.
docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092 --from-beginning --property print.key=true --topic dbserver1.inventory.customers
- Now, open a new terminal to view the Kafka topic that continuously streams and stores all real-time changes made on the database recently.
docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --property print.key=true --topic dbserver1.inventory.addresses
- On executing the above command, you will get the output that resembles the following image. In the output shown below, you can view the new modifications made to the database in a Key-Value pair format.
Code Credit
- The serializer converts all recently made data modifications into JSON format and stores real-time changes in Kafka servers. Since the Kafka topic records and organizes all the real-time changes in the Debezium JSON format, the data is totally in the form of unwrapped envelopes, making it complex for the consumers to understand or read the modifications. On the consumer end, any reader application can deserialize or decode the JSON formatted data and convert them into Java class, making it easy to read or understand the real-time modifications.
- For performing the deserialization action on the consumer side, Debezum provides a SerDes (Serializer/Deserializer), which encodes or decodes data whenever applicable during application execution.
- In the next step, you will use Kafka streams to process data present inside Kafka topics in a JSON format. Then, you will transfer the data into KTable in Kafka streams to continue with the further processing steps using Debezium SerDes.
- Execute the following Java code to process the customer table.
KTable<DefaultId, Customer> customerTable = builder .table(parentTopic, Consumed.with(defaultIdSerde, customerSerde));
- Now, run the below code to process the address table.
KStream<DefaultId, Address> addressStream = builder.stream(childrenTopic,Consumed.with(defaultIdSerde, addressSerde));
- In the next step, you have to implement the process of pseudo grouping to merge both the customer and address records based on their keys. The following code snippet helps you in executing the grouping process.
Code Credit
- Now, you have to create a new parameter called “addresses” to store the address records that will be updated continuously. The new parameter is added because the customer-specific addresses are finalized based on their primary key during the grouping process. To store the map of newly added address records, the “addresses” parameter is created.
Code Credit
- In the next step, you have to combine the customer table with newly-created addresses. Execute the following code snippet to implement the merging process.
Code Credit
- Now, you can run the application using the Maven Package Manager. Execute the command given below to run the application.
mvn clean package -f poc-ddd-aggregates/pom.xml
- After executing the above-given command, you will get an output that resembles the following image.
Code Credit
- You have to set up a MongoDB sink connector that reads real-time changes made on the external database, which are readily stored in the respective Kafka topic.
- Execute the following command to start the MongoDB sink connector.
docker-compose up -d mongodb connect_sink
- Now, configure the sink connector plugin, as shown in the below image.
Code Credit
- After configuring, deploy the sink connector by executing the following command.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8084/connectors/ -d @mongodb-sink.json
- The above command will consume messages from the final_ddd_aggregates that store all the real-time changes made on the MySQL database.
- After the MongoDB sink connector consumes data from the Kafka topic, the real-time changes are stored in customers_with_addresses.
- Since you already defined the value of customers_with_addresses in the mongodb.collection parameter, you can call the respective parameter to fetch all the real-time changes.
- Now, run the Docker container having MongoDB database console.
docker-compose exec mongodb bash -c 'mongo inventory'
- Execute the below command to fetch data stored in the customers_with_addresses.
db.customers_with_addresses.find().pretty()
- On executing the above command, you will get real-time changes in the form of Key-Value pairs, as shown in the above image.
Code Credit
Conclusion
In this article, you learned about Debezium and how to set up the Kafka environment to implement change event deserialization using Debezium SerDes. This article mainly focused on configuring the MySQL source and MongoDB sink connector to write and read real-time changes, respectively. However, you can also configure other Debezium database connectors, such as PostgreSQL, Oracle, and MySQL connectors, for implementing the process of change event deserialization. In case you want to transfer data into your desired Database/destination, then Hevo Data is the right choice for you!
Visit our Website to Explore Hevo
Hevo Data, a No-code Data Pipeline provides you with a consistent and reliable solution to manage Data transfer between a variety of sources and destinations with a few clicks. Hevo with its strong integration with 100+ sources like Kafka allows you to not only export Data from your desired Data sources & load it to the destination of your choice, but also transform & enrich your Data to make it analysis-ready so that you can focus on your key business needs and perform insightful analysis using BI tools.
Want to take Hevo for a spin? Sign Up for a 14-day free trial. You may also have a look at the amazing price, which will assist you in selecting the best plan for your requirements.
Share with us your experience of learning about the Debezium SerDes process in the comments below!