Creating Spring Kafka Consumer Applications Simplified 101

on Data Streaming, Java, Kafka, Kafka Consumers, Spring • January 31st, 2022 • Write for Hevo

Spring Kafka Consumer | Hevo Data

Application Development is critical in Software Development. Since the mid-1990s, Java has been a popular choice for many programmers for building software. As a result, several frameworks have been developed to make Software Development easier using Java. Spring Boot is a good platform for Java Developers to create a stand-alone, production-ready Spring application that can be run straight away.

Spring for Apache Kafka is a project that uses fundamental Spring concepts to create Kafka-based messaging solutions. In this article, you will learn how to create a Spring Kafka Consumer application. At the end of this article, you will learn some of the limitations and challenges faced while creating the Spring Kafka Consumer application using Spring Boot and Java. So, read along to gain insights and understand the easy steps to create a Spring Kafka Consumer application.

Table of Contents

Prerequisites

To create a Spring Kafka Consumer application, you need to meet the following requirements:

  • Apache Kafka Clients 3.0.0.
  • Spring Framework 5.3.x.
  • Java version: 8 or above.

What is Apache Kafka?

Apache Kafka is a Distributed Event Streaming framework that enables applications to efficiently manage large volumes of data. Its fault-tolerant, highly scalable architecture can easily manage billions of events. The Apache Kafka framework is a Java and Scala-based distributed Publish-Subscribe Messaging system that receives Data Streams from several sources and allows real-time analysis of Big Data streams. It can easily scale up with minimal downtime. Kafka’s global appeal has grown as a result of its minimum data redundancy and fault tolerance.

Key Features of Apache Kafka

With high throughput, low latency, and fault tolerance, Kafka is the most popular open-source stream-processing software. It can process thousands of messages each second. Take a peek at some of these impressive features:

  • Fault-Tolerant & Durable: By distributing partitions and replication data over multiple servers, Kafka protects data from server failure and makes it fault-tolerant. It has the ability to restart the server by itself.
  • High Scalability: Kafka’s partitioned log model distributes data over multiple servers, allowing it to extend beyond the capabilities of a single server.
  • Extensibility: Kafka interfaces have been implemented with plenty of other applications. More features can be added in a matter of seconds as a result of this. Check out how you can integrate Kafka with Hadoop, GitHub, and other connectors.
  • Metrics and Monitoring: Kafka is a popular tool for tracking operational data. It helps you to seamlessly gather data from several platforms and consolidate it into centralized feeds with metrics.

What is Spring Boot?

Spring Boot is a good platform for Java Developers to create a stand-alone, production-ready Spring application that can be run instantly. Spring Boot is a project that uses the Spring Framework as its foundation. It facilitates setting up, configuring, and running both simple and web-based applications easier and faster.

Spring Boot is a Spring module that adds the RAD (Rapid Application Development) feature to the Spring Framework, allowing you to construct a standalone Spring-based application with minimal Spring settings. Spring Boot is a hybrid framework that combines Spring Framework and Embedded Servers.

Key Features of Spring Boot

Let’s explore the key features of the Spring Boot framework below:

  • Hassle-Free Development: Spring Boot reduces the time it takes to set up, build, and deploy an application, whether it’s a proof-of-concept or a production-ready application.
  • Easy Set-Up: If you don’t specify specific configurations, Spring Boot uses default setup and starts. Spring Boot, for example, will start with an in-memory database if you don’t provide a database driver and associated options. Moreover, it will respect your custom setups and remove the auto-configurations for the relevant components.
  • Production Ready: The main method bootstraps the Spring Boot application. Running a Spring Boot application locally is similar to running it on a Production Box. Furthermore, having a strong properties management mechanism allows for a concise way to externalize environment-specific attributes, making it simple to deploy the same jar to any box. To execute a Spring Boot application, you simply need JRE.
  • In-Built Container: There’s no need to configure the servers or deploy the application archive to any external containers. Spring Boot includes a tomcat that is launched from the application’s main method. The entire application can be started with just the command java -jar.
  • Efficient Version Management: For version compatibility, Spring Boot leverages the spring.io platform. That is, you only need to indicate the Spring Boot version you wish to use, and Spring will automatically locate and install relevant libraries.

Simplify Kafka ETL and Data Analysis with Hevo’s No-code Data Pipeline

Hevo Data, a No-code Data Pipeline, helps load data from any data source such as Databases, SaaS applications, Cloud Storage, SDK,s, and Streaming Services and simplifies the ETL process. It supports 100+ Data Sources including Apache Kafka, Kafka Confluent Cloud, and other 40+ Free Sources. You can use Hevo Pipelines to replicate the data from your Apache Kafka Source or Kafka Confluent Cloud to the Destination system. It loads the data onto the desired Data Warehouse/destination and transforms it into an analysis-ready form without having to write a single line of code.

Hevo’s fault-tolerant and scalable architecture ensures that the data is handled in a secure, consistent manner with zero data loss and supports different forms of data. Hevo supports two variations of Kafka as a Source. Both these variants offer the same functionality, with Confluent Cloud being the fully-managed version of Apache Kafka.

GET STARTED WITH HEVO FOR FREE

Check out why Hevo is the Best:

  • Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled securely and consistently with zero data loss.
  • Schema Management: Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the destination schema.
  • Minimal Learning: Hevo, with its simple and interactive UI, is extremely simple for new customers to work on and perform operations.
  • Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
  • Incremental Data Load: Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
  • Live Support: The Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
  • Live Monitoring: Hevo allows you to monitor the data flow and check where your data is at a particular point in time.

Simplify your ETL & Data Analysis with Hevo today! 

SIGN UP HERE FOR A 14-DAY FREE TRIAL!

Building a Spring Kafka Consumer Application using Spring Boot and Java

Spring for Apache Kafka is a project that uses fundamental Spring methods to create Kafka-based messaging solutions. With a KafkaTemplate and Message-driven POJOs via the @KafkaListener annotation, Spring Kafka extends the simple and common Spring template programming style. In this section, you will learn to send and receive messages using the Spring Kafka Consumer application. Follow the steps below to get started with Spring Kafka Consumer and Producer:

Step 1: Set Up the Spring Kafka Dependencies

Before you proceed further to set up Spring Kafka Consumer and Producer, let’s configure the required Kafka and Spring Boot dependencies. Use the following script:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

Step 2: Build a Spring Kafka Consumer

Now let’s create a Spring Kafka Consumer script. Enter the following Java code to build a Spring Kafka Consumer. You can customize the script according to your requirements.

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @KafkaListener(id = "myId", topics = "topic1")
    public void listen(String in) {
        System.out.println(in);
    }

}

Step 3: Build a Spring Kafka Producer

After creating the above Spring Kafka Consumer script, you can use the script below to set up the Spring Kafka Producer.

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic1", "test");
        };
    }

}

To read more about how to configure your Spring Kafka Consumer and Producer properties, you can refer to Spring Kafka Consumer and Producer Documentation.  

Step 4: With Java Configuration [without Boot]

Here’s an example of a non-Spring Boot application that has both a Consumer and a Producer.

public class Sender {

	public static void main(String[] args) {
		AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
		context.getBean(Sender.class).send("test", 42);
	}

	private final KafkaTemplate<Integer, String> template;

	public Sender(KafkaTemplate<Integer, String> template) {
		this.template = template;
	}

	public void send(String toSend, int key) {
		this.template.send("topic1", key, toSend);
	}

}

public class Listener {

    @KafkaListener(id = "listen1", topics = "topic1")
    public void listen1(String in) {
        System.out.println(in);
    }

}

@Configuration
@EnableKafka
public class Config {

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());
    }

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // ...
        return props;
    }

    @Bean
    public Sender sender(KafkaTemplate<Integer, String> template) {
        return new Sender(template);
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }

    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //...
        return props;
    }

    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
        return new KafkaTemplate<Integer, String>(producerFactory);
    }

}

Here when you’re not using Spring Boot, you’ll notice that you have to define a lot of infrastructure beans.

Producing Messages in Spring Kafka

To send messages, you must first set up a ProducerFactory. This determines how Kafka Producer instances are created.

Then we’ll need a KafkaTemplate, which wraps a Producer instance and provides methods for sending messages to Kafka topics with ease.

Instances of the producer are thread-safe. As a result, a single instance will provide better performance throughout an application context. As a result, KakfaTemplate instances are thread safe, and only one instance should be used.

Producer Configuration in Spring Kafka

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Publishing Messages in Spring Kafka

The KafkaTemplate class can be used to send messages:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String msg) {
    kafkaTemplate.send(topicName, msg);
}

A ListenableFuture is returned by the send API. The get API of the ListenableFuture object can be used to block the sending thread and retrieve the result of a sent message. The producer will be slowed while the thread waits for the result.

Kafka is a platform that allows you to process data in real time. As a result, it’s preferable to handle the results asynchronously so that subsequent messages aren’t held up by the previous message’s outcome.
A callback can be used for this:

public void sendMessage(String message) {
            
    ListenableFuture<SendResult<String, String>> future = 
      kafkaTemplate.send(topicName, message);
	
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override
        public void onSuccess(SendResult<String, String> result) {
            System.out.println("Sent message=[" + message + 
              "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("Unable to send message=[" 
              + message + "] due to : " + ex.getMessage());
        }
    });
}

Consuming Messages in Spring Kafka

Consumer Configuration in Spring Kafka

A ConsumerFactory and a KafkaListenerContainerFactory must be configured before messages can be consumed. POJO-based consumers can be configured with the @KafkaListener annotation once these beans are available in the Spring bean factory.

To detect the @KafkaListener annotation on spring-managed beans, the @EnableKafka annotation on the configuration class is required:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        props.put(
          ConsumerConfig.GROUP_ID_CONFIG, 
          groupId);
        props.put(
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        props.put(
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
      kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Consuming Messages in Spring Kafka

@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group foo: " + message);
}

For a given topic, you can use multiple listeners, each with its own group Id. Additionally, one consumer can listen to messages on a variety of topics:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

The @Header annotation in the listener can also be used to retrieve one or more message headers in Spring:

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

Consuming Messages from a Specific Partition in Spring Kafka

A @KafkaListener, on the other hand, can subscribe to a specific partition of a topic with an initial offset for a topic with multiple partitions:

@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")}),
  containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

Because this listener’s initialOffset is set to 0, every time this listener is initialised, all previously consumed messages from partitions 0 and 3 will be re-consumed.

If the offset isn’t required, we can use the @TopicPartition annotation’s partitions property to specify only the partitions without the offset:

@KafkaListener(topicPartitions 
  = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

 Adding Message Filter for Listeners in Spring Kafka

By using a custom filter, you can configure listeners to consume specific types of messages. Set a RecordFilterStrategy for the KafkaListenerContainerFactory:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  filterKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(
      record -> record.value().contains("World"));
    return factory;
}

After that, you can set up a listener to use this container factory:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
    System.out.println("Received Message in filtered listener: " + message);
}

All messages that match the filter are discarded in this listener.

Custom Message Converters in Spring Kafka

Custom Java objects can also be sent and received. This necessitates configuring the appropriate serializer and deserializer in ProducerFactory and ConsumerFactory, respectively.

public class Greeting {

    private String msg;
    private String name;

    // standard getters, setters and constructor
}

 Producing Custom Messages in Spring Kafka

You’ll be using JsonSerializer in this example.
Examine the ProducerFactory and KafkaTemplate source code:

@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
    // ...
    configProps.put(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

This new KafkaTemplate can be used to send the Greeting message:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

Consuming Custom Messages in Spring Kafka

Modify the ConsumerFactory and KafkaListenerContainerFactory in the same way to correctly deserialize the Greeting message:

@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
    // ...
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(), 
      new JsonDeserializer<>(Greeting.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> 
  greetingKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
    // ...
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(), 
      new JsonDeserializer<>(Greeting.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> 
  greetingKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}

The Jackson library is used by the spring-kafka JSON serializer and deserializer, which is also an optional Maven dependency for the spring-kafka project.

As a result, we’ll include it in our pom.xml:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.7</version>
</dependency>

It is recommended that you use the version of Jackson that is added to spring-pom.xml kafka’s instead of the most recent version.

Finally, you must compose a listener who will consume your work. Messages of greeting:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
    // process greeting message
}

Example of a Spring Boot Kafka Consumer

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.2</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>demo</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>11</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.10.0</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

The dependencies for our Maven build are defined in the pom.xml file. Take note of how you’ve included both the Spring Boot and Kafka dependencies.

It’s worth noting that the jackson-databind dependency was added to account for bugs in earlier versions of the Spring Kafka library. Depending on the version of Kafka you’re using, you may not need to include this.

DemoApplication.java

@SpringBootApplication
public class DemoApplication {
	public static void main(String[] args) {
		SpringApplication.run(DemoApplication.class, args);
	}
}

Your Spring Boot application is run by the DemoApplication.java class.

Spring Boot configures Kafka for you via @SpringBootApplication. Spring creates/configures a KafkaAutoConfiguration class for you when you use this annotation.

Consumer.java

package com.example.demo.messaging;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
    @KafkaListener(topics = "test")
    public void processMessage(String content){
        System.out.println("Message received: " + content);
    }
}

The Consumer.java class is responsible for receiving messages. Within the @KafkaListener annotation, notice how we specify the topics to listen to.

You can quickly set up a consumer for a given topic by simply using the annotations provided by the Spring Boot Kafka library.

application.properties

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

To configure the consumer, you only need to define a few things in application.properties thanks to auto-configuration. Take note of how our app is set up to use Kafka locally.

Running the example

This example uses a local Kafka topic to consume data. To see messages being consumed from the Spring Boot app, you’ll need Kafka installed and running on your local machine.

Starting Kafka from the CLI:

Both the Zookeeper and Kafka servers must be started locally before you can use Kafka.

./zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

The zookeeper server will be started with the default settings. Please keep in mind that the path to your local Kafka installation may differ.

./kafka-server-start /usr/local/etc/kafka/server.properties 

The Kafka server will be started with the default configurations. Please keep in mind that the path to your local Kafka installation may differ.

Producing messages from the CLI:

You’ll need to create some messages for the test topic in order to test the application consumer. To begin as a Kafka console producer, follow these steps:

./kafka-console-producer --bootstrap-server=localhost:9092 --topic test

This activates the test topic’s Kafka producer. It’s worth noting that the bootstrap-server and topic arguments are identical to the values we set in our application.properties file.

You can send a message like this to the test topic once the producer is up and running.

> Hello world

If everything is working properly, the following message should be logged by the Spring application:

Message received: Hello world

Example of Spring Boot Kafka Multiple Consumers

Within the same Spring Boot application, Spring Kafka enables multiple consumers to read from multiple topics. You’ll look at a few different examples because “multiple consumers” can mean various things.

1) Multiple consumers in the same consumer group

@Component
public class Consumer {
    @KafkaListener(topics = "test", concurrency = "2" groupId = "myGroup")
    public void processMessage(String content) {
        System.out.println("Message received: " + content);
    }
}

As a result, a SINGLE consumer with two members is created. On the @KafkaListener annotation, notice how we set the concurrency to “2.” This will result in the creation of two members of the same consumer group.

Take note of how the annotation refers to the test topic and the groupId myGroup.

If your test topic has three partitions, the first member will be assigned two and the second member will be assigned one.

You’ll see something like this if you send messages to the test topic.

Message received: abc
Message received: 123
Message received: 345

2) Multiple consumer groups reading from the same topic

@Component
public class Consumer {
    @KafkaListener(topics = "test", groupId = "myGroup")
    public void processMessage(String content) {
        System.out.println("Message received by consumer 1: " + content);
    }
    @KafkaListener(topics = "test", groupId = "anotherGroup")
    public void processMessage2(String content) {
        System.out.println("Message received by consumer 2: " + content);
    }
}

This results in two consumers, each with one member. Both listener methods read from the same test topic, but the groupId is different.

If your test topic has three partitions, three partitions will be assigned to myGroup’s sole member, and three partitions will be assigned to anotherGroup’s sole member.

You’ll see something like this if you send messages to the test topic.

Message received by consumer 1: abc
Message received by consumer 2: abc
Message received by consumer 1: 123
Message received by consumer 2: 123

It’s worth noting how the same message is consumed TWICE. This is due to the fact that different groupIds correspond to different consumer groups. This is a significant difference from the first example, in which multiple people work for a single consumer group.

3) Multiple consumer groups reading from different topics

@Component
public class Consumer {
    @KafkaListener(topics = "test", groupId = "myGroup")
    public void processMessage(String content) {
        System.out.println("Message received by consumer 1: " + content);
    }
    @KafkaListener(topics = "test2", groupId = "anotherGroup")
    public void processMessage2(String content) {
        System.out.println("Message received by consumer 2: " + content);
    }
}

This separates the two consumer groups into two distinct groups, each with a single member. A single person reads from the test topic in the first group. A single person reads from the test2 topic in the second group.

4) Multiple consumers in the same consumer group reading from different topics

@Component
public class Consumer {
    @KafkaListener(topics = {"test,"test2"}, groupId = "myGroup", concurrency = "2")
    public void processMessage(String content) {
        System.out.println("Message received by consumer 1: " + content);
    }
}

This results in a SINGLE consumer group consisting of two people. Reading from both topics is shared among the members.

By default, each member will NOT receive an equal number of partitions if both our test and test2 topics have three partitions. One person might get two, while another gets four. The RangeAssignor is Kafka’s default PartitionAssignor. By specifying the RoundRobinAssignor in your configuration, you can change partition assignment to a round robin approach if desired:

spring.kafka.consumer.properties.partition.assignment.strategy=
org.apache.kafka.clients.consumer.RoundRobinAssignor

Limitations of Working with Spring Kafka Consumers

Now that you have gained a basic understanding of how to build a Spring Kafka Consumer application, let’s explore the challenges and limitations that you might face.

  • The Apache Kafka required versions are determined by Spring Boot’s dependency management when leveraging Spring for Apache Kafka in a Spring Boot application. You must override the version used by Spring Boot dependency management and add 2 test artifacts for Apache Kafka if you want to use a different version of kafka-clients or kafka-streams and use the embedded Kafka broker for testing.
  • When using the embedded broker on Microsoft Windows, there is a bug in Apache Kafka 3.0.0 called KAFKA-13391. To use the embedded broker on Windows, you must first downgrade Apache Kafka to version 2.8.1 until version 3.0.1 is released. You must additionally exclude zookeeper dependency from the spring-kafka-test when using 2.8.1.
  • With the increasing complexity and volume of data, you might face challenges to stream your data. Moreover, it might become cumbersome to manage your code and data streams.

Conclusion

In a nutshell, this article provided a comprehensive overview of how to create a Spring Kafka Consumer application. You also explore various robust features of Kafka and Spring Boot and understood what makes them so popular. After learning how you can set up a Spring Kafka Consumer application using Spring Boot and Java, you also discovered some limitations with this method. 

Hence, streaming data from various sources to Apache Kafka or vice versa can be quite challenging and cumbersome. If you are facing these challenges and are looking for some solutions, then check out a simpler alternative like Hevo.

Hevo Data is a No-Code Data Pipeline that offers a faster way to move data from 100+ Data Sources including Apache Kafka, Kafka Confluent Cloud, and other 40+ Free Sources, into your Data Warehouse to be visualized in a BI tool. You can use Hevo Pipelines to replicate the data from your Apache Kafka Source or Kafka Confluent Cloud to the Destination system. Hevo is fully automated and hence does not require you to code. 

VISIT OUR WEBSITE TO EXPLORE HEVO

Want to take Hevo for a spin?

SIGN UP and experience the feature-rich Hevo suite first hand. You can also have a look at the unbeatable pricing that will help you choose the right plan for your business needs.

Have you streamed data using Kafka and Spring Boot? If yes, feel free to share your experience of building the Spring Kafka Consumer application with us in the comments section below!

No-Code Data Pipeline For Your Data Warehouse