Snowflake, a cloud-based data storage and analytics service provider, is a warehouse-as-a-solution designed to cater to today’s enterprises’ needs and use cases. And one such use case — process automation — we will discuss in this tutorial article today by going into the nitty-gritty of Snowflake triggers.
Change in product engagement trends is altering market and hiring realities today. The statement draws attention because of today’s data-driven work environment, where developers must assist in projects with a last-mile connection to ensure a certain quality of work is delivered.
Snowflake Triggers, What are Streams and Tasks?
Stream is a Snowflake object type, under the Snowflake triggers category, that provides Change Data Capture (CDC) capabilities. CDC helps track the delta in a table (delta load means to extract data table after a recurring interval, delta is the recurring interval value). In short, stream allows developers to place a query and extract information in a table and define changes to a table; in rows as well as between two-time intervals.
A Task is also a Snowflake object type; it defines a recurring schedule. It is recommended to use Task to execute SQL statements, including statements that query data from the stored procedures. Moreover, developers can accomplish Tasks continuously and concurrently which is considered to be the best practice for more-complex, periodic processing.
Data pipelines are generally continuous; hence, Tasks will use Streams — a better way to continuously process new/changed data. Moreover, a Task can also verify whether a Stream contains changed data for a table. If no changed data exists, Task can detect whether the pipeline has consumed, altered, or skipped the data. That is why the use of Snowflake triggers is so prevalent.
Hevo helps you directly transfer data from Data Warehouses such as Snowflake, Google BigQuery, etc., and 100+ Data Sources in a completely hassle-free & automated manner.
Check out why Hevo Data is the Best:
- Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
- Schema Management: Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the destination schema.
- Minimal Learning: Hevo, with its simple and interactive UI, is extremely simple for new customers to work on and perform operations.
Sign up here for a 14-Day Free Trial!
How to Set Up a Stream in Snowflake?
In this section, we will be following up, with an example, on how to set up a stream, a part of Snowflake triggers’ family, in Snowflake. The process extrapolates the process to create a Type 2 SCD, which essentially means making a record-holding containing the full history — the data changed, deleted, or primitive — of values. Let’s begin.
It’s required you to have a role with the ability to create databases, streams, and tasks. For now, we will be using the SYSADMIN role.
use role sysadmin;
Next, set up a database and schema to proceed and work in:
create database streams_and_tasks;
use database streams_and_tasks;
create schema scd;
use schema scd;
Create a table named NATION which will be part of the ETL process. The data in the table NATION will be inserted, updated, or deleted. Moreover, the table NATION will always have the current view of the data. And, for every row changed, the update_timestamp field will be updated.
create or replace table nation (
n_nationkey number,
n_name varchar(25),
n_regionkey number,
n_comment varchar(152),
country_code varchar(2),
update_timestamp timestamp_ntz);
The NATION_HISTORY table will be assigned the responsibility of keeping track of the changes made in the NATION table. This means the NATION_HISTORY table will be updated based on every change made in the NATION table.
create or replace table nation_history (
n_nationkey number,
n_name varchar(25),
n_regionkey number,
n_comment varchar(152),
country_code varchar(2),
start_time timestamp_ntz,
end_time timestamp_ntz,
current_flag int);
Load Data from Amazon S3 to Snowflake
Load Data From MySQL to Snowflake
Load Data from MongoDB to Snowflake
The next step will be to create a stream, NATION_TABLE_CHANGES, onto the NATION table itself. The NATION_TABLE_CHANGES stream will be used to process data changes in the NATION table.
create or replace stream nation_table_changes on table nation;
View the stream by running the command given below:
show streams;
The following image of the NATION_TABLE_CHANGES shows the details of the existing streams — stream name, the database name, the schema name, the owner, and the table name.
Note: The stream is supposed to be empty at this point in time because no values were changed or added in the NATION table. Furthermore, to find out the type of DML operations changed data, in the source table, refer to the following mentioned columns: METADATA$ACTION, METADATA$ISUPDATE, and METADATA$ROW_ID. Learn more.
The below statement queries the NATION_TABLE_CHANGES stream.
select * from nation_table_changes;
Take note of the three METADATA columns in the image below — no data has been altered.
Before moving onto the NATION_HISTORY table, let’s learn how these three columns change depending on DML operations — insert, update, or delete — run on the NATION table.
- To generate a single row in a stream, use the insert operation.
- Two rows, in total, will be generated once an updated operation has been used, as shown below.
- The delete operation puts a total number of rows to one again.
Now that the DML operations are clear, let’s use this information to use a SQL statement and load the NATION_HISTORY table with data reflecting changes in the NATION table. The code is shown below:
create or replace view nation_change_data as
-- This subquery figures out what to do when data is inserted into the NATION table
-- An insert to the NATION table results in an INSERT to the NATION_HISTORY table
select n_nationkey, n_name, n_regionkey, n_comment,
country_code, start_time, end_time, current_flag, 'I' as dml_type
from (select n_nationkey, n_name, n_regionkey, n_comment, country_code,
update_timestamp as start_time,
lag(update_timestamp) over (partition by n_nationkey order by update_timestamp desc) as end_time_raw,
case when end_time_raw is null then '9999-12-31'::timestamp_ntz else end_time_raw end as end_time,
case when end_time_raw is null then 1 else 0 end as current_flag
from (select n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp
from nation_table_changes
where metadata$action = 'INSERT'
and metadata$isupdate = 'FALSE'))
union
-- This subquery figures out what to do when data is updated in the NATION table
-- An update to the NATION table results in an update AND an insert to the NATION_HISTORY table
-- The subquery below generates two records, each with a different dml_type
select n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag, dml_type
from (select n_nationkey, n_name, n_regionkey, n_comment, country_code,
update_timestamp as start_time,
lag(update_timestamp) over (partition by n_nationkey order by update_timestamp desc) as end_time_raw,
case when end_time_raw is null then '9999-12-31'::timestamp_ntz else end_time_raw end as end_time,
case when end_time_raw is null then 1 else 0 end as current_flag,
dml_type
from (-- Identify data to insert into nation_history table
select n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp, 'I' as dml_type
from nation_table_changes
where metadata$action = 'INSERT'
and metadata$isupdate = 'TRUE'
union
-- Identify data in NATION_HISTORY table that needs to be updated
select n_nationkey, null, null, null, null, start_time, 'U' as dml_type
from nation_history
where n_nationkey in (select distinct n_nationkey
from nation_table_changes
where metadata$action = 'INSERT'
and metadata$isupdate = 'TRUE')
and current_flag = 1))
union
-- This subquery figures out what to do when data is deleted from the NATION table
-- A deletion from the NATION table results in an update to the NATION_HISTORY table
select nms.n_nationkey, null, null, null, null, nh.start_time, current_timestamp()::timestamp_ntz, null, 'D'
from nation_history nh
inner join nation_table_changes nms
on nh.n_nationkey = nms.n_nationkey
where nms.metadata$action = 'DELETE'
and nms.metadata$isupdate = 'FALSE'
and nh.current_flag = 1;
The following SQL statement given below is required to be executed. This SQL statement will not return any data as the stream hasn’t captured anything yet.
select * from nation_change_data;
After creating the NATION_CHANGE_DATA view, include the MERGE statement as shown below.
merge into nation_history nh -- Target table to merge changes from NATION into
using nation_change_data m -- nation_change_data is a view that holds the logic that determines what to insert/update into the NATION_HISTORY table.
on nh.n_nationkey = m.n_nationkey -- n_nationkey and start_time determine whether there is a unique record in the NATION_HISTORY table
and nh.start_time = m.start_time
when matched and m.dml_type = 'U' then update -- Indicates the record has been updated and is no longer current and the end_time needs to be stamped
set nh.end_time = m.end_time,
nh.current_flag = 0
when matched and m.dml_type = 'D' then update -- Deletes are essentially logical deletes. The record is stamped and no newer version is inserted
set nh.end_time = m.end_time,
nh.current_flag = 0
when not matched and m.dml_type = 'I' then insert -- Inserting a new n_nationkey and updating an existing one both result in an insert
(n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag)
values (m.n_nationkey, m.n_name, m.n_regionkey, m.n_comment, m.country_code, m.start_time, m.end_time, m.current_flag);
Load your Data from Source to Destination within minutes
How to Create a Task in Snowflake?
Task, a part of Snowflake triggers’ family, minimizes the workload by automating the execution part in the MERGE command. Using Task in Snowflake, you can schedule the MERGE statement and run it as a recurring command line. In this section — using the same example used in the stream section — we will be executing the MERGE command using Task in the NATION_TABLE_CHANGES stream.
Step 1: The below are the steps to create the TASKADMIN role — we will be running this as SYSADMIN; hence TASKADMIN role is now granted to SYSADMIN.
--Set up TASKADMIN role
use role securityadmin;
create role taskadmin;
-- Set the active role to ACCOUNTADMIN before granting the EXECUTE TASK privilege to TASKADMIN
use role accountadmin;
grant execute task on account to role taskadmin;
-- Set the active role to SECURITYADMIN to show that this role can grant a role to another role
use role securityadmin;
grant role taskadmin to role sysadmin;
Note: You can start with creating a Task (to automate the MERGE command) once the SYSADMIN role has been approved for the TASKADMIN role.
Step 2: Next, let’s create a task warehouse (It’s necessary to have a warehouse to run Tasks)
create warehouse if not exists task_warehouse with warehouse_size = 'XSMALL' auto_suspend = 120;
Step 3: Run the given below SQL command to create a task:
-- Create a task to schedule the MERGE statement
create or replace task populate_nation_history warehouse = task_warehouse schedule = '1 minute' when system$stream_has_data('nation_table_changes')
as
merge into nation_history nh
using nation_change_data m
on nh.n_nationkey = m.n_nationkey
and nh.start_time = m.start_time
when matched and m.dml_type = 'U' then update
set nh.end_time = m.end_time,
nh.current_flag = 0
when matched and m.dml_type = 'D' then update
set nh.end_time = m.end_time,
nh.current_flag = 0
when not matched and m.dml_type = 'I' then insert
(n_nationkey, n_name, n_regionkey, n_comment,
country_code, start_time, end_time, current_flag)
values (m.n_nationkey, m.n_name, m.n_regionkey, m.n_comment,
m.country_code, m.start_time, m.end_time, m.current_flag);
Step 4: Check the status of the Task using the command given below:
show tasks;
Step 5: By default, the Task remains suspended after its creation. Hence, the Figure given below shows the Task is suspended.
Step 6: To resume the Task, run the command given below:
-- Resume the task
alter task populate_nation_history resume;
show tasks;
Step 7: The Figure below shows the Task has been started.
Step 8: The query given below will show you when the Task will run next:
select timestampdiff(second, current_timestamp, scheduled_time) as next_run, scheduled_time, current_timestamp, name, state
from table(information_schema.task_history()) where state = 'SCHEDULED' order by completed_time desc;
Step 9: The result for the query shows the Task will run after 32 seconds.
Learn More About:
Snowflake Merge
Conclusion
On an ending note, let’s summarize what we have learned. In this tutorial article, we discussed a process automation technique using Snowflake triggers — for developers to take advantage of and reduce their workload. Snowflake triggers make it possible to avail history of data that is changed, deleted, or added. Before, this process required developers’ assistance and, in general, the time utilized to create data pipelines could be better utilized elsewhere.
Curious about how Snowflake Change Data Capture works? Check out our detailed guide to learn how this feature can enhance your data integration and management.
To know more about Snowflake, Snowflake triggers, and other functionalities either of the three articles cited below can help.
Furthermore, in today’s data-driven work environment it’s imperative to have products that make developers’ life easy — and Hevo is one such product.
Learn how to use the Snowflake INSERT command to add data into tables efficiently. This guide covers syntax, examples, and tips for managing data insertion in Snowflake. Explore the details at Snowflake Insert.
FAQ
What is trigger in Snowflake?
Snowflake does not natively support traditional triggers (like in other databases such as MySQL or PostgreSQL), which automatically fire after certain database events (e.g., INSERT, UPDATE, DELETE). However, Snowflake offers alternatives using tasks and streams to automate similar workflows.
Can Snowflake tasks be triggered manually?
Yes, Snowflake tasks can be triggered manually. You can manually execute a task using the EXECUTE TASK command. Although tasks are primarily designed to be scheduled or triggered by dependent tasks, manual execution is useful for testing or executing an ad-hoc task.
What are the three Snowflake stage types?
Snowflake stages are storage locations where data is uploaded or staged before loading into tables. There are three types of stages:
User Stage
Table Stage
Internal and External Named Stages
Yash is a Content Marketing professional with over three years of experience in data-driven marketing campaigns. He has expertise in strategic thinking, integrated marketing, and customer acquisition. Through comprehensive marketing communications and innovative digital strategies, he has driven growth for startups and established brands.