There are many strategies that can be employed in migrating an Airflow project to Dagster. Some organization may prefer to migrate all pipelines, or pick and choose workflows while having Airflow and Dagster co-exist. This guide offers a "Choose your own adventure" approach, with a variety of options to assist you in your migration from Apache Airflow.
If your organization makes heavy use of the PythonOperator, the Airflow TaskFlow API, or the KubernetesPodOperator, then migrating to Dagster may be relatively simple!
The Airflow pipelines that are the most simple to migrate to Dagster are those that use Airflow's TaskFlow API.
With this approach, pipelines are constructed using Airflow @task decorators that can easily be mapped to a function using the Dagster @asset decorator. For example, given the Airflow task below:
Now, let’s walk through the full tutorial_taskflow_api.py example DAG, and how it would be translated to Dagster assets.
import json
import pendulum
from airflow.decorators import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2021,1,1, tz="UTC"),
catchup=False,
tags=["example"],)deftutorial_taskflow_api():"""### TaskFlow API Tutorial Documentation
This is a simple data pipeline example which demonstrates the use of
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""@taskdefextract():"""#### Extract task
A simple Extract task to get data ready for the rest of the data
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string ='{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)return order_data_dict
@task(multiple_outputs=True)deftransform(order_data_dict:dict):"""#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value =0for value in order_data_dict.values():
total_order_value += value
return{"total_order_value": total_order_value}@taskdefload(total_order_value:float):"""#### Load task
A simple Load task which takes in the result of the Transform task and
instead of saving it to end user review, just prints it out.
"""print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
tutorial_taskflow_api()
By converting the Airflow task to a Dagster @asset, and our Airflow dag to a Dagster @job, the resulting code will look like the following.
import json
from dagster import AssetExecutionContext, Definitions, define_asset_job, asset
@assetdefextract():"""Extract task
A simple Extract task to get data ready for the rest of the data pipeline. In this case, getting
data is simulated by reading from a hardcoded JSON string.
"""
data_string ='{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)return order_data_dict
@assetdeftransform(extract):"""Transform task
A simple Transform task which takes in the collection of order data and computes the total order
value.
"""
total_order_value =0for value in extract.values():
total_order_value += value
return total_order_value
@assetdefload(context: AssetExecutionContext, transform):"""Load task
A simple Load task which takes in the result of the Transform task and instead of saving it to
end user review, just prints it out.
"""
context.log.info(f"Total order value is: {transform:.2f}")
airflow_taskflow_example = define_asset_job(
name="airflow_taskflow_example", selection=[extract, transform, load])
defs = Definitions(
assets=[extract, transform, load], jobs=[airflow_taskflow_example])
In this example, we are using define_asset_job to define a job in which the selected assets are materialized. Using the selection parameter of the function, we specify that we want our extract, transform, and load assets to be materialized. The lineage of dependencies between the assets are automatically determined through the passing of one asset as a parameter to another.
Finally, we create a Definitions object to register our assets and job and load them by the Dagster tool.
If you've elected to containerize your Airflow pipelines by using technologies like Kubernetes using the KubernetesPodOperator, or Elastic Container Service using the EcsRunTaskOperator, you'll need a different approach to migration.
In these cases, we recommend leveraging Dagster Pipes for running these external execution environments from Dagster. Refer to the Dagster Pipes documentation for more information.
Some benefits of containerizing your pipelines are as follows:
Dependencies are isolated between execution environments
Compute requirements can be easily modified per pipeline (computer, memory, GPU requirements, and so on)
Pipelines can be language agnostic, allowing you to use R, Rust, Go, and so on
Vendor lock-in is limited, and pipelines can be easily migrated to other platforms
Pipelines can be versioned using tags on the image repository
Let’s walk through an example of how a containerized pipeline can be run from Airflow, and then let’s walk through how the same would be done in Dagster. Imagine you have a Dockerized pipeline deployed to your registry of choice with an image named example-data-pipeline. In Apache Airflow, you would be able to run the image of that image by using the KubernetesPodOperator.
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import(
KubernetesPodOperator,)from pendulum import datetime
with DAG(
dag_id="example_kubernetes_dag", schedule_interval=None, catchup=False)as dag:
KubernetesPodOperator(
image="example-data-pipeline:latest",
name="example-kubernetes-task",
task_id="example-kubernetes-task",
get_logs=True,)
Now, let's look at how the same image would be run on Kubernetes using Dagster Pipes and the dagster_k8s wrapper.
from dagster import AssetExecutionContext, asset
from dagster_k8s import PipesK8sClient
@assetdefk8s_pipes_asset(
context: AssetExecutionContext, k8s_pipes_client: PipesK8sClient
):return k8s_pipes_client.run(
context=context,
image="example-data-pipeline:latest",
base_pod_spec={"containers":[{"name":"data-processing-rs","image":"data-processing-rs",}]},).get_materialize_result()
The primary difference between Airflow and Dagster are how the k8s pod specifications are exposed. In Airflow, they are passed as parameters to the KubernetesPodOperator, whereas in Dagster they are passed as a base_pod_spec dictionary to the k8s_pipes_client.run method. Additionally, in Airflow, get_logs is required to capture stdout. In Dagster, however, they are automatically captured on the stdout tab of the step output.
The primary difference between Airflow and Dagster are how the k8s pod specification are expose. In Airflow, they are passed as parameters the KubernetesPodOperator, whereas in Dagster they are passed as a base_pod_spec dictionary to the k8s_pipes_client.run method. Another difference is that in Airflow, get_logs is required to capture stdout, however, with Dagster they are automatically captured to on the stdout tab of the step output.
In the above example, we demonstrated how to run images on Kubernetes using the dagster_k8s library. One of the biggest benefits of Dagster Pipes, however, is that you can leverage the dagster_pipes library from within your containerized pipeline to access the full Dagster context, and emit events back to the Dagster UI.
In the above example, we demonstrated how to run images on Kubernetes using the dagsater_k8s library. One of the biggest benefits of Dagster Pipes, however, is that you can leverage the dagster_pipes library from within your containerized pipeline to access the full Dagster context, and emit events back to the Dagster UI.
A common pattern when building containerized pipelines is to accept a large number of command-line arguments using libraries like argparse. However, with Dagster you can pass a dictionary of parameters on the Dagster context using the extras parameter. Then, in your pipeline code, you can access the context PipesContext.get() if you are using Python.
For a step-by-step walkthrough of using Dagster Pipes, refer to the Dagster Pipes tutorial.