Processing large volumes of data efficiently is critical for many modern applications. Kafka provides an excellent publish-subscribe messaging system for handling real-time data feeds, but its batch-processing capabilities are less well-known. Integrating Kafka batch processing with Spring Boot’s strong foundations for building robust production-grade services can yield highly scalable data pipelines.

In this concise 4-step guide, we will show you how to rapidly create a Spring Boot based batch processing application for Kafka.

Whether you need to periodically crunch large log files, sync databases, or perform ETL jobs, following these simple steps will get you quickly up and running with your own Kafka powered batch processing service.

Integrate Kafka to BigQuery
Integrate Kafka to MySQL
Integrate Kafka to Redshift
Integrate Kafka to Snowflake

Steps to implement Kafka batch processing

Kafka batch processing: using spring boot + Kafka for batch processing | Hevo data
Image Source: raw.githubusercontent.com

Step 1: Install Maven dependencies

The Apache mavens are used to manage project dependencies of Kafka batch processing. Use the following code to update maven dependencies.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>3.8.5</modelVersion>
    <groupId>com.Hevo.spring.kafka</groupId>
    <artifactId>listener-batch</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <url>http://hevodata.com</url>
    <description>Kafka Batch Processing using Spring</description>
    <name>Kafka Batch Processing - ${project.artifactId}</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>parent-strater-spring-boot</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.9</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-kafka.version>2.1.5.RELEASE</spring-kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>starter-spring-boot</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>

        <!-- testing -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>test-starter-spring-boot</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>test-spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>maven-plugin-spring-boot</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Step 2: Sending a message to Kafka

We will use the BatchListener to receive batch messages and create the Kafka batch processing.

Step 2.1: Configuring a Batch Listener

The @KafkaListener methods can receive the entire batch of consumer records for the Kafka batch process. The concurrentKafkaListenerContainerFactory parameter of the batch listener needs to be set to true to configure the batch listener to be able to support Kafka batch processing.

You can also create BatchErrorHandler and configure other parameters that provide you with the configuration capability of Batch Error Handler for Kafka batch processing.

You can also configure the batch size of Spring Kafka for Kafka batch processing by using the ConsumerConfig.MAX_POLL_RECORDS_CONFIG parameter. In this example of Kafka batch processing, we will use size 5.

package com.memorynotfound.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.BatchLoggingErrorHandler;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class ListenerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrap_Servers;

    @Bean
    public Map<String, Object> consumer_Configs() {
        Map<String, Object> prop = new HashMap<>();
        prop.put(Consumer_Config.BOOTSTRAP_SERVERS_CONFIG, bootstrap_Servers);
        prop.put(Consumer_Config.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(Consumer_Config.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(Consumer_Config.GROUP_ID_CONFIG, "batch");
        prop.put(Consumer_Config.MAX_POLL_RECORDS_CONFIG, "5");
        return prop;
    }

    @Bean
    public Consumer_Factory<String, String> consumer_Factory() {
        return new DefaultKafka_Consumer_Factory<>(consumer_Configs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.getContainerProperties().setBatchErrorHandler(new BatchLoggingErrorHandler());
        return factory;
    }

}

Step 2.2: Batch Recieve Kafka Messages

We will update the receive() method so that a list of messages can be accepted for the Kafka batch processing. While receiving the batch messages for Kafka batch processing, a list of headers is also needed for performing Kafka batch processing.

package com.memorynotfound.kafka.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class Listener {

    private static final Logger LOG = LoggerFactory.getLogger(Listener.class);

    @KafkaListener(id = "listener-batch", topics = "${app.topic.batch}")
    public void receive(@Payload List<String> messages,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {

        LOG.info("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -");
        LOG.info("Starting the process to recieve batch messages");

        for (int i = 0; i < messages.size(); i++) {

            LOG.info("received message='{}' with partition-offset='{}'",
                    messages.get(i), partitions.get(i) + "-" + offsets.get(i));

        }
        LOG.info("all the batch messages are consumed");
    }

}

Step 3: Configure the application

We will create an application.yml file for the Kafka batch processing and the properties will be configured for the spring boot.

spring:
  kafka:
    bootstrap-servers: localhost:9092

app:
  topic:
    foo: foo.t

logging:
  level:
    root: ERROR
    org.springframework.web: ERROR
    com.memorynotfound: DEBUG

Step 4: Running the Application

The application is running on the localhost 9092. The Kafka batch processing is done using this application.

package com.memorynotfound.kafka;

import com.memorynotfound.kafka.producer.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaApplication.class, args);
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... strings) throws Exception {
        for (int i = 1; i < 13; i++){
            sender.send("message-" + i);
        }
    }

The output of Kafka batch processing Application:

  .   ____          _            __ _ _
 / / ___'_ __ _ _(_)_ __  __ _    
( ( )___ | '_ | '_| | '_ / _` |    
 /  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |___, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::       (v2.0.0.RELEASE)

Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
sending message='message-1' to topic='batch.t'
sending message='message-2' to topic='batch.t'
sending message='message-3' to topic='batch.t'
sending message='message-4' to topic='batch.t'
sending message='message-5' to topic='batch.t'
sending message='message-6' to topic='batch.t'
sending message='message-7' to topic='batch.t'
sending message='message-8' to topic='batch.t'
sending message='message-9' to topic='batch.t'
sending message='message-10' to topic='batch.t'
sending message='message-11' to topic='batch.t'
sending message='message-12' to topic='batch.t'
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
beginning to consume batch messages
received message='message-1' with partition-offset='0-295'
received message='message-2' with partition-offset='0-296'
received message='message-3' with partition-offset='0-297'
received message='message-4' with partition-offset='0-298'
received message='message-5' with partition-offset='0-299'
all batch messages consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
beginning to consume batch messages
received message='message-6' with partition-offset='0-300'
received message='message-7' with partition-offset='0-301'
received message='message-8' with partition-offset='0-302'
received message='message-9' with partition-offset='0-303'
received message='message-10' with partition-offset='0-304'
all batch messages consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
beginning to consume batch messages
received message='message-11' with partition-offset='0-305'
received message='message-12' with partition-offset='0-306'
all batch messages consumed
Leverage Real-Time Data Processing with Hevo’s No-code Data Pipeline

Hevo is the only real-time ELT No-code Data Pipeline platform that cost-effectively automates data pipelines that are flexible to your needs. With its industry-leading features:

  • With 150+ pre-built connectors, data integration is a breeze for users.
  • Easy to use interface with no need for any coding knowledge.
  • Supports both pre-load and post-load transformations.
  • Fault-tolerant architecture ensures no data is ever lost.
  • Round the clock support to solve all your queries.

Try Hevo today to experience a seamless data migration journey!

Get Started with Hevo for Free

The Pros and Cons of Kafka Batch Processing using Spring Boot

Pros:

  • No Excessive Byte Copying: Excessive byte processing often results in inefficiency. When there are fewer messages, this does not cause a problem, but under load, the impact can be significant. Kafka avoids this by employing a standardized binary message format that is shared between the consumer, broker, and producer so that the data chunks are migrated without any modification.
  • Batches I/O operations: To mitigate numerous small I/O operations between the client and server, as well as within the server’s persistent operations, the Kafka protocol utilizes a “message set” abstraction to naturally group messages. This approach enables network requests to bundle messages together, reducing the overhead of network roundtrips compared to sending individual messages.
  • End-to-end batch compression: When network bandwidth becomes a bottleneck in data pipelines spanning data centers, Kafka addresses this challenge by supporting the compression of message batches using an efficient batching format. Unlike compressing messages individually, Kafka allows grouping, compressing, and sending batches of messages together, enhancing compression ratios. The broker validates and decompresses the batch before writing it to disk in compressed form, ensuring data integrity. The compressed batch is then transmitted to consumers, who decompress the data upon receipt. This approach optimizes compression efficiency and network utilization in scenarios where network bandwidth is a limiting factor.

Cons:

  • Complexity: Integrating Kafka and Spring Boot adds complexity to your application compared to simpler messaging solutions. It requires understanding both technologies and their configuration options.
  • Latency: While parallel processing can improve throughput, there may be some additional latency introduced by batching messages and waiting for the entire batch to be processed before committing offsets.
  • Limited stream processing: Kafka batch processing is not suitable for real-time stream processing where immediate reaction to each message is required.
  • Data consistency: Depending on your implementation, there might be potential for data inconsistencies if errors occur during batch processing and retries are involved.

Conclusion

Batch Processing is an important technique that helps enterprises to understand the data as well as process huge volumes of it efficiently. Kafka is a leading event streaming software that is used by many companies with even fortune 500 enterprises using its features. In this article, a comprehensive guide on Batch processing and Kafka was given along with the steps to set up Kafka Batch processing with the help of Spring Boot. You will also find examples that will help you better understand the process of batch streaming and implementing Kafka batch processing.

There are various trusted sources like Kafka that companies use as it provides many benefits, but, transferring data from it into a data warehouse is a hectic task. The automated data pipeline helps solve this issue, which is where Hevo comes into the picture. Sign up for Hevo’s 14-day free trial and experience seamless data migration.

FAQs

Does Kafka Stream support batch processing?

No, Kafka Streams is designed for real-time, continuous stream processing and does not support traditional batch processing. It processes data in real-time as it arrives, offering low-latency processing. For batch processing with Kafka, tools like Apache Spark or Apache Flink are typically used.

What is the difference between spring batch and Kafka streams?

Spring Batch is a framework for batch processing large datasets through jobs, focusing on reading, processing, and writing. Kafka Streams is a library for real-time stream processing in Apache Kafka, supporting features like stateful processing and windowing. Spring Batch is for batches, while Kafka Streams is for real-time data.

Arsalan Mohammed
Research Analyst, Hevo Data

Arsalan is a research analyst at Hevo and a data science enthusiast with over two years of experience in the field. He completed his B.tech in computer science with a specialization in Artificial Intelligence and finds joy in sharing the knowledge acquired with data practitioners. His interest in data analysis and architecture drives him to write nearly a hundred articles on various topics related to the data industry.