The Batch Jobs is a term that originated when the processes were run using the punch cards. At that instance, Multiple cards were stacked on one another and the programs would run one after another until the stack depleted. The same is the case with modern Batch Jobs. These Jobs can run with the intervention of an end-user. These jobs are scheduled to perform tasks and once the tasks start executing, no interaction is required to keep them running. Apache Kafka is a robust messaging queue that enables the transfer of high volumes of messages from one end-point to other.
Creating a Kafka Batch Process allows for processing multiple messages with ease. In this tutorial, we will be using Spring boot and Spring Kafka to create the Kafka batch processing.
Table of Contents
What is Batch Processing?
Image Source: www.upsolver.com
Batch Processing is a methodology for executing repetitive tasks that are high in volume. These jobs are scheduled in such a way that is executed as soon as the resources are available and do not require human interaction.
Batch processing improves efficiency by running large amounts of repetitive jobs with no user interaction. The data and messages are stored and then executed collectively in an event called batch window.
Batch Processing is important for enterprises to manage huge amounts of data with efficiency. Batch processing is employed for tasks that frequent and monotonous. The basic prerequisites of batch processing for all the industries remain the same.
Key features of Batch Processing
- Efficiency: Usually there are limited resources that are readily available to an enterprise at a particular instance of time. Batch processing enables the enterprises to determine the priority for jobs by segregating the jobs that are time-dependent and may provide real-time value and the other ones that are not very essential at that instance of time. It also enables the processes to run offline to reduce the processor loads at peak work times.
- Simplicity: Batch processing is less complex when compared to stream processing as it does not require any special hardware or support systems for data entry. It is also less expensive to maintain when compared to stream processing systems.
- Higher Data Quality: Batch processing efficiently automates various processes of enterprises and reduces the manual efforts required. This also results in fewer errors and redundancies. This results in higher accuracy for the data and a considerable increase in the quality of data.
- Accelerated Business Intelligence: Batch processing helps enterprises to processes large volumes of data swiftly. This means there is a considerable reduction in processing times and the ready availability of data. This is due to the fact that batch processing can handle multiple records at once. Since multiple jobs are taken care of by batch processing there is a lot more bandwidth for better business intelligence.
Hevo Data, a No-code Data Pipeline helps to load data from any data source such as Databases, SaaS applications, Cloud Storage, SDKs, and Streaming Services and simplifies the ETL process. It supports 100+ data sources like Kafka(including 40+ free data sources) and is a 3-step process by just selecting the data source, providing valid credentials, and choosing the destination. Hevo not only loads the data onto the desired Data Warehouse/destination but also enriches the data and transforms it into an analysis-ready form without having to write a single line of code.
GET STARTED WITH HEVO FOR FREE[/hevoButton]
Hevo is the fastest, easiest, and most reliable data replication platform that will save your engineering bandwidth and time multifold. Try our 14-day full access free trial today to experience an entirely automated hassle-free Data Replication!
What is Kafka?
Image Source: uploads-ssl.webflow.com
Apache Kafka is a Distributed event Streaming platform that is open-source. It was created in 2010 by LinkedIn with the motto to create robust and scalable recommendation systems and applications that are event-driven. As per a recent survey Kafka is used by about 20000 business enterprises with 80% of its users being fortune 500 companies like Walmart, Netflix, and Spotify.
Kafka consists of three different components: Kafka Producers, Kafka Servers, Kafka Consumers. Kafka producers are the component that generates the messages or data to be streamed. Kafka Server is the mediator That transfers this information. Kafka Consumer is the component that receives this stream of data.
Since Kafka is a distributed system, it provides maximum fault tolerance while streaming data into Kafka Servers. This results in maximum throughput and real-time messaging service.
Key Features of Kafka
Apache Kafka is very popular for its rich-feature suite. Some key features of Kafka are:
- Scalable: Kafka follows a partitioned log model that distributed data over multiple servers allowing it to cope up while high volumes of data by using the resources of multiple servers instead of one single solution.
- Fast: Kafka usually decouples the data streams which results in a reduction of latency and high speeds.
- Durability: Kafka writes data to the disc, where the data partitions are duplicated and spread across multiple servers to ensure the durability of data. The protects the data from being lost even when there is a server failure. It also makes it fault-tolerant and long-lasting.
- Fault-Tolerant: The Kafka components are robust and can handle the errors in the database on their own. It is also capable of restarting the server by itself.
- Extensibility: Kafka’s popularity and usability have increased in recent years and now even fortune 500 companies use it. To cope with this growth many integrations have been set up by different software. This allows the use of more advanced features that can be installed in a matter of seconds and then use advanced features.
- Log Aggregation: Data recordings that are gathered from various sources are stored in a centralized to avoid complexity.
Steps to implement Kafka batch processing
Image Source: raw.githubusercontent.com
Prerequisites:
- Spring Kafka: v2.1.4.RELEASE
- Spring Boot: v2.0.0.RELEASE
- Apache Kafka: vkafka_2.11-1.0.0
- Maven: v3.5 or above
Step 1: Install Maven dependencies
The apache mavens are used to manage project dependencies of Kafka batch processing. use the following code to update maven dependencies.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>3.8.5</modelVersion>
<groupId>com.Hevo.spring.kafka</groupId>
<artifactId>listener-batch</artifactId>
<version>1.0.0-SNAPSHOT</version>
<url>http://hevodata.com</url>
<description>Kafka Batch Processing using Spring</description>
<name>Kafka Batch Processing - ${project.artifactId}</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>parent-strater-spring-boot</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<properties>
<java.version>1.9</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-kafka.version>2.1.5.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>starter-spring-boot</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<!-- testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>test-starter-spring-boot</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>test-spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>maven-plugin-spring-boot</artifactId>
</plugin>
</plugins>
</build>
</project>
Step 2: Sending a message to Kafka
We will be using the BatchListener for receiving batch messages and creating the Kafka batch processing.
Step 2.1: Configuring a Batch Listener
The @KafkaListener methods can receive the entire batch of consumer records for the Kafka batch process. The concurrentKafkaListenerContainerFactory parameter of the batch listener needs to be set to true to configure the batch listener to be able to support Kafka batch processing.
You can also create BatchErrorHandler and configure other parameters that provide you with the configuration capability of Batch Error Handler for Kafka batch processing.
You can also configure the batch size of Spring Kafka for Kafka batch processing by using the ConsumerConfig.MAX_POLL_RECORDS_CONFIG parameter. In this example of Kafka batch processing, we will use size 5.
package com.memorynotfound.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.BatchLoggingErrorHandler;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class ListenerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrap_Servers;
@Bean
public Map<String, Object> consumer_Configs() {
Map<String, Object> prop = new HashMap<>();
prop.put(Consumer_Config.BOOTSTRAP_SERVERS_CONFIG, bootstrap_Servers);
prop.put(Consumer_Config.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(Consumer_Config.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(Consumer_Config.GROUP_ID_CONFIG, "batch");
prop.put(Consumer_Config.MAX_POLL_RECORDS_CONFIG, "5");
return prop;
}
@Bean
public Consumer_Factory<String, String> consumer_Factory() {
return new DefaultKafka_Consumer_Factory<>(consumer_Configs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setBatchErrorHandler(new BatchLoggingErrorHandler());
return factory;
}
}
Step 2.2: Batch Recieve Kafka Messages
We will update the receive() method so that a list of messages can be accepted for the Kafka batch processing. While receiving the batch messages for Kafka batch processing, a list of headers is also needed for performing Kafka batch processing.
package com.memorynotfound.kafka.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class Listener {
private static final Logger LOG = LoggerFactory.getLogger(Listener.class);
@KafkaListener(id = "listener-batch", topics = "${app.topic.batch}")
public void receive(@Payload List<String> messages,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
LOG.info("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -");
LOG.info("Starting the process to recieve batch messages");
for (int i = 0; i < messages.size(); i++) {
LOG.info("received message='{}' with partition-offset='{}'",
messages.get(i), partitions.get(i) + "-" + offsets.get(i));
}
LOG.info("all the batch messages are consumed");
}
}
Step 3: Configure the application
We will create an application.yml file for the Kafka batch processing and the properties will be configured for the spring boot.
spring:
kafka:
bootstrap-servers: localhost:9092
app:
topic:
foo: foo.t
logging:
level:
root: ERROR
org.springframework.web: ERROR
com.memorynotfound: DEBUG
Providing a high-quality Data Pipeline solution can be a cumbersome task if you just have a Data Warehouse and raw data. Hevo’s automated, No-code platform empowers you with everything you need to have a smooth data pipeline experience. Our platform has the following in store for you!
Check out what makes Hevo amazing:
- Fully Managed: Hevo requires no management and maintenance as it is a fully automated platform.
- Data Transformation: Hevo provides a simple interface to perfect, modify, and enrich the data you want to transfer.
- Faster Insight Generation: Hevo offers near real-time data replication so you have access to real-time insight generation and faster decision making.
- Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
- Scalable Infrastructure: Hevo has in-built integrations for 100+ sources (with 40+ free sources) that can help you scale your data infrastructure as required.
- Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-day free trial!
Step 4: Running the Application
The application is running on the localhost 9092. The Kafka batch processing is done using this application.
package com.memorynotfound.kafka;
import com.memorynotfound.kafka.producer.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
@Autowired
private Sender sender;
@Override
public void run(String... strings) throws Exception {
for (int i = 1; i < 13; i++){
sender.send("message-" + i);
}
}
The output of Kafka batch processing Application:
. ____ _ __ _ _
/ / ___'_ __ _ _(_)_ __ __ _
( ( )___ | '_ | '_| | '_ / _` |
/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |___, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.0.RELEASE)
Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
sending message='message-1' to topic='batch.t'
sending message='message-2' to topic='batch.t'
sending message='message-3' to topic='batch.t'
sending message='message-4' to topic='batch.t'
sending message='message-5' to topic='batch.t'
sending message='message-6' to topic='batch.t'
sending message='message-7' to topic='batch.t'
sending message='message-8' to topic='batch.t'
sending message='message-9' to topic='batch.t'
sending message='message-10' to topic='batch.t'
sending message='message-11' to topic='batch.t'
sending message='message-12' to topic='batch.t'
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
beginning to consume batch messages
received message='message-1' with partition-offset='0-295'
received message='message-2' with partition-offset='0-296'
received message='message-3' with partition-offset='0-297'
received message='message-4' with partition-offset='0-298'
received message='message-5' with partition-offset='0-299'
all batch messages consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
beginning to consume batch messages
received message='message-6' with partition-offset='0-300'
received message='message-7' with partition-offset='0-301'
received message='message-8' with partition-offset='0-302'
received message='message-9' with partition-offset='0-303'
received message='message-10' with partition-offset='0-304'
all batch messages consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
beginning to consume batch messages
received message='message-11' with partition-offset='0-305'
received message='message-12' with partition-offset='0-306'
all batch messages consumed
Conclusion
Batch Processing is an important technique that helps enterprises to understand the data as well as process huge volumes of it efficiently. Kafka is a leading event streaming software that is used by many companies with even fortune 500 enterprises using its features. In this article, a comprehensive guide on Batch processing and Kafka was given along with the steps to set up the Kafka Batch processing with the help of spring boot. You will also find examples that will help in better understanding the process of Batch Streaming and implementing Kafka Batch Processing.
There are various trusted sources like Kafka that companies use as it provides many benefits, but, transferring data from it into a data warehouse is a hectic task. The Automated data pipeline helps in solving this issue and this is where Hevo comes into the picture. Hevo Data is a No-code Data Pipeline and has awesome 100+ pre-built Integrations that you can choose from.
visit our website to explore hevo
Hevo can help you Integrate your data from 100+ data sources and load them into a destination to Analyze real-time data. It will make your life easier and data migration hassle-free. It is user-friendly, reliable, and secure.
SIGN UP for a 14-day free trial and see the difference!
Share your experience of learning about the Kafka batch processing in the comments section below.