Processing large volumes of data efficiently is critical for many modern applications. Kafka provides an excellent publish-subscribe messaging system for handling real-time data feeds, but its batch-processing capabilities are less well-known. Integrating Kafka batch processing with Spring Boot’s strong foundations for building robust production-grade services can yield highly scalable data pipelines.

In this concise 4-step guide, we will show you how to rapidly create a Spring Boot based batch processing application for Kafka.

Whether you need to periodically crunch large log files, sync databases, or perform ETL jobs, following these simple steps will get you quickly up and running with your own Kafka powered batch processing service.

Steps to implement Kafka batch processing

Kafka batch processing: using spring boot + Kafka for batch processing | Hevo data
Image Source: raw.githubusercontent.com

Step 1: Install Maven dependencies

The Apache mavens are used to manage project dependencies of Kafka batch processing. Use the following code to update maven dependencies.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>3.8.5</modelVersion>
    <groupId>com.Hevo.spring.kafka</groupId>
    <artifactId>listener-batch</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <url>http://hevodata.com</url>
    <description>Kafka Batch Processing using Spring</description>
    <name>Kafka Batch Processing - ${project.artifactId}</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>parent-strater-spring-boot</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.9</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-kafka.version>2.1.5.RELEASE</spring-kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>starter-spring-boot</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>

        <!-- testing -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>test-starter-spring-boot</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>test-spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>maven-plugin-spring-boot</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Step 2: Sending a message to Kafka

We will use the BatchListener to receive batch messages and create the Kafka batch processing.

Step 2.1: Configuring a Batch Listener

The @KafkaListener methods can receive the entire batch of consumer records for the Kafka batch process. The concurrentKafkaListenerContainerFactory parameter of the batch listener needs to be set to true to configure the batch listener to be able to support Kafka batch processing.

You can also create BatchErrorHandler and configure other parameters that provide you with the configuration capability of Batch Error Handler for Kafka batch processing.

You can also configure the batch size of Spring Kafka for Kafka batch processing by using the ConsumerConfig.MAX_POLL_RECORDS_CONFIG parameter. In this example of Kafka batch processing, we will use size 5.

package com.memorynotfound.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.BatchLoggingErrorHandler;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class ListenerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrap_Servers;

    @Bean
    public Map<String, Object> consumer_Configs() {
        Map<String, Object> prop = new HashMap<>();
        prop.put(Consumer_Config.BOOTSTRAP_SERVERS_CONFIG, bootstrap_Servers);
        prop.put(Consumer_Config.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(Consumer_Config.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(Consumer_Config.GROUP_ID_CONFIG, "batch");
        prop.put(Consumer_Config.MAX_POLL_RECORDS_CONFIG, "5");
        return prop;
    }

    @Bean
    public Consumer_Factory<String, String> consumer_Factory() {
        return new DefaultKafka_Consumer_Factory<>(consumer_Configs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.getContainerProperties().setBatchErrorHandler(new BatchLoggingErrorHandler());
        return factory;
    }

}

Step 2.2: Batch Recieve Kafka Messages

We will update the receive() method so that a list of messages can be accepted for the Kafka batch processing. While receiving the batch messages for Kafka batch processing, a list of headers is also needed for performing Kafka batch processing.

package com.memorynotfound.kafka.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class Listener {

    private static final Logger LOG = LoggerFactory.getLogger(Listener.class);

    @KafkaListener(id = "listener-batch", topics = "${app.topic.batch}")
    public void receive(@Payload List<String> messages,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {

        LOG.info("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -");
        LOG.info("Starting the process to recieve batch messages");

        for (int i = 0; i < messages.size(); i++) {

            LOG.info("received message='{}' with partition-offset='{}'",
                    messages.get(i), partitions.get(i) + "-" + offsets.get(i));

        }
        LOG.info("all the batch messages are consumed");
    }

}

Step 3: Configure the application

We will create an application.yml file for the Kafka batch processing and the properties will be configured for the spring boot.

spring:
  kafka:
    bootstrap-servers: localhost:9092

app:
  topic:
    foo: foo.t

logging:
  level:
    root: ERROR
    org.springframework.web: ERROR
    com.memorynotfound: DEBUG

Step 4: Running the Application

The application is running on the localhost 9092. The Kafka batch processing is done using this application.

package com.memorynotfound.kafka;

import com.memorynotfound.kafka.producer.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaApplication.class, args);
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... strings) throws Exception {
        for (int i = 1; i < 13; i++){
            sender.send("message-" + i);
        }
    }

The output of Kafka batch processing Application:

  .   ____          _            __ _ _
 / / ___'_ __ _ _(_)_ __  __ _    
( ( )___ | '_ | '_| | '_ / _` |    
 /  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |___, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::       (v2.0.0.RELEASE)

Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
sending message='message-1' to topic='batch.t'
sending message='message-2' to topic='batch.t'
sending message='message-3' to topic='batch.t'
sending message='message-4' to topic='batch.t'
sending message='message-5' to topic='batch.t'
sending message='message-6' to topic='batch.t'
sending message='message-7' to topic='batch.t'
sending message='message-8' to topic='batch.t'
sending message='message-9' to topic='batch.t'
sending message='message-10' to topic='batch.t'
sending message='message-11' to topic='batch.t'
sending message='message-12' to topic='batch.t'
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
beginning to consume batch messages
received message='message-1' with partition-offset='0-295'
received message='message-2' with partition-offset='0-296'
received message='message-3' with partition-offset='0-297'
received message='message-4' with partition-offset='0-298'
received message='message-5' with partition-offset='0-299'
all batch messages consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
beginning to consume batch messages
received message='message-6' with partition-offset='0-300'
received message='message-7' with partition-offset='0-301'
received message='message-8' with partition-offset='0-302'
received message='message-9' with partition-offset='0-303'
received message='message-10' with partition-offset='0-304'
all batch messages consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
beginning to consume batch messages
received message='message-11' with partition-offset='0-305'
received message='message-12' with partition-offset='0-306'
all batch messages consumed

The Pros and Cons of Kafka Batch Processing using Spring Boot

Pros:

  • No Excessive Byte Copying: Excessive byte processing often results in inefficiency. When there are fewer messages, this does not cause a problem, but under load, the impact can be significant. Kafka avoids this by employing a standardized binary message format that is shared between the consumer, broker, and producer so that the data chunks are migrated without any modification.
  • Batches I/O operations: To mitigate numerous small I/O operations between the client and server, as well as within the server’s persistent operations, the Kafka protocol utilizes a “message set” abstraction to naturally group messages. This approach enables network requests to bundle messages together, reducing the overhead of network roundtrips compared to sending individual messages.
  • End-to-end batch compression: When network bandwidth becomes a bottleneck in data pipelines spanning data centers, Kafka addresses this challenge by supporting the compression of message batches using an efficient batching format. Unlike compressing messages individually, Kafka allows grouping, compressing, and sending batches of messages together, enhancing compression ratios. The broker validates and decompresses the batch before writing it to disk in compressed form, ensuring data integrity. The compressed batch is then transmitted to consumers, who decompress the data upon receipt. This approach optimizes compression efficiency and network utilization in scenarios where network bandwidth is a limiting factor.

Cons:

  • Complexity: Integrating Kafka and Spring Boot adds complexity to your application compared to simpler messaging solutions. It requires understanding both technologies and their configuration options.
  • Latency: While parallel processing can improve throughput, there may be some additional latency introduced by batching messages and waiting for the entire batch to be processed before committing offsets.
  • Limited stream processing: Kafka batch processing is not suitable for real-time stream processing where immediate reaction to each message is required.
  • Data consistency: Depending on your implementation, there might be potential for data inconsistencies if errors occur during batch processing and retries are involved.
Leverage Real-Time Data Processing with Hevo’s No-code Data Pipeline

Hevo is the only real-time ELT No-code Data Pipeline platform that cost-effectively automates data pipelines that are flexible to your needs. With integration with 150+ Data Sources (40+ free sources), we help you not only export data from sources & load data to the destinations but also transform & enrich your data, & make it analysis-ready.

Start for free now!

Get Started with Hevo for Free

Conclusion

Batch Processing is an important technique that helps enterprises to understand the data as well as process huge volumes of it efficiently. Kafka is a leading event streaming software that is used by many companies with even fortune 500 enterprises using its features. In this article, a comprehensive guide on Batch processing and Kafka was given along with the steps to set up Kafka Batch processing with the help of Spring Boot. You will also find examples that will help you better understand the process of batch streaming and implementing Kafka batch processing.

Get a better understanding of data jobs and Kakfa with these essential reads:

There are various trusted sources like Kafka that companies use as it provides many benefits, but, transferring data from it into a data warehouse is a hectic task. The automated data pipeline helps solve this issue, which is where Hevo comes into the picture.

visit our website to explore hevo

SIGN UP for a 14-day free trial and see the difference! Check out our pricing plans to choose the best plan for your requirements.

Share your experience of learning about Kafka batch processing in the comments section below.

mm
Former Research Analyst, Hevo Data

Arsalan is a data science enthusiast with a keen interest towards data analysis and architecture and is interested in writing highly technical content. He has experience writing around 100 articles on various topics related to data industry.

No-code Data Pipeline For your Data Warehouse

Get Started with Hevo