Apache Kafka is an Open-Source and Distributed Message Streaming Platform for building Event-Driven Applications. It has a vast ecosystem that comprises a dedicated number of servers or brokers to store Real-Time Streaming Data or messages. The Real-Time Data stored in Kafka servers can be further processed and analyzed to get meaningful insights.

Usually, developers or data analysts use Kafka Stream API to perform stream processing and analytics operations on Real-Time Data stored in Kafka servers. However, to implement data operations using Kafka Streams, one should have a strong technical understanding of programming languages like Java or Scala. To eliminate such prerequisites, Confluent introduced KSQL that allows you to perform Data Analytics Operations on Real-Time Data Streams.

Since KSQL queries work the same as SQL queries, even non-technical people can effectively process Real-Time Data instead of writing high-end and lengthy codes in Java or Scala. In this article, you will learn about Kafka KSQL (KSQL), Kafka SQL or query kafka topic using KSQL for processing the Real-Time Data in Kafka servers.

What is KSQL?

Image Source

KSQL is a SQL engine that allows you to process and analyze the Real-Time Streaming Data present in the Apache Kafka platform. In other words, KSQL provides an Interactive Framework for performing Stream Processing activities such as Data Aggregation, Filtering, Joining, Sessionization, Windowing, and more.

Furthermore, executing queries using KSQL is similar to running SQL queries in Relational Databases. The primary commands of KSQL like SELECT, LIMIT, JOIN, and WHERE are the same as the commands present in SQL. Kafka is used in various applications to process or Analyze Streaming Data, including continuous monitoring, Real-Time Streaming Analytics, Online Data Integration, Anomaly Detection, Data Exploration, and Arbitrary Filtering.

Simplify Kafka ETL and Data Analysis with Hevo’s No-code Data Pipeline

Hevo is the only real-time ELT No-code Data Pipeline platform that cost-effectively automates data pipelines that are flexible to your needs.

You can use Hevo Pipelines to replicate the data from your Apache Kafka Source or Kafka Confluent Cloud to the Destination system. It loads the data onto the desired Data Warehouse/destination and transforms it into an analysis-ready form without having to write a single line of code.

Hevo’s fault-tolerant and scalable architecture ensures that the data is handled in a secure, consistent manner with zero data loss and supports different forms of data. Hevo supports two variations of Kafka as a Source. Both these variants offer the same functionality, with Confluent Cloud being the fully-managed version of Apache Kafka.

Get Started with Hevo for Free

When to Use KSQL?

Real-time Monitoring and Real-time Analytics

KSQL can be used for real-time monitoring and real-time analytics on business data. Using Kafka KSQL, your users can define custom business-level metrics, perform computations in real-time and monitor them simultaneously with ease. Here’s an example query to help you do the same:

CREATE TABLE error_counts AS SELECT error_code, count(*)FROM monitoring_stream WINDOW TUMBLING (SIZE 1 MINUTE) WHERE type = 'ERROR’

Security and Anomaly Detection

In KSQL, you can define aggregation and anomaly detection queries on your real-time streams. It’s a simple and straightforward forward process, that looks like this:

CREATE TABLE possible_fraud AS SELECT card_number,
count() FROM authorization_attempts 
WINDOW TUMBLING (SIZE 5 SECONDS) 
GROUP BY card_number HAVING count() > 3;

Online Data Integration

Using Kafka KSQL, you and your teams can merge different streams of data, in real-time. Here’s one sample KSQL examples of how you can do so in KSQL.

CREATE STREAM vip_users AS SELECT userid, page, action 
FROM clickstream c 
LEFT JOIN users u ON c.userid = u.user_id 
WHERE u.level = ‘Platinum’;

Application Development

In the case of more complex applications developed in Java, Kafka’s native streams API might be the ideal solution. However, for uncomplicated applications or teams less inclined toward Java programming, a straightforward SQL interface might be the preferable option.

Materialized Views

Ksql DB enables you to create materialized views over streams and tables by referring to “persistent query”. They attain persistence because they preserve and update results incrementally using a table. Here’s an example query to help you do the same:

CREATE TABLE hourly_metrics AS
  SELECT url, COUNT(*)
  FROM page_views
  WINDOW TUMBLING (SIZE 1 HOUR)
  GROUP BY url EMIT CHANGES;

CREATE TABLE unique_visitors_per_day AS
  SELECT date_trunc('day', event_time) AS visit_date, COUNT(DISTINCT user_id) AS unique_visitors
  FROM user_events
  WHERE event_type = 'visit'
  GROUP BY visit_date EMIT CHANGES;

Streaming ETL

Apache Kafka is the most widely used option for energizing big data pipelines. ksqlDB allows a seamless data transformation within the pipeline, ensuring that messages are prepared for a transfer to another system.

CREATE STREAM vip_actions AS
  SELECT userid, page, action
  FROM clickstream c
  LEFT JOIN users u ON c.userid = u.user_id
  WHERE u.level = 'Platinum' EMIT CHANGES;

CREATE STREAM premium_user_activity AS
  SELECT c.user_id AS user_id, c.page AS visited_page, c.action AS action_performed
  FROM clickstream c
  INNER JOIN users u ON c.user_id = u.id
  WHERE u.tier = 'PREMIUM'
  EMIT CHANGES;

Integration with External Data Sources and Sinks

ksqlDB seamlessly integrates with Kafka Connect data sources and sinks, providing a centralized SQL interface over a diverse range of external systems.The following query is a simple persistent streaming query that will deliver all its output into a topic named Transformed_clicks:

CREATE STREAM Transformed_clicks AS
  SELECT userid, page, action
  FROM clickstream c
  LEFT JOIN users u ON c.userid = u.user_id EMIT CHANGES;
CREATE STREAM enriched_user_actions WITH (KAFKA_TOPIC = 'transformed_clicks') AS
  SELECT c.user_id AS user_id, c.page AS visited_page, c.action AS action_performed, u.level AS user_tier
  FROM clickstream c
  LEFT JOIN users u ON c.user_id = u.id
  EMIT CHANGES;

Instead of directing continuous query results to a Kafka topic, it is often useful to channel the output into an alternative data store. ksqlDB’s Kafka Connect integration simplifies the implementation of this pattern.

The following statement creates a Kafka Connect sink connector that sends all output from the above streaming ETL query directly into Elasticsearch:

CREATE SINK CONNECTOR es_sink WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'key.converter'   = 'org.apache.kafka.connect.storage.StringConverter',
  'topics'          = 'Transformed_clicks',
  'key.ignore'      = 'true',
  'schema.ignore'   = 'true',
  'type.name'       = '',
  'connection.url'  = 'http://elasticsearch:9200');

Fundamental Abstractions in KSQL

KSQL internally uses Kafka’s Streams API as it shares fundamental abstractions for stream processing on Kafka. KSQL introduces two fundamental abstractions that align with Kafka Streams and helps you to manipulate Kafka topics.

1. Stream:

A stream represents a limitless sequence of structured data, often referred to as “facts.” Facts within a stream are unchangeable, meaning new information can be appended to a stream, but existing facts remain unchanged. Streams can be formed from a Kafka topic or derived from pre-existing streams and tables.

2. Table:

A table can be viewed as a STREAM or another TABLE, presenting a collection of evolving facts. It is similar to a conventional database table, but it incorporates streaming semantics such as windowing. Unlike streams, facts in a table are changeable, allowing the addition of new facts and the updating or deletion of existing ones. Tables can be formed from a Kafka topic or derived from existing streams and tables.

How Does KSQL Work?

KSQL contains a Server Process that executes Kafka’s queries. A set of these processes can be run as a Kafka Cluster. If you want to add more processing power,  just launch more instances of this KSQL Server. In addition, these instances are fault-tolerant, therefore, if one of them fails, the other instance will take over their work. 

The querying procedure is straightforward, and it is carried out with the KSQL Command Line Interface (CLI), which sends the query to the Cluster over the REST API. KSQL Command Line is a useful tool for inspecting all of your existing Kafka Themes, creating Streams and Tables, and checking the progress of these queries.

Integrate Kafka to Redshift
Integrate Kafka to BigQuery
Integrate Kafka to Snowflake

Steps to Stream Real-Time Data in Apache Kafka Using KSQL

You can seamlessly stream Real-Time Data in Apache Kafka using KSQL by following the steps given below:

Step 1: Setting up the Kafka Environment

  • Step 1: Before starting to query data using Kafka Query language the data present in Kafka, you must have Kafka, Zookeeper, KSQL server, and KSQL CLI running on your local machine. You can download Kafka based on your version preferences. 
  • Step 2: After installing Kafka on your computer, you have to also ensure that the Java 8+ version is installed and running on your computer. In addition to that, set up the file path and Java_Home environment variables for enabling your operating system to point towards the Java utilities.
  • Step 3: After installing and configuring the Kafka files, you are now ready to start the Kafka and Zookeeper instances. Kafka runs with Zookeeper since Zookeeper acts as a Centralization Service for managing all the metadata of Kafka servers, producers, and consumers.
  • Step 4: You can initially start the Zookeeper instance. For that, open a new command prompt and execute the following command. 
zookeeper-server-start.bat .configzookeeper.properties

Note: In the command prompt, make sure you are in the correct file path.

  • Step 5: Now, you can start the Kafka server. Open a new command prompt and execute the following command. 
.binwindowskafka-server-start.bat .configserver.properties

At this stage, both the Zookeeper instance and Kafka server are running on your local machine. Ensure that you do not accidentally close the command prompts that run Zookeeper and Kafka servers.

Step 2: Installing KSQL

  • Step 1: You have to install the Confluent Platform CLI for performing Real-Time Data processing operations in Apache Kafka using KSQL. This CLI serves as a one-stop solution that comprises several Kafka services like Kafka servers, Zookeeper, Confluent Schema Registry, and Kafka connect. 
  • Step 2: If you have already installed and configured Apache Kafka in your local machine, you do not have to install the Confluent platform for working with KSQL. You just need to clone the Github repository for access and work with KSQL.

    Though KSQL is just a Kafka client for executing data processing queries in Kafka servers, it needs a dedicated machine or instances to run just like Kafka and Zookeeper instances.

Step 3: Setting up the KSQL Server and CLI

  • Step 1: For starting the KSQL server, execute the following command. 
bin/ksql-server-start ./etc/ksql/ksql-server.properties

In the above command, ksql-server-start is the script file for starting a KSQL process while ksql-server.properties is the KSQL server configuration file. Such configuration files will have all the Kafka cluster information like bootstrap servers and port information. 

Bootstrap server parameter allows the KSQL engine or CLI to point to the Kafka cluster for executing data processing operations. Listeners parameter shows which port KSQL CLI should listen to get KSQL server responses.

By default, the port for the KSQL Server is 8088, while the ports for Kafka and Zookeeper instances are 9092 and 2181, respectively.

  • Step 2: KSQL has a command-line interface that communicates with the KSQL server for performing querying operations on Kafka data. Execute the below command to start the KSQL CLI. 
bin/ksql http://localhost:8088

In the above command, “localhost:8088” is the listener port address of the KSQL server. The above command helps you to start a KSQL CLI instance.

A newly started KSQL CLI instance will serve as a command-line tool for writing and executing queries to perform data processing operations.On executing the above steps, you successfully started Kafka, Zookeeper, and KSQL instances. 

  • Step 3: If you have downloaded and installed the Confluent platform CLI, you eliminate the need for separately writing commands in the command line prompt to start Kafka, Zookeeper, and KSQL instances.

    You just need to execute the “Confluent Start” command to start all the pre-required instances, including Kafka Connect, KSQL server, Kafka, and Zookeeper instances. 

Step 4: Running Basic KSQL Kafka Queries

In the further steps, you will learn how to execute queries using KSQL for reading and processing Kafka data. KSQL allows you to use Kafka topics as a table, where queries are written to fetch and process data present inside those topics.

With Kafka KSQL, the querying process is as same as writing SQL queries in RDBMS. You can write KSQL queries for grouping, aggregating, filtering, merging, and processing data present in Kafka. The capabilities of KSQL make the Kafka platform a Real-Time Data warehouse for processing and analyzing continuously Streaming Data.

As this is the KSQL introduction tutorial, you will be learning how to implement basic and simple KSQL queries in Kafka. 

  • Step 1: Initially, you will create topics with a unique name. Then, you will generate or produce data into the newly created Kafka topic.

    Execute the following command to create a topic in the name of “pageviews.” The command will also produce data into the “pageviews” topic using the Data Generator (datagen) method that continuously generates data with random values in a delimited format.
<path-to-confluent>/bin/ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500

The above-given single line of code performs two operations, topic creation and data generation. 

  • Step 2: Similarly, you will be creating another Kafka topic in the name of “users” and generating random values into the topic, which are in JSON format. Execute the below command to create the second Kafka topic.
$ <path-to-confluent>/bin/ksql-datagen quickstart=users format=json topic=users maxInterval=100

On executing the above steps, you successfully created topics in Kafka. 

  • Step 3: Now, open your KSQL CLI to start querying to inspect and manipulate data present in Kafka topics. To show the list of topics you created previously, execute the below command.
SHOW TOPICS;

You will get the output, as shown below. In the output, “pageviews” and “users” are the newly created topics, whereas the other two are previously created Kafka topics.

KSQL - Show Topics Output
Image Source
  • Step 4: You can now proceed with inspecting or viewing the data present inside the Kafka topics. Execute the following command to view the data generated inside the Kafka topic called “users.”

PRINT ‘users’;

The output of the above command will look like this:

KSQL - Print Users Output
Image Source

Since you have written code to generate data in JSON format for the “users” topic, the values are present in the key-value pair format inside the respective topic.

  • Step 5: As the values are continuously generated inside the Kafka topic, the print function will endlessly print those topics. To terminate the printing operation, press CTRL+C
  • Step 6: You can also view the “pageviews” topic by executing the below command.

PRINT ‘pageviews’;

The data present inside the pageview topic will be as shown below.

KSQL - Print pageviews output
Image Source
  • Step 7: The data will be present in a comma-separated format since you used the delimiter function while creating page views topic. Press CTRL+C to terminate the printing of messages.

Kafka topics can be used as a schema or table that contains Real-Time Data or messages received from producers. However, KSQL will not allow you to directly interact or work with Kafka topics to perform querying operations. Instead, you have to create a table or stream from the respective topics to start querying with KSQL. 

A stream in Kafka is a collection of all records present in the topic with a proper schema. It comprises all the past and present data, where new records will be constantly added to the stream with respect to time. A table in Kafka is a snapshot or particular state of the stream at any given point in time.

  • Step 8: You can use the command given below to create the stream from the Kafka topic “pageviews.”
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH
(kafka_topic='pageviews', value_format='DELIMITED');

The above code creates a new stream in the name of “pageviews_original”.

  • Step 9: Now, execute the below command to create a new table from “users” topic. 
CREATE TABLE users_original (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR) WITH
(kafka_topic='users', value_format='JSON', key = 'userid');

This command will create a new table in the name of  “users_original.”

  • Step 10: On successfully creating streams and tables, run the commands SHOW STREAMS;” and “SHOW TABLEs;” to display the previously created streams and tables, respectively.

You will get the output as shown below. 

KSQL - Show Streams Output
Image Source: Confluent.io
  • Step 11: Now, you are ready to start querying using KSQL. Execute the below command to select pageid from the stream called pageviews_original. The LIMIT clause is given to limit the number of rows fetched from the respective stream. Here, you have limited the number of rows to three.
SELECT pageid FROM pageviews_original LIMIT 3;

After executing the above command, your output will be similar to the following output: 

Page_24
Page_73
Page_78
LIMIT reached for the partition.
Query terminated
  • Step 12: You can also create a new stream with the “CREATE STREAM” command. Then, values are inserted into the newly created stream by fetching values from the users_original tables. 
CREATE STREAM pageviews_enriched AS
SELECT users_original.userid AS userid, pageid, regionid, gender
FROM pageviews_original
LEFT JOIN users_original
ON pageviews_original.userid = users_original.userid;

From the above code, a new stream pageviews_enriched is created, which comprises the values like “userid” and “pageid”, fetched from the users_original table. The above command helps you to create and run the stream successfully.

 Message
----------------------------
 Stream created and running
----------------------------

On executing other KSQL commands, as shown above, you can perform various querying operations, including data filtering, aggregation, and joining.

Conclusion 

In this article, you have learned about Kafka, Kafka KSQL, and KSQL tutorial on how to install & execute KSQL queries for processing the Kafka data. This article covers the most fundamental operations for creating new tables, streams and executing basic KSQL queries. Using KSQL, you can also implement advanced querying techniques, like joining, windowing, and filtering on Kafka’s Real-Time Streaming Data. You can also implement the KSQL Create stream to create streams. 

For a complete analysis of your business performance, it is crucial to consolidate data from Apache Kafka, Apache Confluent Cloud, and all the other applications used across your business. As data grows exponentially, it becomes a time-consuming and resource-intensive task to extract complex data from all these sources.

To achieve this efficiently, you would require to invest a portion of your Engineering Bandwidth to Integrate Data from all sources, Clean & Transform it, and finally, Load it to a Cloud Data Warehouse or a destination of your choice for further Business Analytics. All of these challenges can be comfortably solved by a Cloud-Based ETL tool such as Hevo Data.  Hevo is the only real-time ELT No-code Data Pipeline platform that cost-effectively automates data pipelines that are flexible to your needs.

Hevo Data, a No-code Data Pipeline can seamlessly transfer data from a vast sea of 150+ data sources such as Apache Kafka & Kafka Confluent Cloud to a Data Warehouse or a Destination of your choice to be visualized in a BI Tool. It is a reliable, completely automated, and secure service that doesn’t require you to write any code! 

If you are using Apache Kafka & Kafka Confluent Cloud as your Message Streaming Platform and searching for a no-fuss alternative to Manual Data Integration, then Hevo can effortlessly automate this for you. Hevo, with its strong integration with 150+ sources & BI tools(Including 40+ Free Sources), allows you to not only export & load data but also transform & enrich your data & make it analysis-ready in a jiff.

Tell us about your experience of working with KSQL for Real-Time Data Streaming using Apache Kafka! Share your thoughts with us in the comments section below.

References

  1. Github

FAQs

What is KSQL used for?

KSQL is used for real-time stream processing on Kafka topics. It allows developers to write SQL-like queries to filter, transform, and aggregate streaming data without writing custom code, enabling faster insights.

What is the difference between Kafka Stream and KSQL?

Kafka Streams is a Java library for building stream processing applications, offering programmatic control. KSQL, on the other hand, is a SQL-based interface over Kafka Streams, simplifying streaming data manipulation for non-developers.

What is the difference between KSQL and Flink?

KSQL is tailored for Kafka-centric use cases with SQL-like simplicity. Flink is a more versatile, general-purpose stream and batch processing framework, supporting complex stateful operations and broader integrations beyond Kafka.

Ishwarya M
Technical Content Writer, Hevo Data

Ishwarya is a skilled technical writer with over 5 years of experience. She has extensive experience working with B2B SaaS companies in the data industry, she channels her passion for data science into producing informative content that helps individuals understand the complexities of data integration and analysis.