Snowflake is a fully managed data warehouse system that is Cloud-based and enables superior Data Analytics. Snowflake manages its Cloud infrastructure and data using AWS, Azure, and GCP. Snowflake performs SQL queries on the data in order to convert and provide insights.
Apache Kafka is a distributed open-source technology that allows you to publish and subscribe to a large number of messages from one end to the other. Kafka makes use of the Broker idea to duplicate and persist messages in a fault-tolerant manner while also separating them into subjects.
Kafka is a programming language that is used to create real-time streaming data pipelines and streaming applications that convert and send data from a source to a destination.
This blog post aims to discuss an event streaming platform, Kafka and a Cloud-hosted data warehouse, Snowflake. You will also know how you can connect Snowflake to Kafka. Snowflake to Kafka export cannot be done directly. To export data from Snowflake to Kafka, first you will have to export data from Snowflake as CSV files and then load the CSV files into Kafka.
What is Snowflake?
Snowflake is a fully managed SaaS (Software as a Service) platform that brings together Data Warehousing, Data Lakes, Data Engineering, Data Science, Data Application Development, and Secure Sharing and Consumption of Real-time / Shared Data into one platform.
Snowflake delivers out-of-the-box capabilities including Storage and Compute Separation, On-the-fly Scalable Compute, Data Sharing, Data Cloning, and third-party Tool Support to suit the demanding needs of expanding enterprises.
Snowflake isn’t built on top of any existing database or “Big Data” software platforms like Hadoop. Snowflake, on the other hand, combines a cutting-edge Cloud Architecture with a brand-new SQL query engine. Users get all of the features and capabilities of an enterprise analytic database, plus a lot more, with Snowflake.
Key Features of Snowflake
Snowflake as a Software as a Service (SaaS) solution has the following features:
- Snowflake enables you to break down silos and ensure that everyone in your organization has access to relevant data, leading in better data-driven decision-making. This is a critical first step toward improving partner relationships, optimizing pricing, lowering operational costs, and increasing sales effectiveness, among other things.
- Snowflake allows you to improve the quality and speed of your analytics by switching from nightly Batch Loads to Real-time Data Streams.
- The Caching Paradigm is used by Snowflake to deliver cache results quickly.
- You can use Snowflake to better understand customer behavior and product usage.
- Snowflake gives you the ability to build your own Data Exchange, allowing you to securely communicate live, controlled data.
What is Kafka?
Apache Kafka is a widely used Distributed Data Streaming platform that enables the creation of real-time event-driven applications. Kafka is a free, open-source application that lets you store, read, and analyze data streams.
Kafka is a distributed system that can run as a cluster across numerous machines. Users can obtain high throughput, minimal latency, great compute power, and more by leveraging its distributed nature, and can manage massive volumes of data with no discernible lag in performance.
Kafka, which is written in Scala, accepts input from a vast number of external Data Sources and organizes it into “Topics.” To read, write, and process events, Kafka uses two functions: “Producers” and “Consumers.”
Key Features of Kafka
Take a look at the key characteristics that have contributed to Kafka’s enormous appeal.
- Integrations: Kafka includes a set of connectors that make moving data into and out of the system simple. Kafka Connect enables developers to connect to a large number of event sources and sinks, including AWS S3, PostgreSQL, MySQL, Elasticsearch, and others.
- Ease of Use: Kafka is a simple platform that doesn’t require a lot of programming experience to get started with. Kafka provides a wealth of tools, including documentation, tutorials, videos, and projects, to assist developers in learning and developing applications using the Kafka CLI.
- Fault-Tolerant: Kafka’s fault-tolerant clusters keep data in distributed and durable clusters safe and secure. Kafka is extremely dependable, and it also lets you establish new bespoke connections to meet your specific requirements.
- Scalability: Kafka’s scalability allows it to manage massive amounts of data streams and billions of messages each day. Organizations may simply scale production clusters up to a thousand brokers thanks to Kafka’s strong scalability.
- High Availability: Kafka is lightning fast and guarantees zero downtime, ensuring that your data is accessible at all times. Kafka efficiently replicates your data across several clusters with no data loss.
Steps to Connect Snowflake to Kafka
You cannot directly export the data from Snowflake to Kafka. To export data from Snowflake to Kafka, first you will have to export data from Snowflake as CSV files and then load the CSV files into Kafka.
Step 1: Export Data from Snowflake as CSV
The first step in exporting data from Snowflake to Kafka is exporting data from Snowflake as CSV files. There are different ways to do so:
1) Using a BI Tool
You can use a BI Tool to start your Snowflake to Kafka journey. Connecting Snowflake to a BI tool that allows you to query your Snowflake instance directly and download the query results is by far the quickest and easiest way to export CSV files from Snowflake.
Metabase is one of the best BI Tools to export CSV files because most folks find them really easy to use for SQL analytics. Both options allow you to export a CSV to your desktop with a single click.
The download button can be found in the bottom right corner of the SQL results lists in Metabase’s web UI. You can export CSVs from Mode using their API or add a button to a Notebook that allows users to export CSV files.
2) Using COPY Command and Storing CSV File in Cloud
You can always use the COPY command to begin your Snowflake to Kafka data transfer. To begin, you will require a SQL client interface capable of connecting to your Snowflake. When combined with a Cloud platform like AWS, GCP, or Azure, the COPY command becomes extremely powerful. This command allows you to save a query result to the Cloud platform.
That is S3 for AWS. That’s Cloud Storage Bucket for CGP and Azure Container for Azure. You can then download files to your local storage. If you’re particularly inventive, you can route them to other destinations from there. (However, before you do so, learn about reverse ETL.)
Let’s take a look at an AWS command to export data into S3 to get a sense of how a COPY command looks.
copy into s3://mybucket/unload/ from mytable storage_integration = s3_int;
The COPY command operates similarly in GCP and Azure. The COPY command includes an export file path, table name, and connection information. Refer to the create statement below to define your storage integration parameter.
create storage integration s3_int
type = external_stage
storage_provider = s3
storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole'
enabled = true
storage_allowed_locations = ('s3://mybucket1/path1/', 's3://mybucket2/path2/');
The parameter names in this case are fairly straightforward and descriptive. The create syntax also follows similar rules for other Cloud platforms.
3) Using SnowSQL
SnowSQL can also be used to start your Snowflake to Kafka journey. SnowSQL is a command-line tool that connects to your Snowflake cluster and allows you to run DDL and DML queries.
DDL stands for Data Definition Language (queries to manage how data is stored) and DML stands for Data Manipulation Language (queries to manipulate data) (i.e. queries to modify the data itself). It can also be used to load and unload data. Depending on your operating system, the installing process may vary.
You must first install SnowSQL. Check out this Snowflake document when you’re ready to dive in. Let’s focus on exporting CSVs with SnowSQL.
If you use Linux or macOS, the syntax to export follows as below:
snowsql -c my_example_connection
-d sales_db
-s public
-q 'select * from mytable limit 10'
-o output_format=csv
-o header=false
-o timing=false > output_file.csv
If you wish to run it in a Windows terminal, simply replace the single quotes in the select query to double quotes.
snowsql -c my_example_connection
-d sales_db
-s public
-q "select * from mytable limit 10"
-o output_format=csv > output_file.csv
You may easily export a query result on your operating system using SnowSQL instead of using a separate SQL client. If you’re a command-line wizard, you could even send it to your boss through email.
The first step in carrying out Snowflake to Kafka data transfer is completing by having the data as a CSV file.
Step 2: Load CSV Data into Kafka
The second step in exporting data from Snowflake to Kafka is importing CSV data into Kafka.
The Kafka Connect SpoolDir connector supports a number of flat file formats, including CSV. Get it from Confluent Hub. Once you’ve installed it in your Kafka Connect worker, you’ll need to restart it for it to take effect. Run the following command to see if it’s true:
$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'SpoolDir'
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector"
1) Load data from CSV to Kafka
If you have a header row containing field names, you can use these to create the schema throughout the ingestion process.
Construct the connector:
curl -i -X PUT -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config
-d '{
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"topic": "orders_spooldir_00",
"input.path": "/data/unprocessed",
"finished.path": "/data/processed",
"error.path": "/data/error",
"input.file.pattern": ".*.csv",
"schema.generation.enabled":"true",
"csv.first.row.as.header":"true"
}'
Go to a Kafka consumer now and look at the data. kafkacat is opened here.
$ docker exec kafkacat
kafkacat -b kafka:29092 -t orders_spooldir_00
-C -o-1 -J
-s key=s -s value=avro -r http://schema-registry:8081 |
jq '.payload'
{
"order_id": {
"string": "500"
},
"customer_id": {
"string": "424"
},
"order_total_usd": {
"string": "160312.42"
},
"make": {
"string": "Chevrolet"
},
"model": {
"string": "Suburban 1500"
},
"delivery_city": {
"string": "London"
},
"delivery_company": {
"string": "Predovic LLC"
},
"delivery_address": {
"string": "2 Sundown Drive"
}
}
Furthermore, the metadata from the file itself is included in the header of the Kafka message:
2) Setting the message key
Set schema.generation.key.fields to the name of the field(s) you’d like to use for the Kafka message key, assuming you have a header row to supply field names. If you’re running this after the first example, keep in mind that the connector moves your file, so you’ll need to return it to the input.path location for it to be processed again.
The message key in the resulting Kafka message is order_id:
docker exec kafkacat
kafkacat -b kafka:29092 -t orders_spooldir_01 -o-1
-C -J
-s key=s -s value=avro -r http://schema-registry:8081 |
jq '{"key":.key,"payload": .payload}'
{
"key": "Struct{order_id=3}",
"payload": {
"order_id": {
"string": "3"
},
"customer_id": {
"string": "695"
},
"order_total_usd": {
"string": "155664.90"
},
"make": {
"string": "Toyota"
},
"model": {
"string": "Avalon"
},
"delivery_city": {
"string": "Brighton"
},
"delivery_company": {
"string": "Jacobs, Ebert and Dooley"
},
"delivery_address": {
"string": "4 Loomis Crossing"
}
}
}
3) Changing Schema Field Types
Although the connection does a good job of establishing the schema, you may want to override it. You can declare the entire schema using the value.schema configuration, however you might be fine with it inferring the entire schema except for a few fields. To munge it, you can use Single Message Transform:
curl -i -X PUT -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-02/config
-d '{
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"topic": "orders_spooldir_02",
"input.path": "/data/unprocessed",
"finished.path": "/data/processed",
"error.path": "/data/error",
"input.file.pattern": ".*.csv",
"schema.generation.enabled":"true",
"schema.generation.key.fields":"order_id",
"csv.first.row.as.header":"true",
"transforms":"castTypes",
"transforms.castTypes.type":"org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTypes.spec":"order_id:int32,customer_id:int32,order_total_usd:float32"
}'
You can see that the field data types have been configured as stated in the schema that was developed and stored in the Schema Registry:
➜ curl --silent --location --request GET 'http://localhost:8081/subjects/orders_spooldir_02-value/versions/latest' |jq '.schema|fromjson'
{
"type": "record", "name": "Value", "namespace": "com.github.jcustenborder.kafka.connect.model",
"fields": [
{ "name": "order_id", "type": [ "null", "int" ], "default": null },
{ "name": "customer_id", "type": [ "null", "int" ], "default": null },
{ "name": "order_total_usd", "type": [ "null", "float" ], "default": null },
{ "name": "make", "type": [ "null", "string" ], "default": null },
{ "name": "model", "type": [ "null", "string" ], "default": null },
{ "name": "delivery_city", "type": [ "null", "string" ], "default": null },
{ "name": "delivery_company", "type": [ "null", "string" ], "default": null },
{ "name": "delivery_address", "type": [ "null", "string" ], "default": null }
],
"connect.name": "com.github.jcustenborder.kafka.connect.model.Value"
}
If you only need CSV in your Kafka then here it is. Note that here a separate connection class has been used, and you’re writing the values with org.apache.kafka.connect.storage.StringConverter.
curl -i -X PUT -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-03/config
-d '{
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"topic": "orders_spooldir_03",
"input.path": "/data/unprocessed",
"finished.path": "/data/processed",
"error.path": "/data/error",
"input.file.pattern": ".*.csv"
}'
The end result here is just CSV.
➜ docker exec kafkacat
kafkacat -b kafka:29092 -t orders_spooldir_03 -o-5 -C -u -q
496,456,80466.80,Volkswagen,Touareg,Leeds,Hilpert-Williamson,96 Stang Junction
497,210,57743.67,Dodge,Neon,London,Christiansen Group,7442 Algoma Hill
498,88,211171.02,Nissan,370Z,York,"King, Yundt and Skiles",3 1st Plaza
499,343,126072.73,Chevrolet,Camaro,Sheffield,"Schiller, Ankunding and Schumm",8920 Hoffman Place
500,424,160312.42,Chevrolet,Suburban 1500,London,Predovic LLC,2 Sundown Drive
4) Managing Schema
So far, you’ve read CSV data into Kafka but that isn’t the end of the story. Here’s how we use ksqlDB to declare the orders topic we wrote to as a stream:
ksql> CREATE STREAM ORDERS_02 WITH (KAFKA_TOPIC='orders_spooldir_02',VALUE_FORMAT='AVRO');
Message
----------------
Stream created
----------------
After that, and because a schema was generated at the time of ingestion, you can see all of the fields that are available:
ksql> DESCRIBE ORDERS_02;
Name : ORDERS_02
Field | Type
-------------------------------------------
ROWKEY | VARCHAR(STRING) (key)
ORDER_ID | INTEGER
CUSTOMER_ID | INTEGER
ORDER_TOTAL_USD | DOUBLE
MAKE | VARCHAR(STRING)
MODEL | VARCHAR(STRING)
DELIVERY_CITY | VARCHAR(STRING)
DELIVERY_COMPANY | VARCHAR(STRING)
DELIVERY_ADDRESS | VARCHAR(STRING)
-------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>
and execute queries against the data in Kafka:
ksql> SELECT DELIVERY_CITY, COUNT(*) AS ORDER_COUNT, MAX(CAST(ORDER_TOTAL_USD AS DECIMAL(9,2))) AS BIGGEST_ORDER_USD FROM ORDERS_02 GROUP BY DELIVERY_CITY EMIT CHANGES;
+---------------+-------------+---------------------+
|DELIVERY_CITY |ORDER_COUNT |BIGGEST_ORDER_USD |
+---------------+-------------+---------------------+
|Bradford |13 |189924.47 |
|Edinburgh |13 |199502.66 |
|Bristol |16 |213830.34 |
|Sheffield |74 |216233.98 |
|London |160 |219736.06 |
The data you just ingested as a CSV file into a separate topic to maintain schema integrity.
ksql> CREATE STREAM ORDERS_03 WITH (KAFKA_TOPIC='orders_spooldir_03',VALUE_FORMAT='DELIMITED');
No columns supplied.
There aren’t any columns provided. There’s no point in having a schema if you don’t have one. If you wish to query the data in SQL, stream it to a data lake, or do anything else with it, you’ll need to declare that schema at some point. As a result, CSV is a horrible way to share data between systems as a schemaless serialization method.
You can use your CSV data in ksqlDB if you really want to; all you have to do is enter the schema, which is error-prone and time-consuming. You must enter it each time you use the data, and every other data consumer must do it as well.
It makes a lot more sense to declare it once at ingestion and then make it available to everyone. You have successfully done Snowflake to Kafka data transfer!
Conclusion
In this article, you have learned how to effectively connect Snowflake to Kafka using CSV files. You also learned how to map data such that Schema Integrity is maintained.
Creating & managing multiple connections between Snowflake, Google BigQuery, Google sheets and all the applications in your business is a time-consuming & resource-intensive task.
You can opt for a more economical & effortless approach by automating your workflow via a Cloud-Based Reverse ETL Tool like Hevo Activate!