Skip to main content

Instant Data APIs using Snowflake table streams and Dozer

· 12 min read
Sagar

This blog post will help us understand how to build long running web APIs which are near real-time that can fetch data from a data source that is updated frequently. For this we will be using a dataset based on Github statistics that is present on Snowflake marketplace. This guide also explains how to use the Dozer Snowflake connector to fetch data and how to automate data updation on Snowflake using stored procedures and tasks.

Why shouldn't customer-facing APIs be directly exposed to Snowflake

  • Security: There is a constant risk of SQL injections and unauthorized access to backend data.

  • Cost: Unpredictable and higher costing due to frequent and variable compute usage.

  • Performance: Potential for inefficient queries by the user.

  • Flexibility & Maintainance: Middleware ensures that the APIs are not tightly coupled with the database. This makes it easier to change the data without affecting the APIs along with ensuring API version consistency.

  • Unified data: Middleware allows the seamless merging of data from multiple sources seamlessly.

  • Error grace: Handle errors gracefully without exposing them raw to the end user.

Why to use Dozer as a middleware?

Leveraging Dozer as a middleware for applications that are taking data directly from Snowflake has many advantages.

  • Streamlined integration: Dozer innate ability to connect with Snowflake table streams ensure that your APIs always access the latest data. This eliminates the need to write custom code to fetch data from Snowflake table streams.

  • Cache simplified: Say goodbye to dealing with TTLs and cache invalidation woes! Dozer automatically keeps the cache updates via a polling mechanism, sidestepping complexity often faced with tools like Redis.

  • Dynamic transformations on Deltas: Dozer shines by applying transformations directly on the deltas (the fresh updates in Snowflake streams). Instead of sifting through the entire dataset, Dozer smartly processes just the changes, marrying efficiency with accuracy.

  • Performance & Scalability: With real-time transformations and delta-focused processing, Dozer ensures your APIs are not only near real-time but scalable.

How Dozer empowers near real-time processing?

Snowflake table streams are a feature that enables you to track changes made to a table, such as inserts, updates, and deletes. Table streams provide a way to capture these changes in real-time, making it easier to keep downstream systems or applications in sync with the latest data.

Dozer fetches the data from Snowflake table streams to keep it cache fresh, using a polling mechanism. At regular intervals, Dozer monitors the table streams and fetches the modifications. Dozer then promptly updates the cache to ensure that the data remains fresh.

Prerequisites

Before we begin, make sure you have the following:

  • Snowflake account setup, refer here
  • Snowflake ODBC Driver installed, refer odbc installation
  • Dozer installed. For more installation instructions, visit the Dozer documentation.
  • Basic knowledge of SQL for writing data transformation queries.

Project Structure: Instant Data APIs using Snowflake table streams and Dozer

This guide is divided into four parts which are meant to be followed in the particular order:

Part 1: Setting up local GitHub dataset on Snowflake

Snowflake enables the sharing of databases via shared databases, these are imported and used by data consumers. The databases imported are local databases, these can be modifed,deleted and most importantly have necessary permissions for Dozer Snowflake connector to fetch data from!

Snowflake Marketplace contains many datasets maintained by individuals, organisations etc. We are going to use the GitHub dataset which can be imported from here. Once imported this will created a shared database which has read access. This dataset will match the update frequency of the original dataset.

But we need a local database for the Dozer Snowflake connector to connect to! This can be created via running the following commands on a snowflake worksheet. Make sure to create a database named GITHUB_EVENTS and set the worksheet to run inside GITHUB_EVENTS/PUBLIC. For running a particular command place the cursor over it and click on the Run button.

CREATE TABLE GITHUB_STARS LIKE CYBERSYN_GITHUB_ARCHIVE.CYBERSYN.GITHUB_STARS;
INSERT INTO GITHUB_STARS SELECT * FROM CYBERSYN_GITHUB_ARCHIVE.CYBERSYN.GITHUB_STARS LIMIT 1000000;
CREATE TABLE GITHUB_REPOS LIKE CYBERSYN_GITHUB_ARCHIVE.CYBERSYN.GITHUB_REPOS;
INSERT INTO GITHUB_REPOS SELECT * FROM CYBERSYN_GITHUB_ARCHIVE.CYBERSYN.GITHUB_REPOS LIMIT 1000000;

The tables in the local database GITHUB_EVENTS should now be created and populated with data from the shared database CYBERSYN_GITHUB_ARCHIVE. The total records in both the table would be 1 million.

GITHUB_STARS

Stars schema

GITUB_REPOS

Repos schema

Part 2: Automate data updation from shared GitHub dataset

In part 1 we saw how to create a basic table structure and import data from a shared database to a local database. In this part let us learn how to automate the process of data ingestion from the shared database, so that the local database is always up to date.

Let us truncate the records we previously inserted into the GITHUB_STARS and GITHUB_REPOS tables and start anew.

TRUNCATE GITHUB_REPOS;
TRUNCATE GITHUB_STARS;

Since we are importing data into our local database we can run few SQL queires on it to make it more suitable for our usecase. Let us assume the end goal is to find the top 500 repositories with the most stars and the top 500 repository with the most stars obtained on a single day.

Query to insert records into GITHUB_STARS

INSERT INTO GITHUB_STARS (REPO_ID, DATE, COUNT)
SELECT gs.REPO_ID, gs.DATE, gs.COUNT
FROM CYBERSYN_GITHUB_ARCHIVE.CYBERSYN.GITHUB_STARS gs
WHERE gs.REPO_ID IN (
SELECT TOP 500 gs_inner.REPO_ID
FROM CYBERSYN_GITHUB_ARCHIVE.CYBERSYN.GITHUB_STARS gs_inner
GROUP BY gs_inner.REPO_ID
ORDER BY SUM(gs_inner.COUNT) DESC
)
AND NOT EXISTS (
SELECT 1
FROM github_stars_top gst
WHERE gs.REPO_ID = gst.REPO_ID AND gs.DATE = gst.DATE
);

This query will select all the records for the top 500 repositories with the most stars and insert them into the GITHUB_STARS table. The NOT EXISTS clause ensures that the records are not already present in the table. This ensures that the query does not touch upon pre-existing records and only inserts new records into the local table GITHUB_STARS.

Query to insert records into GITHUB_REPOS

Since we know that there will be 500 records intotal here, we can ideally truncate the table GITHUB_REPOS and re-insert the new records back, since it will not significantly increase the computation costs.

TRUNCATE GITHUB_REPOS;
INSERT INTO GITHUB_REPOS (repo_id, repo_name, first_seen)
WITH CTE AS (
SELECT
REPO_ID,
REPO_NAME,
FIRST_SEEN,
ROW_NUMBER() OVER (PARTITION BY REPO_ID ORDER BY FIRST_SEEN DESC) AS row_num
FROM CYBERSYN_GITHUB_ARCHIVE.CYBERSYN.GITHUB_REPOS
)
SELECT repo_id, repo_name, first_seen
FROM CTE
WHERE repo_id IN (
SELECT repo_id
FROM CYBERSYN_GITHUB_ARCHIVE.CYBERSYN.GITHUB_REPOS
WHERE REPO_ID IN (
SELECT TOP 500 gs_inner.REPO_ID
FROM CYBERSYN_GITHUB_ARCHIVE.CYBERSYN.GITHUB_STARS gs_inner
GROUP BY gs_inner.REPO_ID
ORDER BY SUM(gs_inner.COUNT) DESC
)
) AND row_num = 1;

Let us put these queires in stored procedures. Stored procedures are named blocks of code written in JavaScript that can be stored, compiled, and executed on the Snowflake platform. They are designed to encapsulate a series of SQL statements providing a way to modularize and reuse code within Snowflake.

In the same worksheet we can also defined a single task that calls these stored procdeures. Tasks are scheduled operations that automate the execution of one or more SQL statements or stored procedures at specified intervals. They are used to perform routine and repetitive tasks without manual intervention.

Snowflake Worksheet with Stored Procedures and Tasks

--this procedure updates records from shared table stars to local table stars
CREATE OR REPLACE PROCEDURE update_stars()
RETURNS STRING
LANGUAGE JAVASCRIPT
AS
$$
var command_1 = `
INSERT INTO GITHUB_STARS (REPO_ID, DATE, COUNT)
SELECT gs.REPO_ID, gs.DATE, gs.COUNT
FROM CYBERSYN_GITHUB_ARCHIVE.CYBERSYN.GITHUB_STARS gs
WHERE gs.REPO_ID IN (
SELECT TOP 500 gs_inner.REPO_ID
FROM CYBERSYN_GITHUB_ARCHIVE.CYBERSYN.GITHUB_STARS gs_inner
GROUP BY gs_inner.REPO_ID
ORDER BY SUM(gs_inner.COUNT) DESC
)
AND NOT EXISTS (
SELECT 1
FROM github_stars_top gst
WHERE gs.REPO_ID = gst.REPO_ID AND gs.DATE = gst.DATE
);
`;

try {
var statement1 = snowflake.createStatement({ sqlText: command_1 });
var result_set1 = statement1.execute();
return "Successfully Executed";
} catch (err) {
return "Error: " + err.message;
}
$$
;

--this procedure truncates the table repos
create or replace procedure truncate_repos()
returns string not null
language javascript
as
$$
var command_1="TRUNCATE GITHUB_REPOS";
var statement1 = snowflake.createStatement( {sqlText: command_1} );
var result_set1 = statement1.execute();
return "Successfully Executed"
$$
;

--this procedure inserts 500 records from shared table repos to local table repos
CREATE OR REPLACE PROCEDURE insert_repos()
RETURNS STRING
LANGUAGE JAVASCRIPT
AS
$$
var command_1 = `
INSERT INTO GITHUB_REPOS (repo_id, repo_name, first_seen)
WITH CTE AS (
SELECT
REPO_ID,
REPO_NAME,
FIRST_SEEN,
ROW_NUMBER() OVER (PARTITION BY REPO_ID ORDER BY FIRST_SEEN DESC) AS row_num
FROM CYBERSYN_GITHUB_ARCHIVE.CYBERSYN.GITHUB_REPOS
)
SELECT repo_id, repo_name, first_seen
FROM CTE
WHERE repo_id IN (
SELECT repo_id
FROM CYBERSYN_GITHUB_ARCHIVE.CYBERSYN.GITHUB_REPOS
WHERE REPO_ID IN (
SELECT TOP 500 gs_inner.REPO_ID
FROM CYBERSYN_GITHUB_ARCHIVE.CYBERSYN.GITHUB_STARS gs_inner
GROUP BY gs_inner.REPO_ID
ORDER BY SUM(gs_inner.COUNT) DESC
)
) AND row_num = 1;
`;

try {
var statement1 = snowflake.createStatement({ sqlText: command_1 });
var result_set1 = statement1.execute();
return "Successfully Executed";
} catch (err) {
return "Error: " + err.message;
}
$$
;

--This procedure executes all the previous procedures in order
create or replace procedure wrapper_proc()
returns string not null
language javascript
as
$$
snowflake.execute({sqlText: 'CALL truncate_repos();'});
snowflake.execute({sqlText: 'CALL insert_repos();'});
snowflake.execute({sqlText: 'CALL update_stars();'});
return "Successfully Executed"
$$
;

--This task schedules the wrapper procedure
create or replace task github_update
WAREHOUSE = <YOUR_WAREHOUSE>
SCHEDULE = '10 MINUTE'
AS call wrapper_proc()
;

--To resume/start the task
ALTER TASK github_update RESUME;

This worksheet will be used to create and run the stored procedures and tasks. The task github_update will be used to automate the data updation process. The task will be scheduled to run every 10 minutes.

Part 3: Setting up Dozer with snowflake feature enabled

For Dozer to work with Snowflake, we need to keep the Snowflake feature enabled. This can be done by building the Dozer binary from source using the command.

cargo install --git https://github.com/getdozer/dozer dozer-cli --features snowflake --locked

Part 4: Configuration file

Now with the Snowflake feature enabled all that we need is a single configuration file to run Dozer. The configuration file specifies the snowflake connection with all the credentials required, sources containing the tables in the connection, sql transformations to be performed and the name and path of endpoints.

version: 1
app_name: github_stats
connections:
- config: !Snowflake
server: "<YOUR_SERVER>"
port: 443
user: "<YOUR_USERNAME"
password: "<YOUR_PASSWORD>"
database: "<YOUR_DATABASE>"
schema: PUBLIC
warehouse: "<YOUR_WAREHOUSE>"
driver: "SnowflakeDSIIDriver"
role: "<YOUR_ROLE>"
name: sn_data

sources:
- name: github_stars
connection: !Ref sn_data
table_name: GITHUB_STARS

- name: github_repos
connection: !Ref sn_data
table_name: GITHUB_REPOS

sql: |

--Most stars obtained overall
SELECT gr.REPO_ID, gr.REPO_NAME, SUM(gs.COUNT) AS TOTAL_STARS
INTO gh_top_starred_ovr
FROM github_stars gs
JOIN github_repos gr ON gs.REPO_ID = gr.REPO_ID
GROUP BY gr.REPO_ID, gr.REPO_NAME;

--Most stars on one single day
WITH MaxCounts AS (
SELECT REPO_ID, MAX(COUNT) AS MAX_COUNT
FROM github_stars
GROUP BY REPO_ID
)
SELECT gs.REPO_ID, gr.REPO_NAME, gs.DATE, gs.COUNT AS DAILY_STARS
INTO gh_top_starred_day
FROM github_stars gs
JOIN MaxCounts mc ON gs.REPO_ID = mc.REPO_ID AND gs.COUNT = mc.MAX_COUNT
JOIN github_repos gr ON gs.REPO_ID = gr.REPO_ID;

endpoints:
- name: gh_top_starred_ovr
path: /gh_top_starred_ovr
table_name: gh_top_starred_ovr

- name: gh_top_starred_day
path: /gh_top_starred_day
table_name: gh_top_starred_day

Part 5: Running Dozer and querying the endpoints

Running with Dozer

Once the configuration file is set up, we can start Dozer by running the following command in the terminal:

dozer run

Running this command spins up Dozer, initiating the process of data ingestion from the designated MySQL database and populating the cache. Logs during execution can be observed in the console. The outcomes of the SQL queries will subsequently be accessible at the designated endpoints.

Dozer generates automatic REST and gRPC APIs based on the endpoint configuration provided in the dozer config. We can now query the Dozer endpoints to get the results of our SQL queries. You can query the cache using gRPC - both typed and common service or REST. In these queries we order the data in descending order of TOTAL STARS and DAILY STARS respectively using $order_by filter.

gRPC Common

grpcurl -d '{"endpoint": "gh_top_starred_day", "query": "{\"$order_by\": {\"TOTAL_STARS\": \"desc\"}}"}' \
-plaintext localhost:50051 \
dozer.common.CommonGrpcService/query

REST

curl -X POST  http://localhost:8080/gh_top_starred_ovr)/query \
--header 'Content-Type: application/json' \
--data-raw '{"$order_by": {"TOTAL_STARS": "desc"}}'

Running with Dozer Live

Dozer Live can be utilized for visualizing data piplelines, debugging sql, sources and endpoints, and executing API calls, all within a single interface. To run Dozer Live, run the following command in the terminal:

dozer live

This will open up the Dozer Live interface in your browser.

Dozer live

After clicking RUN, the REST and gRPC APIs can be queries from the Dozer Live interface itself!

gRPC Common

gRPC Common

gRPC Typesafe

gRPC Typed

REST

REST

Testing out the near real-time claim of Dozer.

Since 10 minutes is a long time to wait for the snowflake tables to automatically update! Feel free to maunually add data to the snowflake tables using INSERT statements. Keep Dozer running throughout the process and watch how Dozer ingests the data and updates the endpoints data efforlessly using its polling mechanism.

Conclusion

As you can see, Dozer makes it easy to ingest frequently updating data from Snowflake and expose it as queryable APIs. With just a simple configuration, you can write powerful SQL queries that will give out meaningful data analysis, and instantly get near real-time data APIs. This makes Dozer a powerful tool for quickly building data products.

For more information and examples, check out the Dozer GitHub repository and dozer-samples repository. Happy coding, Happy Data APIng! 🚀👩‍💻👨‍💻