Ask AI

Snowflake (dagster-snowflake)

This library provides an integration with the Snowflake data warehouse.

To use this library, you should first ensure that you have an appropriate Snowflake user configured to access your data warehouse.

Related Guides:

I/O Manager

dagster_snowflake.SnowflakeIOManager IOManagerDefinition[source]

Config Schema:
database (dagster.StringSource):

Name of the database to use.

account (dagster.StringSource):

Your Snowflake account name. For more details, see the Snowflake documentation.

user (dagster.StringSource):

User login name.

schema (Union[dagster.StringSource, None], optional):

Name of the schema to use.

Default Value: None

password (Union[dagster.StringSource, None], optional):

User password.

Default Value: None

warehouse (Union[dagster.StringSource, None], optional):

Name of the warehouse to use.

Default Value: None

role (Union[dagster.StringSource, None], optional):

Name of the role to use.

Default Value: None

private_key (Union[dagster.StringSource, None], optional):

Raw private key to use. See the Snowflake documentation for details. To avoid issues with newlines in the keys, you can base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat rsa_key.p8 | base64

Default Value: None

private_key_path (Union[dagster.StringSource, None], optional):

Path to the private key. See the Snowflake documentation for details.

Default Value: None

private_key_password (Union[dagster.StringSource, None], optional):

The password of the private key. See the Snowflake documentation for details. Required for both private_key and private_key_path if the private key is encrypted. For unencrypted keys, this config can be omitted or set to None.

Default Value: None

store_timestamps_as_strings (Union[dagster.BoolSource, None], optional):

If using Pandas DataFrames, whether to convert time data to strings. If True, time data will be converted to strings when storing the DataFrame and converted back to time data when loading the DataFrame. If False, time data without a timezone will be set to UTC timezone to avoid a Snowflake bug. Defaults to False.

Default Value: False

authenticator (Union[dagster.StringSource, None], optional):

Optional parameter to specify the authentication mechanism to use.

Default Value: None

Base class for an IO manager definition that reads inputs from and writes outputs to Snowflake.

Examples

from dagster_snowflake import SnowflakeIOManager
from dagster_snowflake_pandas import SnowflakePandasTypeHandler
from dagster_snowflake_pyspark import SnowflakePySparkTypeHandler
from dagster import Definitions, EnvVar

class MySnowflakeIOManager(SnowflakeIOManager):
    @staticmethod
    def type_handlers() -> Sequence[DbTypeHandler]:
        return [SnowflakePandasTypeHandler(), SnowflakePySparkTypeHandler()]

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={
        "io_manager": MySnowflakeIOManager(database="my_database", account=EnvVar("SNOWFLAKE_ACCOUNT"), ...)
    }
)

You can set a default schema to store the assets using the schema configuration value of the Snowflake I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.

defs = Definitions(
    assets=[my_table]
    resources={
        "io_manager" MySnowflakeIOManager(database="my_database", schema="my_schema", ...)
    }
)

On individual assets, you an also specify the schema where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame:
    ...

@asset(
    metadata={"schema": "my_schema"}  # will be used as the schema in snowflake
)
def my_other_table() -> pd.DataFrame:
    ...

For ops, the schema can be specified by including a “schema” entry in output metadata.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
    ...

If none of these is provided, the schema will default to “public”.

To only use specific columns of a table as input to a downstream op or asset, add the metadata columns to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
    # my_table will just contain the data from column "a"
    ...

Resource

dagster_snowflake.SnowflakeResource ResourceDefinition[source]

Config Schema:
account (Union[dagster.StringSource, None], optional):

Your Snowflake account name. For more details, see the Snowflake documentation.

Default Value: None

user (dagster.StringSource):

User login name.

password (Union[dagster.StringSource, None], optional):

User password.

Default Value: None

database (Union[dagster.StringSource, None], optional):

Name of the default database to use. After login, you can use USE DATABASE to change the database.

Default Value: None

schema (Union[dagster.StringSource, None], optional):

Name of the default schema to use. After login, you can use USE SCHEMA to change the schema.

Default Value: None

role (Union[dagster.StringSource, None], optional):

Name of the default role to use. After login, you can use USE ROLE to change the role.

Default Value: None

warehouse (Union[dagster.StringSource, None], optional):

Name of the default warehouse to use. After login, you can use USE WAREHOUSE to change the role.

Default Value: None

private_key (Union[dagster.StringSource, None], optional):

Raw private key to use. See the Snowflake documentation for details. Alternately, set private_key_path and private_key_password. To avoid issues with newlines in the keys, you can base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat rsa_key.p8 | base64

Default Value: None

private_key_password (Union[dagster.StringSource, None], optional):

Raw private key password to use. See the Snowflake documentation for details. Required for both private_key and private_key_path if the private key is encrypted. For unencrypted keys, this config can be omitted or set to None.

Default Value: None

private_key_path (Union[dagster.StringSource, None], optional):

Raw private key path to use. See the Snowflake documentation for details. Alternately, set the raw private key as private_key.

Default Value: None

autocommit (Union[dagster.BoolSource, None], optional):

None by default, which honors the Snowflake parameter AUTOCOMMIT. Set to True or False to enable or disable autocommit mode in the session, respectively.

Default Value: None

client_prefetch_threads (Union[dagster.IntSource, None], optional):

Number of threads used to download the results sets (4 by default). Increasing the value improves fetch performance but requires more memory.

Default Value: None

client_session_keep_alive (Union[dagster.BoolSource, None], optional):

False by default. Set this to True to keep the session active indefinitely, even if there is no activity from the user. Make certain to call the close method to terminate the thread properly or the process may hang.

Default Value: None

login_timeout (Union[dagster.IntSource, None], optional):

Timeout in seconds for login. By default, 60 seconds. The login request gives up after the timeout length if the HTTP response is “success”.

Default Value: None

network_timeout (Union[dagster.IntSource, None], optional):

Timeout in seconds for all other operations. By default, none/infinite. A general request gives up after the timeout length if the HTTP response is not ‘success’.

Default Value: None

ocsp_response_cache_filename (Union[dagster.StringSource, None], optional):

URI for the OCSP response cache file. By default, the OCSP response cache file is created in the cache directory.

Default Value: None

validate_default_parameters (Union[dagster.BoolSource, None], optional):

If True, raise an exception if the warehouse, database, or schema doesn’t exist. Defaults to False.

Default Value: None

paramstyle (Union[dagster.StringSource, None], optional):

pyformat by default for client side binding. Specify qmark or numeric to change bind variable formats for server side binding.

Default Value: None

timezone (Union[dagster.StringSource, None], optional):

None by default, which honors the Snowflake parameter TIMEZONE. Set to a valid time zone (e.g. America/Los_Angeles) to set the session time zone.

Default Value: None

connector (Union[dagster.StringSource, None], optional):

Indicate alternative database connection engine. Permissible option is ‘sqlalchemy’ otherwise defaults to use the Snowflake Connector for Python.

Default Value: None

cache_column_metadata (Union[dagster.StringSource, None], optional):

Optional parameter when connector is set to sqlalchemy. Snowflake SQLAlchemy takes a flag cache_column_metadata=True such that all of column metadata for all tables are “cached”

Default Value: None

numpy (Union[dagster.BoolSource, None], optional):

Optional parameter when connector is set to sqlalchemy. To enable fetching NumPy data types, add numpy=True to the connection parameters.

Default Value: None

authenticator (Union[dagster.StringSource, None], optional):

Optional parameter to specify the authentication mechanism to use.

Default Value: None

A resource for connecting to the Snowflake data warehouse.

If connector configuration is not set, SnowflakeResource.get_connection() will return a snowflake.connector.Connection object. If connector=”sqlalchemy” configuration is set, then SnowflakeResource.get_connection() will return a SQLAlchemy Connection or a SQLAlchemy raw connection.

A simple example of loading data into Snowflake and subsequently querying that data is shown below:

Examples

from dagster import job, op
from dagster_snowflake import SnowflakeResource

@op
def get_one(snowflake_resource: SnowflakeResource):
    with snowflake_resource.get_connection() as conn:
        # conn is a snowflake.connector.Connection object
        conn.cursor().execute("SELECT 1")

@job
def my_snowflake_job():
    get_one()

my_snowflake_job.execute_in_process(
    resources={
        'snowflake_resource': SnowflakeResource(
            account=EnvVar("SNOWFLAKE_ACCOUNT"),
            user=EnvVar("SNOWFLAKE_USER"),
            password=EnvVar("SNOWFLAKE_PASSWORD")
            database="MY_DATABASE",
            schema="MY_SCHEMA",
            warehouse="MY_WAREHOUSE"
        )
    }
)
class dagster_snowflake.SnowflakeConnection(config, log, snowflake_connection_resource)[source]

A connection to Snowflake that can execute queries. In general this class should not be directly instantiated, but rather used as a resource in an op or asset via the snowflake_resource().

Note that the SnowflakeConnection is only used by the snowflake_resource. The Pythonic SnowflakeResource does not use this SnowflakeConnection class.

execute_queries(sql_queries, parameters=None, fetch_results=False, use_pandas_result=False)[source]

Execute multiple queries in Snowflake.

Parameters:
  • sql_queries (str) – List of queries to be executed in series

  • parameters (Optional[Union[Sequence[Any], Mapping[Any, Any]]]) – Parameters to be passed to every query. See the Snowflake documentation for more information.

  • fetch_results (bool) – If True, will return the results of the queries as a list. Defaults to False. If True and use_pandas_result is also True, results will be returned as Pandas DataFrames.

  • use_pandas_result (bool) – If True, will return the results of the queries as a list of a Pandas DataFrames. Defaults to False. If fetch_results is False and use_pandas_result is True, an error will be raised.

Returns:

The results of the queries as a list if fetch_results or use_pandas_result is True, otherwise returns None

Examples

@op
def create_fresh_database(snowflake: SnowflakeResource):
    queries = ["DROP DATABASE IF EXISTS MY_DATABASE", "CREATE DATABASE MY_DATABASE"]
    snowflake.execute_queries(
        sql_queries=queries
    )
execute_query(sql, parameters=None, fetch_results=False, use_pandas_result=False)[source]

Execute a query in Snowflake.

Parameters:
  • sql (str) – the query to be executed

  • parameters (Optional[Union[Sequence[Any], Mapping[Any, Any]]]) – Parameters to be passed to the query. See the Snowflake documentation for more information.

  • fetch_results (bool) – If True, will return the result of the query. Defaults to False. If True and use_pandas_result is also True, results will be returned as a Pandas DataFrame.

  • use_pandas_result (bool) – If True, will return the result of the query as a Pandas DataFrame. Defaults to False. If fetch_results is False and use_pandas_result is True, an error will be raised.

Returns:

The result of the query if fetch_results or use_pandas_result is True, otherwise returns None

Examples

@op
def drop_database(snowflake: SnowflakeResource):
    snowflake.execute_query(
        "DROP DATABASE IF EXISTS MY_DATABASE"
    )
get_connection(raw_conn=True)[source]

Gets a connection to Snowflake as a context manager.

If using the execute_query, execute_queries, or load_table_from_local_parquet methods, you do not need to create a connection using this context manager.

Parameters:

raw_conn (bool) – If using the sqlalchemy connector, you can set raw_conn to True to create a raw connection. Defaults to True.

Examples

@op(
    required_resource_keys={"snowflake"}
)
def get_query_status(query_id):
    with context.resources.snowflake.get_connection() as conn:
        # conn is a Snowflake Connection object or a SQLAlchemy Connection if
        # sqlalchemy is specified as the connector in the Snowflake Resource config

        return conn.get_query_status(query_id)
load_table_from_local_parquet(src, table)[source]

Stores the content of a parquet file to a Snowflake table.

Parameters:
  • src (str) – the name of the file to store in Snowflake

  • table (str) – the name of the table to store the data. If the table does not exist, it will be created. Otherwise the contents of the table will be replaced with the data in src

Examples

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

@op
def write_parquet_file(snowflake: SnowflakeResource):
    df = pd.DataFrame({"one": [1, 2, 3], "ten": [11, 12, 13]})
    table = pa.Table.from_pandas(df)
    pq.write_table(table, "example.parquet')
    snowflake.load_table_from_local_parquet(
        src="example.parquet",
        table="MY_TABLE"
    )

Data Freshness

dagster_snowflake.fetch_last_updated_timestamps(*, snowflake_connection, schema, tables, database=None, ignore_missing_tables=False)[source]

Fetch the last updated times of a list of tables in Snowflake.

If the underlying query to fetch the last updated time returns no results, a ValueError will be raised.

Parameters:
  • snowflake_connection (Union[SqlDbConnection, SnowflakeConnection]) – A connection to Snowflake. Accepts either a SnowflakeConnection or a sqlalchemy connection object, which are the two types of connections emittable from the snowflake resource.

  • schema (str) – The schema of the tables to fetch the last updated time for.

  • tables (Sequence[str]) – A list of table names to fetch the last updated time for.

  • database (Optional[str]) – The database of the table. Only required if the connection has not been set with a database.

  • ignore_missing_tables (Optional[bool]) – If True, tables not found in Snowflake will be excluded from the result.

Returns:

A dictionary of table names to their last updated time in UTC.

Return type:

Mapping[str, datetime]

Ops

dagster_snowflake.snowflake_op_for_query(sql, parameters=None)[source]

This function is an op factory that constructs an op to execute a snowflake query.

Note that you can only use snowflake_op_for_query if you know the query you’d like to execute at graph construction time. If you’d like to execute queries dynamically during job execution, you should manually execute those queries in your custom op using the snowflake resource.

Parameters:
  • sql (str) – The sql query that will execute against the provided snowflake resource.

  • parameters (dict) – The parameters for the sql query.

Returns:

Returns the constructed op definition.

Return type:

OpDefinition

Legacy

dagster_snowflake.build_snowflake_io_manager IOManagerDefinition[source]

Config Schema:
database (dagster.StringSource):

Name of the database to use.

account (dagster.StringSource):

Your Snowflake account name. For more details, see the Snowflake documentation.

user (dagster.StringSource):

User login name.

schema (Union[dagster.StringSource, None], optional):

Name of the schema to use.

Default Value: None

password (Union[dagster.StringSource, None], optional):

User password.

Default Value: None

warehouse (Union[dagster.StringSource, None], optional):

Name of the warehouse to use.

Default Value: None

role (Union[dagster.StringSource, None], optional):

Name of the role to use.

Default Value: None

private_key (Union[dagster.StringSource, None], optional):

Raw private key to use. See the Snowflake documentation for details. To avoid issues with newlines in the keys, you can base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat rsa_key.p8 | base64

Default Value: None

private_key_path (Union[dagster.StringSource, None], optional):

Path to the private key. See the Snowflake documentation for details.

Default Value: None

private_key_password (Union[dagster.StringSource, None], optional):

The password of the private key. See the Snowflake documentation for details. Required for both private_key and private_key_path if the private key is encrypted. For unencrypted keys, this config can be omitted or set to None.

Default Value: None

store_timestamps_as_strings (Union[dagster.BoolSource, None], optional):

If using Pandas DataFrames, whether to convert time data to strings. If True, time data will be converted to strings when storing the DataFrame and converted back to time data when loading the DataFrame. If False, time data without a timezone will be set to UTC timezone to avoid a Snowflake bug. Defaults to False.

Default Value: False

authenticator (Union[dagster.StringSource, None], optional):

Optional parameter to specify the authentication mechanism to use.

Default Value: None

Builds an IO manager definition that reads inputs from and writes outputs to Snowflake.

Parameters:
  • type_handlers (Sequence[DbTypeHandler]) – Each handler defines how to translate between slices of Snowflake tables and an in-memory type - e.g. a Pandas DataFrame. If only one DbTypeHandler is provided, it will be used as teh default_load_type.

  • default_load_type (Type) – When an input has no type annotation, load it as this type.

Returns:

IOManagerDefinition

Examples

from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake_pandas import SnowflakePandasTypeHandler
from dagster_snowflake_pyspark import SnowflakePySparkTypeHandler
from dagster import Definitions

@asset(
    key_prefix=["my_prefix"]
    metadata={"schema": "my_schema"} # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame:  # the name of the asset will be the table name
    ...

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in snowflake
)
def my_second_table() -> pd.DataFrame:  # the name of the asset will be the table name
    ...

snowflake_io_manager = build_snowflake_io_manager([SnowflakePandasTypeHandler(), SnowflakePySparkTypeHandler()])

defs = Definitions(
    assets=[my_table, my_second_table],
    resources={
        "io_manager": snowflake_io_manager.configured({
            "database": "my_database",
            "account" : {"env": "SNOWFLAKE_ACCOUNT"}
            ...
        })
    }
)

You can set a default schema to store the assets using the schema configuration value of the Snowflake I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.

defs = Definitions(
    assets=[my_table]
    resources={"io_manager" snowflake_io_manager.configured(
        {"database": "my_database", "schema": "my_schema", ...} # will be used as the schema
    )}
)

On individual assets, you an also specify the schema where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame:
    ...

@asset(
    metadata={"schema": "my_schema"}  # will be used as the schema in snowflake
)
def my_other_table() -> pd.DataFrame:
    ...

For ops, the schema can be specified by including a “schema” entry in output metadata.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
    ...

If none of these is provided, the schema will default to “public”.

To only use specific columns of a table as input to a downstream op or asset, add the metadata columns to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
    # my_table will just contain the data from column "a"
    ...
dagster_snowflake.snowflake_resource ResourceDefinition[source]

Config Schema:
account (Union[dagster.StringSource, None], optional):

Your Snowflake account name. For more details, see the Snowflake documentation.

Default Value: None

user (dagster.StringSource):

User login name.

password (Union[dagster.StringSource, None], optional):

User password.

Default Value: None

database (Union[dagster.StringSource, None], optional):

Name of the default database to use. After login, you can use USE DATABASE to change the database.

Default Value: None

schema (Union[dagster.StringSource, None], optional):

Name of the default schema to use. After login, you can use USE SCHEMA to change the schema.

Default Value: None

role (Union[dagster.StringSource, None], optional):

Name of the default role to use. After login, you can use USE ROLE to change the role.

Default Value: None

warehouse (Union[dagster.StringSource, None], optional):

Name of the default warehouse to use. After login, you can use USE WAREHOUSE to change the role.

Default Value: None

private_key (Union[dagster.StringSource, None], optional):

Raw private key to use. See the Snowflake documentation for details. Alternately, set private_key_path and private_key_password. To avoid issues with newlines in the keys, you can base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat rsa_key.p8 | base64

Default Value: None

private_key_password (Union[dagster.StringSource, None], optional):

Raw private key password to use. See the Snowflake documentation for details. Required for both private_key and private_key_path if the private key is encrypted. For unencrypted keys, this config can be omitted or set to None.

Default Value: None

private_key_path (Union[dagster.StringSource, None], optional):

Raw private key path to use. See the Snowflake documentation for details. Alternately, set the raw private key as private_key.

Default Value: None

autocommit (Union[dagster.BoolSource, None], optional):

None by default, which honors the Snowflake parameter AUTOCOMMIT. Set to True or False to enable or disable autocommit mode in the session, respectively.

Default Value: None

client_prefetch_threads (Union[dagster.IntSource, None], optional):

Number of threads used to download the results sets (4 by default). Increasing the value improves fetch performance but requires more memory.

Default Value: None

client_session_keep_alive (Union[dagster.BoolSource, None], optional):

False by default. Set this to True to keep the session active indefinitely, even if there is no activity from the user. Make certain to call the close method to terminate the thread properly or the process may hang.

Default Value: None

login_timeout (Union[dagster.IntSource, None], optional):

Timeout in seconds for login. By default, 60 seconds. The login request gives up after the timeout length if the HTTP response is “success”.

Default Value: None

network_timeout (Union[dagster.IntSource, None], optional):

Timeout in seconds for all other operations. By default, none/infinite. A general request gives up after the timeout length if the HTTP response is not ‘success’.

Default Value: None

ocsp_response_cache_filename (Union[dagster.StringSource, None], optional):

URI for the OCSP response cache file. By default, the OCSP response cache file is created in the cache directory.

Default Value: None

validate_default_parameters (Union[dagster.BoolSource, None], optional):

If True, raise an exception if the warehouse, database, or schema doesn’t exist. Defaults to False.

Default Value: None

paramstyle (Union[dagster.StringSource, None], optional):

pyformat by default for client side binding. Specify qmark or numeric to change bind variable formats for server side binding.

Default Value: None

timezone (Union[dagster.StringSource, None], optional):

None by default, which honors the Snowflake parameter TIMEZONE. Set to a valid time zone (e.g. America/Los_Angeles) to set the session time zone.

Default Value: None

connector (Union[dagster.StringSource, None], optional):

Indicate alternative database connection engine. Permissible option is ‘sqlalchemy’ otherwise defaults to use the Snowflake Connector for Python.

Default Value: None

cache_column_metadata (Union[dagster.StringSource, None], optional):

Optional parameter when connector is set to sqlalchemy. Snowflake SQLAlchemy takes a flag cache_column_metadata=True such that all of column metadata for all tables are “cached”

Default Value: None

numpy (Union[dagster.BoolSource, None], optional):

Optional parameter when connector is set to sqlalchemy. To enable fetching NumPy data types, add numpy=True to the connection parameters.

Default Value: None

authenticator (Union[dagster.StringSource, None], optional):

Optional parameter to specify the authentication mechanism to use.

Default Value: None

A resource for connecting to the Snowflake data warehouse. The returned resource object is an instance of SnowflakeConnection.

A simple example of loading data into Snowflake and subsequently querying that data is shown below:

Examples

from dagster import job, op
from dagster_snowflake import snowflake_resource

@op(required_resource_keys={'snowflake'})
def get_one(context):
    context.resources.snowflake.execute_query('SELECT 1')

@job(resource_defs={'snowflake': snowflake_resource})
def my_snowflake_job():
    get_one()

my_snowflake_job.execute_in_process(
    run_config={
        'resources': {
            'snowflake': {
                'config': {
                    'account': {'env': 'SNOWFLAKE_ACCOUNT'},
                    'user': {'env': 'SNOWFLAKE_USER'},
                    'password': {'env': 'SNOWFLAKE_PASSWORD'},
                    'database': {'env': 'SNOWFLAKE_DATABASE'},
                    'schema': {'env': 'SNOWFLAKE_SCHEMA'},
                    'warehouse': {'env': 'SNOWFLAKE_WAREHOUSE'},
                }
            }
        }
    }
)