Apache Kafka is a popular platform that is widely in use today, not only for messaging & communication but also for various other avenues. While its real-time streaming functionalities are robust and widely implemented, Apache Kafka logs also offer some unique features for easy and scalable logging. Understanding the working of Apache Kafka logs can help you use the platform to its full potential.

This article focusses on the logging functionalities supported by Apache Kafka, providing you in-depth knowledge about the working and significance of logging specific data as well as some operations and commands to implement Apache Kafka logging. Follow our easy step-by-step guide to implement these concepts and understand your data logs better!

Table of Contents

Introduction to Apache Kafka

Kafka Logo.

Apache Kafka is a popular real-time data streaming software that allows users to store, read and analyze streaming data using its open-source framework. Being open-source, it is available free of cost to users. Leveraging its distributed nature, users can achieve high throughput, minimal latency, computation power, etc. and handle large volumes of data with ease.

Written in Scala, Apache Kafka supports bringing in data from a large variety of sources and stores them in the form of “topics” by processing the information stream. It uses two functions, namely Producers, which act as an interface between the data source and Kafka Topics, and Consumers, which allow users to read and transfer the data stored in Apache Kafka.

Key features of Apache Kafka:

  • Scalability: Apache Kafka has exceptional scalability and can be scaled easily without downtime.
  • Data Transformation: Apache Kafka offers KStream and KSQL (in case of Confluent Kafka) for on the fly data transformation.
  • Fault-Tolerant: Apache Kafka uses brokers to replicate data and persists the data to make it a fault-tolerant system.
  • Security: Apache Kafka can be combined with various security measures like Kerberos to stream data securely.
  • Performance: Apache Kafka is distributed, partitioned, and has very high throughput for publishing and subscribing to the messages.

For further information on Apache Kafka, you can check the official website here.

Introduction to Logs in Apache Kafka

Apache Kafka logs are a collection of various data segments present on your disk, having a name as that of a form-topic partition or any specific topic-partition. Each Kafka log provides a logical representation of a unique topic-based partitioning.

Apache Kafka allows you to replicate data nodes by committing an external log for a distributed system, allowing you to not only read data but also restore data when required.

Kafka Logs for a Distributed System.
Simplify ETL with Hevo’s No-code Data Pipelines

Hevo Data, a No-code Data Pipeline, helps to transfer data from 100+ sources to your desired data warehouse/ destination and visualize it in a BI Tool. Hevo is fully-managed and completely automates the process of not only loading data from your desired source but also enriching the data and transforming it into an analysis-ready form without having to write a single line of code. Its fault-tolerant architecture ensures that the data is handled in a secure, consistent manner with zero data loss.

It provides a consistent & reliable solution to manage data in real-time and always have analysis-ready data in your desired destination. It allows you to focus on key business needs and perform insightful analysis using various BI tools such as Power BI, Tableau, etc. 

Check out what makes Hevo amazing:

  • Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
  • Schema Management: Hevo takes away the tedious task of schema management & automatically detects schema of incoming data and maps it to the destination schema.
  • Minimal Learning: Hevo, with its simple and interactive UI, is extremely simple for new customers to work on and perform operations.
  • Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
  • Incremental Data Load: Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
  • Live Support: The Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
  • Live Monitoring: Hevo allows you to monitor the data flow and check where your data is at a particular point in time.

Simplify your data analysis with Hevo today! Sign up here for a 14-day free trial!

Prerequisites

  • Working knowledge of Apache Kafka.
  • A general idea about logging.
  • Apache Kafka installed at the host workstation.

Working with Apache Kafka Logs

Logging in Apache Kafka houses a whole lot of benefits, not only does it provide an industry-standard solution for appending data logs but also provides a highly-scalable solution for storing data logs. Apache Kafka logs provide robust integration support for numerous pre-existing applications, allowing users to integrate their applications using an easy-to-follow configuration step up. Also, read Kafka Log Compaction.

Using Kafka Logs.

Apache Kafka logs are a collection of data, that is present across various data points, irrespective of whether the processing or transmission has taken place or not. Data in today’s world is crucial, but not all of it is necessary. When working with data logs, there can be redundant pieces of information that you might not always need. For example, a server socket logs each time a connection closes as follows:

2014/08/24 00:00:12.145 INFO [Processor] [kafka-network-thread-10251-1] [kafka-server] [] Closing socket connection to /172.17.198.21.
2014/08/24 00:00:12.152 INFO [Processor] [kafka-network-thread-10251-6] [kafka-server] [] Closing socket connection to /172.24.25.165.
2014/08/24 00:00:12.191 INFO [Processor] [kafka-network-thread-10251-4] [kafka-server] [] Closing socket connection to /172.17.198.24.
2014/08/24 00:00:12.209 INFO [Processor] [kafka-network-thread-10251-4] [kafka-server] [] Closing socket connection to /172.17.232.135.
2014/08/24 00:00:12.218 INFO [Processor] [kafka-network-thread-10251-6] [kafka-server] [] Closing socket connection to /172.17.198.41.
2014/08/24 00:00:12.235 INFO [Processor] [kafka-network-thread-10251-5] [kafka-server] [] Closing socket connection to /172.17.198.30.

On the other hand, you can also have data logs representing various errors and warnings that can prove out to be useful, especially while carrying out debugging:

ERROR kafka.producer.SyncProducer  - Producer connection to localhost:1025 unsuccessful
java.net.ConnectException: Connection refused
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:465)
    at sun.nio.ch.Net.connect(Net.java:457)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
    at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
    at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
    at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
    at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
    at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69)
    at kafka.utils.Utils$.swallow(Utils.scala:186)
    at kafka.utils.Logging$class.swallowError(Logging.scala:105)
    at kafka.utils.Utils$.swallowError(Utils.scala:45)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69)
    at kafka.producer.Producer.send(Producer.scala:74)
    at kafka.javaapi.producer.Producer.send(Producer.scala:32)
 
[2014-08-11 15:09:56,078] DEBUG [KafkaApi-1] Error while fetching metadata for [item_topic_0,0]. Possible cause: null (kafka.server.KafkaApis)

You can learn more about how you can enable logging in Apache Kafka, what you must log and the operations that you can perform on your Apache Kafka logs from the following sections:

Understanding what you need to Log & what you don’t

When working with Apache Kafka logs, you must ensure that you avoid logging redundant data or employing data logs for operational purposes. To do this, you must be aware of when it is necessary to log your data.

Typically, you can create log-based entries at the very beginning and end of a module startup() and shutdown(). You can also use Kafka logs at the start and end of a specific phase/s, where you need to run “special” logics or carry out any updates.

 LogManager.cleanupLogs() OR Controller.onBrokerFailure()

In either case, you must only log essential information associated with local or global variables of your code block. In case of exception logging, (WARN / ERROR) logging can be done to record the cause of the exception, execution handling logic, and other crucial details.

The log entry is usually limited to one line, with concise and distinctive details represented in a canonical structure as follows:

topic %s".format(TopicAndPartition(topic, partitionId))

These are some of the situations where you can use Apache Kafka logs to save crucial data associated with your application.

Enabling Logs in Apache Kafka

To enable logging in Apache Kafka, you can use a simple script containing the log configurations as follows:

# Enable both file and kafka based logging
log4j.rootLogger=INFO, file, kafka log4j.appender.kafka=com.cloudera.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=flink.logs
log4j.appender.kafka.brokerList=<broker_host>:9092
# Log layout configuration
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=yarnContainerId:${yarnContainerId}

Once you’ve enabled logging in Apache Kafka, you will be able to find logs on the topic of your desire in the JSON format. For example, a data log would look like as follows:

{ "source_host": "<flink_host>",
  "method": "completePendingCheckpoint",
  "level": "INFO",
  "message": "Completed checkpoint 1 for job 5e70cf704ed010372e2007333db10cf0 (50738 bytes in 2721 ms).",
  "mdc": {},
  "yarnContainerId": "container_1571051884501_0001_01_000001",
  "@timestamp": "2019-10-14T11:21:07.400Z",
  "file": "CheckpointCoordinator.java",
  "line_number": "906",
  "thread_name": "jobmanager-future-thread-1",
  "@version": 1,
  "logger_name":    "org.apache.flink.runtime.checkpoint.CheckpointCoordinator",
  "class": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator"
}

This is how you can enable logs in Apache Kafka to start logging your data.

Operations & Commands associated with Apache Kafka Logs

Apache Kafka allows you to execute numerous commands to implement various operations, while logging is taking place in the background:

Logging Segments

To log various segments in Apache Kafka, you can use the following line of code:

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]

Creating a Log instance

You can create a Log instance for your Apache Kafka logs using the following syntax:

apply(
dir: File,
config: LogConfig,
logStartOffset: Long,
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log

Reading Records

You can use the following command to read a record from your data logs with ease:

addAbortedTransactions(
  startOffset: Long,
  segmentEntry: JEntry[JLong, LogSegment],
  fetchInfo: FetchDataInfo): FetchDataInfo

read(
  startOffset: Long,
  maxLength: Int,
  maxOffset: Option[Long],
  minOneMessage: Boolean,
  includeAbortedTxns: Boolean): FetchDataInfo

Appending Records

You can append new records to your data logs using the following command:

maybeRoll(
  messagesSize: Int,
  appendInfo: LogAppendInfo): LogSegment

append(
  records: MemoryRecords,
  isFromClient: Boolean,
  interBrokerProtocolVersion: ApiVersion,
  assignOffsets: Boolean,
  leaderEpoch: Int): LogAppendInfo

Cleaning Segments & Building an Offset Map

To build an Offset Map and clean segments of your data logs, you can use the following lines of code:

collectAbortedTransactions(
  startOffset: Long,
  upperBoundOffset: Long): List[AbortedTxn]
collectAbortedTransactions(
  startOffset: Long,
  upperBoundOffset: Long,
  startingSegmentEntry: JEntry[JLong, LogSegment],
  accumulator: List[AbortedTxn] => Unit): Unit

Deleting Segments

In case you want to delete a segment from your data log, you can do so using the following command in Apache Kafka:

roll(
  expectedNextOffset: Option[Long] = None): LogSegment
asyncDeleteSegment(segment: LogSegment): Unit
deleteSeg(): Unit

Truncating Records

To carry out a truncate operation for your Apache Kafka logs, you can use the following lines of code:

truncateTo(targetOffset: Long): Boolean

truncateFullyAndStartAt(newOffset: Long): Unit

Loading a Partition

You can load a partition of your choice from your data logs using the following line of code:

parseTopicPartitionName(dir: File): TopicPartition

Adding & Converting Segments

You can either create a new segment or convert an existing segment of your data log in Apache Kafka, using the following lines of code:

addSegment(
  segment: LogSegment): LogSegment

convertToOffsetMetadata(
  offset: Long): Option[LogOffsetMetadata]

Modifying the Configurations File

In case you want to update the configurations file for your data logs, you can do so using the following lines of code in Apache Kafka:

updateConfig(
  updatedKeys: Set[String],
  newConfig: LogConfig): Unit

Creating a Log File

You can create a new log file in Apache Kafka, using the following line of code:

logFile(
  dir: File,
  offset: Long,
  suffix: String = ""): File

Opening a Log Segment

You can open a new log segment of the offset, time or transaction type, using the following lines of code:

offsetIndexFile(
  dir: File,
  offset: Long,
  suffix: String = ""): File

timeIndexFile(
  dir: File,
  offset: Long,
  suffix: String = ""): File

transactionIndexFile(
  dir: File,
  offset: Long,
  suffix: String = ""): File

Recovering and Rebuilding Segments

Apache Kafka allows you to recover and rebuild a new segment, using the following lines of code:

recoverSegment(
  segment: LogSegment,
  leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

rebuildProducerState(
  lastOffset: Long,
  reloadFromCleanShutdown: Boolean,
  producerStateManager: ProducerStateManager): Unit

Closing a Log

To close a data log in Apache Kafka, you can use the close command as follows:

close(): Unit

These are some of the commands that you can use to perform various operations on your Apache Kafka logs.

For further information on the internal functions and operations of logs in Apache Kafka, you can check the official documentation here.

Conclusion

This article outlined various concepts related to Apache Kafka Server logs. It provided in-depth knowledge about how you can enable them and perform numerous functions on your data logs in Apache Kafka to keep track of the crucial data associated with various operations. With the data growing at an exponential rate in today’s world, integrating and managing such large volumes of data is no small feat, especially for a beginner & this is where Hevo saves the day. For any information on Kafka Exactly Once, you can visit the following link.

Hevo Data, a No-code Data Pipeline, helps you transfer data from a source of your choice in a fully-automated and secure manner without having to write the code repeatedly. Hevo, with its strong integration with 100+ sources & BI tools, allows you to not only export & load data but also transform & enrich your data & make it analysis-ready in a jiff. 

Want to take Hevo for a spin? Sign up here for the 14-day free trial and experience the feature-rich Hevo suite first hand.

Tell us about your experience of learning about Apache Kafka logs! Let us know your thoughts in the comments section below.

Aman Sharma
Freelance Technical Content Writer, Hevo Data

Driven by a problem-solving approach and guided by analytical thinking, Aman loves to help data practitioners solve problems related to data integration and analysis through his extensively researched content pieces.

No-code Data Pipeline For Your Data Warehouse

Get Started with Hevo