Debezium Testing for CDC using Test Containers: 3 Easy Steps

on Database Management Systems, Debezium, Kafka, Kafka Consumers, PostgreSQL • February 23rd, 2022 • Write for Hevo

debezium testing - Featured Image

Debezium is a distributed, open-sourced platform for tracking real-time changes in databases. It is called an event streaming platform as it converts data changes on databases into events, and when such changes are accessed by different applications to process the information further. Debezium uses the Change Data Capture approach (CDC) to retrieve the real-time changes from databases. To handle data seamlessly, you can connect Debezium with databases through respective connectors like PostgreSQL, SQL, MongoDB, MySQL, etc. When the connector is connected to a database, it will initially release helpful information about the data change events. This information is called log messages, and these messages are stored in Kafka topics. Therefore, Kafka’s topic follows the producer-consumer approach to distributing these messages. 

In contrast, the producers are the ones who produce the data change event messages, the consumer access these messages. However, it is not straightforward to handle real-time changes from several data sources without a proper testing mechanism in place. Implementing Debezium Testing enables users to have automated tests for their CDC set up and ensure that everything is configured as intended.

In this tutorial, you will learn about the Debezium Testing for CDC using Testcontainers through automated tests.

Table of Contents

What is CDC? 

Change Data Capture (CDC) is used to replicate data between databases in real-time. It is a process where any changes in databases are detected and synchronized in real-time to help other applications react to changes and reduce the time taken for data migration. However, along with CDC, different approaches are used to detect real-time changes in databases like pooling, dual wires, or DB triggers. But, since these methods fail to maintain the integrity of databases, the performance is affected. Therefore, CDC is an effective way to synchronize the row-level changes of databases in real-time to multiple applications.

What is Debezium?

debezium testing - debezium logo
Image Source

Debezium is an open-sourced, event streaming platform that keeps handling real-time changes in databases. It uses the Changa Data Capture (CDC) approach, replicating data between databases in real-time. Debezium uses different connectors for different databases like MySQL, PostgreSQL, MongoDB, SQL, etc. When the connector is connected to the database, it releases helpful information about the change data events. This information is stored in Kafka, called log messages. The message in Kafka follows the producer-consumer approach where the producer means the data change events which produce log messages. In contrast, the consumer is the one that accesses these messages.

Accelerate Apache ETL Using Hevo’s No-code Data Pipeline

Hevo Data, a No-code Data Pipeline, is your one-stop-shop solution for all your Apache ETL needs! Hevo offers a built-in and robust native integration with Apache Kafka and Kafka Confluent Cloud to help you replicate data in a matter of minutes! With Hevo’s Real-Time Data Replication & Change Data Capture, you can seamlessly load data from your Apache Sources straight to your Desired Destination such as PostgreSQL, Data Warehouse, or any other destination of your choice.

Hevo also supports PostgreSQL as a Source for extracting data effortlessly. With Hevo in place, you can not only replicate data from 100+ Data Sources (Including 40+ Free Sources) but also enrich & transform it into an analysis-ready form without having to write a single line of code! In addition, Hevo’s fault-tolerant architecture ensures that the data is handled securely and consistently with zero data loss.

Get Started with Hevo for Free

Check out what makes Hevo amazing:

  • Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
  • Schema Management: Hevo takes away the tedious task of schema management & automatically detects schema of incoming data and maps it to the destination schema.
  • Minimal Learning: Hevo with its simple and interactive UI, is extremely simple for new customers to work on and perform operations.
  • Connectors: Hevo supports 100+ Integrations to SaaS platforms such as WordPress, Apache Kafka, FTP/SFTP, Files, Databases, BI tools, and Native REST API & Webhooks Connectors. It supports various destinations including Google BigQuery, Amazon Redshift, Snowflake, Firebolt, Data Warehouses; Amazon S3 Data Lakes; Databricks, MySQL, SQL Server, TokuDB, MongoDB, DynamoDB, PostgreSQL Databases to name a few.  
  • Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
  • Incremental Data Load: Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
  • Extensive Customer Base: Over 1000 Data-Driven organizations from 40+ Countries trust Hevo for their Data Integration needs.
  • Live Support: The Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-Day Free Trial!

How to set up Integration Testing with Test containers?

When you are setting up Change Data Capture pipelines with Debezium, it is essential to implement automated Debezium testing in place to ensure the following:

  • The source database is set up so that the changes can be streamed off effectively.
  • Connectors are configured correctly.

The Debezium Testing extension for Testcontainers focuses on simplifying tests by running all the required infrastructure like Apache Kafka, Kafka Connect, etc., via Linux containers and making it easily accessible for java-based tests. Follow the simple steps to set up the Integration Testing for Debezium:

Integration Testing Step 1: Add Dependencies

To use Debezium Testing Testcontainers, use the below dependencies for your project.

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-testing-testcontainers</artifactId>
  <version>1.8.1.Final</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <scope>test</scope>
</dependency>

<!-- Add the TC dependency matching your database -->
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>postgresql</artifactId>
  <scope>test</scope>
</dependency>

You need the JDBC driver of your database and a client for Apache Kafka to insert some test data and a database that should be the source of change events in Kafka. 

Integration Testing Step 2: Test Setup

When writing an integration test for a Debezium Connector configuration, you need to set up Apache Kafka and a database that acts as a source of all change events.

With the Debezium’s DebeziumContainer class, the setup will look as shown in the below code.

public class DebeziumContainerTest {

    private static Network network = Network.newNetwork(); 

    private static KafkaContainer kafkaContainer = new KafkaContainer()
            .withNetwork(network); 

    public static PostgreSQLContainer<?> postgresContainer =
            new PostgreSQLContainer<>("debezium/postgres:11")
                .withNetwork(network)
                .withNetworkAliases("postgres"); 

    public static DebeziumContainer debeziumContainer =
            new DebeziumContainer("debezium/connect:1.8.1.Final")
                .withNetwork(network)
                .withKafka(kafkaContainer)
                .dependsOn(kafkaContainer); 

    @BeforeClass
    public static void startContainers() { 
        Startables.deepStart(Stream.of(
                kafkaContainer, postgresContainer, debeziumContainer))
                .join();
    }
}

The above Debezium Testing Code contains the following elements:

  • private static Network network = Network.newNetwork(): It defines a docker network to be used by all the services.
  • private static KafkaContainer kafkaContainer = new KafkaContainer().withNetwork(network): It sets a up container for Apache Kafka.
  • public static PostgreSQLContainer<?> postgresContainer = new PostgreSQLContainer<>(“debezium/postgres:11”).withNetwork(network).withNetworkAliases(“postgres”) : It sets up the container for Postgre using Debezium’s Postgre container image.
  • public static DebeziumContainer debeziumContainer = new DebeziumContainer(“debezium/connect:1.8.1.Final”).withNetwork(network).withKafka(kafkaContainer).dependsOn(kafkaContainer) : It sets up a container for Kafka Connect with Debezium 1.8.1.
  • public static void startContainers(){Startables.deepStart(Stream.of(kafkaContainer, postgresContainer, debeziumContainer)).join() : It starts all three containers.

Integration Testing Step 3: Test Implementation

After declaring all the required containers, you can register an instance of the Debezium PostgreSQL Connector to insert some data into Postgres and use the Apache Kafka client for reading the data change record from the corresponding topic.

@Test
public void canRegisterPostgreSqlConnector() throws Exception {
    try (Connection connection = getConnection(postgresContainer);
            Statement statement = connection.createStatement();
            KafkaConsumer<String, String> consumer = getConsumer(
                    kafkaContainer)) {

        statement.execute("create schema todo"); 
        statement.execute("create table todo.Todo (id int8 not null, " +
                "title varchar(255), primary key (id))");
        statement.execute("alter table todo.Todo replica identity full");
        statement.execute("insert into todo.Todo values (1, " +
                "'Learn CDC')");
        statement.execute("insert into todo.Todo values (2, " +
                "'Learn Debezium')");

        ConnectorConfiguration connector = ConnectorConfiguration
                .forJdbcContainer(postgresContainer)
                .with("database.server.name", "dbserver1");

        debeziumContainer.registerConnector("my-connector",
                connector); 

        consumer.subscribe(Arrays.asList("dbserver1.todo.todo"));

        List<ConsumerRecord<String, String>> changeEvents =
                drain(consumer, 2); 

        assertThat(JsonPath.<Integer> read(changeEvents.get(0).key(),
                "$.id")).isEqualTo(1);
        assertThat(JsonPath.<String> read(changeEvents.get(0).value(),
                "$.op")).isEqualTo("r");
        assertThat(JsonPath.<String> read(changeEvents.get(0).value(),
                "$.after.title")).isEqualTo("Learn CDC");

        assertThat(JsonPath.<Integer> read(changeEvents.get(1).key(),
                "$.id")).isEqualTo(2);
        assertThat(JsonPath.<String> read(changeEvents.get(1).value(),
                "$.op")).isEqualTo("r");
        assertThat(JsonPath.<String> read(changeEvents.get(1).value(),
                "$.after.title")).isEqualTo("Learn Debezium");

        consumer.unsubscribe();
    }
}

// Helper methods below

private Connection getConnection(
        PostgreSQLContainer<?> postgresContainer)
                throws SQLException {

    return DriverManager.getConnection(postgresContainer.getJdbcUrl(),
            postgresContainer.getUsername(),
            postgresContainer.getPassword());
}

private KafkaConsumer<String, String> getConsumer(
            KafkaContainer kafkaContainer) {

    return new KafkaConsumer<>(
            ImmutableMap.of(
                    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                            kafkaContainer.getBootstrapServers(),
                    ConsumerConfig.GROUP_ID_CONFIG,
                            "tc-" + UUID.randomUUID(),
                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                            "earliest"),
            new StringDeserializer(),
            new StringDeserializer());
}

private List<ConsumerRecord<String, String>> drain(
        KafkaConsumer<String, String> consumer,
        int expectedRecordCount) {

    List<ConsumerRecord<String, String>> allRecords = new ArrayList<>();

    Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
        consumer.poll(Duration.ofMillis(50))
                .iterator()
                .forEachRemaining(allRecords::add);

        return allRecords.size() == expectedRecordCount;
    });

    return allRecords;
}
debezium testing - debezium Query Integration Testing
Image Source

The above query contains the following elements that create a table in the Postgre database and insert two records:

  • connectorConfiguration connector = ConnectorConfiguration .forJdbcContainer(postgresContainer).with(“database.server.name”, “dbserver1”);debeziumContainer.registerConnector(“my-connector”,connector): It registers an instance of the Debezium Postgre connector, the connector type, and properties like database host, database name, user, etc., derived from the container.
  • consumer.subscribe(Arrays.asList(“dbserver1.todo.todo”)); List<ConsumerRecord<String,String>> changeEvents = drain(consumer,2) : It reads two records from the change event topic in Kafka and asserts their attributes.

How to set up Debezium Testing for CDC using Test containers?

debezium Testing - CDC PostgreSQL Kafka Debezium Integration
Image Source

Setting up Change Data Capture with Debezium involves a lot of configuration consisting of the source database and Debezium connector. The source database should be set up so that Debezium can connect and retrieve change events from it. The source database can be any database like MySQL, SQL, PostgreSQL, etc. The Debezium connector should be configured using the correct database host and credentials by using tables, filters, and SMTs.

For Debezium Testing, you can use Debezium Testcontainers that allows setting all the required components like Apache Kafka, Kafka Connect, and using Linux container image. It configures and deploys the Debezium connector and runs assertions against produced change data events. To setup Debezium Testing for CDC using Test Containers, you can follow the simple steps given below:

Debezium Testing Step 1: Test Setup

If you are working with Apache Maven for dependency management, you must add the below dependencies to your pom.xml file.

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-testing-testcontainers</artifactId>
  <version>1.1.0.CR1</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <scope>test</scope>
</dependency>

Add the Testcontainers dependency to your database, e.g., PostgreSQL in this tutorial. But you can use any database.

<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>postgresql</artifactId>
  <scope>test</scope>
</dependency>

Debezium Testing Step 2: Initialize the Testcontainers

After declaring the required dependencies, you need to write a CDC integration test with Testcontainers. Integration tests can be implemented using Linux containers and Docker. You need to use Apache Kafka, Kafka Connect, and a PostgreSQL database to initialize the Testconatiners below.

public class CdcTest {

  private static Network network = Network.newNetwork(); 

  private static KafkaContainer kafkaContainer = new KafkaContainer()
      .withNetwork(network); 

  public static PostgreSQLContainer<?> postgresContainer =
      new PostgreSQLContainer<>("debezium/postgres:11")
          .withNetwork(network)
          .withNetworkAliases("postgres"); 

  public static DebeziumContainer debeziumContainer =
      new DebeziumContainer("1.1.0.CR1")
          .withNetwork(network)
          .withKafka(kafkaContainer)
          .dependsOn(kafkaContainer); 

  @BeforeClass
  public static void startContainers() { 
    Startables.deepStart(Stream.of(
        kafkaContainer, postgresContainer, debeziumContainer))
            .join();
  }
}

The above Debezium Testing code has the following elements:

  • private static Network network = Network.newNetwork(): It defines a docker network that all the services can use.
  • private static KafkaContainer kafkaContainer = new KafkaContainer().withNetwork(network): It sets up a container for Apache Kafka.
  • public static PostgreSQLContainer<?> postgresContainer = new PostgreSQLContainer<>(“debezium/postgres:11”).withNetwork(network).withNetworkAliases(“postgres”): It sets up a container for the database Postgre.
  • public static DebeziumContainer debeziumContainer = new DebeziumContainer(“1.1.0.CR1”).withNetwork(network).withKafka(kafkaContainer).dependsOn(kafkaContainer): It sets up a container for Kafka to connect with Debezium.
  • @BeforeClass public static void startContainers(){Startables.deepStart(Stream.of(kafkaContainer,postgresContainer,debeziumContainer)).join(): It starts all three containers in the @Beforeclass method.

Debezium Testing Step 3: Test Implementation

To write a test for CDC setup, follow the below steps.

  • Step 1: Configure the Debezium connector for the database. Follow the below command for the test.
@Test
public void canObtainChangeEventsFromPostgres() throws Exception {
  try (Connection connection = getConnection(postgresContainer);
      Statement statement = connection.createStatement();
      KafkaConsumer<String, String> consumer =
          getConsumer(kafkaContainer)) {

      // TODO ...
  }
}
  • Connect to your PostgreSQL Database:
private Connection getConnection(PostgreSQLContainer<?> postgresContainer)
    throws SQLException {

  return DriverManager.getConnection(postgresContainer.getJdbcUrl(),
      postgresContainer.getUsername(),
      postgresContainer.getPassword());
}
  • Connect to Kafka:
private KafkaConsumer<String, String> getConsumer(
    KafkaContainer kafkaContainer) {

  return new KafkaConsumer<>(
      ImmutableMap.of(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
          kafkaContainer.getBootstrapServers(),

          ConsumerConfig.GROUP_ID_CONFIG,
          "tc-" + UUID.randomUUID(),

          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
          "earliest"),
      new StringDeserializer(),
      new StringDeserializer());
}
  • Step 2: Execute some SQL statements to change some data.
statement.execute("create schema todo"); 
statement.execute("create table todo.Todo (" +
                    "id int8 not null, " +
                    "title varchar(255), " +
                    "primary key (id))");
statement.execute("alter table todo.Todo replica identity full");
statement.execute("insert into todo.Todo values (1, 'Learn CDC')");
statement.execute("insert into todo.Todo values (2, 'Learn Debezium')");
  • Step 3: Retrieve the resulting change data events from the Kafka topic using Kafka consumer.
ConnectorConfiguration connector = ConnectorConfiguration
        .forJdbcContainer(postgresContainer)
        .with("database.server.name", "dbserver1");

debeziumContainer.registerConnector("my-connector",
        connector); 

consumer.subscribe(Arrays.asList("dbserver1.todo.todo"));

List<ConsumerRecord<String, String>> changeEvents =
        drain(consumer, 2); 
  • Step 4: Run assertions against such events.
ConsumerRecord<String, String> changeEvent = changeEvents.get(0);
assertThat(JsonPath.<Integer> read(changeEvent.key(), "$.id"))
  .isEqualTo(1);
assertThat(JsonPath.<String> read(changeEvent.value(), "$.op"))
  .isEqualTo("r");
assertThat(JsonPath.<String> read(changeEvent.value(), "$.after.title"))
  .isEqualTo("Learn CDC");

changeEvent = changeEvents.get(1);
assertThat(JsonPath.<Integer> read(changeEvent.key(), "$.id"))
  .isEqualTo(2);
assertThat(JsonPath.<String> read(changeEvent.value(), "$.op"))
  .isEqualTo("r");
assertThat(JsonPath.<String> read(changeEvent.value(), "$.after.title"))
  .isEqualTo("Learn Debezium");

consumer.unsubscribe();

The above Debezium Testing code contains the following elements:

  • statement.execute(“create schema todo”): It creates a table in the Postgre database and inserts two records.
  • debeziumContainer.registerConnector(“my-connector”, connector) : It registers an instance of the Debezium Postgre connector.
  • List<ConsumerRecord<String, String>> changeEvents = drain(consumer,2): It read two records from the change event topic in Kafka and asserted their attributes.

Conclusion

In this article, you have learned about automated Debezium Testing through Testcontainers with dependencies, test setup, test configuration, and test implementation. Whenever you implement Debezium Testing, you have to retrieve resulting change data events from the respective Kafka. You need to run assertions against these change events. This article uses JsonPath-based assertions, but you can also use the JSON API. The Debezium Testing approach explained in this tutorial can be expanded in multiple ways. You can add your connector configuration under revision control to manage and track any configuration changes and then drive the test using the tracked configuration. You can also test your entire data streaming pipeline by deploying the Debezium connector with the sink connector.

For a more in-depth and holistic analysis of business performance and financial health, it is essential to consolidate from Apache Kafka and all the other applications used across your business. However, to extract this complex data with everchanging data connectors, you would require to invest a section of your engineering bandwidth to Integrate, Clean, Transform & Load data to your Data Warehouse or a destination of your choice. On the other hand, a more effortless & economical choice is exploring a Cloud-Based ETL Tool like Hevo Data.

Visit our Website to Explore Hevo

Hevo Data, a No-code Data Pipeline can seamlessly transfer data from a vast sea of sources such as PostgreSQL, Apache Kafka & Kafka Confluent Cloud to a Data Warehouse or a destination of your choice to be visualised in a BI Tool. Hevo also supports PostgreSQL as a Destination for loading data seamlessly. It is a reliable, completely automated, and secure service that doesn’t require you to write any code!  

If you are using Apache Kafka & Kafka Confluent Cloud as your Message Streaming Platform and searching for a stress-free alternative to manual data integration, then Hevo can effortlessly automate this for you. Hevo, with its strong integration with 100+ sources & BI tools(Including 40+ Free Sources), allows you to not only export & load data but also transform & enrich your data & make it analysis-ready in a jiffy.

Want to take Hevo for a ride? Sign Up for a 14-day free trial and simplify your Data Integration process. Do check out the pricing details to understand which plan fulfills all your business needs.

Tell us about your experience of setting up Debezium Testing for CDC using Test Containers! Share your thoughts with us in the comments section below.

No-code Data Pipeline for Apache Kafka