Migrating from Spark step launchers to Dagster Pipes#
In this guide, we’ll show you how to migrate from using step launchers to using Dagster Pipes in Dagster.
While step launchers were intended to support various runtime environments, in practice, they have only been implemented for Spark. Therefore, we will focus on Spark-related examples.
When deciding to migrate from step launchers to Dagster Pipes, consider the following:
Step launchers are superceded by Dagster Pipes. While they are still available (and there are no plans for their removal), they are no longer the recommended method for launching external code from Dagster ops and assets. They won't be receiving new features or be under active development.
Dagster Pipes is a more lightweight and flexible framework, but it does come with a few drawbacks:
Spark runtime and the code executed will no longer be managed by Dagster for you.
Dagster Pipes are not compatible with Resources and IO Managers. If you are heavily relying on these features, you might want to keep using step launchers.
The process of packaging the Python dependencies and scripts should be automated with a CI/CD pipeline and run before deploying the Dagster code location.
It's also possible to run Java or Scala Spark jobs with Dagster Pipes, but currently there is no official Pipes implementation for these languages. Therefore, forwarding Dagster events from these jobs is not yet supported officially (although it can be done with some custom code).
The goal is to keep the same observability and orchestration features while moving compute to an external script. Suppose you have existing code using step launchers similar to this:
from typing import Any
import dagster as dg
from dagster_aws.emr import emr_pyspark_step_launcher
from pyspark.sql import DataFrame
from my_lib import MyPysparkIOManager, calculate_metric, get_data_frame
# the upstream asset will serve as an example of writing a Spark DataFrame@dg.asset(io_manager_key="pyspark_io_manager")defupstream(pyspark_step_launcher: dg.ResourceParam[Any])-> DataFrame:return get_data_frame()# the downstream asset will serve as an example of reading a Spark DataFrame# and logging metadata to Dagster@dg.asset(io_manager_key="pyspark_io_manager")defdownstream(
context: dg.AssetExecutionContext,
upstream: DataFrame,
pyspark_step_launcher: dg.ResourceParam[Any],)->None:
my_metric = calculate_metric(upstream)
context.add_output_metadata({"my_metric": my_metric})return
definitions = dg.Definitions(
assets=[upstream, downstream],
resources={"pyspark_step_launcher": emr_pyspark_step_launcher,"pyspark_io_manager": MyPysparkIOManager(),},)
The corresponding Pipes code will instead have two components: the Dagster asset definition, and the external PySpark job.
Let's start with the PySpark job. The upstream asset will invoke the following script:
import boto3
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes
from my_lib import get_data_frame
defmain():with open_dagster_pipes(
message_writer=PipesS3MessageWriter(client=boto3.client("s3")),)as pipes:
df = get_data_frame()# change according to your needs
path ="s3://"+"<my-bucket>/"+ pipes.asset_key +"/.parquet"# this was probably previously logged by the IOManager
pipes.add_output_metadata({"path": path})
df.write.parquet(path)if __name__ =="__main__":
main()
Now, we have to run this script from Dagster. First, let's factor the boilerplate EMR config into a reusable function:
defmake_emr_params(script_path:str)->dict:return{# very rough configuration, please adjust to your needs"Name":"MyJobFlow","Applications":[{"Name":"Hadoop"},{"Name":"Spark"}],"LogUri":"s3://your-bucket/emr/logs","Steps":[{"Name":"MyStep","ActionOnFailure":"CONTINUE","HadoopJarStep":{"Jar":"command-runner.jar","Args":["spark-submit","--deploy-mode","cluster","--master","yarn","--files","s3://your-bucket/venv.pex","--conf","spark.pyspark.python=./venv.pex","--conf","spark.yarn.submit.waitAppCompletion=true",
script_path,],},},],}
Now, the asset body will be as follows:
import boto3
import dagster as dg
from dagster_aws.pipes import PipesEMRClient
from my_lib import make_emr_params
@dg.asset(io_manager_key="s3_io_manager")defupstream(
context: dg.AssetExecutionContext, pipes_emr_client: PipesEMRClient
)->str:
result = pipes_emr_client.run(
context=context,
run_job_flow_params=make_emr_params("s3://your-bucket/upstream_asset_script.py"),).get_materialize_result()return result.metadata["path"].value
Since the asset now returns the Parquet file path, it will be saved by the IOManager, and the downstream asset will be abe to access it.
Let's continue to migrating the second downstream asset.
Since we can't use IO Managers in scripts launched by Pipes, we would have to either make a CLI argument parser or use the handy extras feature provided by Pipes in order to pass this "path" value to the job. We will demonstrate the latter approach. The downstream asset turns into:
In this guide, we have demonstrated how to migrate from using step launchers to using Dagster Pipes. We have shown how to launch PySpark jobs on AWS EMR using PipesEMRClient and how to pass small pieces of data between assets using Dagster's metadata and Pipes extras.
Heads up! As an alternative to storing paths with an `IOManager`, the following utility function can be used to retrieve logged metadata values from upstream assets:
defget_latest_output_metadata_value(
context: dg.AssetExecutionContext, asset_key: dg.AssetKey, metadata_key:str):# see https://github.com/dagster-io/dagster/issues/8521 for more details about accessing upstream metadata from downstream assets and ops
event_log_entry =(
context.get_step_execution_context().instance.get_latest_materialization_event(
asset_key
))
metadata =(
event_log_entry.dagster_event.event_specific_data.materialization.metadata
)return metadata[metadata_key].value