Skip to main content

Observing multiple Airflow instances

At this point, you should have finished the setup step, and now the example code set up with a fresh virtual environment, and two Airflow instances running locally. Now, we can start writing Dagster code.

Observing the Airflow instances

We'll start by creating asset representations of our DAGs in Dagster.

Create a new shell and navigate to the root of the tutorial directory. You will need to set up the dagster-airlift package in your Dagster environment:

source .venv/bin/activate
uv pip install 'dagster-airlift[core]' dagster-webserver dagster

Observing the warehouse Airflow instance

Next, we'll declare a reference to our warehouse Airflow instance, which is running at http://localhost:8081.

from dagster_airlift.core import AirflowBasicAuthBackend, AirflowInstance

warehouse_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="warehouse",
)

Now, we can use the load_airflow_dag_asset_specs function to create asset representations of the DAGs in the warehouse Airflow instance:

from dagster_airlift.core import load_airflow_dag_asset_specs

assets = load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
)

Now, let's add these assets to a Definitions object:

from dagster import Definitions

defs = Definitions(assets=assets)

Let's set up some environment variables, and then point Dagster to see the asset created from our Airflow instance:

# Set up environment variables to point to the airlift-federation-tutorial directory on your machine
export TUTORIAL_EXAMPLE_DIR=$(pwd)
export DAGSTER_HOME="$TUTORIAL_EXAMPLE_DIR/.dagster_home"
dagster dev -f airlift_federation_tutorial/dagster_defs/definitions.py

If we navigate to the Dagster UI (running at http://localhost:3000), we should see the assets created from the warehouse Airflow instance.

Assets from the warehouse Airflow instance in the Dagster UI

There's a lot of DAGs in this instance, and we only want to focus on the load_customers DAG. Let's filter the assets to only include the load_customers DAG:

load_customers = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)

Let's instead add this asset to our Definitions object:

defs = Definitions(assets=[load_customers])

Now, our Dagster environment only includes the load_customers DAG from the warehouse Airflow instance.

Assets from the warehouse Airflow instance in the Dagster UI

Finally, we'll use a sensor to poll the warehouse Airflow instance for new runs. This way, whenever we get a successful run of the load_customers DAG, we'll see a materialization in the Dagster UI:

from dagster_airlift.core import build_airflow_polling_sensor

warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers],
airflow_instance=warehouse_airflow_instance,
)

Now, we can add this sensor to our Definitions object:

defs = Definitions(assets=[load_customers], sensors=[warehouse_sensor])

You can test this by navigating to the Airflow UI at localhost:8081, and triggering a run of the load_customers DAG. When the run completes, you should see a materialization in the Dagster UI.

Materialization of the load_customers DAG in the Dagster UI

Observing the metrics Airflow instance

We can repeat the same process for the customer_metrics DAG in the metrics Airflow instance, which runs at http://localhost:8082. We'll leave this as an exercise to test your understanding.

When complete, your code should look like this:

from dagster import Definitions
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
build_airflow_polling_sensor,
load_airflow_dag_asset_specs,
)

warehouse_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="warehouse",
)

metrics_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8082",
username="admin",
password="admin",
),
name="metrics",
)

load_customers_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)
customer_metrics_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
)
)

warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers_dag_asset],
airflow_instance=warehouse_airflow_instance,
)
metrics_sensor = build_airflow_polling_sensor(
mapped_assets=[customer_metrics_dag_asset],
airflow_instance=metrics_airflow_instance,
)

defs = Definitions(
assets=[load_customers_dag_asset, customer_metrics_dag_asset],
sensors=[warehouse_sensor, metrics_sensor],
)

Adding lineage between load_customers and customer_metrics

Now that we have both DAGs loaded into Dagster, we can observe the cross-dag lineage between them. To do this, we'll use the replace_attributes function to add a dependency from the load_customers asset to the customer_metrics asset:

customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes(
deps=[load_customers],
)

Now, after adding the updated customer_metrics_dag_asset to our Definitions object, we should see the lineage between the two DAGs in the Dagster UI.

Lineage between load_customers and customer_metrics in the Dagster UI

Next steps

Next, we'll federate the execution of our DAGs across both Airflow instances. Follow along here.