Debezium is a distributed, open-sourced platform for tracking real-time changes in databases. It is called an event streaming platform as it converts data changes on databases into events, and when such changes are accessed by different applications to process the information further. Debezium uses the Change Data Capture approach (CDC) to retrieve the real-time changes from databases.

To handle data seamlessly, you can connect Debezium with databases through respective connectors like PostgreSQL, SQL, MongoDB, MySQL, etc. When the connector is connected to a database, it will initially release helpful information about the data change events.

This information is called log messages, and these messages are stored in Kafka topics. Therefore, Kafka’s topic follows the producer-consumer approach to distributing these messages. In contrast, the producers are the ones who produce the data change event messages, the consumer access these messages.

However, it is not straightforward to handle real-time changes from several data sources without a proper testing mechanism in place. Implementing Debezium Testing enables users to have automated tests for their CDC set up and ensure that everything is configured as intended.

In this tutorial, you will learn about the Debezium Testing for CDC using Testcontainers through automated tests.

What is CDC? 

Change Data Capture (CDC) is used to replicate data between databases in real-time. It is a process where any changes in databases are detected and synchronized in real-time to help other applications react to changes and reduce the time taken for data migration.

However, along with CDC, different approaches are used to detect real-time changes in databases like pooling, dual wires, or DB triggers. But, since these methods fail to maintain the integrity of databases, the performance is affected.

Therefore, CDC is an effective way to synchronize the row-level changes of databases in real-time to multiple applications.

What is Debezium?

debezium testing - debezium logo

Debezium is an open-sourced, event streaming platform that keeps handling real-time changes in databases. It uses the Changa Data Capture (CDC) approach, replicating data between databases in real-time.

Debezium uses different connectors for different databases like MySQL, PostgreSQL, MongoDB, SQL, etc. When the connector is connected to the database, it releases helpful information about the change data events. This information is stored in Kafka, called log messages.

The message in Kafka follows the producer-consumer approach where the producer means the data change events which produce log messages. In contrast, the consumer is the one that accesses these messages.

How to set up Integration Testing with Test containers?

When you are setting up Change Data Capture pipelines with Debezium, it is essential to implement automated Debezium testing in place to ensure the following:

  • The source database is set up so that the changes can be streamed off effectively.
  • Connectors are configured correctly.

The Debezium Testing extension for Testcontainers focuses on simplifying tests by running all the required infrastructure like Apache Kafka, Kafka Connect, etc., via Linux containers and making it easily accessible for java-based tests. Follow the simple steps to set up the Integration Testing for Debezium:

Integration Testing Step 1: Add Dependencies

To use Debezium Testing Testcontainers, use the below dependencies for your project.

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-testing-testcontainers</artifactId>
  <version>1.8.1.Final</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <scope>test</scope>
</dependency>

<!-- Add the TC dependency matching your database -->
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>postgresql</artifactId>
  <scope>test</scope>
</dependency>

You need the JDBC driver of your database and a client for Apache Kafka to insert some test data and a database that should be the source of change events in Kafka. 

Integration Testing Step 2: Test Setup

When writing an integration test for a Debezium Connector configuration, you need to set up Apache Kafka and a database that acts as a source of all change events.

With the Debezium’s DebeziumContainer class, the setup will look as shown in the below code.

public class DebeziumContainerTest {

    private static Network network = Network.newNetwork(); 

    private static KafkaContainer kafkaContainer = new KafkaContainer()
            .withNetwork(network); 

    public static PostgreSQLContainer<?> postgresContainer =
            new PostgreSQLContainer<>("debezium/postgres:11")
                .withNetwork(network)
                .withNetworkAliases("postgres"); 

    public static DebeziumContainer debeziumContainer =
            new DebeziumContainer("debezium/connect:1.8.1.Final")
                .withNetwork(network)
                .withKafka(kafkaContainer)
                .dependsOn(kafkaContainer); 

    @BeforeClass
    public static void startContainers() { 
        Startables.deepStart(Stream.of(
                kafkaContainer, postgresContainer, debeziumContainer))
                .join();
    }
}

The above Debezium Testing Code contains the following elements:

  • private static Network network = Network.newNetwork(): It defines a docker network to be used by all the services.
  • private static KafkaContainer kafkaContainer = new KafkaContainer().withNetwork(network): It sets a up container for Apache Kafka.
  • public static PostgreSQLContainer<?> postgresContainer = new PostgreSQLContainer<>(“debezium/postgres:11”).withNetwork(network).withNetworkAliases(“postgres”) : It sets up the container for Postgre using Debezium’s Postgre container image.
  • public static DebeziumContainer debeziumContainer = new DebeziumContainer(“debezium/connect:1.8.1.Final”).withNetwork(network).withKafka(kafkaContainer).dependsOn(kafkaContainer) : It sets up a container for Kafka Connect with Debezium 1.8.1.
  • public static void startContainers(){Startables.deepStart(Stream.of(kafkaContainer, postgresContainer, debeziumContainer)).join() : It starts all three containers.

Integration Testing Step 3: Test Implementation

After declaring all the required containers, you can register an instance of the Debezium PostgreSQL Connector to insert some data into Postgres and use the Apache Kafka client for reading the data change record from the corresponding topic.

@Test
public void canRegisterPostgreSqlConnector() throws Exception {
    try (Connection connection = getConnection(postgresContainer);
            Statement statement = connection.createStatement();
            KafkaConsumer<String, String> consumer = getConsumer(
                    kafkaContainer)) {

        statement.execute("create schema todo"); 
        statement.execute("create table todo.Todo (id int8 not null, " +
                "title varchar(255), primary key (id))");
        statement.execute("alter table todo.Todo replica identity full");
        statement.execute("insert into todo.Todo values (1, " +
                "'Learn CDC')");
        statement.execute("insert into todo.Todo values (2, " +
                "'Learn Debezium')");

        ConnectorConfiguration connector = ConnectorConfiguration
                .forJdbcContainer(postgresContainer)
                .with("database.server.name", "dbserver1");

        debeziumContainer.registerConnector("my-connector",
                connector); 

        consumer.subscribe(Arrays.asList("dbserver1.todo.todo"));

        List<ConsumerRecord<String, String>> changeEvents =
                drain(consumer, 2); 

        assertThat(JsonPath.<Integer> read(changeEvents.get(0).key(),
                "$.id")).isEqualTo(1);
        assertThat(JsonPath.<String> read(changeEvents.get(0).value(),
                "$.op")).isEqualTo("r");
        assertThat(JsonPath.<String> read(changeEvents.get(0).value(),
                "$.after.title")).isEqualTo("Learn CDC");

        assertThat(JsonPath.<Integer> read(changeEvents.get(1).key(),
                "$.id")).isEqualTo(2);
        assertThat(JsonPath.<String> read(changeEvents.get(1).value(),
                "$.op")).isEqualTo("r");
        assertThat(JsonPath.<String> read(changeEvents.get(1).value(),
                "$.after.title")).isEqualTo("Learn Debezium");

        consumer.unsubscribe();
    }
}

// Helper methods below

private Connection getConnection(
        PostgreSQLContainer<?> postgresContainer)
                throws SQLException {

    return DriverManager.getConnection(postgresContainer.getJdbcUrl(),
            postgresContainer.getUsername(),
            postgresContainer.getPassword());
}

private KafkaConsumer<String, String> getConsumer(
            KafkaContainer kafkaContainer) {

    return new KafkaConsumer<>(
            ImmutableMap.of(
                    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                            kafkaContainer.getBootstrapServers(),
                    ConsumerConfig.GROUP_ID_CONFIG,
                            "tc-" + UUID.randomUUID(),
                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                            "earliest"),
            new StringDeserializer(),
            new StringDeserializer());
}

private List<ConsumerRecord<String, String>> drain(
        KafkaConsumer<String, String> consumer,
        int expectedRecordCount) {

    List<ConsumerRecord<String, String>> allRecords = new ArrayList<>();

    Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
        consumer.poll(Duration.ofMillis(50))
                .iterator()
                .forEachRemaining(allRecords::add);

        return allRecords.size() == expectedRecordCount;
    });

    return allRecords;
}
Debezium Query Integration Testing
Debezium Query Integration Testing

The above query contains the following elements that create a table in the Postgre database and insert two records:

  • connectorConfiguration connector = ConnectorConfiguration .forJdbcContainer(postgresContainer).with(“database.server.name”, “dbserver1”);debeziumContainer.registerConnector(“my-connector”,connector): It registers an instance of the Debezium Postgre connector, the connector type, and properties like database host, database name, user, etc., derived from the container.
  • consumer.subscribe(Arrays.asList(“dbserver1.todo.todo”)); List<ConsumerRecord<String,String>> changeEvents = drain(consumer,2) : It reads two records from the change event topic in Kafka and asserts their attributes.

How to set up Debezium Testing for CDC using Test containers?

Setting up Change Data Capture with Debezium involves a lot of configuration consisting of the source database and Debezium connector. The source database should be set up so that Debezium can connect and retrieve change events from it.

The source database can be any database like MySQL, SQL, PostgreSQL, etc. The Debezium connector should be configured using the correct database host and credentials by using tables, filters, and SMTs.

For Debezium Testing, you can use Debezium Testcontainers that allows setting all the required components like Apache Kafka, Kafka Connect, and using Linux container image. It configures and deploys the Debezium connector and runs assertions against produced change data events.

To setup Debezium Testing for CDC using Test Containers, you can follow the simple steps given below:

Debezium Testing Step 1: Test Setup

If you are working with Apache Maven for dependency management, you must add the below dependencies to your pom.xml file.

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-testing-testcontainers</artifactId>
  <version>1.1.0.CR1</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <scope>test</scope>
</dependency>

Add the Testcontainers dependency to your database, e.g., PostgreSQL in this tutorial. But you can use any database.

<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>postgresql</artifactId>
  <scope>test</scope>
</dependency>

Step 2: Initialize the Testcontainers

After declaring the required dependencies, you need to write a CDC integration test with Testcontainers. Integration tests can be implemented using Linux containers and Docker. You need to use Apache Kafka, Kafka Connect, and a PostgreSQL database to initialize the Testconatiners below.

public class CdcTest {

  private static Network network = Network.newNetwork(); 

  private static KafkaContainer kafkaContainer = new KafkaContainer()
      .withNetwork(network); 

  public static PostgreSQLContainer<?> postgresContainer =
      new PostgreSQLContainer<>("debezium/postgres:11")
          .withNetwork(network)
          .withNetworkAliases("postgres"); 

  public static DebeziumContainer debeziumContainer =
      new DebeziumContainer("1.1.0.CR1")
          .withNetwork(network)
          .withKafka(kafkaContainer)
          .dependsOn(kafkaContainer); 

  @BeforeClass
  public static void startContainers() { 
    Startables.deepStart(Stream.of(
        kafkaContainer, postgresContainer, debeziumContainer))
            .join();
  }
}

The above Debezium Testing code has the following elements:

  • private static Network network = Network.newNetwork(): It defines a docker network that all the services can use.
  • private static KafkaContainer kafkaContainer = new KafkaContainer().withNetwork(network): It sets up a container for Apache Kafka.
  • public static PostgreSQLContainer<?> postgresContainer = new PostgreSQLContainer<>(“debezium/postgres:11”).withNetwork(network).withNetworkAliases(“postgres”): It sets up a container for the database Postgre.
  • public static DebeziumContainer debeziumContainer = new DebeziumContainer(“1.1.0.CR1”).withNetwork(network).withKafka(kafkaContainer).dependsOn(kafkaContainer): It sets up a container for Kafka to connect with Debezium.
  • @BeforeClass public static void startContainers(){Startables.deepStart(Stream.of(kafkaContainer,postgresContainer,debeziumContainer)).join(): It starts all three containers in the @Beforeclass method.

Step 3: Test Implementation

To write a test for CDC setup, follow the below steps.

  • Step 1: Configure the Debezium connector for the database. Follow the below command for the test.
@Test
public void canObtainChangeEventsFromPostgres() throws Exception {
  try (Connection connection = getConnection(postgresContainer);
      Statement statement = connection.createStatement();
      KafkaConsumer<String, String> consumer =
          getConsumer(kafkaContainer)) {

      // TODO ...
  }
}
  • Connect to your PostgreSQL Database:
private Connection getConnection(PostgreSQLContainer<?> postgresContainer)
    throws SQLException {

  return DriverManager.getConnection(postgresContainer.getJdbcUrl(),
      postgresContainer.getUsername(),
      postgresContainer.getPassword());
}
  • Connect to Kafka:
private KafkaConsumer<String, String> getConsumer(
    KafkaContainer kafkaContainer) {

  return new KafkaConsumer<>(
      ImmutableMap.of(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
          kafkaContainer.getBootstrapServers(),

          ConsumerConfig.GROUP_ID_CONFIG,
          "tc-" + UUID.randomUUID(),

          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
          "earliest"),
      new StringDeserializer(),
      new StringDeserializer());
}
  • Step 2: Execute some SQL statements to change some data.
statement.execute("create schema todo"); 
statement.execute("create table todo.Todo (" +
                    "id int8 not null, " +
                    "title varchar(255), " +
                    "primary key (id))");
statement.execute("alter table todo.Todo replica identity full");
statement.execute("insert into todo.Todo values (1, 'Learn CDC')");
statement.execute("insert into todo.Todo values (2, 'Learn Debezium')");
  • Step 3: Retrieve the resulting change data events from the Kafka topic using Kafka consumer.
ConnectorConfiguration connector = ConnectorConfiguration
        .forJdbcContainer(postgresContainer)
        .with("database.server.name", "dbserver1");

debeziumContainer.registerConnector("my-connector",
        connector); 

consumer.subscribe(Arrays.asList("dbserver1.todo.todo"));

List<ConsumerRecord<String, String>> changeEvents =
        drain(consumer, 2); 
  • Step 4: Run assertions against such events.
ConsumerRecord<String, String> changeEvent = changeEvents.get(0);
assertThat(JsonPath.<Integer> read(changeEvent.key(), "$.id"))
  .isEqualTo(1);
assertThat(JsonPath.<String> read(changeEvent.value(), "$.op"))
  .isEqualTo("r");
assertThat(JsonPath.<String> read(changeEvent.value(), "$.after.title"))
  .isEqualTo("Learn CDC");

changeEvent = changeEvents.get(1);
assertThat(JsonPath.<Integer> read(changeEvent.key(), "$.id"))
  .isEqualTo(2);
assertThat(JsonPath.<String> read(changeEvent.value(), "$.op"))
  .isEqualTo("r");
assertThat(JsonPath.<String> read(changeEvent.value(), "$.after.title"))
  .isEqualTo("Learn Debezium");

consumer.unsubscribe();

The above Debezium Testing code contains the following elements:

  • statement.execute(“create schema todo”): It creates a table in the Postgre database and inserts two records.
  • debeziumContainer.registerConnector(“my-connector”, connector) : It registers an instance of the Debezium Postgre connector.
  • List<ConsumerRecord<String, String>> changeEvents = drain(consumer,2): It read two records from the change event topic in Kafka and asserted their attributes.

Conclusion

In this article, you have learned about automated Debezium Testing through Testcontainers with dependencies, test setup, test configuration, and test implementation. Whenever you implement Debezium Testing, you have to retrieve resulting change data events from the respective Kafka.

You need to run assertions against these change events. This article uses JsonPath-based assertions, but you can also use the JSON API. The Debezium Testing approach explained in this tutorial can be expanded in multiple ways. You can add your connector configuration under revision control to manage and track any configuration changes and then drive the test using the tracked configuration.

You can also test your entire data streaming pipeline by deploying the Debezium connector with the sink connector. For a more in-depth and holistic analysis of business performance and financial health, it is essential to consolidate from Apache Kafka and all the other applications used across your business.

However, to extract this complex data with everchanging data connectors, you would require to invest a section of your engineering bandwidth to Integrate, Clean, Transform & Load data to your Data Warehouse or a destination of your choice. On the other hand, a more effortless & economical choice is exploring a Cloud-Based ETL Tool like Hevo Data.

Hevo Data, a No-code Data Pipeline can seamlessly transfer data from a vast sea of sources such as PostgreSQL, Apache Kafka & Kafka Confluent Cloud to a Data Warehouse or a destination of your choice to be visualised in a BI Tool. Hevo also supports PostgreSQL as a Destination for loading data seamlessly. It is a reliable, completely automated, and secure service that doesn’t require you to write any code!  

Want to take Hevo for a spin? Sign Up or a 14-day free trial and experience the feature-rich Hevo suite firsthand. Also checkout our unbeatable pricing to choose the best plan for your organization.

Tell us about your experience of setting up Debezium Testing for CDC using Test Containers! Share your thoughts with us in the comments section below.

Manjiri Gaikwad
Technical Content Writer, Hevo Data

Manjiri is a proficient technical writer and a data science enthusiast. She holds an M.Tech degree and leverages the knowledge acquired through that to write insightful content on AI, ML, and data engineering concepts. She enjoys breaking down the complex topics of data integration and other challenges in data engineering to help data professionals solve their everyday problems.

No-code Data Pipeline for Apache Kafka