A Quick-Start Guide to Databricks Kafka Integration: 5 Comprehensive Aspects

• June 3rd, 2022

Databricks Kafka- Featured Image

“Without big data analytics, companies are blind and deaf, wandering out onto the web like deer on a freeway.”

With the growing complexity of data and the need to deliver faster insights, it is becoming increasingly difficult to work with Big Data. Computer speed has plateaued, and its basic architecture hasn’t changed in decades. If you are looking for more computational power, you will have to invest in more computers.

Today, there are more people and devices generating data than ever before, and companies are struggling to extract meaningful insights. To help companies solve this problem, Databricks has emerged as one of the most popular Big Data platforms that can store, clean, and visualize vast amounts of data from disparate sources. It’s an end-to-end solution for many common data tasks, ranging from basic ETL to Business Intelligence all the way to Machine Learning and Artificial Intelligence.

Kafka has become the de-facto solution for Real-time Streaming Applications. It is the technological foundation for capturing data from databases, sensors, mobile devices, cloud services, and software applications in the form of event steams and later storing them or routing them as needed. 

With Databricks Kafka integration, you can create reliable Event-driven Microservices and Scalable Stream-Processing Applications for Big Data. The advantages of this integration include the ability to constantly obtain resources to analyze your data by simply scaling up the computational power in a brief burst. Your teams may also create and deploy powerful analytics apps without any DevOps knowledge.

This article covers the five most important aspects of integrating Kafka Databricks.

Table of Contents

What Is Databricks?

Databricks Logo: Databricks Kafka | Hevo Data
Image Source: Databricks

Databricks is a Big Data Service founded by the team that created Apache Spark. It is a Unified Analytics Platform that provides an end-to-end solution for Data Ingestion, Data Exploration, Advanced Analytics, Production Deployment, and Data Visualization.

Databricks combines the best elements of a Data Lake and a Data Warehouse to bring unmatched performance, strong governance, and reliability. It is by far the quickest and easiest way to harness the full potential of Apache Spark. When it comes to Big Data, Spark is the de facto standard, and there comes no second opinion when you are choosing to use Databricks; since it is the same technology behind Spark.

One of the main advantages of using Databricks is that it works on top of your cloud, whether you have Amazon Web Services (AWS), Microsoft Azure, Google Cloud, or a combination of those. Databricks provides unified management, security, and governance experience across all clouds, ensuring that your data teams can focus more on capitalizing on the data rather than on cloud data governance.

Databricks lies at the heart of cloud computing and Spark applications. A number of organizations across the globe like Viacom, Samsung, American Diabetes Association, NBC Universal, use the Databricks platform to explore large data sets and extract actionable insights and meet their data needs.

It is said that Databricks offers 5x performance over open-source Spark with features like collaborative notebooks, integrated workflows, and enterprise security. For more information on Databricks architecture and integrations, you can visit What is Databricks: The Ultimate Guide for Beginners.

Business Benefits of Using Databricks

Fast and Easy Access to Data At Scale

Databricks make Big Data Processing and Analysis easy. It can process both structured, and unstructured data and offers fast and easy access to data at scale. 

Orchestrated Apache Spark in the Cloud

Databricks has a highly secure and reliable production environment in the cloud. Powered by strong cluster management capabilities, you can create new clusters in seconds, dynamically scale them up and down, and share them across teams. It also has secure Data Integration capabilities built on top of Spark, which allows you to unify your data smoothly.

Better Decision Making

Databricks features interactive Data Exploration & Visualization to help you and your teams work with and understand data better. You can build real-time dashboards to sync new changes into your Data Visualizations or tables. Databricks can also seamlessly connect to multiple BI tools to assist in sound decision-making.

Enterprise Security Framework

The Databricks platform uses best-in-class standards such as SSL and keys stored in the AWS Key Management System to ensure robust encryption at rest and in-flight (KMS).

It also offers fine-grained management access to every component of the company data infrastructure, including files, clusters, code, application deployments, dashboards, and reports, and enables seamless connection with enterprise identity providers through SAML 2.0 and Active Directory.

What Is Apache Kafka?

Apache Kafka Logo: Databricks Kafka | Hevo Data
Image Source: Indellient

Apache Kafka is an open-source, Distributed Streaming Platform that allows for the development of Real-time Event-Driven Applications. It enables developers to create applications that consistently produce and consume streams of data records, relying on a Message Broker.

This Message Broker relays messages from the Publishers (systems that transform data into the desired format from Data Producers) to the Subscribers (systems that manipulate or analyze data in order to find alerts and insights and deliver them to Data Consumers). 

Apache Kafka is superfast and maintains a high level of accuracy for all data records. These data records are maintained in order of their occurrence inside “Clusters” that can span multiple Servers or even multiple Data Centers. Apache Kafka replicates these records and partitions them in such a way that allows for a high volume of users to use the application simultaneously.

Apache Kafka Architecture: Databricks Kafka | Hevo Data
Image Source: Cloud Karafka

As a result, Apache Kafka has a fault-tolerant and resilient architecture. Kafka copies the partitions to other Brokers (also known as replicas) from the elected Broker (leader) to ensure robustness. A Broker is a working Server or Node; like a facilitator between Data Producer and Data Consumer Groups. All writes and reads to a Topic are routed through the leader, who organizes the updating of replicas with new data.

Business Benefits of Using Apache Kafka

Low Latency Publish-Subscribe Messaging Service

For huge amounts of data, Apache Kafka has very low end-to-end latency, up to 10 milliseconds. This means that the time it takes for a data record produced to Kafka to be retrieved by the Consumer is quite quick. It is because it decouples the message, allowing the Consumer to retrieve it at any moment.

Seamless Messaging and Streaming Functionality

Apache Kafka provides a unique capacity to publish, subscribe, store, and process Data Records in real-time, thanks to its special ability to decouple messages and store them in a highly efficient manner.

With such seamless messaging functionality, dealing with huge volumes of data becomes simple and easy, giving business communications and scalability a considerable edge over conventional communication approaches.

Consumer Friendly

Kafka may be used to integrate with a wide range of Consumers. The best thing about Kafka is that it may behave or act differently depending on the Consumer with whom it connects because each Customer has a varied ability to manage the messages that come out of Kafka. Furthermore, Kafka integrates nicely with a wide range of Consumers written in a wide range of languages.

Replicate Data From Kafka in Minutes Using Hevo’s No-Code Data Pipeline

Hevo Data, a Fully-managed Data Pipeline platform, can help you automate, simplify & enrich your data replication process in a few clicks. With Hevo’s wide variety of connectors and blazing-fast Data Pipelines, you can extract & load data from 100+ Data Sources such as Kafka straight into Databricks, Data Warehouse or any Databases. To further streamline and prepare your data for analysis, you can process and enrich raw granular data using Hevo’s robust & built-in Transformation Layer without writing a single line of code!

GET STARTED WITH HEVO FOR FREE

Hevo is the fastest, easiest, and most reliable data replication platform that will save your engineering bandwidth and time multifold. Try our 14-day full access free trial today to experience an entirely automated hassle-free Data Replication!

How to Set Up Databricks Kafka Connection?

Setting up your Databricks Kafka connection is a very straightforward process. You can use the Databricks Connector for Apache Kafka to create a connection. This connector is available on the Confluent hub, and you can find the link to the connector page here –  Confluent Databricks Kafka Connector.

All Kafka versions above 0.10 are supported by this connector. For connecting to Kafka version 0.8+, the kafka08 connector can be used. 

Note: If you are using kafka08, please change your code format to format(“kafka08”) in the upcoming examples.

Quick Example

At times, you may want to keep track of the total number of words received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Confluent Databricks Streaming.

import org.apache.spark.sql.functions.{explode, split}

// Setting Up Databricks Kafka Connection

val kafka = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "hostname:port1, hostname:port2")   // comma separated list of broker:host
  .option("subscribe", "topic1, topic2")    // comma separated list of topics
  .option("startingOffsets", "latest") // read data from the end of the stream
  .load()

// split lines by whitespace and explode the array as rows of `word`
val df = kafka.select(explode(split($"value".cast("string"), "s+")).as("word"))
  .groupBy($"word")
  .count

// follow the word counts as it updates
display(df.select($"word", $"count"))

This example uses Kafka as a Structured Streaming Source and runs stateful operations (groupBy) to calculate running counts.

Schema of Records

Each record in your source system should have the following schema:

ColumnType
keybinary
valuebinary
topicstring
partitionint
offsetlong
timestamplong
timestampTypeint
headers (optional)array

Configuration

You must set the following options in your Kafka source and for both Batch and Streaming Queries.

It should be noted that Apache Kafka only supports once-write semantics. As a result, certain records may be duplicated when sent to Databricks Kafka using Streaming Queries or Batch Queries.

This might happen if Kafka has to retry a message that was not acknowledged by a Broker even if the Broker received and wrote the message record. Because of these Kafka write semantics, Structured Streaming cannot prevent such duplicates from occurring. 

If the query is successfully written, you may presume that the query output was written at least once. One approach to removing duplicates when reading written data is to use the primary (unique) key to perform de-duplication. 

OptionValueSupported Kafka VersionMeaning
assignjson string {“topicA”:[0,1],”topicB”:[2,4]}0.8, 0.10Mentions the specific TopicPartitions to consume. For the Kafka source, only one of the options “assign”, “subscribe”, or “subscribePattern” can be given.
subscribeA comma-separated list of topics0.8, 0.10Mentions the name of the topic list to subscribe to. For the Kafka source, only one of the options “assign”, “subscribe”, or “subscribePattern” can be given.
subscribePatternJava regex string0.10Mentions the pattern used to subscribe to the topic(s). For the Kafka source, only one of the options “assign”, “subscribe”, or “subscribePattern” can be given.
kafka.bootstrap.serversA comma-separated list of host:port0.8, 0.10The Kafka “bootstrap.servers” configuration.

Kafka source also supports additional configuration options which you can refer to here –  Structured Streaming Kafka Integration Guide.

Structured Streaming With Databricks Kafka 

For Structured Streaming with Databricks Kafka, you can use the following code to set up a connection to Apache Kafka.

import org.apache.spark.sql.functions.{get_json_object, json_tuple}
 
var streamingInputDF = 
  spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "topic1")     
    .option("startingOffsets", "latest")  
    .option("minPartitions", "10")  
    .option("failOnDataLoss", "true")
    .load()
import org.apache.spark.sql.functions.{get_json_object, json_tuple}
streamingInputDF: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]

Creating a Kafka Source for Streaming Queries

Kafka handles Structured Streaming Queries using a micro-batch processing engine, which processes Data Streams as a series of tiny batch tasks. This architecture enables Kafka to achieve end-to-end latencies as low as 100 milliseconds and provide fault tolerance to its users.

Here is an example to create a Databricks Kafka source for Streaming Queries.

// Subscribe to 1 topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to 1 topic, with headers
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  .as[(String, String, Array[(String, Array[Byte])])]

// Subscribe to multiple topics
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Creating a Kafka Source for Batch Queries

This is an example of how to create a Databricks Kafka source for Batch Queries.

// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Creating a Databricks Kafka Sink for Streaming Queries

Once you have created your Kafka sources, you need Kafka Sink to process and distribute your data. Here’s an example to create a Kafka Databricks Sink for Streaming Queries: 

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

Creating a Databricks Kafka Sink for Batch Queries

And, here’s an example to create a Databricks Sink for Batch Queries.

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()

Metrics

Metrics help you monitor how far behind your streaming query is with respect to the latest available offset among all the subscribed topics. Some useful ones are:

  • avgOffsetsBehindLatest which points to the average number of trailing offsets
  • maxOffsetsBehindLatest which points to the minimum number of trailing offsets
  • minOffsetsBehindLatest which points to the maximum number of trailing offsets

Note: Metrics feature is available only on Databricks Runtime 8.1 and above.

You can get an estimate of the total number of bytes the streaming query process is behind all the subscribed topics. This estimate is based on the last 300 seconds of processed batches. The timeframe for the estimate can be changed by setting the bytesEstimateWindowLength option to a different value. For example, to set it to 7 minutes, you can execute a command as follows: 

df = spark.readStream 
  .format("kafka") 
  .option("bytesEstimateWindowLength", "7m") // m for minutes, you can also use "420s" for 420 seconds

If you are running the stream in a notebook, you can see the metrics under the Raw Data tab. These are visible in the Streaming Query Progress Dashboard.

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Use SSL Connection

An SSL, or Secure Sockets Layer, connection secures sensitive data exchanged between two systems. Using this might be essential in your Databricks Kafka integration. As options, you can supply the configurations indicated by prefixing with “kafka”. For example, using the field kafka.ssl.truststore.location, you define the location of the trust store.

Databricks suggests that you:

  • Keep your certificates in S3/Azure Blob storage/Azure Data Lake Storage Gen2/GCS and access them using a DBFS mount point. When combined with cluster and task ACLs, you may limit access to the certificates to clusters that can connect to Kafka.
  • Keep your certificate passwords in a hidden scope as secrets.

After mounting pathways and storing secrets, you may run the following code:

df = spark.readStream 
  .format("kafka") 
  .option("kafka.bootstrap.servers", ...) 
  .option("kafka.security.protocol", "SASL_SSL") 
  .option("kafka.ssl.truststore.location", <dbfs-truststore-location>) 
  .option("kafka.ssl.keystore.location", <dbfs-keystore-location>) 
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>)) 
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))

More information about encryption and SSL configuration can be found in this Confluent documentation – Encryption, and Authentication with SSL.

A Simpler Way to Set Up Databricks Kafka Connection

For businesses, real-time streams have become the core that connects applications and data systems and makes available in real-time a stream of everything happening in the business. Since this data is vital, there is also a necessity to replicate it and store it safely in a cloud storage solution like a Data Warehouse. Replicating your data from Kafka to a warehouse or lakehouse like Databricks requires you to build an ETL.

But for starters, crafting an in-house ETL solution can get challenging. Ask most businesses and they’ll tell you how complicated and demanding the job is. 

A better alternative is to use an Automated & No-Code Data Pipeline solution like Hevo Data. Using Hevo you can reduce the time to deployment for your Kafka Data Pipelines significantly from weeks and months to just minutes. Hevo can readily connect to your Kafka source, among 100+ Data Sources, and streamline Data Replication from Kafka to Databricks within minutes.

Hevo Data Pipeline: Databricks Kafka | Hevo Data

Hevo Data does not only ingest your data, but it also enriches it and transforms it into an analysis-ready form without having you write a single line of code. Its fault-tolerant architecture ensures that the data is handled in a secure, consistent manner with zero data loss. 

Here are more reasons to try Hevo:

  • Fully Managed: Hevo requires no management and maintenance as it is a fully automated platform.
  • Data Transformation: Hevo provides a simple interface to perfect, modify, and enrich the data you want to transfer.
  • Faster Insight Generation: Hevo offers near real-time data replication so you have access to real-time insight generation and faster decision making. 
  • Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
  • Scalable Infrastructure: Hevo has in-built integrations for 100+ sources (with 40+ free sources) that can help you scale your data infrastructure as required.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-Day Free Trial!

Related:

Conclusion

This guide presented all the aspects of integrating Kafka Databricks. With multiple configuration options and Databricks multi-cloud support on Amazon Web Services (AWS), Microsoft Azure, and Google Cloud, setting up your Databricks Kafka connection is easy, as was scoped in this guide.

When you use Hevo, the greatest benefit you derive is that setting up your Data Pipelines is a breeze. It’s a simple 3 step process where you select your source, provide credentials and choose your target destination; and you are done.

Hevo Data can connect your frequently used applications among 100+ Data Sources to Data Warehouses like Amazon Redshift, Snowflake, Google BigQuery, Firebolt, or even Database Destinations like PostgreSQL, MySQL, or MS SQL Server in a matter of minutes. Matter of fact, you need no extensive training to use our ETL solution.

Visit our Website to Explore Hevo

Try Hevo and see the magic for yourself. Sign Up here for a 14-day free trial and experience the feature-rich Hevo suite first hand. You can also check our unbeatable pricing and make a decision on your best-suited plan. 

Have any questions on Databricks Kafka integration? Do let us know in the comment section below. We’d be happy to help.

Effortless Data Replication From Kafka to Databricks