Using Snowflake with Dagster resources
This tutorial focuses on how to store and load Dagster's asset definitions in Snowflake by using Dagster's SnowflakeResource
. A resource allows you to directly run SQL queries against tables within an asset's compute function.
By the end of the tutorial, you will:
- Configure a Snowflake resource
- Use the Snowflake resource to execute a SQL query that creates a table
- Load Snowflake tables in downstream assets
- Add the assets and Snowflake resource to a
Definitions
object
Prefer to use an I/O manager? Unlike resources, an I/O manager transfers the responsibility of storing and loading DataFrames as Snowflake tables to Dagster. Refer to the Snowlake I/O manager guide for more info.
Prerequisites
To complete this tutorial, you'll need:
-
To install the following libraries:
pip install dagster dagster-snowflake pandas
-
To gather the following information, which is required to use the Snowflake resource:
-
Snowflake account name: You can find this by logging into Snowflake and getting the account name from the URL:
-
Snowflake credentials: You can authenticate with Snowflake two ways: with a username and password or with a username and private key.
The Snowflake resource can read these authentication values from environment variables. In this guide, we use password authentication and store the username and password as
SNOWFLAKE_USER
andSNOWFLAKE_PASSWORD
, respectively:export SNOWFLAKE_USER=<your username>
export SNOWFLAKE_PASSWORD=<your password>Refer to the Using environment variables and secrets guide for more info.
For more information on authenticating with a private key, see Authenticating with a private key in the Snowflake reference guide.
-
Step 1: Configure the Snowflake resource
To connect to Snowflake, we'll use the dagster-snowflake
SnowflakeResource
. The SnowflakeResource
requires some configuration:
- The
account
anduser
values are required. - One method of authentication is required, either by using a password or a private key.
- Optional: Using the
warehouse
,schema
, androle
attributes, you can specify where data should be stored and arole
for the resource to use.
from dagster_snowflake import SnowflakeResource
from snowflake.connector.pandas_tools import write_pandas
from dagster import Definitions, EnvVar, MaterializeResult, asset
snowflake = SnowflakeResource(
account=EnvVar("SNOWFLAKE_ACCOUNT"), # required
user=EnvVar("SNOWFLAKE_USER"), # required
password=EnvVar("SNOWFLAKE_PASSWORD"), # password or private key required
warehouse="PLANTS",
schema="IRIS",
role="WRITER",
)
With this configuration, if you materialized an asset named iris_dataset
, SnowflakeResource
would use the role WRITER
and store the data in the FLOWERS.IRIS.IRIS_DATASET
table using the PLANTS
warehouse.
For more info about each of the configuration values, refer to the SnowflakeResource
API documentation.
Step 2: Create tables in Snowflake
- Create tables in Snowflake from Dagster assets
- Make existing tables available in Dagster
Using the Snowflake resource, you can create Snowflake tables using the Snowflake Python API:
import pandas as pd
from dagster_snowflake import SnowflakeResource
from snowflake.connector.pandas_tools import write_pandas
from dagster import MaterializeResult, asset
@asset
def iris_dataset(snowflake: SnowflakeResource):
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"species",
],
)
with snowflake.get_connection() as conn:
table_name = "iris_dataset"
database = "flowers"
schema = "iris"
success, number_chunks, rows_inserted, output = write_pandas(
conn,
iris_df,
table_name=table_name,
database=database,
schema=schema,
auto_create_table=True,
overwrite=True,
quote_identifiers=False,
)
return MaterializeResult(
metadata={"rows_inserted": rows_inserted},
)
In this example, we've defined an asset that fetches the Iris dataset as a Pandas DataFrame. Then, using the Snowflake resource, the DataFrame is stored in Snowflake as the FLOWERS.IRIS.IRIS_DATASET
table.
If you have existing tables in Snowflake and other assets defined in Dagster depend on those tables, you may want Dagster to be aware of those upstream dependencies.
Making Dagster aware of these tables allows you to track the full data lineage in Dagster. You can accomplish this by defining external assets for these tables. For example:
from dagster import AssetSpec
iris_harvest_data = AssetSpec(key="iris_harvest_data")
In this example, we created a AssetSpec
for a pre-existing table called iris_harvest_data
.
Since we supplied the database and the schema in the resource configuration in Step 1, we only need to provide the table name. We did this by using the key
parameter in our AssetSpec
. When the iris_harvest_data
asset needs to be loaded in a downstream asset, the data in the FLOWERS.IRIS.IRIS_HARVEST_DATA
table will be selected and provided to the asset.
Step 3: Define downstream assets
Once you've created an asset that represents a table in Snowflake, you may want to create additional assets that work with the data. In the following example, we've defined an asset that creates a second table, which contains only the data for the Iris Setosa species:
from dagster_snowflake import SnowflakeResource
from dagster import asset
@asset(deps=["iris_dataset"])
def iris_setosa(snowflake: SnowflakeResource) -> None:
query = """
create or replace table iris.iris_setosa as (
SELECT *
FROM iris.iris_dataset
WHERE species = 'Iris-setosa'
);
"""
with snowflake.get_connection() as conn:
conn.cursor.execute(query)
To accomplish this, we defined a dependency on the iris_dataset
asset using the deps
parameter. Then, the SQL query runs and creates the table of Iris Setosa data.
Step 4: Definitions object
The last step is to add the SnowflakeResource
and the assets to the project's Definitions
object:
from dagster import Definitions
defs = Definitions(
assets=[iris_dataset, iris_setosa], resources={"snowflake": snowflake}
)
This makes the resource and assets available to Dagster tools like the UI and CLI.
Completed code example
When finished, your code should look like the following:
import pandas as pd
from dagster_snowflake import SnowflakeResource
from snowflake.connector.pandas_tools import write_pandas
from dagster import Definitions, EnvVar, MaterializeResult, asset
snowflake = SnowflakeResource(
account=EnvVar("SNOWFLAKE_ACCOUNT"), # required
user=EnvVar("SNOWFLAKE_USER"), # required
password=EnvVar("SNOWFLAKE_PASSWORD"), # password or private key required
warehouse="PLANTS",
schema="IRIS",
role="WRITER",
)
@asset
def iris_dataset(snowflake: SnowflakeResource):
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"species",
],
)
with snowflake.get_connection() as conn:
table_name = "iris_dataset"
database = "flowers"
schema = "iris"
success, number_chunks, rows_inserted, output = write_pandas(
conn,
iris_df,
table_name=table_name,
database=database,
schema=schema,
auto_create_table=True,
overwrite=True,
quote_identifiers=False,
)
return MaterializeResult(
metadata={"rows_inserted": rows_inserted},
)
@asset(deps=["iris_dataset"])
def iris_setosa(snowflake: SnowflakeResource) -> None:
query = """
create or replace table iris.iris_setosa as (
SELECT *
FROM iris.iris_dataset
WHERE species = 'Iris-setosa'
);
"""
with snowflake.get_connection() as conn:
conn.cursor.execute(query)
defs = Definitions(
assets=[iris_dataset, iris_setosa], resources={"snowflake": snowflake}
)