Do you want to use Kafka to monitor PostgreSQL data? But, some bottlenecks are making it difficult for you to connect PostgreSQL to Kafka? So look no further — you are at the right place!
This article will answer all of your questions and take the stress out of finding a truly efficient solution. Follow this step-by-step guide to seamlessly connect PostgreSQL to Kafka.
After reviewing this blog thoroughly, you will be able to connect PostgreSQL to Kafka and transfer data to the destination of your choice for real-time analysis. This article will give you a thorough understanding of the platforms in question as well as their key features.
Let’s begin…
What is PostgreSQL?
PostgreSQL is an open-source, free-to-use relational database management platform. It supports SQL and is highly extensible. Postgres uses more traditional structuring than MongoDB, which stores data in traditional tables.
It follows the monolithic methodology where all the components work in a united manner, that follows symmetry. Because it is based on the C programming language, it has widespread community support. Being an open-source solution also makes a significant contribution to the cause of constant updates.
One significant difference between PostgreSQL and standard relational database systems that makes it highly extensible is that PostgreSQL stores massive amounts of data in its catalogs. It saves information about data types, functions, access methods, and so on.
Hevo Data allows you to seamlessly connect your PostgreSQL or Kafka sources to any destination of your choice. Whether you’re moving data to a new database, analytics platform, or other formats, our platform ensures a smooth and efficient integration process.
Why Choose Hevo Data?
- Integrate data from 150+ sources(60+ free sources).
- Simplify data mapping and transformations using features like drag-and-drop.
- Easily migrate different data types like CSV, JSON, etc., with the auto-mapping feature.
Join 2000+ happy customers like Whatfix and Thoughtspot, who’ve streamlined their data operations. See why Hevo is the #1 choice for building modern data stacks.
Get Started with Hevo for Free
Key Features of PostgreSQL
PostgreSQL has become one of the most popular Database Management Systems due to the following appealing features:
- Data Integrity: PostgreSQL ensures data integrity with primary keys, foreign keys, explicit locks, advisory locks, and exclusion constraints.
- Multiple Data Types: PostgreSQL supports a wide range of data types. It supports a variety of data types, including INTEGER, NUMERIC, BOOLEAN, CHAR, VARCHAR, DATE, INTERVAL, TIMESTAMP, and so on.
- Data Security: Data is ensured by several layers of data authentication and protection. It supports various authentication methods, such as Lightweight Directory Access.
What is Apache Kafka?
Apache Kafka is a free and open-source Distributed Streaming Platform that enables the creation of Real-Time Event-Driven Applications. It enables developers to create applications that use a Message Broker to consistently produce and consume streams of data records.
Apache Kafka is extremely fast and ensures that all data records are accurate. These data records are kept in the order in which they appear within “Clusters,” which can span multiple Servers or even multiple Data Centers. Apache Kafka replicates and partitions these records so that a large number of users can use the application at the same time.
Key Features of Apache Kafka
The core capabilities of Apache Kafka are:
- Exceptional Throughput: Kafka uses a cluster of machines to deliver messages at high throughput with latencies as low as 2ms.
- Scalable: Kafka can handle thousands of brokers, trillions of messages per day, petabytes of data, and hundreds of thousands of partitions.
- Long-Term Storage: Kafka can securely store data streams in distributed, long-lasting, fault-tolerant cluster machines.
- Built-in Stream Processing: Kafka includes built-in stream processing features that process event streams in real-time using joins, aggregations, filters, transformations, and more.
- The interface of Connection: Kafka includes a connect interface that integrates with hundreds of event sources and sinks, including AWS S3, Postgres, JMS, Elasticsearch, and others.
- High Availability: Kafka is lightning fast and guarantees zero downtime, ensuring that your data is always accessible.
- Libraries for Clients: Kafka can read, write, and process event streams in a variety of programming languages, including Python, Java, C/C++, Scala, and others.
Why is PostgreSQL Kafka Connector used?
The PostgreSQL Kafka Connector streams real-time data between a PostgreSQL database and an Apache Kafka cluster. Here are some key reasons why it is utilized:
- Real-Time Data Streaming: It enables capturing changes in a PostgreSQL database (using Change Data Capture or CDC) and streams these changes to Kafka topics in real time. This is particularly useful for applications that need to react to database changes immediately.
- Data Integration: The connector facilitates seamless integration between PostgreSQL and other systems or applications connected to Kafka. Data from PostgreSQL can be consumed by various consumers downstream, such as microservices, data warehouses, or analytics platforms.
- Scalability: Using Kafka as a buffer allows for scalable and distributed processing of PostgreSQL data. This helps in handling large volumes of data across different environments.
- Fault Tolerance: Kafka provides intense durability and fault tolerance guarantees. By using the PostgreSQL Kafka Connector, you can ensure that changes to the PostgreSQL database are reliably delivered even when the database fails.
Connecting PostgreSQL to Kafka
Debezium is an open-source platform that builds on Change Data Capture features found in various databases. It includes a set of Kafka Connect connectors that listen for row-level changes (via CDC) in database table(s) and convert them to event streams. Apache Kafka receives these event streams. Below you’ll discover two methods of how you can use PostgreSQL Kafka Connector.
Migrate Data from PostgreSQL to BigQuery
Migrate Data from Kafka to PostgreSQL
Migrate Data from Confluent Cloud to Snowflake
Method 1: Using Debezium as a PostgreSQL Kafka Connector
To capture data change, the following technologies will be used.
- Apache Kafka: It will be used to create a messaging topic that will store the database changes.
- Kafka Connect: It is a tool that allows for scalable and dependable data streaming between Apache Kafka and other systems. It is used to specify connectors that can move data from entire databases into and out of Kafka.
- Debezium: It is a tool that is used to convert WALs into data streams by utilizing the best underlying mechanism provided by the database system. The database data is then streamed into Kafka via the Kafka Connect API.
Debezium uses the PostgreSQL logical decoding feature to extract all persistent changes to the database in a simple format that can be interpreted without detailed knowledge of the database’s internal state.
Debezium uses the Kafka Connect API to register itself as one of the connectors of a data source once the changed data is available in an understandable format. Debezium uses checkpoints to read only committed data from the transaction log.
Step 1: Running Multiple Instances for PostgreSQL Kafka Connector
Here you’ll start the PostgreSQL database and assign it a port number of 5000 to the system. You also need to start zookeeper, which Apache Kafka uses to store consumer offsets. Finally, you can launch a debezium instance and connect it to the existing containers, namely Postgres, Kafka, and Zookeeper. The linking will aid communication between containers.
Launch a PostgreSQL instance
#Postgresql Kafka Connector--
docker run — name postgres -p 5000:5432 debezium/postgres
docker run
starts a new container using Docker.
— name postgres
names the container “postgres” for easy reference.
-p 5000:5432
maps port 5432 of the PostgreSQL service inside the container to port 5000 on the host machine, allowing external access.
debezium/postgres
specifies the Docker image to use, which is a PostgreSQL image configured by Debezium for Kafka integration.
- This command starts a PostgreSQL container, making it accessible via port 5000 on the host.
Launch a Zookeeper instance
docker run -it — name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper
docker run -it
runs a new Docker container in interactive mode with terminal access.
— name zookeeper
names the container “zookeeper” for easy identification.
-p 2181:2181 -p 2888:2888 -p 3888:3888
maps Zookeeper’s internal ports (2181, 2888, 3888) to the same ports on the host, allowing network access.
debezium/zookeeper
specifies the Docker image, which includes Zookeeper configured for Kafka.
- This command starts a Zookeeper container, set up to communicate through essential ports for Kafka integration.
Begin a Kafka instance
docker run -it — name kafka -p 9092:9092 — link zookeeper:zookeeper debezium/kafka
docker run -it
starts a new Docker container in interactive mode with terminal access.
— name kafka
names the container “kafka” for easy reference.
-p 9092:9092
maps Kafka’s internal port 9092 to the same port on the host, allowing external access.
— link zookeeper:zookeeper
links this Kafka container to a running Zookeeper container, enabling them to communicate.
debezium/kafka
specifies the Kafka Docker image from Debezium for setting up a Kafka server.
Begin a Debezium instance
#Postgresql Kafka Connector---
docker run -it — name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3 -d’/’ | cut -f1 -d’:’) — link zookeeper:zookeeper — link postgres:postgres — link kafka:kafka debezium/connect
docker run -it
starts a new container in interactive mode with terminal access.
— name connect
names the container “connect.”
-p 8083:8083
makes the container’s port 8083 accessible on the host, allowing access to Kafka Connect’s API.
- Environment variables like
GROUP_ID
and CONFIG_STORAGE_TOPIC
configure Kafka Connect’s group and storage settings.
— link
connects this container to other containers (Zookeeper, PostgreSQL, and Kafka), enabling communication between them.
Connect to PostgreSQL and create a tracking database
#Postgresql Kafka Connector---
psql -h localhost -p 5000 -U postgres
CREATE DATABASE inventory;
CREATE TABLE dumb_table(id SERIAL PRIMARY KEY, name VARCHAR);
Once the setup is complete; all that remains is to register a PostgreSQL Kafka Connector with Kafka Connect.
Step 2: Connect the PostgreSQL Kafka Connector
#Postgresql Kafka Connector---
curl -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d ‘
{
“name”: “inventory-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: “1”,
“database.hostname”: “postgres”,
“database.port”: “5432”,
“database.user”: “postgres”,
“database.password”: “postgres”,
“database.dbname” : “inventory”,
“database.server.name”: “dbserver1”,
“database.whitelist”: “inventory”,
“database.history.kafka.bootstrap.servers”: “kafka:9092”,
“database.history.kafka.topic”: “schema-changes.inventory”
}
}’
Now you can start the Kafka console and you can monitor the data you get from PostgreSQL. Now, use the PSQL CLI to perform some SQL inserts, updates, and deletes. In the console consumer, you will see some JSON-like output.
Method 2: Using Confluent CLI as a PostgreSQL Kafka Connector
You can use Confluent CLI to set up PostgreSQL to Kafka. Here are steps to get started:
Step 1: Set up the Connector Configuration
To display the required connector properties, run the following command:
confluent connect plugin describe PostgresSource
Step 2: Create the connector configuration file
Create a JSON file with the connector configuration properties. The required connector properties are illustrated in the following example.
#Postgresql Kafka Connector---
{
"name" : "confluent-postgresql-source",
"connector.class": "PostgresSource",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.secret" : "<my-kafka-api-secret>",
"topic.prefix" : "postgresql_",
"ssl.mode" : "prefer",
"connection.host" : "<my-database-endpoint>",
"connection.port" : "5432",
"connection.user" : "postgres",
"connection.password": "<my-database-password>",
"db.name": "postgres",
"table.whitelist": "passengers",
"timestamp.column.name": "created_at",
"output.data.format": "JSON",
"db.timezone": "UTC",
"tasks.max" : "1"
}
Please take note of the following property definitions:
- “name“: Gives your connector a name.
- “connector.class“: Identifies the name of the connector plugin.
- “kafka.auth.mode“: Specifies which connector authentication mode to use. There are two choices: KAFKA API KEY or SERVICE ACCOUNT (the default). To use an API key and secret, set the configuration properties kafka.api.key and kafka.api.secret to the values shown in the example configuration (above). To make use of a service account, enter the Resource ID in the property.
- “topic.prefix”: The connector generates Kafka topics automatically using the naming convention: prefix>.table-name>. Topic.creation.default.partitions=1 and topic.creation.default.replication.factor=3 are used to create the tables. If you want to create topics with specific settings, do so before running this connector.
- “SSL mode”: This allows you for an encrypted connection to the MySQL database server. The name of your PostgreSQL database is referenced by “db.name.”
- “timestamp.column.name”: A method for reading data from PostgreSQL tables. It detects new and modified rows by using a timestamp column. When combined with “incrementing.column.name,” this property enables timestamp and incrementing mode for handling updates using a globally unique ID that can be assigned a unique stream offset.
- “output.data.format”: This property specifies the preferred output format for Kafka records (data coming from connector). Acceptable formats include AVRO, JSON SR, PROTOBUF, and JSON.
- “db.timezone“: It determines the timezone of the database. UTC is the default timezone.
Load Data from PostgreSQL or Kafka within Minutes
No credit card required
Step 3: Load the Properties file and Run the Connector
To load the configuration and start the connector, type the following command:
#Postgresql Kafka Connector---
confluent connect create --config postgres-source.json
Example Output:
#Postgresql Kafka Connector---
Created connector confluent-postgresql-source lcc-ix4dl
Step 4: Examine the Connector’s status
To check the connector status, use the following command:
#Postgresql Kafka Connector---
confluent connect list
Example Output:
lcc-ix4dl | confluent-postgresql-source | RUNNING | source
Limitations of PostgreSQL Kafka Connector
Here are some limitations you encounter while using the PostgreSQL Kafka Connector:
- PostgreSQL’s logical decoding feature is used by the connector. As a result, it does not capture DDL changes and cannot reflect these events in topics.
- In the Replication slots, the connector can only connect to the primary database instance.
- If the connector user has read-only access, you must manually create the replication slot and database publication.
- The connector will attempt to reconnect to the source. The connector defaults to making 16 attempts using the exponential backoff algorithm. The task will be marked as failed after the 16th failed attempt. In this case, you must manually restart it.
- Certain network access restrictions may exist depending on the service environment. Ascertain that the connector can reach your service. See Networking and DNS Considerations for more information.
- To use a schema-based message format, such as Avro, a valid schema must be available in the Confluent Cloud Schema Registry.
Conclusion
This article demonstrated how to implement the PostgreSQL Kafka Connector. It provided a piece of knowledge about Debezium and Confluent CLI concepts that helped you understand how to implement them.
The real question for most businesses is how to leverage and make sense of this data. Although many people are aware of Business Analytics Systems, in order to benefit from them, you must have consolidated data in a single repository, such as a Data Warehouse. Data replication from PostgreSQL or Kafka to a Data Warehouse necessitates the creation of ETL Pipelines, which in turn necessitate continuous monitoring and routine maintenance.
Try Hevo and see the magic for yourself. Sign up for a free 14-day trial to streamline your data integration process. You may examine Hevo’s pricing plans and decide on the best plan for your business needs.
Frequently Asked Questions
1. What is a Kafka connector?
A Kafka connector is a component used in Apache Kafka to integrate Kafka with various data sources or sinks, enabling data to be ingested into or extracted from Kafka topics.
2. What is PostgreSQL connector?
A PostgreSQL connector is a software library or component used to connect and interact with a PostgreSQL database from various applications or systems.
3. How to enable CDC in PostgreSQL?
Change Data Capture (CDC) is a technique used to track and capture changes made to data in a database. In PostgreSQL, CDC can be enabled using logical replication, which allows you to stream changes made to the database.
Davor DSouza is a data analyst with a passion for using data to solve real-world problems. His experience with data integration and infrastructure, combined with his Master's in Machine Learning, equips him to bridge the gap between theory and practical application. He enjoys diving deep into data and emerging with clear and actionable insights.