Debezium Engine 101: Change Data Capture Simplified

on Database Management Systems, Debezium, MySQL, PostgreSQL • February 25th, 2022 • Write for Hevo

debezium engine - Featured Image

Debezium mainly focuses on detecting real-time data changes in databases and generating events from databases. It uses connectors to track these data changes and writes them to Kafka’s topic. Debezium consists of different connectors like MySQL, MongoDB, SQL, Oracle, PostgreSQL, and many more. When a Debezium connector is connected to databases, it tracks real-time changes in databases and generates change data events. These change data events are written to Kafka and then accessed by applications independently. Kafka, a distributed service, ensures that all registered connectors are configured and running correctly. But if one Kafka Connect endpoint in the Kafka cluster fails, the remaining Kafka Connect endpoint starts connecting connectors that were earlier running on the last terminated endpoint. 

As a result, it provides excellent scalability and fault tolerance. However, not every application requires such fault tolerance and does not need an external cluster of Kafka brokers and Kafka Connect services. Therefore, you can also get started with Change Data Capture using Debezium Engine. Often applications would embed the Debezium connector directly within the application space. Application space consists of all security information of the application and the role of who will process which activity in the application. 

In this article, you will learn how to effectively employ Debezium Engine for Change Data Capture using Embedded Debezium.

Table of contents

What is Debezium?

debezium engine - debezium logo
Image Source

Debezium is a Change Data Capture (CDC) tool and is an open-source, distributed, event streaming platform that captures real-time changes on databases. It consists of connectors for databases like MySQL, SQL, PostgreSQL, MongoDB, and many more, used to track such changes on databases. Debezium connectors then store these changes as events to respective Kafka Topics. Debezium can deploy one or more connectors to Kafka Connect in the cluster and configure to monitor databases. Distributed Kafka Connect provides critical fault tolerance and scalability so that all connectors are always running.

What is Debezium Engine?

Debezium connectors are usually operated by deploying them to a Kafka Connect service and configuring one or more monitors to upstream databases. It also produces data change events for all changes in databases. These data change events are written to Kafka and independently consumed by different applications. Since Kafka Connect runs as a distributed service, it ensures all the registered and configured connectors are always running.

Debezium engine provides an alternate way of using Debezium connectors through the embedded engine. Users can use the Debezium engine by embedding a library into custom Java applications. Change events can be consumed directly within applications without deploying Kafka and Kafka Connect clusters through this approach. Debezium engines are used where applications do not need very high fault tolerance or capabilities.

Accelerate Apache ETL Using Hevo’s No-code Data Pipeline

Hevo Data, a No-code Data Pipeline, is your one-stop-shop solution for all your Apache ETL needs! Hevo offers a built-in and robust native integration with Apache Kafka and Kafka Confluent Cloud to help you replicate data in a matter of minutes! With Hevo’s Real-Time Data Replication & Change Data Capture, you can seamlessly load data from your Apache Sources straight to your Desired Destination such as PostgreSQL & MySQL, Data Warehouse, or any other destination of your choice.

Hevo also supports MySQL & PostgreSQL as Sources for extracting data effortlessly. With Hevo in place, you can not only replicate data from 100+ Data Sources (Including 40+ Free Sources) but also enrich & transform it into an analysis-ready form without having to write a single line of code! In addition, Hevo’s fault-tolerant architecture ensures that the data is handled securely and consistently with zero data loss.

Get Started with Hevo for Free

Check out what makes Hevo amazing:

  • Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
  • Schema Management: Hevo takes away the tedious task of schema management & automatically detects schema of incoming data and maps it to the destination schema.
  • Minimal Learning: Hevo with its simple and interactive UI, is extremely simple for new customers to work on and perform operations.
  • Connectors: Hevo supports 100+ Integrations to SaaS platforms such as WordPress, Apache Kafka, FTP/SFTP, Files, Databases, BI tools, and Native REST API & Webhooks Connectors. It supports various destinations including Google BigQuery, Amazon Redshift, Snowflake, Firebolt, Data Warehouses; Amazon S3 Data Lakes; Databricks, MySQL, SQL Server, TokuDB, MongoDB, DynamoDB, PostgreSQL Databases to name a few.  
  • Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
  • Incremental Data Load: Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
  • Extensive Customer Base: Over 1000 Data-Driven organizations from 40+ Countries trust Hevo for their Data Integration needs.
  • Live Support: The Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-Day Free Trial!

How to get started with CDC using Debezium Engine?

Debezium engine can be configured in the following ways:

Method 1: Using Debezium engine for PostgreSQL connector

debezium engine - debezium PostgreSQL Elasticsearch
Image Source

In this article, you will use a Springboot application called “Student CDC Relay.” It runs the Debezium engine and includes the “Student” tabular PostgreSQL Transaction log of the database.

Use the Docker-compose file on port 54320, which starts the PostgreSQL database. Port 9022 begins HTTP, and 9300 starts the Elasticsearch. Follow the below steps to run the Debezium engine.

  • Step 1: Install the tools using the below command.

version: "3.5"

services:
  # Install postgres and setup the student database.
  postgres:
    container_name: postgres
    image: debezium/postgres
    ports:
      - 5432:5432
    environment:
      - POSTGRES_DB=studentdb
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password

  # Install Elasticsearch.
  elasticsearch:
    container_name: elasticsearch
    image: docker.elastic.co/elasticsearch/elasticsearch:6.8.0
    environment:
    - discovery.type=single-node
    ports:
      - 9200:9200
      - 9300:9300

Use the mirror of Debezium or PostgreSQL because it has a pre-build logic decoding function that allows you to extract changes committed to the transaction log.

  • Step 2: After installation, you need to add the Debezium-embedded and Maven dependencies of the Debezium-connector. In this example, you have to use the PostgreSQL connector. Therefore, add the below dependencies.
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${debezium.version}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-postgres</artifactId>
    <version>${debezium.version}</version>
</dependency>
  • Step 3: Configure the PostgreSQL connector to the Student CDC application to notice changes in the table. The connector has a setting of offset.storage that is used to help the Student CDC application track the number of transactions processed from the transaction log. If the application fails to process the transaction, they can restart from where it stops. There are multiple ways to store the offset, but in this article, FileOffsetBackingStore is used to store the offset in the local file defined by offset.storage.file.filename. The offset in the file records every change the connector reads. The Debezium engine can periodically refresh the offset to the file using offset.flush.interval.ms.

The connector consists of another parameter that accommodates Student Tabular PostgreSQL Database properties as below.

@Bean
public io.debezium.config.Configuration studentConnector() {
    return io.debezium.config.Configuration.create()
            .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
            .with("offset.storage",  "org.apache.kafka.connect.storage.FileOffsetBackingStore")
            .with("offset.storage.file.filename", "/path/cdc/offset/student-offset.dat")
            .with("offset.flush.interval.ms", 60000)
            .with("name", "student-postgres-connector")
            .with("database.server.name", studentDBHost+"-"+studentDBName)
            .with("database.hostname", studentDBHost)
            .with("database.port", studentDBPort)
            .with("database.user", studentDBUserName)
            .with("database.password", studentDBPassword)
            .with("database.dbname", studentDBName)
            .with("table.whitelist", STUDENT_TABLE_NAME).build();
}
  • Step 4: When you set up embedded Debezium, you need to start the Student CDC application. When it prompts, you have to use EmbeddedEngine classes that act as a wrapper for the connector and manage the lifecycle of the connector. You can use the connector to configure and change the functions for each data event. In this case, the engine handleEvent() is created as follows.

private CDCListener(Configuration studentConnector, StudentService studentService) {
    this.engine = EmbeddedEngine
            .create()
            .using(studentConnector)
            .notifying(this::handleEvent).build();

    this.studentService = studentService;
}

stay handleEvent(): StudentService can use SpringData JPA when parsing each event in the method in ElasticSearch for creating, updating, or deleting operations.

  • Step 5: Use the executor to start the Debezium engine.

private final Executor executor = Executors.newSingleThreadExecutor();

...

@PostConstruct
private void start() {
    this.executor.execute(engine);
}

@PreDestroy
private void stop() {
    if (this.engine != null) {
        this.engine.stop();
    }
}

Code credit: cdmana | Image: Screenshot

  • Step 6: Run the application along with docker with the below command.
docker-compose up -d start-up 'Student CDC Relay'
  • Step 7: Set up the student surface using the below command.

CREATE TABLE public.student
(
    id integer NOT NULL,
    address character varying(255),
    email character varying(255),
    name character varying(255),
    CONSTRAINT student_pkey PRIMARY KEY (id)
);
  • Step 8: Run the below query to insert values to the Student table of the PostgreSQL database.
INSERT INTO STUDENT(ID, NAME, ADDRESS, EMAIL) VALUES('1','Jason','San Diego, CA','jason@gmail.com');
  • Step 9: Confirm the Elastic search data has been changed using the below commands.
$ curl -X GET http://localhost:9200/student/student/1?pretty=true
{
  "_index" : "student",
  "_type" : "student",
  "_id" : "1",
  "_version" : 31,
  "_seq_no" : 30,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "id" : 1,
    "name" : "Jason",
    "address" : "San Diego, CA",
    "email" : "jason@gmail.com"
  }
}
  • Step 10: To update the record, use the below query.
UPDATE STUDENT SET EMAIL='john@gmail.com', NAME='John' WHERE ID = 1;
  • Step 11: Do the Elastic search to check the record update.
$ curl -X GET http://localhost:9200/student/student/1?pretty=true
{
  "_index" : "student",
  "_type" : "student",
  "_id" : "1",
  "_version" : 32,
  "_seq_no" : 31,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "id" : 1,
    "name" : "John",
    "address" : "San Diego, CA",
    "email" : "john@gmail.com"
  }
}

You have learned that the CDC-Relay application runs well when dealing with one source record, but if the application stops and restarts, it will not tolerate the duplication of events.

Method 2: Using Debezium engine for MySQL connector

To use the Debezium Engine module, you need to add the debezium-api to your application’s dependencies.

Follow the below steps to use the Debezium engine for MySQL connector.

  • Step 1: Add the below dependencies to your application.
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
</dependency>

From above, {version.debezium} is the version of Debezium or the Maven property whose value contains the Debezium version string.

  • Step 2: Add the dependencies to each of the connectors that the application uses. In this case, it is a MySQL connector.
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${version.debezium}</version>
</dependency>
  • Step 3: You can create the Debezium engine instance using its builder API that consists of :
    • The format in which users want to receive the message like JSON, Avro, and Kafka Connect SourceRecord.
    • Configuration properties define the environment for both the engine and the connector.
    • A method that can be called for every data change event by the connector.
  • Step 4: The below commands consist of a Debezium engine configuration with MySQL connector.
final Properties props = config.asProperties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/tmp/offsets.dat");
props.setProperty("offset.flush.interval.ms", 60000);

The above configuration consists of a Properties object used to set several fields required by the engine. The first property is the name of the engine that is used. The connector.class defines the name of the class that extends the Kafka Connect org.apache.kafka.connect.source.SourceConnector abstract class.

When a Kafka Connect connector runs, it reads information from the source and stores it to the offset. Connectors do not know how to work on offset, but the Debzium engine knows and stores the result in offset. As a result, the Debezium engine uses FileOffsetBackingStore to store offsets in the /path/to/storage/offset.dat local file system.

  • Step 5: The subsequent commands of connector configuration are as follows.
/* begin connector properties */
    props.setProperty("database.hostname", "localhost")
    props.setProperty("database.port", "3306")
    props.setProperty("database.user", "mysqluser")
    props.setProperty("database.password", "mysqlpw")
    props.setProperty("database.server.id", "85744")
    props.setProperty("database.server.name", "my-app-connector")
    props.setProperty("database.history",
          "io.debezium.relational.history.FileDatabaseHistory")
    props.setProperty("database.history.file.filename",
          "/path/to/storage/dbhistory.dat")

From above, you need to set the host machine’s name and port number where the MySQL database server is running. You have to define the username and password to connect to the MySQL database.

FileDatabaseHistory stores the historical changes of the database schema in the /path/to/storage/dbhistory.dat local file system.

  • Step 6: Create the Debezium Engine using the below command.
// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .notifying(record -> {
            System.out.println(record);
        })
        .build()) {
}

All the change data events can be passed to the handler method, which should match the signature of java.util.function.Consumer<R> interface, where <R> matches the type of the format specified when calling create().

The DebeziumEngine is executed using the Executor or ExecutorService through the below command.

// Run the engine asynchronously ...
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);

// Do something else or wait for a signal or an event
  • Step 7: You can stop the Debezium engine using the below command.

engine.close();

The Debezium connector engine stops reading the information from the source system and forwards all the remaining change data events to the handler function. If the application needs to wait for the engine to stop completely before exiting, you can use ExcecutorService shutdown and awaitTermination methods.

try {
    executor.shutdown();
    while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
        logger.info("Waiting another 5 seconds for the embedded engine to shut down");
    }
}
catch ( InterruptedException e ) {
    Thread.currentThread().interrupt();
}

You can also register CompletionCallback while creating DebeziumEngine as a callback to inform when the engine terminates.

Conclusion

In this article, you have learned about CDC using Debezium Engine without Kafka using Embedded Debezium. Debezium engineer configuration, setup with MySQL and PostgreSQL database has been explained in this article. You can also explore the configuration and setup of other databases like MongoDB, SQL, Oracle, etc., for embedded Debezium.

For a more in-depth and holistic analysis of business performance and financial health, it is essential to consolidate from Apache Kafka and all the other applications used across your business. However, to extract this complex data with everchanging data connectors, you would require to invest a section of your engineering bandwidth to Integrate, Clean, Transform & Load data to your Data Warehouse or a destination of your choice. On the other hand, a more effortless & economical choice is exploring a Cloud-Based ETL Tool like Hevo Data.

Visit our Website to Explore Hevo

Hevo Data, a No-code Data Pipeline can seamlessly transfer data from a vast sea of sources such as PostgreSQL, MySQL, Apache Kafka & Kafka Confluent Cloud to a Data Warehouse or a destination of your choice to be visualised in a BI Tool. Hevo also supports both MySQL & PostgreSQL as Destinations for loading data seamlessly. It is a reliable, completely automated, and secure service that doesn’t require you to write any code!  

If you are using Apache Kafka & Kafka Confluent Cloud as your Message Streaming Platform and searching for a stress-free alternative to manual data integration, then Hevo can effortlessly automate this for you. Hevo, with its strong integration with 100+ sources & BI tools(Including 40+ Free Sources), allows you to not only export & load data but also transform & enrich your data & make it analysis-ready in a jiffy.

Want to take Hevo for a ride? Sign Up for a 14-day free trial and simplify your Data Integration process. Do check out the pricing details to understand which plan fulfills all your business needs.

Tell us about your experience of setting up CDC using Debezium Engine! Share your thoughts with us in the comments section below.

No-code Data Pipeline for Apache Kafka