Organizations use Kafka and Debezium to track real-time changes in databases and stream them to different applications. But often, due to a colossal amount of messages in Kafka topics, it becomes challenging to serialize these messages.

Every message in Kafka’s topic has a key and value. You have to identify how the Debezium event keys and values should be serialized to a binary form. You can do it through a JSON converter, but it will become lengthy.

As a workaround for the JSON converter, you can leverage the Apicurio registry, using which you can perform Debezium Serialization. Read along to learn more about Debezium Serialization!

Prerequisites

  • Fundamental understanding of Debezium Connectors.

What is Debezium?

Debezium Serialization: Debezium logo
Debezium Serialization: Debezium logo

Debezium is an open-source, distributed system that enables users to capture real-time changes so that applications can notice such changes and react to them. It consists of connectors that record all real-time data changes and store them as events in Kafka topics. Debezium uses connectors like MySQL, SQL, Oracle, MongoDB, and many more for different databases. 

What is Debezium Serialization?

Debezium uses Debezium connectors in the Kafka Connect framework to capture changes in databases through change data events. For each record, the Debezium connector performs the following actions.

  • It applies configured Transformation.
  • Through the Kafka Connect converter, it serializes the record keys and values into binary form.
  • Writes the records to particular Kafka topics.

Kafka Connect consists of a JSON converter that serializes the record keys and values into JSON documents and helps in performing Debezium Serialization. However, by default, the converter consists of the record message scheme, making the record very lengthy. Consequently, as an alternative, you can use Avro and Apicurio Registry to serialize the record keys and values.

Getting Started with Debezium Serialization

Apicurio Registry is an open-sourced project, which consists of an Avro converter with an API schema registry. It reduces the payload on databases by storing the external versions of schemas. You need to specify the Avro converter to the Debezium connector configuration. The connector then maps Kafka Connect schemas to Avro schemas.

Apicurio Registry uses the Avro converter to serialize keys and values into Avro’s compact binary form, reducing the payload on databases. The Apicurio API and schema registry can track the Avro schemas in Kafka topics. It can also follow the generated Avro schemas by the Avro converter.

Since the Avro schemas are stored in this registry, each record needs a schema identifier. Kafka consumer applications written to consume change data events can use the Avro Serdes to deserialize the change event records.

A) Debezium Serialization with Avro and Apicurio Registry

In this tutorial, you will run an Avro application for Debezium Serialization and Apicurio Service Registry for tracking Debezium events using the below steps.

  • Download the latest version of Docker.
  • Download the generic non-JVM producer and consumer for Kafka: Kafkacat.
  • Download the command-line utility for JSON processing: jq.
  • Clone the debezium-examples directory.
  • Clone this Git repository.
  • Change the path of the cloned repository.
cd debezium-examples/debezium-registry-avro
  • Start the environment.
docker-compose up -d

The above command starts the below services.

  • A single-node ZooKeeper and Kafka cluster.
  • A single node Kafka cluster.
  • Apicurio service registry instance.
  • MySQL database.

Configuration of the Debezium Connector

The configuration file points Debezium connectors to use Avro serialization and deserialization while also specifying the location of the Apicurio registry.

The container image used in this tutorial includes all libraries to access connectors and converters. The below commands set the keys and values with their registry configuration.

Setting the keys & values for Debezium Serialization
Debezium Serialization: Commands for Setting Keys and Values

Creating the Connector

You have to create a connector to capture changes in databases. This tutorial uses Kafka Connect cluster REST API to create the Debezium connector.

curl -X POST http://localhost:8083/connectors -H 'content-type:application/json' -d @dbz-mysql-connector-avro.json

Review the Data

Once you have created and started the MySQL connector, you will notice data in the database has been captured by Debezium and sent to Kafka as events. Therefore, you have to use the below kafkacat command to review the data.

kafkacat -b localhost:9092 -t avro.inventory.customers -e

Deserializing the Record

When Avro serializes the data, the information returned is not a human-readable format.

You can use the kafkacat command to query the schema from the Apicurio service registry to get readable data and deserialize it.

Run the below command for registry configuration.

kafkacat -b localhost:9092 -t avro.inventory.customers -s avro -r http://localhost:8081/api/ccompat -e

You can use the jq JSON utility installed, or else you can use the below command.

kafkacat -b localhost:9092 -t avro.inventory.customers -s avro -r http://localhost:8081/api/ccompat -e | jq

The payload information is stored in the Kafa record. It does not have information about the Debezium schema. Debezium schema is needed to know the logical configuration of databases. Therefore, the Apicurio service registry is used to externalize the Debezium schema.

Checking the Apicurio Schema Registry

If you want to view the list of all schemas, you can check the Apicurio schema registry at http://localhost:8081/ as shown in the below image.

Apicurio Registry
Apicurio Registry

B) Multiple Approaches to Schema Association

The changed data events consist of all types of data collected through Debezium connectors. Consumers who want to access data change events should be aware of such data. Therefore, you can solve the problem of passing long message-type data in change data events in several ways.

  • The message structure can be passed out of band to the consumer, who can process the data stored.
  • The message contains a schema, which can be embedded in the message.
  • The message includes a reference to the registry that contains the associated metadata.

Consider the example of JsonConverter. It operates in two modes with and without schemas. When it works without schema, JsonConverter generates a plain JSON message where consumers need to know the type of each field. It should also execute rules to guess and map values to data types. Although this approach is flexible, it can fail for advanced systems.
An example of a JsonConverter without schema is as follows.

When the JsonConverter works with schema, the JSON message consists of two parts. i.e., schema and payload. The schema describes messages, type constraints, and field types. It enables consumers to process messages safely. The drawback of this approach is message size, as it increases significantly due to large schemas. Although schema changes rarely, adding schema to each event becomes difficult.

The below example consists of a message with a schema that can be significantly larger than the payload and is not very beneficial to use.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "source": {
      "version": "1.1.0.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 0,
      "snapshot": "true",
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "thread": null,
      "query": null
    },
    "op": "c",
    "ts_ms": 1586331101491,
    "transaction": null
  }
}

Registry

The third approach combines the first two approaches and resolves the drawbacks by adding a new registry component, which stores and versions message schemas. Apicurio registry enables Debezium and consumers to exchange messages stored in the registry and pass only a reference to the schema in the message.

To understand the Debezium Seraizalition with Apicurio registry, follow the below steps.

  • The resulting message is as follows.

From above, the JSON message consists of a full payload and a reference to a schema with id 48. It is possible to query the schema from the registry using id 48 or a symbolic name given by Debezium. Therefore, you can use the below command to query the schema using id.

$ docker run --rm --tty 
    --network tutorial_default 
    debezium/tooling bash -c 'http http://apicurio:8080/ids/64 | jq .'

$ docker run --rm --tty 
    --network tutorial_default 
    debezium/tooling bash -c 'http http://apicurio:8080/artifacts/dbserver1.inventory.customers-value | jq .'

Output:

{
  "type": "struct",
  "fields": [
    {
      "type": "struct",
      "fields": [
        {
          "type": "int32",
          "optional": false,
          "field": "id"
        },
        {
          "type": "string",
          "optional": false,
          "field": "first_name"
        },
        {
          "type": "string",
          "optional": false,
          "field": "last_name"
        },
        {
          "type": "string",
          "optional": false,
          "field": "email"
        }
      ],
      "optional": true,
      "name": "dbserver1.inventory.customers.Value",
      "field": "before"
    },
...
  ],
  "optional": false,
  "name": "dbserver1.inventory.customers.Envelope"
}

From above, the output is the same as the output of JSON with schema example. The connector registration request differs from the previous one in a few lines:

...
"key.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter", 
"key.converter.apicurio.registry.url": "http://apicurio:8080", 
"key.converter.apicurio.registry.global-id":
    "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy", 

"value.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter", 
"value.converter.apicurio.registry.url": "http://apicurio:8080", 
"value.converter.apicurio.registry.global-id":
    "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy" 
...
  • io.apicurio.registry.utils.converter.ExtJsonConverter: It is the Apicurio JSON converter that is used as both key and value converter.
  • key.converter.apicurio.registry.url: “http://apicurio:8080: It is the Apicurio registry endpoint.
  • io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy: It is the setting that automatically registers the schema id.

Avro Converter

From above, you have learned the serialization of messages into JSON format. Since the data in the Apicurio registry consists of a human-readable format, it requires more space. Therefore, using a binary serialization format like the Avro format in the Apicurio registry is helpful. In the Apicurio registry, you need to pack the data without any field name. As a result, messages will contain a reference to a schema stored in the registry.

Follow the below commands to use Apicurios’s Avro converter.

# Tear down the previous deployment
$ docker-compose -f docker-compose-mysql-apicurio.yaml down

# Start the deployment
$ docker-compose -f docker-compose-mysql-apicurio.yaml up -d --build

# Start the connector
curl -i -X POST -H "Accept:application/json" 
    -H  "Content-Type:application/json" 
    http://localhost:8083/connectors/ 
    -d @register-mysql-apicurio-converter-avro.json
  • Query the registry using the same name.
$ docker run --rm --tty 
    --network tutorial_default 
    debezium/tooling 
    bash -c 'http http://apicurio:8080/artifacts/dbserver1.inventory.customers-value | jq .'
  • The resulting schema is as follows.
{
  "type": "record",
  "name": "Envelope",
  "namespace": "dbserver1.inventory.customers",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "id",
              "type": "int"
            },
            {
              "name": "first_name",
              "type": "string"
            },
            {
              "name": "last_name",
              "type": "string"
            },
            {
              "name": "email",
              "type": "string"
            }
          ],
          "connect.name": "dbserver1.inventory.customers.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    },
...
  ],
  "connect.name": "dbserver1.inventory.customers.Envelope"
}

This is how you can perform Debezium Serialization using Avro and Apicurio Registry.

Conclusion

Debezium makes it easy to capture the data changes in databases, but it’s also essential to know how to serialize these data changes in Kafka. In this tutorial, you have learned multiple approaches to schema association that can reduce the payload on databases.

Debezium Serialization allows you to select keys and values using the Avro connector in the Apicurio registry, which stores the external versions of schemas and minimizes the payload. You have also learned how to use the Apicurio registry with Debezium connectors to deliver messages with the schema to the consumer.

As your business begins to grow, data is generated at an exponential rate across all of your company’s SaaS applications, Databases, and other sources. To meet this growing storage and computing needs of data,  you would require to invest a portion of your engineering bandwidth to Integrate data from all sources, Clean & Transform it, and finally load it to a Cloud Data Warehouse for further Business Analytics. All of these challenges can be efficiently handled by a Cloud-Based ETL tool such as Hevo Data.

Hevo Data, a No-code Data Pipeline provides you with a consistent and reliable solution to manage data transfer between a variety of sources and a wide variety of Desired Destinations, with a few clicks.

Hevo Data with its strong integration with 150+ sources (including 40+ free sources) allows you to not only export data from your desired data sources & load it to the destination of your choice but also transform & enrich your data to make it analysis-ready so that you can focus on your key business needs and perform insightful analysis using BI tools.

Want to take Hevo for a spin? Sign Up or a 14-day free trial and experience the feature-rich Hevo suite firsthand. Also checkout our unbeatable pricing to choose the best plan for your organization.

Share with us your experience of learning about Debezium Serialization. Share your thoughts in the comments below!

Manjiri Gaikwad
Technical Content Writer, Hevo Data

Manjiri is a proficient technical writer and a data science enthusiast. She holds an M.Tech degree and leverages the knowledge acquired through that to write insightful content on AI, ML, and data engineering concepts. She enjoys breaking down the complex topics of data integration and other challenges in data engineering to help data professionals solve their everyday problems.