A 101 Guide to PostgreSQL Kafka Connector

on Apache Kafka, Data Streaming, Database Management Systems, ETL, ETL Tutorials, Kafka, PostgreSQL • June 24th, 2022 • Write for Hevo

PostgreSQL Kafka Connector - Featured Image

Do you want to use Kafka to monitor PostgreSQL data? But, some bottlenecks are making it difficult for you to connect PostgreSQL to Kafka? So look no further — you are at the right place!

This article will answer all of your questions and take the stress out of finding a truly efficient solution. Follow this step-by-step guide to seamlessly connect PostgreSQL to Kafka.

After reviewing this blog thoroughly, you will be able to connect PostgreSQL to Kafka and transfer data to the destination of your choice for real-time analysis. This article will give you a thorough understanding of the platforms in question as well as their key features.

Let’s begin…

Table of Contents

What is PostgreSQL?

PostgreSQL Kafka Connector - PostgreSQL Logo
Image Source

PostgreSQL is an open-source, free-to-use relational database management platform. It supports SQL and is highly extensible. Postgres uses more traditional structuring than MongoDB, which stores data in traditional tables.

It follows the monolithic methodology where all the components work in a united manner, that follows symmetry. Because it is based on the C programming language, it has widespread community support. Being an open-source solution also makes a significant contribution to the cause of constant updates.

One significant difference between PostgreSQL and standard relational database systems that makes it highly extensible is that PostgreSQL stores massive amounts of data in its catalogs. It saves information about data types, functions, access methods, and so on.

Key Features of PostgreSQL

PostgreSQL has become one of the most popular Database Management Systems due to the following appealing features:

  • Data Integrity: PostgreSQL ensures data integrity with primary keys, foreign keys, explicit locks, advisory locks, and exclusion constraints.
  • Multiple Data Types: PostgreSQL supports a wide range of data types. It supports a variety of data types, including INTEGER, NUMERIC, BOOLEAN, CHAR, VARCHAR, DATE, INTERVAL, TIMESTAMP, and so on.
  • Data Security: Data is ensured by several layers of data authentication and protection. It supports various authentication methods, such as Lightweight Directory Access.

Replicate PostgreSQL and Kafka Data in Minutes Using Hevo’s No-Code Data Pipeline

Hevo Data, a fully-managed Data Pipeline platform, can help you automate and simplify your data replication process in a few clicks. With Hevo’s wide variety of connectors and blazing-fast Data Pipelines, you can extract & load data from 100+ Data Sources like PostgreSQL and Kafka straight into your Data Warehouse or any Databases. To further streamline and prepare your data for analysis, you can process and enrich raw granular data using Hevo’s robust & built-in Transformation Layer without writing a single line of code

Get Started with Hevo for Free

Hevo is the fastest, easiest, and most reliable data replication platform that will save your engineering bandwidth and time multifold. Try our 14-day full access free trial today to experience an entirely automated hassle-free Data Replication!

What is Apache Kafka?

PostgreSQL Kafka Connector - Apache kafka logo
Image Source

Apache Kafka is a free and open-source Distributed Streaming Platform that enables the creation of Real-Time Event-Driven Applications. It enables developers to create applications that use a Message Broker to consistently produce and consume streams of data records.

Apache Kafka is extremely fast and ensures that all data records are accurate. These data records are kept in the order in which they appear within “Clusters,” which can span multiple Servers or even multiple Data Centers. Apache Kafka replicates and partitions these records so that a large number of users can use the application at the same time.

Key Features of Apache Kafka

The core capabilities of Apache Kafka are:

  • Exceptional Throughput: Kafka uses a cluster of machines to deliver messages at high throughput with latencies as low as 2ms.
  • Scalable: Kafka can handle thousands of brokers, trillions of messages per day, petabytes of data, and hundreds of thousands of partitions.
  • Long-Term Storage: Kafka can securely store data streams in distributed, long-lasting, fault-tolerant cluster machines.
  • Built-in Stream Processing: Kafka includes built-in stream processing features that process event streams in real-time using joins, aggregations, filters, transformations, and more.
  • The interface of Connection: Kafka includes a connect interface that integrates with hundreds of event sources and sinks, including AWS S3, Postgres, JMS, Elasticsearch, and others.
  • High Availability: Kafka is lightning fast and guarantees zero downtime, ensuring that your data is always accessible.
  • Libraries for Clients: Kafka can read, write, and process event streams in a variety of programming languages, including Python, Java, C/C++, Scala, and others.

Why PostgreSQL Kafka Connector is used?

Each program saves its data to a database. So, the reasons are pretty straightforward why you would establish the PostgreSQL Kafka Connect. Some motivations are as follows:

  1. You want to react to changes in the database in real-time. For example, when the records were updated.
  2. You want to avoid a situation in which each application has its piece of code that sends a message when something in the database has been modified.

PostgreSQL Kafka Connector is useful for streaming data into Kafka. This connection allows you to extract data from PostgreSQL and track all row-level changes to your data.

The PostgreSQL Kafka Connector supports multiple data formats such as Avro, JSON Schema, Protobuf, or JSON. You can use the PostgreSQL Kafka Connector to transfer important data from your Postgres database tables, such as customer information and stakeholder data, and perform Stream Processing on this data using Kafka’s built-in functionalities.

Many major players in the industry have found Apache Kafka to be an ideal combination for data streaming, including Pinterest, Airbnb, Cisco, Cloudflare, Goldman Sachs, LinkedIn, Mozilla Firefox, Oracle, Paypal, and Spotify, Shopify, Tencent, Twitter, and many more.

Like Apache Kafka, Debezium has proved to be one of the native CDC connectors for Postgres (and other databases). The Apache Connector is used to communicate between Apache Kafka and Debezium. It is an out-of-the-box solution that works as a centralized data hub for simple data integration between databases, key-value stores, search indexes, and file systems.

Connecting PostgreSQL to Kafka

Debezium is an open-source platform that builds on Change Data Capture features found in various databases. It includes a set of Kafka Connect connectors that listen for row-level changes (via CDC) in database table(s) and convert them to event streams. Apache Kafka receives these event streams. Below you’ll discover two methods of how you can use PostgreSQL Kafka Connector.

Method 1: Using Debezium as a PostgreSQL Kafka Connector

To capture data change, the following technologies will be used.

  • Apache Kafka: It will be used to create a messaging topic that will store the database changes.
  • Kafka Connect: It is a tool that allows for scalable and dependable data streaming between Apache Kafka and other systems. It is used to specify connectors that can move data from entire databases into and out of Kafka.
  • Debezium: It is a tool that is used to convert WALs into data streams by utilizing the best underlying mechanism provided by the database system. The database data is then streamed into Kafka via the Kafka Connect API.
PostgreSQL Kafka Connector - Using Debezium
Image Source

Debezium uses the PostgreSQL logical decoding feature to extract all persistent changes to the database in a simple format that can be interpreted without detailed knowledge of the database’s internal state.

Debezium uses the Kafka Connect API to register itself as one of the connectors of a data source once the changed data is available in an understandable format. Debezium uses checkpoints to read only committed data from the transaction log.

Step 1: Running Multiple Instances for PostgreSQL Kafka Connector

Here you’ll start the PostgreSQL database and assign it a port number of 5000 to the system. You also need to start zookeeper, which Apache Kafka uses to store consumer offsets. Finally, you can launch a debezium instance and connect it to the existing containers, namely Postgres, Kafka, and Zookeeper. The linking will aid communication between containers.

Launch a PostgreSQL instance

#Postgresql Kafka Connector--

docker run — name postgres -p 5000:5432 debezium/postgres

Launch a Zookeeper instance

docker run -it — name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper

Begin a Kafka instance

docker run -it — name kafka -p 9092:9092 — link zookeeper:zookeeper debezium/kafka

Begin a Debezium instance

#Postgresql Kafka Connector---

docker run -it — name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3 -d’/’ | cut -f1 -d’:’) — link zookeeper:zookeeper — link postgres:postgres — link kafka:kafka debezium/connect

Connect to PostgreSQL and create a tracking database

#Postgresql Kafka Connector---

psql -h localhost -p 5000 -U postgres
CREATE DATABASE inventory;
CREATE TABLE dumb_table(id SERIAL PRIMARY KEY, name VARCHAR);

Once the setup is complete; all that remains is to register a PostgreSQL Kafka Connector with Kafka Connect.

Step 2: Connect the PostgreSQL Kafka Connector

#Postgresql Kafka Connector---

curl -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d ‘
{
 “name”: “inventory-connector”,
 “config”: {
 “connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
 “tasks.max”: “1”,
 “database.hostname”: “postgres”,
 “database.port”: “5432”,
 “database.user”: “postgres”,
 “database.password”: “postgres”,
 “database.dbname” : “inventory”,
 “database.server.name”: “dbserver1”,
 “database.whitelist”: “inventory”,
 “database.history.kafka.bootstrap.servers”: “kafka:9092”,
 “database.history.kafka.topic”: “schema-changes.inventory”
 }
}’

Now you can start the Kafka console and you can monitor the data you get from PostgreSQL. Now, use the PSQL CLI to perform some SQL inserts, updates, and deletes. In the console consumer, you will see some JSON-like output.

What Makes Hevo’s ETL Process Best-In-Class

Providing a high-quality ETL solution can be a difficult task if you have a large volume of data. Hevo’s Automated, No-Code Platform empowers you with everything you need to have for a smooth data replication experience.

Check out what makes Hevo amazing:

  • Fully Managed: Hevo requires no management and maintenance as it is a fully automated platform.
  • Data Transformation: Hevo provides a simple interface to perfect, modify, and enrich the data you want to transfer.
  • Faster Insight Generation: Hevo offers near real-time data replication so you have access to real-time insight generation and faster decision making. 
  • Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
  • Scalable Infrastructure: Hevo has in-built integrations for 100+ Data Sources (with 40+ Free Sources) that can help you scale your data infrastructure as required.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-Day Free Trial!

Method 2: Using Confluent CLI as a PostgreSQL Kafka Connector

You can use Confluent CLI to set up PostgreSQL to Kafka. Here are steps to get started:

Step 1: Set up the Connector Configuration

To display the required connector properties, run the following command:

confluent connect plugin describe PostgresSource

Step 2: Create the connector configuration file

Create a JSON file with the connector configuration properties. The required connector properties are illustrated in the following example.

#Postgresql Kafka Connector---

{
    "name" : "confluent-postgresql-source",
    "connector.class": "PostgresSource",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.secret" : "<my-kafka-api-secret>",
    "topic.prefix" : "postgresql_",
    "ssl.mode" : "prefer",
    "connection.host" : "<my-database-endpoint>",
    "connection.port" : "5432",
    "connection.user" : "postgres",
    "connection.password": "<my-database-password>",
    "db.name": "postgres",
    "table.whitelist": "passengers",
    "timestamp.column.name": "created_at",
    "output.data.format": "JSON",
    "db.timezone": "UTC",
    "tasks.max" : "1"
}

Please take note of the following property definitions:

  • name“: Gives your connector a name.
  • connector.class“: Identifies the name of the connector plugin.
  • kafka.auth.mode“: Specifies which connector authentication mode to use. There are two choices: KAFKA API KEY or SERVICE ACCOUNT (the default). To use an API key and secret, set the configuration properties kafka.api.key and kafka.api.secret to the values shown in the example configuration (above). To make use of a service account, enter the Resource ID in the property.
  • “topic.prefix”: The connector generates Kafka topics automatically using the naming convention: prefix>.table-name>. Topic.creation.default.partitions=1 and topic.creation.default.replication.factor=3 are used to create the tables. If you want to create topics with specific settings, do so before running this connector.
  • “SSL mode”: This allows you for an encrypted connection to the MySQL database server. The name of your PostgreSQL database is referenced by “db.name.”
  • “timestamp.column.name”: A method for reading data from PostgreSQL tables. It detects new and modified rows by using a timestamp column. When combined with “incrementing.column.name,” this property enables timestamp and incrementing mode for handling updates using a globally unique ID that can be assigned a unique stream offset.
  • output.data.format”: This property specifies the preferred output format for Kafka records (data coming from connector). Acceptable formats include AVRO, JSON SR, PROTOBUF, and JSON.
  • “db.timezone“: It determines the timezone of the database. UTC is the default timezone.

Step 3: Load the Properties file and Run the Connector

To load the configuration and start the connector, type the following command:

#Postgresql Kafka Connector---

confluent connect create --config postgres-source.json

Example Output:

#Postgresql Kafka Connector---

Created connector confluent-postgresql-source lcc-ix4dl

Step 4: Examine the Connector’s status

To check the connector status, use the following command:

#Postgresql Kafka Connector---
confluent connect list

Example Output:

lcc-ix4dl | confluent-postgresql-source | RUNNING | source

Limitations of PostgreSQL Kafka Connector

Here are some limitations you encounter while using the PostgreSQL Kafka Connector:

  • PostgreSQL’s logical decoding feature is used by the connector. As a result, it does not capture DDL changes and cannot reflect these events in topics.
  • In the Replication slots, the connector can only connect to the primary database instance.
  • If the connector user has read-only access, you must manually create the replication slot and database publication.
  • The connector will attempt to reconnect to the source. The connector defaults to making 16 attempts using the exponential backoff algorithm. The task will be marked as failed after the 16th failed attempt. In this case, you must manually restart it.
  • Certain network access restrictions may exist depending on the service environment. Ascertain that the connector can reach your service. See Networking and DNS Considerations for more information.
  • To use a schema-based message format, such as Avro, a valid schema must be available in the Confluent Cloud Schema Registry.

Conclusion

This article demonstrated how to implement the PostgreSQL Kafka Connector. It provided a piece of knowledge about Debezium and Confluent CLI concepts that helped you understand how to implement them.

The real question for most businesses is how to leverage and make sense of this data. Although many people are aware of Business Analytics Systems, in order to benefit from them, you must have consolidated data in a single repository, such as a Data Warehouse. Data replication from PostgreSQL or Kafka to a Data Warehouse necessitates the creation of ETL Pipelines, which in turn necessitate continuous monitoring and routine maintenance.

Luckily, with Hevo, you can build Data Pipelines from PostgreSQL, Kafka, and 100+ Data Sources to a Data Warehouse, but without the need for technical resources or know-how. That’s right! Hevo Pipelines can be set up in minutes, even by non-data professionals.

This article describes how to get started with PostgreSQL Kafka Connector in a few easy steps.

VISIT OUR WEBSITE TO EXPLORE HEVO ACTIVATE

Hevo connects your applications to Data Warehouses like Amazon Redshift, Snowflake, Google BigQuery, Firebolt, or even Database Destinations like PostgreSQL, MySQL, or MS SQL Server where you can run transformations for analytics, and deliver Operational Intelligence to Business Tools.

Try Hevo and see the magic for yourself. Sign Up here for a 14-day full-feature access trial and experience the feature-rich Hevo suite first hand. You can also check Hevo’s pricing and make a decision on your best-suited plan. 

Have any questions on Hevo Data Pipelines? Let us know in the comment section below. We would love to help you out.

No-Code Data Pipeline for PostgreSQL