Testing against production with Dagster+ Branch Deployments#
This guide is applicable to Dagster+.
This guide details a workflow to test Dagster code in your cloud environment without impacting your production data. To highlight this functionality, we’ll leverage Dagster+ branch deployments and a Snowflake database to:
Execute code on a feature branch directly on Dagster+
Read and write to a unique per-branch clone of our Snowflake data
With these tools, we can merge changes with confidence in the impact on our data platform and with the assurance that our code will execute as intended.
Here’s an overview of the main concepts we’ll be using:
Assets - We'll define three assets that each persist a table to Snowflake.
Ops - We'll define two ops that query Snowflake: the first will clone a database, and the second will drop database clones.
Graphs - We'll build graphs that define the order our ops should run.
Jobs - We'll define jobs by binding our graphs to resources.
Resources - We'll use the SnowflakeResource to swap in different Snowflake connections to our jobs depending on environment.
I/O managers - We'll use a Snowflake I/O manager to persist asset outputs to Snowflake.
This guide is an extension of the Transitioning data pipelines from development to production guide, illustrating a workflow for staging deployments. We’ll use the examples from this guide to build a workflow atop Dagster+’s branch deployment feature.
To complete the steps in this guide, you'll need:
A Dagster+ account
An existing Branch Deployments setup that uses GitHub actions or Gitlab CI/CD. Your setup should contain a Dagster project set up for branch deployments containing:
Either a GitHub actions workflow file (e.g. .github/workflows/branch-deployments.yaml) or a Gitlab CI/CD file (e.g. .gitlab-ci.yml)
We have a PRODUCTION Snowflake database with a schema named HACKER_NEWS. In our production cloud environment, we’d like to write tables to Snowflake containing subsets of Hacker News data. These tables will be:
ITEMS - A table containing the entire dataset
COMMENTS - A table containing data about comments
STORIES - A table containing data about stories
To set up a branch deployment workflow to construct and test these tables, we will:
Configure our assets to write to Snowflake using a different connection (credentials and database name) for two environments: production and branch deployment.
Write a job that will clone the production database upon each branch deployment launch. Each clone will be named PRODUCTION_CLONE_<ID>, where <ID> is the pull request ID of the branch. Then we'll create a branch deployment and test our Hacker News assets against our newly cloned database.
Write a job that will delete the corresponding database clone upon closing the feature branch.
In production, we want to write three tables to Snowflake: ITEMS, COMMENTS, and STORIES. We can define these tables as assets as follows:
# assets.pyimport pandas as pd
import requests
from dagster import Config, asset
classItemsConfig(Config):
base_item_id:int@asset(
io_manager_key="snowflake_io_manager",)defitems(config: ItemsConfig)-> pd.DataFrame:"""Items from the Hacker News API: each is a story or a comment on a story."""
rows =[]
max_id = requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json", timeout=5).json()# Hacker News API is 1-indexed, so adjust range by 1for item_id inrange(max_id - config.base_item_id +1, max_id +1):
item_url =f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
rows.append(requests.get(item_url, timeout=5).json())# ITEM_FIELD_NAMES is a list of the column names in the Hacker News dataset
result = pd.DataFrame(rows, columns=ITEM_FIELD_NAMES).drop_duplicates(subset=["id"])
result.rename(columns={"by":"user_id"}, inplace=True)return result
@asset(
io_manager_key="snowflake_io_manager",)defcomments(items: pd.DataFrame)-> pd.DataFrame:"""Comments from the Hacker News API."""return items[items["type"]=="comment"]@asset(
io_manager_key="snowflake_io_manager",)defstories(items: pd.DataFrame)-> pd.DataFrame:"""Stories from the Hacker News API."""return items[items["type"]=="story"]
As you can see, our assets use an I/O manager named snowflake_io_manager. Using I/O managers and other resources allow us to swap out implementations per environment without modifying our business logic.
Step 2: Configure our assets for each environment#
At runtime, we’d like to determine which environment our code is running in: branch deployment, or production. This information dictates how our code should execute, specifically with which credentials and with which database.
To ensure we can't accidentally write to production from within our branch deployment, we’ll use a different set of credentials from production and write to our database clone.
Dagster automatically sets certain environment variables containing deployment metadata, allowing us to read these environment variables to discern between deployments. We can access the DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT environment variable to determine the currently executing environment.
Because we want to configure our assets to write to Snowflake using a different set of credentials and database in each environment, we’ll configure a separate I/O manager for each environment:
Step 3: Create jobs to manage database cloning per branch deployment#
We’ll first need to define a job that clones our PRODUCTION database for each branch deployment. Later, in our GitHub actions workflow, we can trigger this job to run upon each redeploy. Each clone will be named PRODUCTION_CLONE_<ID> with <ID> representing the pull request ID, ensuring each branch deployment has a unique clone. This job will drop a database clone if it exists and then reclone from production, ensuring each redeployment has a fresh clone of PRODUCTION:
Why use ops and jobs instead of assets? We'll be writing ops to clone the production database for each branch deployment and drop the clone once the branch is merged. In this case, we chose to use ops since we are primarily interested in the task that's being performed: cloning or dropping the database. Additionally, we don't need asset-specific features for these tasks, like viewing them in the Global Asset Graph.
from dagster_snowflake import SnowflakeResource
from dagster import In, Nothing, graph, op
@opdefdrop_database_clone(snowflake: SnowflakeResource):with snowflake.get_connection()as conn:
cur = conn.cursor()
cur.execute("DROP DATABASE IF EXISTS"f" PRODUCTION_CLONE_{os.environ['DAGSTER_CLOUD_PULL_REQUEST_ID']}")@op(ins={"start": In(Nothing)})defclone_production_database(snowflake: SnowflakeResource):with snowflake.get_connection()as conn:
cur = conn.cursor()
cur.execute("CREATE DATABASE"f" PRODUCTION_CLONE_{os.environ['DAGSTER_CLOUD_PULL_REQUEST_ID']} CLONE"' "PRODUCTION"')@graphdefclone_prod():
clone_production_database(start=drop_database_clone())@graphdefdrop_prod_clone():
drop_database_clone()
We’ve defined drop_database_clone and clone_production_database to utilize the SnowflakeResource. The Snowflake resource will use the same configuration as the Snowflake I/O manager to generate a connection to Snowflake. However, while our I/O manager writes outputs to Snowflake, the Snowflake resource executes queries against Snowflake.
We now need to define resources that configure our jobs to the current environment. We can modify the resource mapping by environment as follows:
Step 4: Create our database clone upon opening a branch#
The branch_deployments.yml file located in .github/workflows/branch_deployments.yml defines a dagster_cloud_build_push job with a series of steps that launch a branch deployment. Because we want to queue a run of clone_prod within each deployment after it launches, we'll add an additional step at the end dagster_cloud_build_push. This job is triggered on multiple pull request events: opened, synchronize, reopen, and closed. This means that upon future pushes to the branch, we'll trigger a run of clone_prod. The if condition below ensures that clone_prod will not run if the pull request is closed:
Opening a pull request for our current branch will automatically kick off a branch deployment. After the deployment launches, we can confirm that the clone_prod job has run:
Alternatively, the logs for the branch deployment workflow can be found in the Actions tab on the GitHub pull request.
We can also view our database in Snowflake to confirm that a clone exists for each branch deployment. When we materialize our assets within our branch deployment, we’ll now be writing to our clone of PRODUCTION. Within Snowflake, we can run queries against this clone to confirm the validity of our data:
The .gitlab-ci.yaml script contains a deploy job that defines a series of steps that launch a branch deployment. Because we want to queue a run of clone_prod within each deployment after it launches, we'll add an additional step at the end of deploy. This job is triggered on when a merge request is created or updated. This means that upon future pushes to the branch, we'll trigger a run of clone_prod.
Opening a merge request for our current branch will automatically kick off a branch deployment. After the deployment launches, we can confirm that the clone_prod job has run:
We can also view our database in Snowflake to confirm that a clone exists for each branch deployment. When we materialize our assets within our branch deployment, we’ll now be writing to our clone of PRODUCTION. Within Snowflake, we can run queries against this clone to confirm the validity of our data:
Step 5: Delete our database clone upon closing a branch#
Finally, we can add a step to our branch_deployments.yml file that queues a run of our drop_prod_clone job:
After merging our branch, viewing our Snowflake database will confirm that our branch deployment step has successfully deleted our database clone.
We’ve now built an elegant workflow that enables future branch deployments to automatically have access to their own clones of our production database that are cleaned up upon merge!