Do you want to transfer your data to Amazon S3 using Kafka? Are you finding it challenging to connect Kafka to S3? If yes, then you’ve landed at the right place! This article will answer all your queries & relieve you of the stress of finding a truly efficient solution. Follow our easy step-by-step guide to help you master the skill of efficiently transferring your data to Amazon S3 using Kafka.

It will help you take charge in a hassle-free way without compromising efficiency. This article aims at making the data export process as smooth as possible.

Upon a complete walkthrough of the content, you will be able to successfully set up a connection between Kafka & Amazon S3 to seamlessly transfer data to Amazon S3 for a fruitful analysis in real time. It will further help you build a customized ETL pipeline for your organization. Through this article, you will get a deep understanding of the tools and techniques & thus, it will help you hone your skills further.

What is Kafka?

Kafka to S3: Kafka Logo.

Apache Kafka is a popular real-time data streaming software that allows users to store, read and analyze streaming data using its open-source framework. Being open-source, it is available free of cost to users. Leveraging its distributed nature, users can achieve high throughput, minimal latency, computation power, etc. and handle large volumes of data with ease.

Written in Scala, Kafka supports bringing in data from a large variety of sources and stores them in the form of “topics” by processing the information stream. It uses two functions, namely Producers, which act as an interface between the data source and Kafka Topics, and Consumers, which allow users to read and transfer the data stored in Kafka.

Key features of Kafka:

  • Scalability: Kafka has exceptional scalability and can be scaled easily without downtime.
  • Data Transformation: Kafka offers KStream and KSQL (in case of Confluent Kafka) for on the fly data transformation.
  • Fault-Tolerant: Kafka uses brokers to replicate data and persists the data to make it a fault-tolerant system.
  • Security: Kafka can be combined with various security measures like Kerberos to stream data securely.
  • Performance: Kafka is distributed, partitioned, and has very high throughput for publishing and subscribing to the messages.

For further information on Kafka, you can check the official website here.

What is Amazon S3?

Kafka to S3: S3 Logo

Amazon S3 (Simple Storage Service) is a highly scalable cloud-based storage service provided by Amazon. It allows users to create online backups of their data from numerous data sources, allowing them to store data up to 5 TB in size. Amazon S3 provides users with object-based data storage functionality and lets them store data in Amazon S3 buckets, ensuring 99.999999999% of data durability and 99.99% object availability. 

It stores data in the form of objects, with each of them consisting of files along with their metadata and lets users select the kind of storage class they want to use, choosing between S3 Standard, Infrequent Access, and Glacier. Amazon S3 houses an easy-to-use platform and provides exceptional support for numerous programming languages such as Java, Python, Scala, etc. and lets users transfer data to Amazon S3 buckets by leveraging the S3 APIs and various other ETL tools, connectors, etc.

For further information on Amazon S3, you can check the official website here.

Integrate Your Kafka & S3 ETL Using Hevo’s No-code Data Pipeline

Hevo Data, an Automated No-code Data Pipeline helps you directly transfer data from Kafka and S3 to Data Warehouses, Databases, or any other destination of your choice in a completely hassle-free manner. Hevo initializes a connection with Kafka Bootstrap Servers and collects the data stored in their Topics & Clusters. Hevo offers native support for Data Streaming & File Storage platforms and lets you load/stream data straight from your S3 Buckets and Kafka Topics. Moreover, S3 stores its files after compressing them into a Gzip format. Hevo’s Data pipeline automatically unzips any Gzipped files on ingestion and also performs file re-ingestion in case there is any data update.

Hevo is fully managed and completely automates the process of not only loading data from 100+ data sources (including 40+ free sources) sources but also enriching the data and transforming it into an analysis-ready form without having to write a single line of code. Its fault-tolerant architecture ensures that the data is handled in a secure and flexible manner with zero data loss. Hevo’s consistent & reliable solution to manage data in real-time allows you to focus more on Data Analysis, instead of Data Consolidation. 

ETL your data from Kafka & S3 to your destination warehouse with Hevo’s easy-to-setup and No-code interface. Try our 14-day full-access free trial!

Sign up here for a 14-Day Free Trial!

Prerequisites

To connect Kafka to S3 you must have:

  • Working knowledge of Kafka.
  • Working knowledge of Amazon S3.
  • A general idea of ETL.
  • A general idea of APIs.
  • Kafka installed at the host workstation.
  • An Amazon S3 account & bucket. 

Steps for Connecting Kafka to S3

Using Confluent’s in-built Kafka S3 connector to load data from KafkatoS3 is one such way. Kafka allows users to transfer their data to a destination of their choice such as AmazonS3 by using one of the connectors provided by Confluent Hub. This method requires you to install the KafkaS3 connector and then transfer data from Kafka to an AmazonS3 Bucket by making REST API calls.

Kafka supports connecting with Amazon S3 and numerous other databases/data warehouses with the help of various in-built connectors. These connectors help bring in data from a source of your choice to Kafka and then stream it to the destination of your choice from Kafka Topics.

Kafka to S3: Kafka to S3 Connector.

You can connect Kafka to S3 using the following steps:

Solve your data replication problems with Hevo’s reliable, no-code, automated pipelines with 150+ connectors.
Get your free trial right away!

Step 1: Installing Kafka on your Workstation

To connect Kafka to S3, you will have to download and install Kafka, either on standalone or distributed mode. You can check out the following links & follow Kafka’s official documentation, that will help you get started with the installation process:

Step 2: Installing the Amazon S3 Sink Connector for Kafka

Confluent provides users with a diverse set of in-built connectors that act as a source or a sink and allow users to transfer data to the destination of their choice such as Amazon S3 from their desired data source via Kafka. One such connector that lets users connect Kafka to S3 is the Confluent Kafka S3 connector.

To install the Kafka S3 connector, go to Confluent Hub’s official website and search for Amazon S3 using the search bar found at the top of your screen. In case you’re finding it challenging to locate the correct connector, you can click here to go to the connector’s page directly.

Kafka to S3: Kafka S3 Connector Page.

Once you’ve found the desired Kafka S3 connector, you can now download the connector by executing the following command on the Confluent CLI:

confluent-hub install confluentinc/kafka-connect-s3:5.5.2

In case you don’t have the Confluent CLI installed on your system, you can click here and follow Confluent’s official documentation that will help you install and configure the Confluent CLI. 

You can also click on the download button to install the connector. Once you’ve clicked on it, a zip file will now start downloading on your system. Extract the zip file and copy all jar files, found in the lib folder to your Confluent installation.

This is how you can install the Kafka S3 connector that will help you connect Kafka to S3.

Step 3: Starting the Kafka, Zookeeper & Connect Server

Once you’ve installed the Kafka S3 connector, you now need to start Zookeeper, Kafka and Shema registry. To do this, execute the following lines of code on different terminals:

./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

./bin/kafka-server-start  ./etc/kafka/server.properties

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

In case you’re running Confluent Platform v5.3 or above, you can also use the following line of code:

confluent local start connect

This is how you can start the Kafka, Zookeeper & Connect server.

Step 4: Ingesting Data into Kafka 

With Kafka servers now up & running on your system, you can start ingesting data from a source of your choice such as the sample public Meetup feed. To do this, you will have to create a Kafka Topic and then partition it into five portions for the locally running Kafka broker. You can do this using the following command:

bin/kafka-topics --zookeeper localhost:2181 --create --topic meetups --replication-factor 1 --partitions 5

Once you’ve created the Kafka Topic, use the curl command as follows to ingest events associated with the Meetup feed:

curl -s http://stream.meetup.com/2/rsvps | confluent local produce meetup

You can also verify if Kafka is writing the data correctly into the desired Kafka Topic using the following command:

confluent local consume meetups -- --from-beginning

This is how you can start ingesting data into Kafka Topics from a source of your choice such as the Meetup feed. 

Step 5: Setting up your Amazon S3 Bucket

Once you’ve made all the necessary configurations and successfully set up Kafka, you now need to create an Amazon S3 bucket. To create an Amazon S3 bucket, go to the official website of AWS Console for S3 and log in with your credentials such as username and password. 

Kafka to S3: AWS S3 Console Login Page.

Click on the create bucket option. A new dialogue box will now open up on your screen, where you need to provide a name for your Amazon S3 bucket and select a region for the same. You now need to set the appropriate permissions for your Amazon S3 bucket and then click on create.

Kafka to S3: Creating an S3 Bucket.

With your Amazon S3 bucket now ready, you need to configure the Kafka S3 connector to allow it to authenticate while establishing a connection. To do this, you will have to export your AWS credentials to set them up as follows:

export AWS_ACCESS_KEY_ID=your_Id
export AWS_SECRET_ACCESS_KEY=your_key

The Kafka S3 connector also houses a default credentials provider, available as a part of the AWS SDK. In case you want to modify the authentication, you can do so by adding the custom properties to the “s3.credentials.provider” class of your Kafka S3 connector.

This is how you can set up your Amazon S3 bucket to connect Kafka to S3.

Step 6: Enabling Amazon S3 Connector to Connect Kafka to S3

Once you’ve created your Amazon S3 bucket, you now need to enable the Kafka S3 connector. To do this, create a JSON property file with the following name:

Source_name-to-s3.json

Since the data source in use here is Meetup feeds, the file name would be:

meetups-to-s3.json

There are multiple ways in which the Kafka S3 connector can help you partition your records, such as Default, Field, Time-based, Daily partitioning, etc. You can click here to know more about how these partitionings work.

For example, if you want to partition your Amazon S3 records based on the Kafka record timestamp and then group them as time-based Amazon S3 objects, you can create the JSON property file as follows:

{                                                                                                                                                                                                                                                                   
  "name": "meetups-to-s3",                                                                                                                                                                                                                                          
  "config": {                                                                                                                                                                                                                                                       
                                                                                                                                                                                                                                                                    
    "_comment": "The S3 sink connector class",                                                                                                                                                                                                                      
    "connector.class":"io.confluent.connect.s3.S3SinkConnector",                                                                                                                                                                                                    
                                                                                                                                                                                                                                                                    
    "_comment": "The total number of Connect tasks to spawn (with implicit upper limit the number of topic-partitions)",                                                                                                                                            
    "tasks.max":"1",                                                                                                                                                                                                                                                
                                                                                                                                                                                                                                                                    
    "_comment": "Which topics to export to S3",                                                                                                                                                                                                                     
    "topics":"meetups",                                                                                                                                                                                                                                             
                                                                                                                                                                                                                                                                    
    "_comment": "The S3 bucket that will be used by this connector instance",                                                                                                                                                                                       
    "s3.bucket.name":"meetups",                                                                                                                                                                                                                               
                                                                                                                                                                                                                                                                    
    "_comment": "The AWS region where the S3 bucket is located",                                                                                                                                                                                                    
    "s3.region":"us-west-2",                                                                                                                                                                                                                                        
                                                                                                                                                                                                                                                                    
    "_comment": "The size in bytes of a single part in a multipart upload. The last part is of s3.part.size bytes or less. This property does not affect the total size of an S3 object uploaded by the S3 connector",                                             
    "s3.part.size":"5242880",                                                                                                                                                                                                                                       
                                                                                                                                                                                                                                                                    
    "_comment": "The maximum number of Kafka records contained in a single S3 object. Here a high value to allow for time-based partition to take precedence",                                                                                                                                                                              
    "flush.size":"100000",                                                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                    
    "_comment": "Kafka Connect converter used to deserialize keys (unused in this example)",                                                                                                                                                                        
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",                                                                                                                                                                                                  
    "key.converter.schemas.enable":"false",                                                                                                                                                                                                                         
                                                                                                                                                                                                                                                                    
    "_comment": "Kafka Connect converter used to deserialize values",                                                                                                                                                                                               
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",                                                                                                                                                                                                
    "value.converter.schemas.enable":"false",                                                                                                                                                                                                                       
                                                                                                                                                                                                                                                                    
    "_comment": "The type of storage for this storage cloud connector",                                                                                                                                                                                             
    "storage.class":"io.confluent.connect.s3.storage.S3Storage",                                                                                                                                                                                                    
                                                                                                                                                                                                                                                                    
    "_comment": "The storage format of the objects uploaded to S3",                                                                                                                                                                                                 
    "format.class":"io.confluent.connect.s3.format.json.JsonFormat",                                                                                                                                                                                                
                                                                                                                                                                                                                                                                    
    "_comment": "Schema compatibility mode between records with schemas (Useful when used with schema-based converters. Unused in this example, listed for completeness)",                                                                                          
    "schema.compatibility":"NONE",                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                                    
    "_comment": "The class used to partition records in objects to S3. Here, partitioning based on time is used.",                                                                                                                                                  
    "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",                                                                                                                                                                            
                                                                                                                                                                                                                                                                    
    "_comment": "The locale used by the time-based partitioner to encode the date string",                                                                                                                                                                          
    "locale":"en",
    
    "_comment": "Setting the timezone of the timestamps is also required by the time-based partitioner",                                                                                                                                                            
    "timezone":"UTC",                                                                                                                                                                                                                                               
                                                                                                                                                                                                                                                                    
    "_comment": "The date-based part of the S3 object key",                                                                                                                                                                                                         
    "path.format":"'date'=YYYY-MM-dd/'hour'=HH",                                                                                                                                                                                                                    
                                                                                                                                                                                                                                                                    
    "_comment": "The duration that aligns with the path format defined above",                                                                                                                                                                                      
    "partition.duration.ms":"3600000",                                                                                                                                                                                                                              
                                                                                                                                                                                                                                                                    
    "_comment": "The interval between timestamps that is sufficient to upload a new object to S3. Here a small interval of 1min for better visualization during the demo",                                                                                          
    "rotate.interval.ms":"60000",                                                                                                                                                                                                                                   
                                                                                                                                                                                                                                                                    
    "_comment": "The class to use to derive the timestamp for each record. Here Kafka record timestamps are used",                                                                                                                                                  
    "timestamp.extractor":"Record"                                                                                                                                                                                                                                  
  }                                                                                                                                                                                                                                                                 
} 

To start the Kafka S3 connector, use the Confluent CLI to make a REST API call as follows:

confluent local load meetups-to-s3 -- -d ./meetups-to-s3.json

You can also use the curl command to perform the same operation as follows:

confluent local load meetups-to-s3 -- -d ./meetups-to-s3.json

You can now check the logs of your Kafka S3 connector to verify that the connector is functioning correctly. 

Checking S3 Connector Logs.

You’ll also be able to see that the Kafka S3 connector publishes a new object in your Amazon S3 bucket almost every minute, starting with the records that were already present in Kafka.

Kafka to S3: Checking S3 Connector publishing new objects.

You can further verify that the Kafka S3 connector creates a new object every minute by checking your Amazon S3 bucket using the AWS Console.

Kafka to S3: Checking Amazon S3 Bucket for new records.

This is how you can use the Kafka S3 connector to establish a connection from Kafka to S3.  

Limitations of Using the Kafka S3 Connector

Connecting Kafka to S3 using the above method comes along with the following limitations:

  • The Kafka S3 sink connector can suffer from a high consumer lag in case you have the connector configured to consume a large number of Kafka Topics with numerous partitions. It further results in a poor throughout.
  • A feedback loop can occur in case you end up connecting your sink connector to Kafka Topics your source connector is writing data to, which leads to an increase in the number of duplicate records.
  • Using the Kafka S3 connector requires you to write custom code and make API calls and, hence you must have strong technical knowledge. 

Conclusion

This article teaches you how to connect Kafka to S3 with ease. It provides in-depth knowledge about the concepts behind every step to help you understand and implement them efficiently. The mentioned method can be challenging, especially for a beginner, as writing code to ETL unstructured data can be quite tasking and resource-intensive.

Hevo can abstract all the above challenges by letting users ETL data in minutes and without code. Hevo’s Data Pipeline offers a much faster and easy to set up No-code alternative for unifying data from Kafka and S3. Hevo caters to 150+ data sources (including 40+ free sources) and can seamlessly transfer your Kafka and S3 data to the Data Warehouse of your choice in real time. Hevo’s Data Pipeline enriches your data and manages the transfer process in a fully automated and secure manner. It will make your life easier and make data migration hassle-free.

Learn more about Hevo

Want to take Hevo for a spin? Sign up for a 14-day free trial and experience the feature-rich Hevo suite firsthand.

Tell us about your experience of connecting Kafka to S3! Share your thoughts in the comments section below.

Divij Chawla
Former Marketing Operations and Analytics Manager, Hevo Data

Divij Chawla worked with Hevo in driving the Marketing Operations and Analytics team. He has a keen interest in data analysis, data, software architecture, and the creation of technical content related to the realm.

No-code Data Pipeline For Amazon S3