Observing multiple Airflow instances
At this point, you should have finished the setup step, and now the example code set up with a fresh virtual environment, and two Airflow instances running locally. Now, we can start writing Dagster code.
Observing the Airflow instances
We'll start by creating asset representations of our DAGs in Dagster.
Create a new shell and navigate to the root of the tutorial directory. You will need to set up the dagster-airlift
package in your Dagster environment:
source .venv/bin/activate
uv pip install 'dagster-airlift[core]' dagster-webserver dagster
Observing the warehouse
Airflow instance
Next, we'll declare a reference to our warehouse
Airflow instance, which is running at http://localhost:8081
.
from dagster_airlift.core import AirflowBasicAuthBackend, AirflowInstance
warehouse_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="warehouse",
)
Now, we can use the load_airflow_dag_asset_specs
function to create asset representations of the DAGs in the warehouse
Airflow instance:
from dagster_airlift.core import load_airflow_dag_asset_specs
assets = load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
)
Now, let's add these assets to a Definitions
object:
from dagster import Definitions
defs = Definitions(assets=assets)
Let's set up some environment variables, and then point Dagster to see the asset created from our Airflow instance:
# Set up environment variables to point to the airlift-federation-tutorial directory on your machine
export TUTORIAL_EXAMPLE_DIR=$(pwd)
export DAGSTER_HOME="$TUTORIAL_EXAMPLE_DIR/.dagster_home"
dagster dev -f airlift_federation_tutorial/dagster_defs/definitions.py
If we navigate to the Dagster UI (running at http://localhost:3000
), we should see the assets created from the warehouse
Airflow instance.
There's a lot of DAGs in this instance, and we only want to focus on the load_customers
DAG. Let's filter the assets to only include the load_customers
DAG:
load_customers = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)
Let's instead add this asset to our Definitions
object:
defs = Definitions(assets=[load_customers])
Now, our Dagster environment only includes the load_customers
DAG from the warehouse
Airflow instance.
Finally, we'll use a sensor to poll the warehouse
Airflow instance for new runs. This way, whenever we get a successful run of the load_customers
DAG, we'll see a materialization in the Dagster UI:
from dagster_airlift.core import build_airflow_polling_sensor
warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers],
airflow_instance=warehouse_airflow_instance,
)
Now, we can add this sensor to our Definitions
object:
defs = Definitions(assets=[load_customers], sensors=[warehouse_sensor])
You can test this by navigating to the Airflow UI at localhost:8081, and triggering a run of the load_customers
DAG. When the run completes, you should see a materialization in the Dagster UI.
Observing the metrics
Airflow instance
We can repeat the same process for the customer_metrics
DAG in the metrics
Airflow instance, which runs at http://localhost:8082
. We'll leave this as an exercise to test your understanding.
When complete, your code should look like this:
from dagster import Definitions
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
build_airflow_polling_sensor,
load_airflow_dag_asset_specs,
)
warehouse_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="warehouse",
)
metrics_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8082",
username="admin",
password="admin",
),
name="metrics",
)
load_customers_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)
customer_metrics_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
)
)
warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers_dag_asset],
airflow_instance=warehouse_airflow_instance,
)
metrics_sensor = build_airflow_polling_sensor(
mapped_assets=[customer_metrics_dag_asset],
airflow_instance=metrics_airflow_instance,
)
defs = Definitions(
assets=[load_customers_dag_asset, customer_metrics_dag_asset],
sensors=[warehouse_sensor, metrics_sensor],
)
Adding lineage between load_customers
and customer_metrics
Now that we have both DAGs loaded into Dagster, we can observe the cross-dag lineage between them. To do this, we'll use the replace_attributes
function to add a dependency from the load_customers
asset to the customer_metrics
asset:
customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes(
deps=[load_customers],
)
Now, after adding the updated customer_metrics_dag_asset
to our Definitions
object, we should see the lineage between the two DAGs in the Dagster UI.
Next steps
Next, we'll federate the execution of our DAGs across both Airflow instances. Follow along here.