Debezium Serialization with Avro and Apicurio Registry Simplified: A Comprehensive Guide 101

on Debezium, Kafka • February 28th, 2022 • Write for Hevo

Debezium Serialization

Organizations use Kafka and Debezium to track real-time changes in databases and stream them to different applications. But often, due to a colossal amount of messages in Kafka topics, it becomes challenging to serialize these messages. Every message in Kafka’s topic has a key and value, and you have to identify how the Debezium event keys and value should be serialized to a binary form. You can do it through a JSON converter, but it will become lengthy. As a workaround for the JSON converter, you can leverage the Apicurio registry, using which you can perform Debezium Serialization. Read along to learn more about Debezium Serialization!

Table of Contents

Prerequisites

  • Fundamental understanding of Debezium Connectors.

What is Debezium?

Debezium logo
Image Source

Debezium is an open-source, distributed system that enables users to capture real-time changes so that applications can notice such changes and react to them. It consists of connectors that record all real-time data changes and store them as events in Kafka topics. Debezium uses connectors like MySQL, SQL, Oracle, MongoDB, and many more for different databases. 

What is Debezium Serialization?

Debezium uses Debezium connectors in the Kafka Connect framework to capture changes in databases through change data events. For each record, the Debezium connector performs the following actions.

  • It applies configured Transformation.
  • Through the Kafka Connect converter, it serializes the record keys and values into binary form.
  • Writes the records to particular Kafka topics.

Kafka Connect consists of a JSON converter that serializes the record keys and values into JSON documents and helps in performing Debezium Serialization. However, by default, the converter consists of the record message scheme, making the record very lengthy. Consequently, as an alternative, you can use Avro and Apicurio Registry to serialize the record keys and values.

Simplify Your Data Analysis Using Hevo’s No-code Data Pipeline

Hevo Data is a No-code Data Pipeline that offers a fully managed solution to set up Data Integration from 100+ Data Sources (including 40+ Free sources) and will let you directly load data from various sources to a Data Warehouse or the Destination of your choice. It will automate your data flow in minutes without writing any line of code. Its fault-tolerant architecture makes sure that your data is secure and consistent. Hevo provides you with a truly efficient and fully automated solution to manage data in real-time and always have analysis-ready data. 

Get Started with Hevo for Free

Check out why Hevo is the Best:

  • Fully Managed: It requires no management and maintenance as Hevo is a fully automated platform.
  • Data Transformation: It provides a simple interface to perfect, modify, and enrich the data you want to transfer. 
  • Real-Time: Hevo offers real-time data migration. So, your data is always ready for analysis.
  • Schema Management: Hevo can automatically detect the schema of the incoming data and maps it to the destination schema.
  • Connectors: Hevo supports 100+ Integrations to SaaS platforms FTP/SFTP, Files, Databases, BI tools, and Native REST API & Webhooks Connectors. It supports various destinations including Google BigQuery, Amazon Redshift, Snowflake, Firebolt, Data Warehouses; Amazon S3 Data Lakes; Databricks; and MySQL, SQL Server, TokuDB, MongoDB, PostgreSQL Databases to name a few.  
  • Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
  • Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
  • Live Monitoring: Advanced monitoring gives you a one-stop view to watch all the activities that occur within Data Pipelines.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-Day Free Trial!

Getting started with Debezium Serialization

Apicurio Registry is an open-sourced project, which consists of an Avro converter with an API schema registry. It reduces the payload on databases by storing the external versions of schemas. You need to specify the Avro converter to the Debezium connector configuration. The connector then maps Kafka Connect schemas to Avro schemas. Apicurio Registry uses the Avro converter to serialize keys and values into Avro’s compact binary form, reducing the payload on databases.

The Apicurio API and schema registry can track the Avro schemas in Kafka topics. It can also follow the generated Avro schemas by the Avro converter. Since the Avro schemas are stored in this registry, each record needs a schema identifier. Kafka consumer applications written to consume change data events can use the Avro Serdes to deserialize the change event records.

A) Debezium Serialization with Avro and Apicurio Registry

In this tutorial, you will run an Avro application for Debezium Serialization and Apicurio Service Registry for tracking Debezium events using the below steps.

  • Download the latest version of Docker.
  • Download the generic non-JVM producer and consumer for Kafka: Kafkacat.
  • Download the command-line utility for JSON processing: jq.
  • Clone the debezium-examples directory.
  • Clone this Git repository.
  • Change the path of the cloned repository.
cd debezium-examples/debezium-registry-avro
  • Start the environment.
docker-compose up -d

The above command starts the below services.

  • A single-node ZooKeeper and Kafka cluster.
  • A single node Kafka cluster.
  • Apicurio service registry instance.
  • MySQL database.

Configuration of the Debezium Connector

The configuration file points Debezium connectors to use Avro serialization and deserialization while also specifying the location of the Apicurio registry.

The container image used in this tutorial includes all libraries to access connectors and converters. The below commands set the keys and values with their registry configuration.

Setting the keys & values for Debezium Serialization
Image Source

Creating the Connector

You have to create a connector to capture changes in databases. This tutorial uses Kafka Connect cluster REST API to create the Debezium connector.

curl -X POST http://localhost:8083/connectors -H 'content-type:application/json' -d @dbz-mysql-connector-avro.json

Review the Data

Once you have created and started the MySQL connector, you will notice data in the database has been captured by Debezium and sent to Kafka as events. Therefore, you have to use the below kafkacat command to review the data.

kafkacat -b localhost:9092 -t avro.inventory.customers -e

Deserializing the Record

When Avro serializes the data, the information returned is not a human-readable format.

You can use the kafkacat command to query the schema from the Apicurio service registry to get readable data and deserialize it.

Run the below command for registry configuration.

kafkacat -b localhost:9092 -t avro.inventory.customers -s avro -r http://localhost:8081/api/ccompat -e

You can use the jq JSON utility installed, or else you can use the below command.

kafkacat -b localhost:9092 -t avro.inventory.customers -s avro -r http://localhost:8081/api/ccompat -e | jq

The payload information is stored in the Kafa record. It does not have the information about the Debezium schema. Debezium schema is needed to know the logical configuration of databases. Therefore, the Apicurio service registry is used to externalize the Debezium schema.

Checking the Apicurio Schema Registry

If you want to view the list of all schemas, you can check the Apicurio schema registry at http://localhost:8081/ as shown in the below image.

Apicurio Registry
Image Source

B) Multiple Approaches to Schema Association

The changed data events consist of all types of data collected through Debezium connectors. Consumers who want to access data change events should be aware of such data. Therefore, you can solve the problem of passing long message type data in change data events through several ways.

  • The message structure can be passed out of band to the consumer, who can process the data stored.
  • The message contains a schema, which can be embedded in the message.
  • The message includes a reference to the registry that contains the associated metadata.

Consider the example of JsonConverter. It operates in two modes with and without schemas. When it works without schema, JsonConverter generates a plain JSON message where consumers need to know the type of each field. It should also execute rules to guess and map values to data types. Although this approach is flexible, it can fail for advanced systems.
An example of a JsonConverter without schema is as follows.

When the JsonConverter works with schema, the JSON message consists of two parts. i.e., schema and payload. The schema describes messages, type constraints, and field types. It enables consumers to process messages safely. The drawback of this approach is message size, as it increases significantly due to large schemas. Although schema changes rarely, adding schema to each event becomes difficult.

The below example consists of a message with a schema that can be significantly larger than the payload and is not very beneficial to use.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "source": {
      "version": "1.1.0.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 0,
      "snapshot": "true",
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "thread": null,
      "query": null
    },
    "op": "c",
    "ts_ms": 1586331101491,
    "transaction": null
  }
}

Registry

The third approach combines the first two approaches and resolves the drawbacks by adding a new registry component, which stores and versions message schemas. Apicurio registry enables Debezium and consumers to exchange messages stored in the registry and pass only a reference to the schema in the message.

To understand the Debezium Seraizalition with Apicurio registry, follow the below steps.

  • The resulting message is as follows.
Resulting Message

From above, the JSON message consists of a full payload and a reference to a schema with id 48. It is possible to query the schema from the registry using id 48 or a symbolic name given by Debezium. Therefore, you can use the below command to query the schema using id.

$ docker run --rm --tty 
    --network tutorial_default 
    debezium/tooling bash -c 'http http://apicurio:8080/ids/64 | jq .'

$ docker run --rm --tty 
    --network tutorial_default 
    debezium/tooling bash -c 'http http://apicurio:8080/artifacts/dbserver1.inventory.customers-value | jq .'

Output:

{
  "type": "struct",
  "fields": [
    {
      "type": "struct",
      "fields": [
        {
          "type": "int32",
          "optional": false,
          "field": "id"
        },
        {
          "type": "string",
          "optional": false,
          "field": "first_name"
        },
        {
          "type": "string",
          "optional": false,
          "field": "last_name"
        },
        {
          "type": "string",
          "optional": false,
          "field": "email"
        }
      ],
      "optional": true,
      "name": "dbserver1.inventory.customers.Value",
      "field": "before"
    },
...
  ],
  "optional": false,
  "name": "dbserver1.inventory.customers.Envelope"
}

From above, the output is the same as the output of JSON with schema example. The connector registration request differs from the previous one in a few lines:

...
"key.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter", 
"key.converter.apicurio.registry.url": "http://apicurio:8080", 
"key.converter.apicurio.registry.global-id":
    "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy", 

"value.converter": "io.apicurio.registry.utils.converter.ExtJsonConverter", 
"value.converter.apicurio.registry.url": "http://apicurio:8080", 
"value.converter.apicurio.registry.global-id":
    "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy" 
...
  • io.apicurio.registry.utils.converter.ExtJsonConverter: It is the Apicurio JSON converter that is used as both key and value converter.
  • key.converter.apicurio.registry.url: “http://apicurio:8080: It is the Apicurio registry endpoint.
  • io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy: It is the setting that automatically registers the schema id.

Avro Converter

From above, you have learned the serialization of messages into JSON format. Since the data in the Apicurio registry consists of a human-readable format, it requires more space. Therefore, using a binary serialization format like the Avro format in the Apicurio registry is helpful. In the Apicurio registry, you need to pack the data without any field name. As a result, messages will contain a reference to a schema stored in the registry.

Follow the below commands to use Apicurios’s Avro converter.

# Tear down the previous deployment
$ docker-compose -f docker-compose-mysql-apicurio.yaml down

# Start the deployment
$ docker-compose -f docker-compose-mysql-apicurio.yaml up -d --build

# Start the connector
curl -i -X POST -H "Accept:application/json" 
    -H  "Content-Type:application/json" 
    http://localhost:8083/connectors/ 
    -d @register-mysql-apicurio-converter-avro.json
  • Query the registry using the same name.
$ docker run --rm --tty 
    --network tutorial_default 
    debezium/tooling 
    bash -c 'http http://apicurio:8080/artifacts/dbserver1.inventory.customers-value | jq .'
  • The resulting schema is as follows.
{
  "type": "record",
  "name": "Envelope",
  "namespace": "dbserver1.inventory.customers",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "id",
              "type": "int"
            },
            {
              "name": "first_name",
              "type": "string"
            },
            {
              "name": "last_name",
              "type": "string"
            },
            {
              "name": "email",
              "type": "string"
            }
          ],
          "connect.name": "dbserver1.inventory.customers.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    },
...
  ],
  "connect.name": "dbserver1.inventory.customers.Envelope"
}

This is how you can perform Debezium Serialization using Avro and Apicurio Registry.

Conclusion

Debezium makes it easy to capture the data changes in databases, but it’s also essential to know how to serialize these data changes in Kafka. In this tutorial, you have learned multiple approaches to schema association that can reduce the payload on databases. Debezium Serialization allows you to select keys and values using the Avro connector in the Apicurio registry, which stores the external versions of schemas and minimizes the payload. You have also learned how to use the Apicurio registry with Debezium connectors to deliver messages with the schema to the consumer.

As your business begins to grow, data is generated at an exponential rate across all of your company’s SaaS applications, Databases, and other sources. To meet this growing storage and computing needs of data,  you would require to invest a portion of your engineering bandwidth to Integrate data from all sources, Clean & Transform it, and finally load it to a Cloud Data Warehouse for further Business Analytics. All of these challenges can be efficiently handled by a Cloud-Based ETL tool such as Hevo Data.

Visit our Website to Explore Hevo

Hevo Data, a No-code Data Pipeline provides you with a consistent and reliable solution to manage data transfer between a variety of sources and a wide variety of Desired Destinations, with a few clicks. Hevo Data with its strong integration with 100+ sources (including 40+ free sources) allows you to not only export data from your desired data sources & load it to the destination of your choice, but also transform & enrich your data to make it analysis-ready so that you can focus on your key business needs and perform insightful analysis using BI tools.

Want to take Hevo for a spin? Sign Up for a 14-day free trial and experience the feature-rich Hevo suite first hand. You can also have a look at the unbeatable pricing that will help you choose the right plan for your business needs.

Share with us your experience of learning about Debezium Serialization. Share your thoughts in the comments below!

No-code Data Pipeline for Your Data Warehouse