When a Debezium connector is connected to databases, it tracks real-time changes in databases and generates change data events. These change data events are written to Kafka and then accessed by applications independently. Kafka, a distributed service, ensures that all registered connectors are configured and running correctly.

But if one Kafka Connect endpoint in the Kafka cluster fails, the remaining Kafka Connect endpoint starts connecting connectors that were earlier running on the last terminated endpoint. As a result, it provides excellent scalability and fault tolerance.

However, not every application requires such fault tolerance and does not need an external cluster of Kafka brokers and Kafka Connect services. Therefore, you can also get started with Change Data Capture using Debezium Engine.

In this article, you will learn how to effectively employ Debezium Engine for Change Data Capture using Embedded Debezium.

What is Debezium?

debezium engine - debezium logo

Debezium is a Change Data Capture (CDC) tool and is an open-source, distributed, event streaming platform that captures real-time changes on databases. It consists of connectors for databases like MySQL, SQL, PostgreSQL, MongoDB, and many more, used to track such changes on databases. Debezium connectors then store these changes as events to respective Kafka Topics.

Debezium can deploy one or more connectors to Kafka Connect in the cluster and configure it to monitor databases. Distributed Kafka Connect provides critical fault tolerance and scalability so that all connectors are always running.

What is Debezium Engine?

Debezium connectors are usually operated by deploying them to a Kafka Connect service and configuring one or more monitors to upstream databases. It also produces data change events for all changes in databases. These data change events are written to Kafka and are independently consumed by different applications.

Since Kafka Connect runs as a distributed service, it ensures all the registered and configured connectors are always running. Debezium engine provides an alternate way of using Debezium connectors through the embedded engine. Users can use the Debezium engine by embedding a library into custom Java applications.

Change events can be consumed directly within applications without deploying Kafka and Kafka Connect clusters through this approach. Debezium engines are used where applications do not need very high fault tolerance or capabilities.

How to get started with CDC using Debezium Engine?

Debezium engine can be configured in the following ways:

Method 1: Using Debezium Engine for PostgreSQL Connector

debezium engine - debezium PostgreSQL Elasticsearch
Image Source

In this article, you will use a Springboot application called “Student CDC Relay.” It runs the Debezium engine and includes the “Student” tabular PostgreSQL Transaction log of the database.

Use the Docker-compose file on port 54320, which starts the PostgreSQL database. Port 9022 begins HTTP, and 9300 starts the Elasticsearch. Follow the below steps to run the Debezium engine.

  • Step 1: Install the tools using the below command.

version: "3.5"

services:
  # Install postgres and setup the student database.
  postgres:
    container_name: postgres
    image: debezium/postgres
    ports:
      - 5432:5432
    environment:
      - POSTGRES_DB=studentdb
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password

  # Install Elasticsearch.
  elasticsearch:
    container_name: elasticsearch
    image: docker.elastic.co/elasticsearch/elasticsearch:6.8.0
    environment:
    - discovery.type=single-node
    ports:
      - 9200:9200
      - 9300:9300

Use the mirror of Debezium or PostgreSQL because it has a pre-build logic decoding function that allows you to extract changes committed to the transaction log.

  • Step 2: After installation, you need to add the Debezium-embedded and Maven dependencies of the Debezium-connector. In this example, you have to use the PostgreSQL connector. Therefore, add the below dependencies.
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${debezium.version}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-postgres</artifactId>
    <version>${debezium.version}</version>
</dependency>
  • Step 3: Configure the PostgreSQL connector to the Student CDC application to notice changes in the table. The connector has a setting of offset.storage that is used to help the Student CDC application track the number of transactions processed from the transaction log. If the application fails to process the transaction, they can restart from where it stops.

There are multiple ways to store the offset, but in this article, FileOffsetBackingStore is used to store the offset in the local file defined by offset.storage.file.filename. The offset in the file records every change the connector reads. The Debezium engine can periodically refresh the offset to the file using offset.flush.interval.ms.

The connector consists of another parameter that accommodates Student Tabular PostgreSQL Database properties as below.

@Bean
public io.debezium.config.Configuration studentConnector() {
    return io.debezium.config.Configuration.create()
            .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
            .with("offset.storage",  "org.apache.kafka.connect.storage.FileOffsetBackingStore")
            .with("offset.storage.file.filename", "/path/cdc/offset/student-offset.dat")
            .with("offset.flush.interval.ms", 60000)
            .with("name", "student-postgres-connector")
            .with("database.server.name", studentDBHost+"-"+studentDBName)
            .with("database.hostname", studentDBHost)
            .with("database.port", studentDBPort)
            .with("database.user", studentDBUserName)
            .with("database.password", studentDBPassword)
            .with("database.dbname", studentDBName)
            .with("table.whitelist", STUDENT_TABLE_NAME).build();
}
  • Step 4: When you set up embedded Debezium, you need to start the Student CDC application. When it prompts, you have to use EmbeddedEngine classes that act as a wrapper for the connector and manage the lifecycle of the connector. You can use the connector to configure and change the functions for each data event. In this case, the engine handleEvent() is created as follows.

private CDCListener(Configuration studentConnector, StudentService studentService) {
    this.engine = EmbeddedEngine
            .create()
            .using(studentConnector)
            .notifying(this::handleEvent).build();

    this.studentService = studentService;
}

stay handleEvent(): StudentService can use SpringData JPA when parsing each event in the method in ElasticSearch for creating, updating, or deleting operations.

  • Step 5: Use the executor to start the Debezium engine.

private final Executor executor = Executors.newSingleThreadExecutor();

...

@PostConstruct
private void start() {
    this.executor.execute(engine);
}

@PreDestroy
private void stop() {
    if (this.engine != null) {
        this.engine.stop();
    }
}
  • Step 6: Run the application along with docker with the below command.
docker-compose up -d start-up 'Student CDC Relay'
  • Step 7: Set up the student surface using the below command.

CREATE TABLE public.student
(
    id integer NOT NULL,
    address character varying(255),
    email character varying(255),
    name character varying(255),
    CONSTRAINT student_pkey PRIMARY KEY (id)
);
  • Step 8: Run the below query to insert values to the Student table of the PostgreSQL database.
INSERT INTO STUDENT(ID, NAME, ADDRESS, EMAIL) VALUES('1','Jason','San Diego, CA','jason@gmail.com');
  • Step 9: Confirm the Elastic search data has been changed using the below commands.
$ curl -X GET http://localhost:9200/student/student/1?pretty=true
{
  "_index" : "student",
  "_type" : "student",
  "_id" : "1",
  "_version" : 31,
  "_seq_no" : 30,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "id" : 1,
    "name" : "Jason",
    "address" : "San Diego, CA",
    "email" : "jason@gmail.com"
  }
}
  • Step 10: To update the record, use the below query.
UPDATE STUDENT SET EMAIL='john@gmail.com', NAME='John' WHERE ID = 1;
  • Step 11: Do the Elastic search to check the record update.
$ curl -X GET http://localhost:9200/student/student/1?pretty=true
{
  "_index" : "student",
  "_type" : "student",
  "_id" : "1",
  "_version" : 32,
  "_seq_no" : 31,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "id" : 1,
    "name" : "John",
    "address" : "San Diego, CA",
    "email" : "john@gmail.com"
  }
}

You have learned that the CDC-Relay application runs well when dealing with one source record. But if the application stops and restarts, it will not tolerate the duplication of events.

Accelerate Apache ETL Using Hevo’s No-code Data Pipeline

Hevo is the only real-time ELT No-code Data Pipeline platform that cost-effectively automates data pipelines that are flexible to your needs. With integration with 150+ Data Sources (40+ free sources), we help you not only export data from sources & load data to the destinations but also transform & enrich your data, & make it analysis-ready.

Method 2: Using Debezium engine for MySQL connector

To use the Debezium Engine module, you need to add the debezium-api to your application’s dependencies.

Follow the below steps to use the Debezium engine for MySQL connector.

  • Step 1: Add the below dependencies to your application.
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
</dependency>

From above, {version.debezium} is the version of Debezium or the Maven property whose value contains the Debezium version string.

  • Step 2: Add the dependencies to each of the connectors that the application uses. In this case, it is a MySQL connector.
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${version.debezium}</version>
</dependency>
  • Step 3: You can create the Debezium engine instance using its builder API that consists of :
    • The format in which users want to receive the message like JSON, Avro, and Kafka Connect SourceRecord.
    • Configuration properties define the environment for both the engine and the connector.
    • A method that can be called for every data change event by the connector.
  • Step 4: The below commands consist of a Debezium engine configuration with MySQL connector.
final Properties props = config.asProperties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/tmp/offsets.dat");
props.setProperty("offset.flush.interval.ms", 60000);

The above configuration consists of a Properties object used to set several fields required by the engine. The first property is the name of the engine that is used.

The connector.class defines the name of the class that extends the Kafka Connect org.apache.kafka.connect.source.SourceConnector abstract class.

When a Kafka Connect connector runs, it reads information from the source and stores it to the offset. Connectors do not know how to work on offset, but the Debzium engine knows and stores the result in offset. As a result, the Debezium engine uses FileOffsetBackingStore to store offsets in the /path/to/storage/offset.dat local file system.

  • Step 5: The subsequent commands of connector configuration are as follows.
/* begin connector properties */
    props.setProperty("database.hostname", "localhost")
    props.setProperty("database.port", "3306")
    props.setProperty("database.user", "mysqluser")
    props.setProperty("database.password", "mysqlpw")
    props.setProperty("database.server.id", "85744")
    props.setProperty("database.server.name", "my-app-connector")
    props.setProperty("database.history",
          "io.debezium.relational.history.FileDatabaseHistory")
    props.setProperty("database.history.file.filename",
          "/path/to/storage/dbhistory.dat")

From above, you need to set the host machine’s name and port number where the MySQL database server is running. You have to define the username and password to connect to the MySQL database.

FileDatabaseHistory stores the historical changes of the database schema in the /path/to/storage/dbhistory.dat local file system.

  • Step 6: Create the Debezium Engine using the below command.
// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .notifying(record -> {
            System.out.println(record);
        })
        .build()) {
}

All the change data events can be passed to the handler method, which should match the signature of java.util.function.Consumer<R> interface, where <R> matches the type of the format specified when calling create().

The DebeziumEngine is executed using the Executor or ExecutorService through the below command.

// Run the engine asynchronously ...
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);

// Do something else or wait for a signal or an event
  • Step 7: You can stop the Debezium engine using the below command.

engine.close();

The Debezium connector engine stops reading the information from the source system and forwards all the remaining change data events to the handler function. If the application needs to wait for the engine to stop completely before exiting, you can use ExcecutorService shutdown and awaitTermination methods.

try {
    executor.shutdown();
    while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
        logger.info("Waiting another 5 seconds for the embedded engine to shut down");
    }
}
catch ( InterruptedException e ) {
    Thread.currentThread().interrupt();
}

You can also register CompletionCallback while creating DebeziumEngine as a callback to inform when the engine terminates.

Conclusion

In this article, you have learned about CDC using Debezium Engine without Kafka using Embedded Debezium. Debezium engineer configuration, setup with MySQL and PostgreSQL database has been explained in this article.

You can also explore the configuration and setup of other databases like MongoDB, SQL, Oracle, etc., for embedded Debezium.

To extract this complex data with everchanging data connectors, you would require to invest your engineering bandwidth to Integrate, Clean, Transform & Load data to your Data Warehouse or a destination of your choice. On the other hand, a more effortless & economical choice is exploring a Cloud-Based ETL Tool like Hevo Data.

Hevo Data, a No-code Data Pipeline can seamlessly transfer data from a vast sea of sources such as PostgreSQL, MySQL, Apache Kafka & Kafka Confluent Cloud to a Data Warehouse or a destination of your choice to be visualised in a BI Tool.

It is a reliable, completely automated, and secure service that doesn’t require you to write any code!  

If you are using Apache Kafka & Kafka Confluent Cloud as your Message Streaming Platform and searching for a stress-free alternative to manual data integration, then Hevo can effortlessly automate this for you. Hevo, with its strong integration with 150+ sources & BI tools(Including 40+ Free Sources), allows you to not only export & load data but also transform & enrich your data & make it analysis-ready in a jiffy.

Want to take Hevo for a ride? Sign Up for a 14-day free trial and simplify your Data Integration process. Do check out the pricing details to understand which plan fulfills all your business needs.

Tell us about your experience of setting up CDC using Debezium Engine! Share your thoughts with us in the comments section below.

Talha
Software Developer, Hevo Data

Talha is a Software Developer with over eight years of experience in the field. He is currently driving advancements in data integration at Hevo Data, where he has been instrumental in shaping a cutting-edge data integration platform for the past four years. Prior to this, he spent 4 years at Flipkart, where he played a key role in projects related to their data integration capabilities. Talha loves to explain complex information related to data engineering to his peers through writing. He has written many blogs related to data integration, data management aspects, and key challenges data practitioners face.

No-code Data Pipeline for Apache Kafka