Snowflake Triggers: How To Use Streams & Tasks?

on Snowflake, Snowflake Triggers • January 17th, 2022 • Write for Hevo

Snowflake Triggers: How To Use Streams & Tasks? | Cover

Snowflake, a cloud-based data storage and analytics service provider, is a warehouse-as-a-solution designed to cater to today’s enterprises’ needs and use cases. And one such use case — process automation — we will discuss in this tutorial article today by going into the nitty-gritty of Snowflake triggers.

Change in product engagement trends is altering market and hiring realities today. The statement draws attention because of today’s data-driven work environment, where developers must assist in projects with a last-mile connection to ensure a certain quality of work is delivered.

And, more often than not, developers’ assistance is limited to creating jobs and tasks in the production of data pipelines to automate the data load from the primary table to a new table that holds an updated history of data. We can leverage Snowflake triggers to automate pipeline creation, set outcomes to a defined and recurring time interval. In this tutorial article, we will learn, using examples, about two Snowflake triggers — streams and tasks — to help you automate the production of data pipelines. And that’s just one prime reason why Snowflake is so famous in the DevOps community.

Table of Contents

  1. Snowflake Triggers, What are Streams and Tasks?
  2. How to Set Up a Stream in Snowflake?
  3. How to Create a Task in Snowflake?
  4. Conclusion

Snowflake Triggers, What are Streams and Tasks?

Stream is a Snowflake object type, under the Snowflake triggers category, that provides Change Data Capture (CDC) capabilities. CDC helps track the delta in a table (delta load means to extract data table after a recurring interval, delta is the recurring interval value). In short, stream allows developers to place a query and extract information in a table and define changes to a table; in rows as well as between two-time intervals.

A Task is also a Snowflake object type; it defines a recurring schedule. It is recommended to use Task to execute SQL statements, including statements that query data from the stored procedures. Moreover, developers can accomplish Tasks continuously and concurrently which is considered to be the best practice for more-complex, periodic processing.

Data pipelines are generally continuous; hence, Tasks will use Streams — a better way to continuously process new/changed data. Moreover, a Task can also verify whether a Stream contains changed data for a table. If no changed data exists, Task can detect whether the pipeline has consumed, altered, or skipped the data. That is why the use of Snowflake triggers is so prevalent.

Simplify Data Analysis With Hevo’s No-code Data Pipeline!

Hevo helps you directly transfer data from Data Warehouses such as Snowflake, Google BigQuery, etc., and 100+ Data Sources in a completely hassle-free & automated manner.

Hevo Data is fully managed and completely automates the process of not only loading data from your desired source but also enriching the data and transforming it into an analysis-ready format without having to write a single line of code. Its fault-tolerant architecture ensures that the data is handled in a secured, consistent manner with zero data loss.

Get Started with Hevo for Free

Check out why Hevo Data is the Best:

  • Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
  • Schema Management: Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the destination schema.
  • Minimal Learning: Hevo, with its simple and interactive UI, is extremely simple for new customers to work on and perform operations.
  • Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
  • Incremental Data Load: Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
  • Live Support: The Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
  • Live Monitoring: Hevo allows you to monitor the data flow and check where your data is at a particular point in time.
Sign up here for a 14-Day Free Trial!

How to Set Up a Stream in Snowflake?

In this section, we will be following up, with an example, on how to set up a stream, a part of Snowflake triggers’ family, in Snowflake. The process extrapolates the process to create a Type 2 SCD, which essentially means making a record-holding containing the full history — the data changed, deleted, or primitive — of values. Let’s begin.

It’s required you to have a role with the ability to create databases, streams, and tasks. For now, we will be using the SYSADMIN role.

use role sysadmin;

Next, set up a database and schema to proceed and work in:

create database streams_and_tasks;
use database streams_and_tasks;
create schema scd;
use schema scd;

Create a table named NATION which will be part of the ETL process. The data in the table NATION will be inserted, updated, or deleted. Moreover, the table NATION will always have the current view of the data. And, for every row changed, the update_timestamp field will be updated.

create or replace table nation (
     n_nationkey number,
     n_name varchar(25),
     n_regionkey number,
     n_comment varchar(152),
     country_code varchar(2),
     update_timestamp timestamp_ntz);

The NATION_HISTORY table will be assigned the responsibility of keeping track of the changes made in the NATION table. This means the NATION_HISTORY table will be updated based on every change made in the NATION table. 

create or replace table nation_history (
    n_nationkey number,
    n_name varchar(25),
    n_regionkey number,
    n_comment varchar(152),
    country_code varchar(2),
    start_time timestamp_ntz,
    end_time timestamp_ntz,
    current_flag int);

The next step will be to create a stream, NATION_TABLE_CHANGES, onto the NATION table itself. The NATION_TABLE_CHANGES stream will be used to process data changes in the NATION table.

create or replace stream nation_table_changes on table nation;

View the stream by running the command given below:

show streams;

The following image of the NATION_TABLE_CHANGES shows the details of the existing streams — stream name, the database name, the schema name, the owner, and the table name.

Snowflake Triggers: How To Use Streams & Tasks? | image of the NATION_TABLE_CHANGES

Note: The stream is supposed to be empty at this point in time because no values were changed or added in the NATION table. Furthermore, to find out the type of DML operations changed data, in the source table, refer to the following mentioned columns: METADATA$ACTION, METADATA$ISUPDATE, and METADATA$ROW_ID. Learn more.

The below statement queries the NATION_TABLE_CHANGES stream.

select * from nation_table_changes;

Take note of the three METADATA columns in the image below — no data has been altered.

Snowflake Triggers: How To Use Streams & Tasks? | the three METADATA columns

Before moving onto the NATION_HISTORY table, let’s learn how these three columns change depending on DML operations — insert, update, or delete — run on the NATION table.

  • To generate a single row in a stream, use the insert operation.
Snowflake Triggers: How To Use Streams & Tasks? | using the insert operation
  • Two rows, in total, will be generated once an updated operation has been used, as shown below.
Snowflake Triggers: How To Use Streams & Tasks? | table after using the insert operation
  • The delete operation puts a total number of rows to one again.
Snowflake Triggers: How To Use Streams & Tasks? | The delete operation

Now that the DML operations are clear, let’s use this information to use a SQL statement and load the NATION_HISTORY table with data reflecting changes in the NATION table. The code is shown below:

create or replace view nation_change_data as
-- This subquery figures out what to do when data is inserted into the NATION table
-- An insert to the NATION table results in an INSERT to the NATION_HISTORY table
select n_nationkey, n_name, n_regionkey, n_comment, 
country_code, start_time, end_time, current_flag, 'I' as dml_type
from (select n_nationkey, n_name, n_regionkey, n_comment, country_code,
             update_timestamp as start_time,
             lag(update_timestamp) over (partition by n_nationkey order by update_timestamp desc) as end_time_raw,
             case when end_time_raw is null then '9999-12-31'::timestamp_ntz else end_time_raw end as end_time,
             case when end_time_raw is null then 1 else 0 end as current_flag
      from (select n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp
            from nation_table_changes
            where metadata$action = 'INSERT'
            and metadata$isupdate = 'FALSE'))
union
-- This subquery figures out what to do when data is updated in the NATION table
-- An update to the NATION table results in an update AND an insert to the NATION_HISTORY table
-- The subquery below generates two records, each with a different dml_type
select n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag, dml_type
from (select n_nationkey, n_name, n_regionkey, n_comment, country_code,
             update_timestamp as start_time,
             lag(update_timestamp) over (partition by n_nationkey order by update_timestamp desc) as end_time_raw,
             case when end_time_raw is null then '9999-12-31'::timestamp_ntz else end_time_raw end as end_time,
             case when end_time_raw is null then 1 else 0 end as current_flag, 
             dml_type
      from (-- Identify data to insert into nation_history table
            select n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp, 'I' as dml_type
            from nation_table_changes
            where metadata$action = 'INSERT'
            and metadata$isupdate = 'TRUE'
            union
            -- Identify data in NATION_HISTORY table that needs to be updated
            select n_nationkey, null, null, null, null, start_time, 'U' as dml_type
            from nation_history
            where n_nationkey in (select distinct n_nationkey 
                                  from nation_table_changes
                                  where metadata$action = 'INSERT'
                                  and metadata$isupdate = 'TRUE')
     and current_flag = 1))
union
-- This subquery figures out what to do when data is deleted from the NATION table
-- A deletion from the NATION table results in an update to the NATION_HISTORY table
select nms.n_nationkey, null, null, null, null, nh.start_time, current_timestamp()::timestamp_ntz, null, 'D'
from nation_history nh
inner join nation_table_changes nms
   on nh.n_nationkey = nms.n_nationkey
where nms.metadata$action = 'DELETE'
and   nms.metadata$isupdate = 'FALSE'
and   nh.current_flag = 1;

The following SQL statement given below is required to be executed. This SQL statement will not return any data as the stream hasn’t captured anything yet.

select * from nation_change_data;

After creating the NATION_CHANGE_DATA view, include the MERGE statement as shown below.

merge into nation_history nh -- Target table to merge changes from NATION into
using nation_change_data m -- nation_change_data is a view that holds the logic that determines what to insert/update into the NATION_HISTORY table.
   on  nh.n_nationkey = m.n_nationkey -- n_nationkey and start_time determine whether there is a unique record in the NATION_HISTORY table
   and nh.start_time = m.start_time
when matched and m.dml_type = 'U' then update -- Indicates the record has been updated and is no longer current and the end_time needs to be stamped
    set nh.end_time = m.end_time,
        nh.current_flag = 0
when matched and m.dml_type = 'D' then update -- Deletes are essentially logical deletes. The record is stamped and no newer version is inserted
    set nh.end_time = m.end_time,
        nh.current_flag = 0
when not matched and m.dml_type = 'I' then insert -- Inserting a new n_nationkey and updating an existing one both result in an insert
           (n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag)
    values (m.n_nationkey, m.n_name, m.n_regionkey, m.n_comment, m.country_code, m.start_time, m.end_time, m.current_flag);

How to Create a Task in Snowflake?

Task, a part of Snowflake triggers’ family, minimizes the workload by automating the execution part in the MERGE command. Using Task in Snowflake, you can schedule the MERGE statement and run it as a recurring command line. In this section — using the same example used in the stream section — we will be executing the MERGE command using Task in the NATION_TABLE_CHANGES stream.

The below are the steps to create the TASKADMIN role — we will be running this as SYSADMIN; hence TASKADMIN role is now granted to SYSADMIN.

--Set up TASKADMIN role
use role securityadmin;
create role taskadmin;
-- Set the active role to ACCOUNTADMIN before granting the EXECUTE TASK privilege to TASKADMIN
use role accountadmin;
grant execute task on account to role taskadmin;

-- Set the active role to SECURITYADMIN to show that this role can grant a role to another role 
use role securityadmin;
grant role taskadmin to role sysadmin;

Note: You can start with creating a Task (to automate the MERGE command) once the SYSADMIN role has been approved for the TASKADMIN role.

Next, let’s create a task warehouse (It’s necessary to have a warehouse to run Tasks)

create warehouse if not exists task_warehouse with warehouse_size = 'XSMALL' auto_suspend = 120;

Run the given below SQL command to create a task:

-- Create a task to schedule the MERGE statement
create or replace task populate_nation_history warehouse = task_warehouse schedule = '1 minute' when system$stream_has_data('nation_table_changes')
as   
merge into nation_history nh
using nation_change_data m
   on nh.n_nationkey = m.n_nationkey
   and nh.start_time = m.start_time
when matched and m.dml_type = 'U' then update
    set nh.end_time = m.end_time,
        nh.current_flag = 0
when matched and m.dml_type = 'D' then update
    set nh.end_time = m.end_time,
        nh.current_flag = 0
when not matched and m.dml_type = 'I' then insert
           (n_nationkey, n_name, n_regionkey, n_comment, 
country_code, start_time, end_time, current_flag)
    values (m.n_nationkey, m.n_name, m.n_regionkey, m.n_comment, 
m.country_code, m.start_time, m.end_time, m.current_flag);

Check the status of the Task using the command given below:

show tasks;

By default, the Task remains suspended after its creation. Hence, the Figure given below shows the Task is suspended.

Snowflake Triggers: How To Use Streams & Tasks? | the Figure shows the Task is suspended.

To resume the Task, run the command given below:

-- Resume the task
alter task populate_nation_history resume;
show tasks;

The Figure below shows the Task has been started.

Snowflake Triggers: How To Use Streams & Tasks? | The Figure shows the Task has been started.

The query given below will show you when the Task will run next:

select timestampdiff(second, current_timestamp, scheduled_time) as next_run, scheduled_time, current_timestamp, name, state 
from table(information_schema.task_history()) where state = 'SCHEDULED' order by completed_time desc;

The result for the query shows the Task will run after 32 seconds.

Snowflake Triggers: How To Use Streams & Tasks? | The result for the query shows the Task will run after 32 seconds.

Conclusion

On an ending note, let’s summarize what we have learned. In this tutorial article, we discussed a process automation technique using Snowflake triggers — for developers to take advantage of and reduce their workload. Snowflake triggers make it possible to avail history of data that is changed, deleted, or added. Before, this process required developers’ assistance and, in general, the time utilized to create data pipelines could be better utilized elsewhere.

To know more about Snowflake, Snowflake triggers, and other functionalities either of the three articles cited below can help.

Furthermore, in today’s data-driven work environment it’s imperative to have products that make developers’ life easy — and Hevo is one such product.

Hevo Data is a no-code data pipeline platform that helps new-age businesses integrate their data from multiple sources systems to a data warehouse and plug this unified data into any BI tool or Business Application. The platform provides 100+ ready-to-use integrations with a range of data sources and is trusted by hundreds of data-driven organizations from 30+ countries.

Visit our Website to Explore Hevo
Hevo Product Video

Want to take Hevo for a spin? Sign Up for a 14-day free trial and experience the feature-rich Hevo suite first hand. You can also have a look at the unbeatable pricing that will help you choose the right plan for your business needs.

Don’t forget to comment below your experience of reading today’s article, Snowflake Triggers: How to use Streams and Tasks?

No-code Data Pipeline For Snowflake