Observe assets
Previously, we completed the "Peering" stage of the Airflow migration process by peering the Airflow instance with a Dagster code location.
The next step is to represent our Airflow workflows more richly by observing the data assets that are produced by our tasks. Similar to the peering step, this stage does not require any changes to Airflow code.
In order to do this, we must define the relevant assets in the Dagster code location.
In our example, we have three sequential tasks:
load_raw_customers
loads a CSV file of raw customer data into duckdb.run_dbt_model
builds a series of dbt models (from jaffle shop) combining customer, order, and payment data.export_customers
exports a CSV representation of the final customer file from duckdb to disk.
We will first create a set of asset specs that correspond to the assets produced by these tasks. We will then annotate these asset specs so that Dagster can associate them with the Airflow tasks that produce them.
The first and third tasks involve a single table each. We can manually construct specs for these two tasks. Dagster provides the assets_with_task_mappings
utility to annotate our asset specs with the tasks that produce them. Assets which are properly annotated will be materialized by the Airlift sensor once the corresponding task completes: These annotated specs are then provided to the defs
argument to build_defs_from_airflow_instance
.
We will also create a set of dbt asset definitions for the build_dbt_models
task. We can use the dagster-dbt
-supplied decorator @dbt_assets
to generate these definitions using Dagster's dbt integration.
First, you need to install the extra that has the dbt factory:
uv pip install 'dagster-airlift[dbt]'
Then, we will construct our assets:
import os
from pathlib import Path
from dagster import (
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetSpec,
Definitions,
asset_check,
)
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
assets_with_task_mappings,
build_defs_from_airflow_instance,
)
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
@asset_check(asset=AssetKey(["airflow_instance_one", "dag", "rebuild_customers_list"]))
def validate_exported_csv() -> AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"
if not csv_path.exists():
return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")
rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=AssetCheckSeverity.WARN,
)
return AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)
def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)
@dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
mapped_assets = assets_with_task_mappings(
dag_id="rebuild_customers_list",
task_mappings={
"load_raw_customers": [AssetSpec(key=["raw_data", "raw_customers"])],
"build_dbt_models": [dbt_project_assets],
"export_customers": [AssetSpec(key="customers_csv", deps=["customers"])],
},
)
defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)
Viewing observed assets
Once your assets are set up, you should be able to reload your Dagster definitions and see a full representation of the dbt project and other data assets in your code.
Kicking off a run of the DAG in Airflow, you should see the newly created assets materialize in Dagster as each task completes.
Note: There will be some delay between task completion and assets materializing in Dagster, managed by the sensor. This sensor runs every 30 seconds by default (you can reduce down to one second via the minimum_interval_seconds
argument to sensor
).
Moving the asset check
Now that we've introduced an asset explicitly for the customers.csv
file output by the DAG, we should move the asset check constructed during the Peering step to instead be on the customers_csv
asset. Simply change the asset
targeted by the @asset_check
decorator to be AssetKey(["customers_csv"])
. Doing this ensures that even when we delete the DAG, the asset check will live on.
When done, our code will look like this.
import os
from pathlib import Path
from dagster import (
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetSpec,
Definitions,
asset_check,
)
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
assets_with_task_mappings,
build_defs_from_airflow_instance,
)
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
@asset_check(asset=AssetKey(["customers_csv"]))
def validate_exported_csv() -> AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"
if not csv_path.exists():
return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")
rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=AssetCheckSeverity.WARN,
)
return AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)
def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)
@dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
mapped_assets = assets_with_task_mappings(
dag_id="rebuild_customers_list",
task_mappings={
"load_raw_customers": [AssetSpec(key=["raw_data", "raw_customers"])],
"build_dbt_models": [dbt_project_assets],
"export_customers": [AssetSpec(key="customers_csv", deps=["customers"])],
},
)
defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)
Adding partitions
If your Airflow tasks produce time-partitioned assets, Airlift can automatically associate your materializations to the relevant partitions. In the case of rebuild_customers_list
, data is daily partitioned in each created table, and and the Airflow DAG runs on a @daily
cron schedule. We can likewise add a DailyPartitionsDefinition
to each of our assets.
import os
from pathlib import Path
from dagster import (
AssetCheckResult,
AssetCheckSeverity,
AssetExecutionContext,
AssetKey,
AssetSpec,
DailyPartitionsDefinition,
Definitions,
asset_check,
)
from dagster._time import get_current_datetime_midnight
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
assets_with_task_mappings,
build_defs_from_airflow_instance,
)
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight())
@asset_check(asset=AssetKey(["customers_csv"]))
def validate_exported_csv() -> AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"
if not csv_path.exists():
return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")
rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=AssetCheckSeverity.WARN,
)
return AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)
def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)
@dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=DbtProject(dbt_project_path()),
partitions_def=PARTITIONS_DEF,
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
mapped_assets = assets_with_task_mappings(
dag_id="rebuild_customers_list",
task_mappings={
"load_raw_customers": [
AssetSpec(key=["raw_data", "raw_customers"], partitions_def=PARTITIONS_DEF)
],
"build_dbt_models": [dbt_project_assets],
"export_customers": [
AssetSpec(key="customers_csv", deps=["customers"], partitions_def=PARTITIONS_DEF)
],
},
)
defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)
Now, every time the sensor synthesizes a materialization for an asset, it will automatically have a partition associated with it.
Let's try this out by kicking off an airflow backfill for today:
airflow dags backfill rebuild_customers_list --start-date $(date +"%Y-%m-%d")
After this dag run completes, you should see a partitioned materialization appear in Dagster.
Let's clear our Airflow runs so that we can kick off this backfill again for testing in the future.
airflow db clean
In order for partitioned assets to work out of the box with dagster-airlift
, the following things need to be true:
- The asset can only be time-window partitioned. This means static, dynamic, and multi partitioned definitions will require custom functionality.
- The partitioning scheme must match up with the logical_date / execution_date of corresponding Airflow runs. That is, each logical_date should correspond exactly to a partition in Dagster.
Next steps
Next, it's time to begin migrating our Airflow DAG code to Dagster. Follow along with the Migrate step here.