Change Data Capture using Snowflake Streams

Pooja Kelgaonkar
Dev Genius
Published in
6 min readOct 20, 2022

--

Thanks for reading my earlier blogs, We are going to learn more about implementing slowly changing dimensions or Change data capture in Snowflake using Snowflake — Streams. If you want to know more about Change Data Capture (CDC) , you can read more about it here — https://blog.devgenius.io/change-data-capture-in-dwbi-etl-elt-implementations-af08b54d8fb5

Let’s start learning more about Snowflake — Streams with the help of some common questions —

What is Stream?

Stream is Snowflake object which we can create, alter, or delete on another snowflake objects — tables. We can create stream on any tables like — Standard table, Directory Tables, External tables as well as tables underlying in a View. We can create stream on top of tables which we want to track for slowly changing dimensions or CDC.

How Stream works?

Once we create Stream on any table, it tracks any DML changes on that table. This objects keeps track of any inserts, updates or deletes happening on source table and list down operations in Streams.

How Stream captures changes?

Once stream is defined on top of any table, it creates a point in time snapshot of table as current transactional version of table. Stream compares the changes with snapshot version and identifies changes done to source table and logs into Stream.

What is captured in Stream?

As we know by now, stream tracks, compares and logs any transactional or DML changes to the source object. These changes are logged into Stream created on top of that table. Stream stores information in the form of before and after stage of columns, rows modified along with its state of action — insert/update or delete in one of metadata column. Stream doesn’t hold any table data however it just holds log data of changes.

What is format of Stream? Does it have any columns used to log or track changes?

Stream doesnot store any table data however it does have structure like a table with all columns from source table and addition of below metadata related columns –

METADATA$ACTION — Indicates the DML operation (INSERT, DELETE) recorded.

METADATA$ISUPDATE — Indicates whether the operation was part of an UPDATE statement. Updates to rows in the source object are represented as a pair of DELETE and INSERT records in the stream with a metadata column METADATA$ISUPDATE values set to TRUE.

METADATA$ROW_ID — Specifies the unique and immutable ID for the row, which can be used to track changes to specific rows over time.

What are the types of Streams?

Stream can also be done in different types as below –

1. Insert-only : This is supported only for external tables . This stream tracks rows those are insert operations. This doesn’t capture any delete operations.

2. Append — Only : This is supported for — Standard tables, directory tables or views. This tracks rows those are inserted. Update, delete or any truncate operations are not tracked with these types of Streams

3. Standard : This is supported for — Standard tables, directory tables or views. As this is standard Stream , this tracks all types of DML on source objects like inserts, updates or deletes.

Can we track CDC for Geospatial data using Streams?

Yes we can track CDC using Streams however we can not create standard stream for geospatial data although CDC of geospatial data can be done using append-only streams.

How long Stream can track data? Is there any validity of change logs being maintained ?

Stream data can be stale and this is dependent on the data retention period of source table. This is not applicable to external tables or directory tables as there is no data retention policy for these tables. If source table have data retention of 14 days then the stream can also hold change log upto 14 days. If the change log data is not consumed within 14 days then the data gets extended upto max of 14 days to ensure consumption of change logs and avoid missing or deletion of change logs from Streams.

How can we create Streams?

Stream can be created using DDL statement. We can create or alter stream using below set of DDLs. Lets create one sample table as source table to create Stream –

-- Create a table to store the names and fees paid by members of a gym
create or replace table members (
id number(8) not null,
name varchar(255) default null,
fee number(3) null
);

-- Create a stream to track changes to date in the MEMBERS table
create or replace stream member_check on table members;

here, member_check is stream created on source table members. Whenever any DML changes are running on source table, stream — member_check will capture them.

How can we use tracked changes using Streams?

Now, we have a source table — members and a stream defined — members_check. Lets create another table and run few insert statements to source table — members and signup.

create or replace table signup (
id number(8),
dt date
);

insert into members (id,name,fee)
values
(1,'Joe',0),
(2,'Jane',0),
(3,'George',0),
(4,'Betty',0),
(5,'Sally',0);

insert into signup
values
(1,'2018-01-01'),
(2,'2018-02-15'),
(3,'2018-05-01'),
(4,'2018-07-16'),
(5,'2018-08-21');

Now, lets see how the stream logs changes done to members — source table.

select * from member_check;

+----+--------+-----+-----------------+-------------------+------------------------------------------+
| ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
|----+--------+-----+-----------------+-------------------+------------------------------------------|
| 1 | Joe | 0 | INSERT | False | d200504bf3049a7d515214408d9a804fd03b46cd |
| 2 | Jane | 0 | INSERT | False | d0a551cecbee0f9ad2b8a9e81bcc33b15a525a1e |
| 3 | George | 0 | INSERT | False | b98ad609fffdd6f00369485a896c52ca93b92b1f |
| 4 | Betty | 0 | INSERT | False | e554e6e68293a51d8e69d68e9b6be991453cc901 |
| 5 | Sally | 0 | INSERT | False | c94366cf8a4270cf299b049af68a04401c13976d |
+----+--------+-----+-----------------+-------------------+------------------------------------------+

How we can use change log data from Streams and apply to target tables?

As we have seen, members_check is stream and it has same columns/DDL as source table along with 4 metadata columns. We can apply changes based on the metadata action column. Lets se one sample example using members, signup and members_check. Lets consider we have 30 days free trial and then offers $90 Fee per month. We need to consider the members and signup details along with latest data inserted log fron members_check and calculate FEE column in target table. We can use sql as below –

merge into members musing (select id, dtfrom signup swhere datediff(day, '2018-08-15'::date, s.dt::date) < -30) son m.id = s.idwhen matched then update set m.fee = 90;

Once we run this SQL, target table gets FEE updated. Lets see the records from members table –

select * from members;+----+--------+-----+| ID | NAME   | FEE ||----+--------+-----||  1 | Joe    |  90 ||  2 | Jane   |  90 ||  3 | George |  90 ||  4 | Betty  |   0 ||  5 | Sally  |   0 |+----+--------+-----+

Using MERGE statement we can easily capture changes from Streams and apply to target table. We can also implement SCD type II using same Streams logic and MERGE statement.

I hope this blog will help to understand Streams and implementation of streams to capture changes from source table and applying them to target table. We can also automate the process and capture the changes near real time to target tables. Automation can be done using Snowflake Tasks.

About Me :

I am DWBI and Cloud Architect! I am currently working as Senior Data Architect — GCP, Snowflake. I have been working with various Legacy data warehouses, Bigdata Implementations, Cloud platforms/Migrations. I am SnowPro Core certified Data Architect as well as Google certified Google Professional Cloud Architect. You can reach out to me LinkedIn you need any further help on certification, Data Solutions and Implementations!

--

--

My words keep me going, Keep me motivating to reach out to more and more!