Snowflake, a cloud-based data storage and analytics service provider, is a warehouse-as-a-solution designed to cater to today’s enterprises’ needs and use cases. And one such use case — process automation — we will discuss in this tutorial article today by going into the nitty-gritty of Snowflake triggers.
Change in product engagement trends is altering market and hiring realities today. The statement draws attention because of today’s data-driven work environment, where developers must assist in projects with a last-mile connection to ensure a certain quality of work is delivered.
And, more often than not, developers’ assistance is limited to creating jobs and tasks in the production of data pipelines to automate the data load from the primary table to a new table that holds an updated history of data. We can leverage Snowflake triggers to automate pipeline creation, set outcomes to a defined and recurring time interval. In this tutorial article, we will learn, using examples, about two Snowflake triggers — streams and tasks — to help you automate the production of data pipelines. And that’s just one prime reason why Snowflake is so famous in the DevOps community.
Table of Contents
- Snowflake Triggers, What are Streams and Tasks?
- How to Set Up a Stream in Snowflake?
- How to Create a Task in Snowflake?
Snowflake Triggers, What are Streams and Tasks?
Stream is a Snowflake object type, under the Snowflake triggers category, that provides Change Data Capture (CDC) capabilities. CDC helps track the delta in a table (delta load means to extract data table after a recurring interval, delta is the recurring interval value). In short, stream allows developers to place a query and extract information in a table and define changes to a table; in rows as well as between two-time intervals.
A Task is also a Snowflake object type; it defines a recurring schedule. It is recommended to use Task to execute SQL statements, including statements that query data from the stored procedures. Moreover, developers can accomplish Tasks continuously and concurrently which is considered to be the best practice for more-complex, periodic processing.
Data pipelines are generally continuous; hence, Tasks will use Streams — a better way to continuously process new/changed data. Moreover, a Task can also verify whether a Stream contains changed data for a table. If no changed data exists, Task can detect whether the pipeline has consumed, altered, or skipped the data. That is why the use of Snowflake triggers is so prevalent.
Simplify Data Analysis With Hevo’s No-code Data Pipeline!
Hevo Data is fully managed and completely automates the process of not only loading data from your desired source but also enriching the data and transforming it into an analysis-ready format without having to write a single line of code. Its fault-tolerant architecture ensures that the data is handled in a secured, consistent manner with zero data loss.Get Started with Hevo for Free
Check out why Hevo Data is the Best:
- Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
- Schema Management: Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the destination schema.
- Minimal Learning: Hevo, with its simple and interactive UI, is extremely simple for new customers to work on and perform operations.
- Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
- Incremental Data Load: Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
- Live Support: The Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
- Live Monitoring: Hevo allows you to monitor the data flow and check where your data is at a particular point in time.
How to Set Up a Stream in Snowflake?
In this section, we will be following up, with an example, on how to set up a stream, a part of Snowflake triggers’ family, in Snowflake. The process extrapolates the process to create a Type 2 SCD, which essentially means making a record-holding containing the full history — the data changed, deleted, or primitive — of values. Let’s begin.
It’s required you to have a role with the ability to create databases, streams, and tasks. For now, we will be using the SYSADMIN role.
use role sysadmin;
Next, set up a database and schema to proceed and work in:
create database streams_and_tasks; use database streams_and_tasks; create schema scd; use schema scd;
Create a table named NATION which will be part of the ETL process. The data in the table NATION will be inserted, updated, or deleted. Moreover, the table NATION will always have the current view of the data. And, for every row changed, the update_timestamp field will be updated.
create or replace table nation ( n_nationkey number, n_name varchar(25), n_regionkey number, n_comment varchar(152), country_code varchar(2), update_timestamp timestamp_ntz);
The NATION_HISTORY table will be assigned the responsibility of keeping track of the changes made in the NATION table. This means the NATION_HISTORY table will be updated based on every change made in the NATION table.
create or replace table nation_history ( n_nationkey number, n_name varchar(25), n_regionkey number, n_comment varchar(152), country_code varchar(2), start_time timestamp_ntz, end_time timestamp_ntz, current_flag int);
The next step will be to create a stream, NATION_TABLE_CHANGES, onto the NATION table itself. The NATION_TABLE_CHANGES stream will be used to process data changes in the NATION table.
create or replace stream nation_table_changes on table nation;
View the stream by running the command given below:
The following image of the NATION_TABLE_CHANGES shows the details of the existing streams — stream name, the database name, the schema name, the owner, and the table name.
Note: The stream is supposed to be empty at this point in time because no values were changed or added in the NATION table. Furthermore, to find out the type of DML operations changed data, in the source table, refer to the following mentioned columns: METADATA$ACTION, METADATA$ISUPDATE, and METADATA$ROW_ID. Learn more.
The below statement queries the NATION_TABLE_CHANGES stream.
select * from nation_table_changes;
Take note of the three METADATA columns in the image below — no data has been altered.
Before moving onto the NATION_HISTORY table, let’s learn how these three columns change depending on DML operations — insert, update, or delete — run on the NATION table.
- To generate a single row in a stream, use the insert operation.
- Two rows, in total, will be generated once an updated operation has been used, as shown below.
- The delete operation puts a total number of rows to one again.
Now that the DML operations are clear, let’s use this information to use a SQL statement and load the NATION_HISTORY table with data reflecting changes in the NATION table. The code is shown below:
create or replace view nation_change_data as -- This subquery figures out what to do when data is inserted into the NATION table -- An insert to the NATION table results in an INSERT to the NATION_HISTORY table select n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag, 'I' as dml_type from (select n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp as start_time, lag(update_timestamp) over (partition by n_nationkey order by update_timestamp desc) as end_time_raw, case when end_time_raw is null then '9999-12-31'::timestamp_ntz else end_time_raw end as end_time, case when end_time_raw is null then 1 else 0 end as current_flag from (select n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp from nation_table_changes where metadata$action = 'INSERT' and metadata$isupdate = 'FALSE')) union -- This subquery figures out what to do when data is updated in the NATION table -- An update to the NATION table results in an update AND an insert to the NATION_HISTORY table -- The subquery below generates two records, each with a different dml_type select n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag, dml_type from (select n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp as start_time, lag(update_timestamp) over (partition by n_nationkey order by update_timestamp desc) as end_time_raw, case when end_time_raw is null then '9999-12-31'::timestamp_ntz else end_time_raw end as end_time, case when end_time_raw is null then 1 else 0 end as current_flag, dml_type from (-- Identify data to insert into nation_history table select n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp, 'I' as dml_type from nation_table_changes where metadata$action = 'INSERT' and metadata$isupdate = 'TRUE' union -- Identify data in NATION_HISTORY table that needs to be updated select n_nationkey, null, null, null, null, start_time, 'U' as dml_type from nation_history where n_nationkey in (select distinct n_nationkey from nation_table_changes where metadata$action = 'INSERT' and metadata$isupdate = 'TRUE') and current_flag = 1)) union -- This subquery figures out what to do when data is deleted from the NATION table -- A deletion from the NATION table results in an update to the NATION_HISTORY table select nms.n_nationkey, null, null, null, null, nh.start_time, current_timestamp()::timestamp_ntz, null, 'D' from nation_history nh inner join nation_table_changes nms on nh.n_nationkey = nms.n_nationkey where nms.metadata$action = 'DELETE' and nms.metadata$isupdate = 'FALSE' and nh.current_flag = 1;
The following SQL statement given below is required to be executed. This SQL statement will not return any data as the stream hasn’t captured anything yet.
select * from nation_change_data;
After creating the NATION_CHANGE_DATA view, include the MERGE statement as shown below.
merge into nation_history nh -- Target table to merge changes from NATION into using nation_change_data m -- nation_change_data is a view that holds the logic that determines what to insert/update into the NATION_HISTORY table. on nh.n_nationkey = m.n_nationkey -- n_nationkey and start_time determine whether there is a unique record in the NATION_HISTORY table and nh.start_time = m.start_time when matched and m.dml_type = 'U' then update -- Indicates the record has been updated and is no longer current and the end_time needs to be stamped set nh.end_time = m.end_time, nh.current_flag = 0 when matched and m.dml_type = 'D' then update -- Deletes are essentially logical deletes. The record is stamped and no newer version is inserted set nh.end_time = m.end_time, nh.current_flag = 0 when not matched and m.dml_type = 'I' then insert -- Inserting a new n_nationkey and updating an existing one both result in an insert (n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag) values (m.n_nationkey, m.n_name, m.n_regionkey, m.n_comment, m.country_code, m.start_time, m.end_time, m.current_flag);
How to Create a Task in Snowflake?
Task, a part of Snowflake triggers’ family, minimizes the workload by automating the execution part in the MERGE command. Using Task in Snowflake, you can schedule the MERGE statement and run it as a recurring command line. In this section — using the same example used in the stream section — we will be executing the MERGE command using Task in the NATION_TABLE_CHANGES stream.
The below are the steps to create the TASKADMIN role — we will be running this as SYSADMIN; hence TASKADMIN role is now granted to SYSADMIN.
--Set up TASKADMIN role use role securityadmin; create role taskadmin; -- Set the active role to ACCOUNTADMIN before granting the EXECUTE TASK privilege to TASKADMIN use role accountadmin; grant execute task on account to role taskadmin; -- Set the active role to SECURITYADMIN to show that this role can grant a role to another role use role securityadmin; grant role taskadmin to role sysadmin;
Note: You can start with creating a Task (to automate the MERGE command) once the SYSADMIN role has been approved for the TASKADMIN role.
Next, let’s create a task warehouse (It’s necessary to have a warehouse to run Tasks)
create warehouse if not exists task_warehouse with warehouse_size = 'XSMALL' auto_suspend = 120;
Run the given below SQL command to create a task:
-- Create a task to schedule the MERGE statement create or replace task populate_nation_history warehouse = task_warehouse schedule = '1 minute' when system$stream_has_data('nation_table_changes') as merge into nation_history nh using nation_change_data m on nh.n_nationkey = m.n_nationkey and nh.start_time = m.start_time when matched and m.dml_type = 'U' then update set nh.end_time = m.end_time, nh.current_flag = 0 when matched and m.dml_type = 'D' then update set nh.end_time = m.end_time, nh.current_flag = 0 when not matched and m.dml_type = 'I' then insert (n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag) values (m.n_nationkey, m.n_name, m.n_regionkey, m.n_comment, m.country_code, m.start_time, m.end_time, m.current_flag);
Check the status of the Task using the command given below:
By default, the Task remains suspended after its creation. Hence, the Figure given below shows the Task is suspended.
To resume the Task, run the command given below:
-- Resume the task alter task populate_nation_history resume; show tasks;
The Figure below shows the Task has been started.
The query given below will show you when the Task will run next:
select timestampdiff(second, current_timestamp, scheduled_time) as next_run, scheduled_time, current_timestamp, name, state from table(information_schema.task_history()) where state = 'SCHEDULED' order by completed_time desc;
The result for the query shows the Task will run after 32 seconds.
On an ending note, let’s summarize what we have learned. In this tutorial article, we discussed a process automation technique using Snowflake triggers — for developers to take advantage of and reduce their workload. Snowflake triggers make it possible to avail history of data that is changed, deleted, or added. Before, this process required developers’ assistance and, in general, the time utilized to create data pipelines could be better utilized elsewhere.
To know more about Snowflake, Snowflake triggers, and other functionalities either of the three articles cited below can help.
- Understanding Snowflake UI: An Easy Guide
- Change Tracking Using Table Streams
- Using Streams and Tasks in Snowflake
Furthermore, in today’s data-driven work environment it’s imperative to have products that make developers’ life easy — and Hevo is one such product.
Hevo Data is a no-code data pipeline platform that helps new-age businesses integrate their data from multiple sources systems to a data warehouse and plug this unified data into any BI tool or Business Application. The platform provides 100+ ready-to-use integrations with a range of data sources and is trusted by hundreds of data-driven organizations from 30+ countries.Visit our Website to Explore Hevo
Want to take Hevo for a spin? Sign Up for a 14-day free trial and experience the feature-rich Hevo suite first hand. You can also have a look at the unbeatable pricing that will help you choose the right plan for your business needs.
Don’t forget to comment below your experience of reading today’s article, Snowflake Triggers: How to use Streams and Tasks?