Do you want to use Kafka to monitor PostgreSQL data? But, some bottlenecks are making it difficult for you to connect PostgreSQL to Kafka? So look no further — you are at the right place!
This article will answer all of your questions and take the stress out of finding a truly efficient solution. Follow this step-by-step guide to seamlessly connect PostgreSQL to Kafka.
After reviewing this blog thoroughly, you will be able to connect PostgreSQL to Kafka and transfer data to the destination of your choice for real-time analysis. This article will give you a thorough understanding of the platforms in question as well as their key features.
Let’s begin…
What is PostgreSQL?
PostgreSQL is an open-source, free-to-use relational database management platform. It supports SQL and is highly extensible. Postgres uses more traditional structuring than MongoDB, which stores data in traditional tables.
It follows the monolithic methodology where all the components work in a united manner, that follows symmetry. Because it is based on the C programming language, it has widespread community support. Being an open-source solution also makes a significant contribution to the cause of constant updates.
One significant difference between PostgreSQL and standard relational database systems that makes it highly extensible is that PostgreSQL stores massive amounts of data in its catalogs. It saves information about data types, functions, access methods, and so on.
Key Features of PostgreSQL
PostgreSQL has become one of the most popular Database Management Systems due to the following appealing features:
- Data Integrity: PostgreSQL ensures data integrity with primary keys, foreign keys, explicit locks, advisory locks, and exclusion constraints.
- Multiple Data Types: PostgreSQL supports a wide range of data types. It supports a variety of data types, including INTEGER, NUMERIC, BOOLEAN, CHAR, VARCHAR, DATE, INTERVAL, TIMESTAMP, and so on.
- Data Security: Data is ensured by several layers of data authentication and protection. It supports various authentication methods, such as Lightweight Directory Access.
Providing a high-quality ETL solution can be a difficult task if you have a large volume of data. Hevo’s Automated, No-Code Platform empowers you with everything you need to have for a smooth data replication experience.
Check out what makes Hevo amazing:
- Fully Managed: Hevo requires no management and maintenance as it is a fully automated platform.
- Data Transformation: Hevo provides a simple interface to perfect, modify, and enrich the data you want to transfer.
- Faster Insight Generation: Hevo offers near real-time data replication so you have access to real-time insight generation and faster decision making.
Sign up here for a 14-Day Free Trial!
What is Apache Kafka?
Apache Kafka is a free and open-source Distributed Streaming Platform that enables the creation of Real-Time Event-Driven Applications. It enables developers to create applications that use a Message Broker to consistently produce and consume streams of data records.
Apache Kafka is extremely fast and ensures that all data records are accurate. These data records are kept in the order in which they appear within “Clusters,” which can span multiple Servers or even multiple Data Centers. Apache Kafka replicates and partitions these records so that a large number of users can use the application at the same time.
Key Features of Apache Kafka
The core capabilities of Apache Kafka are:
- Exceptional Throughput: Kafka uses a cluster of machines to deliver messages at high throughput with latencies as low as 2ms.
- Scalable: Kafka can handle thousands of brokers, trillions of messages per day, petabytes of data, and hundreds of thousands of partitions.
- Long-Term Storage: Kafka can securely store data streams in distributed, long-lasting, fault-tolerant cluster machines.
- Built-in Stream Processing: Kafka includes built-in stream processing features that process event streams in real-time using joins, aggregations, filters, transformations, and more.
- The interface of Connection: Kafka includes a connect interface that integrates with hundreds of event sources and sinks, including AWS S3, Postgres, JMS, Elasticsearch, and others.
- High Availability: Kafka is lightning fast and guarantees zero downtime, ensuring that your data is always accessible.
- Libraries for Clients: Kafka can read, write, and process event streams in a variety of programming languages, including Python, Java, C/C++, Scala, and others.
Why is PostgreSQL Kafka Connector used?
The PostgreSQL Kafka Connector streams real-time data between a PostgreSQL database and an Apache Kafka cluster. Here are some key reasons why it is utilized:
- Real-Time Data Streaming: It enables capturing changes in a PostgreSQL database (using Change Data Capture or CDC) and streams these changes to Kafka topics in real time. This is particularly useful for applications that need to react to database changes immediately.
- Data Integration: The connector facilitates seamless integration between PostgreSQL and other systems or applications connected to Kafka. Data from PostgreSQL can be consumed by various consumers downstream, such as microservices, data warehouses, or analytics platforms.
- Scalability: Using Kafka as a buffer allows for scalable and distributed processing of PostgreSQL data. This helps in handling large volumes of data across different environments.
- Fault Tolerance: Kafka provides intense durability and fault tolerance guarantees. By using the PostgreSQL Kafka Connector, you can ensure that changes to the PostgreSQL database are reliably delivered even when the database fails.
Connect PostgreSQL to BigQuery
Connect Kafka to PostgreSQL
Connect PostgreSQL to Snowflake
Connecting PostgreSQL to Kafka
Debezium is an open-source platform that builds on Change Data Capture features found in various databases. It includes a set of Kafka Connect connectors that listen for row-level changes (via CDC) in database table(s) and convert them to event streams. Apache Kafka receives these event streams. Below you’ll discover two methods of how you can use PostgreSQL Kafka Connector.
Method 1: Using Debezium as a PostgreSQL Kafka Connector
To capture data change, the following technologies will be used.
- Apache Kafka: It will be used to create a messaging topic that will store the database changes.
- Kafka Connect: It is a tool that allows for scalable and dependable data streaming between Apache Kafka and other systems. It is used to specify connectors that can move data from entire databases into and out of Kafka.
- Debezium: It is a tool that is used to convert WALs into data streams by utilizing the best underlying mechanism provided by the database system. The database data is then streamed into Kafka via the Kafka Connect API.
Debezium uses the PostgreSQL logical decoding feature to extract all persistent changes to the database in a simple format that can be interpreted without detailed knowledge of the database’s internal state.
Debezium uses the Kafka Connect API to register itself as one of the connectors of a data source once the changed data is available in an understandable format. Debezium uses checkpoints to read only committed data from the transaction log.
Step 1: Running Multiple Instances for PostgreSQL Kafka Connector
Here you’ll start the PostgreSQL database and assign it a port number of 5000 to the system. You also need to start zookeeper, which Apache Kafka uses to store consumer offsets. Finally, you can launch a debezium instance and connect it to the existing containers, namely Postgres, Kafka, and Zookeeper. The linking will aid communication between containers.
Launch a PostgreSQL instance
#Postgresql Kafka Connector--
docker run — name postgres -p 5000:5432 debezium/postgres
docker run
starts a new container using Docker.
— name postgres
names the container “postgres” for easy reference.
-p 5000:5432
maps port 5432 of the PostgreSQL service inside the container to port 5000 on the host machine, allowing external access.
debezium/postgres
specifies the Docker image to use, which is a PostgreSQL image configured by Debezium for Kafka integration.
- This command starts a PostgreSQL container, making it accessible via port 5000 on the host.
Launch a Zookeeper instance
docker run -it — name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper
docker run -it
runs a new Docker container in interactive mode with terminal access.
— name zookeeper
names the container “zookeeper” for easy identification.
-p 2181:2181 -p 2888:2888 -p 3888:3888
maps Zookeeper’s internal ports (2181, 2888, 3888) to the same ports on the host, allowing network access.
debezium/zookeeper
specifies the Docker image, which includes Zookeeper configured for Kafka.
- This command starts a Zookeeper container, set up to communicate through essential ports for Kafka integration.
Begin a Kafka instance
docker run -it — name kafka -p 9092:9092 — link zookeeper:zookeeper debezium/kafka
docker run -it
starts a new Docker container in interactive mode with terminal access.
— name kafka
names the container “kafka” for easy reference.
-p 9092:9092
maps Kafka’s internal port 9092 to the same port on the host, allowing external access.
— link zookeeper:zookeeper
links this Kafka container to a running Zookeeper container, enabling them to communicate.
debezium/kafka
specifies the Kafka Docker image from Debezium for setting up a Kafka server.
Begin a Debezium instance
#Postgresql Kafka Connector---
docker run -it — name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3 -d’/’ | cut -f1 -d’:’) — link zookeeper:zookeeper — link postgres:postgres — link kafka:kafka debezium/connect
docker run -it
starts a new container in interactive mode with terminal access.
— name connect
names the container “connect.”
-p 8083:8083
makes the container’s port 8083 accessible on the host, allowing access to Kafka Connect’s API.
- Environment variables like
GROUP_ID
and CONFIG_STORAGE_TOPIC
configure Kafka Connect’s group and storage settings.
— link
connects this container to other containers (Zookeeper, PostgreSQL, and Kafka), enabling communication between them.
Connect to PostgreSQL and create a tracking database
#Postgresql Kafka Connector---
psql -h localhost -p 5000 -U postgres
CREATE DATABASE inventory;
CREATE TABLE dumb_table(id SERIAL PRIMARY KEY, name VARCHAR);
Once the setup is complete; all that remains is to register a PostgreSQL Kafka Connector with Kafka Connect.
Step 2: Connect the PostgreSQL Kafka Connector
#Postgresql Kafka Connector---
curl -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d ‘
{
“name”: “inventory-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: “1”,
“database.hostname”: “postgres”,
“database.port”: “5432”,
“database.user”: “postgres”,
“database.password”: “postgres”,
“database.dbname” : “inventory”,
“database.server.name”: “dbserver1”,
“database.whitelist”: “inventory”,
“database.history.kafka.bootstrap.servers”: “kafka:9092”,
“database.history.kafka.topic”: “schema-changes.inventory”
}
}’
Now you can start the Kafka console and you can monitor the data you get from PostgreSQL. Now, use the PSQL CLI to perform some SQL inserts, updates, and deletes. In the console consumer, you will see some JSON-like output.
Load your Data from Source to Destination within minutes
No credit card required
Method 2: Using Confluent CLI as a PostgreSQL Kafka Connector
You can use Confluent CLI to set up PostgreSQL to Kafka. Here are steps to get started:
Step 1: Set up the Connector Configuration
To display the required connector properties, run the following command:
confluent connect plugin describe PostgresSource
Step 2: Create the connector configuration file
Create a JSON file with the connector configuration properties. The required connector properties are illustrated in the following example.
#Postgresql Kafka Connector---
{
"name" : "confluent-postgresql-source",
"connector.class": "PostgresSource",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.secret" : "<my-kafka-api-secret>",
"topic.prefix" : "postgresql_",
"ssl.mode" : "prefer",
"connection.host" : "<my-database-endpoint>",
"connection.port" : "5432",
"connection.user" : "postgres",
"connection.password": "<my-database-password>",
"db.name": "postgres",
"table.whitelist": "passengers",
"timestamp.column.name": "created_at",
"output.data.format": "JSON",
"db.timezone": "UTC",
"tasks.max" : "1"
}
Please take note of the following property definitions:
- “name“: Gives your connector a name.
- “connector.class“: Identifies the name of the connector plugin.
- “kafka.auth.mode“: Specifies which connector authentication mode to use. There are two choices: KAFKA API KEY or SERVICE ACCOUNT (the default). To use an API key and secret, set the configuration properties kafka.api.key and kafka.api.secret to the values shown in the example configuration (above). To make use of a service account, enter the Resource ID in the property.
- “topic.prefix”: The connector generates Kafka topics automatically using the naming convention: prefix>.table-name>. Topic.creation.default.partitions=1 and topic.creation.default.replication.factor=3 are used to create the tables. If you want to create topics with specific settings, do so before running this connector.
- “SSL mode”: This allows you for an encrypted connection to the MySQL database server. The name of your PostgreSQL database is referenced by “db.name.”
- “timestamp.column.name”: A method for reading data from PostgreSQL tables. It detects new and modified rows by using a timestamp column. When combined with “incrementing.column.name,” this property enables timestamp and incrementing mode for handling updates using a globally unique ID that can be assigned a unique stream offset.
- “output.data.format”: This property specifies the preferred output format for Kafka records (data coming from connector). Acceptable formats include AVRO, JSON SR, PROTOBUF, and JSON.
- “db.timezone“: It determines the timezone of the database. UTC is the default timezone.
Step 3: Load the Properties file and Run the Connector
To load the configuration and start the connector, type the following command:
#Postgresql Kafka Connector---
confluent connect create --config postgres-source.json
Example Output:
#Postgresql Kafka Connector---
Created connector confluent-postgresql-source lcc-ix4dl
Step 4: Examine the Connector’s status
To check the connector status, use the following command:
#Postgresql Kafka Connector---
confluent connect list
Example Output:
lcc-ix4dl | confluent-postgresql-source | RUNNING | source
Limitations of PostgreSQL Kafka Connector
Here are some limitations you encounter while using the PostgreSQL Kafka Connector:
- PostgreSQL’s logical decoding feature is used by the connector. As a result, it does not capture DDL changes and cannot reflect these events in topics.
- In the Replication slots, the connector can only connect to the primary database instance.
- If the connector user has read-only access, you must manually create the replication slot and database publication.
- The connector will attempt to reconnect to the source. The connector defaults to making 16 attempts using the exponential backoff algorithm. The task will be marked as failed after the 16th failed attempt. In this case, you must manually restart it.
- Certain network access restrictions may exist depending on the service environment. Ascertain that the connector can reach your service. See Networking and DNS Considerations for more information.
- To use a schema-based message format, such as Avro, a valid schema must be available in the Confluent Cloud Schema Registry.
Conclusion
This article demonstrated how to implement the PostgreSQL Kafka Connector. It provided a piece of knowledge about Debezium and Confluent CLI concepts that helped you understand how to implement them.
The real question for most businesses is how to leverage and make sense of this data. Although many people are aware of Business Analytics Systems, in order to benefit from them, you must have consolidated data in a single repository, such as a Data Warehouse. Data replication from PostgreSQL or Kafka to a Data Warehouse necessitates the creation of ETL Pipelines, which in turn necessitate continuous monitoring and routine maintenance.
Try Hevo and see the magic for yourself. Sign Up here for a 14-day full-feature access trial and experience the feature-rich Hevo suite first hand. You can also check Hevo’s pricing and make a decision on your best-suited plan.
Have any questions on Hevo Data Pipelines? Let us know in the comment section below. We would love to help you out.
Frequently Asked Questions
1. What is a Kafka connector?
A Kafka connector is a component used in Apache Kafka to integrate Kafka with various data sources or sinks, enabling data to be ingested into or extracted from Kafka topics.
2. What is PostgreSQL connector?
A PostgreSQL connector is a software library or component used to connect and interact with a PostgreSQL database from various applications or systems.
3. How to enable CDC in PostgreSQL?
Change Data Capture (CDC) is a technique used to track and capture changes made to data in a database. In PostgreSQL, CDC can be enabled using logical replication, which allows you to stream changes made to the database.
Davor DSouza is a data analyst with a passion for using data to solve real-world problems. His experience with data integration and infrastructure, combined with his Master's in Machine Learning, equips him to bridge the gap between theory and practical application. He enjoys diving deep into data and emerging with clear and actionable insights.