Ask AI

Migrating from Spark step launchers to Dagster Pipes#

In this guide, we’ll show you how to migrate from using step launchers to using Dagster Pipes in Dagster.

While step launchers were intended to support various runtime environments, in practice, they have only been implemented for Spark. Therefore, we will focus on Spark-related examples.


Considerations#

When deciding to migrate from step launchers to Dagster Pipes, consider the following:

  • Step launchers are superceded by Dagster Pipes. While they are still available (and there are no plans for their removal), they are no longer the recommended method for launching external code from Dagster ops and assets. They won't be receiving new features or be under active development.
  • Dagster Pipes is a more lightweight and flexible framework, but it does come with a few drawbacks:
    1. Spark runtime and the code executed will no longer be managed by Dagster for you.
    2. Dagster Pipes are not compatible with Resources and IO Managers. If you are heavily relying on these features, you might want to keep using step launchers.

Steps#

To migrate from step launchers to Dagster Pipes, you will have to perform the following steps.

1. Implement new CI/CD pipelines to prepare your Spark runtime environment#

Alternatively, this can be done from Dagster jobs, but either way, you will need to manage the Spark runtime yourself.

When running PySpark jobs, the following changes to Python dependencies should be considered:

  • drop dagster
  • add dagster-pipes

You can learn more about packaging Python dependencies for PySpark in PySpark documentation or in the AWS EMR Pipes guide.

The process of packaging the Python dependencies and scripts should be automated with a CI/CD pipeline and run before deploying the Dagster code location.

It's also possible to run Java or Scala Spark jobs with Dagster Pipes, but currently there is no official Pipes implementation for these languages. Therefore, forwarding Dagster events from these jobs is not yet supported officially (although it can be done with some custom code).

2. Update your Dagster code#

The goal is to keep the same observability and orchestration features while moving compute to an external script. Suppose you have existing code using step launchers similar to this:

from typing import Any

import dagster as dg
from dagster_aws.emr import emr_pyspark_step_launcher
from pyspark.sql import DataFrame

from my_lib import MyPysparkIOManager, calculate_metric, get_data_frame

# the upstream asset will serve as an example of writing a Spark DataFrame


@dg.asset(io_manager_key="pyspark_io_manager")
def upstream(pyspark_step_launcher: dg.ResourceParam[Any]) -> DataFrame:
    return get_data_frame()


# the downstream asset will serve as an example of reading a Spark DataFrame
# and logging metadata to Dagster


@dg.asset(io_manager_key="pyspark_io_manager")
def downstream(
    context: dg.AssetExecutionContext,
    upstream: DataFrame,
    pyspark_step_launcher: dg.ResourceParam[Any],
) -> None:
    my_metric = calculate_metric(upstream)
    context.add_output_metadata({"my_metric": my_metric})
    return


definitions = dg.Definitions(
    assets=[upstream, downstream],
    resources={
        "pyspark_step_launcher": emr_pyspark_step_launcher,
        "pyspark_io_manager": MyPysparkIOManager(),
    },
)

The corresponding Pipes code will instead have two components: the Dagster asset definition, and the external PySpark job.

Let's start with the PySpark job. The upstream asset will invoke the following script:

import boto3
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes

from my_lib import get_data_frame


def main():
    with open_dagster_pipes(
        message_writer=PipesS3MessageWriter(client=boto3.client("s3")),
    ) as pipes:
        df = get_data_frame()

        # change according to your needs
        path = "s3://" + "<my-bucket>/" + pipes.asset_key + "/.parquet"

        # this was probably previously logged by the IOManager
        pipes.add_output_metadata({"path": path})

        df.write.parquet(path)


if __name__ == "__main__":
    main()

Now, we have to run this script from Dagster. First, let's factor the boilerplate EMR config into a reusable function:

def make_emr_params(script_path: str) -> dict:
    return {
        # very rough configuration, please adjust to your needs
        "Name": "MyJobFlow",
        "Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
        "LogUri": "s3://your-bucket/emr/logs",
        "Steps": [
            {
                "Name": "MyStep",
                "ActionOnFailure": "CONTINUE",
                "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                        "spark-submit",
                        "--deploy-mode",
                        "cluster",
                        "--master",
                        "yarn",
                        "--files",
                        "s3://your-bucket/venv.pex",
                        "--conf",
                        "spark.pyspark.python=./venv.pex",
                        "--conf",
                        "spark.yarn.submit.waitAppCompletion=true",
                        script_path,
                    ],
                },
            },
        ],
    }

Now, the asset body will be as follows:

import boto3
import dagster as dg
from dagster_aws.pipes import PipesEMRClient

from my_lib import make_emr_params


@dg.asset(io_manager_key="s3_io_manager")
def upstream(
    context: dg.AssetExecutionContext, pipes_emr_client: PipesEMRClient
) -> str:
    result = pipes_emr_client.run(
        context=context,
        run_job_flow_params=make_emr_params(
            "s3://your-bucket/upstream_asset_script.py"
        ),
    ).get_materialize_result()

    return result.metadata["path"].value

Since the asset now returns the Parquet file path, it will be saved by the IOManager, and the downstream asset will be abe to access it.

Let's continue to migrating the second downstream asset.

Since we can't use IO Managers in scripts launched by Pipes, we would have to either make a CLI argument parser or use the handy extras feature provided by Pipes in order to pass this "path" value to the job. We will demonstrate the latter approach. The downstream asset turns into:

@dg.asset
def downstream(
    context: dg.AssetExecutionContext, upstream: str, pipes_emr_client: PipesEMRClient
):
    return pipes_emr_client.run(
        context=context,
        run_job_flow_params=make_emr_params(
            "s3://your-bucket/downstream_asset_script.py"
        ),
        extras={"path": upstream},
    ).get_materialize_result()

Now, let's access the path value in the PySpark job:

import boto3
import pyspark
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes

from my_lib import calculate_metric


def main():
    with open_dagster_pipes(
        message_writer=PipesS3MessageWriter(client=boto3.client("s3")),
    ) as pipes:
        spark = pyspark.sql.SparkSession.builder.getOrCreate()

        upstream_path = pipes.get_extra("path")

        df = spark.read.parquet(upstream_path)

        my_metric = calculate_metric(df)

        pipes.report_asset_materialization(metadata={"my_metric": my_metric})


if __name__ == "__main__":
    main()

Finally, provide the required resources to Definitions:

from dagster_aws.pipes import PipesS3MessageReader
from dagster_aws.s3 import S3PickleIOManager, S3Resource

definitions = dg.Definitions(
    assets=[upstream, downstream],
    resources={
        "pipes_emr_client": PipesEMRClient(
            client=boto3.client("emr"),
            message_reader=PipesS3MessageReader(
                client=boto3.client("s3"),
                bucket="your-bucket",
            ),
        ),
        "s3_io_manager": S3PickleIOManager(
            s3_resource=S3Resource(),
            s3_bucket="your-bucket",
            s3_prefix="your-prefix",
        ),
    },
)

Conclusion#

In this guide, we have demonstrated how to migrate from using step launchers to using Dagster Pipes. We have shown how to launch PySpark jobs on AWS EMR using PipesEMRClient and how to pass small pieces of data between assets using Dagster's metadata and Pipes extras.

Supplementary#

Heads up! As an alternative to storing paths with an `IOManager`, the following utility function can be used to retrieve logged metadata values from upstream assets:
def get_latest_output_metadata_value(
    context: dg.AssetExecutionContext, asset_key: dg.AssetKey, metadata_key: str
):
    # see https://github.com/dagster-io/dagster/issues/8521 for more details about accessing upstream metadata from downstream assets and ops

    event_log_entry = (
        context.get_step_execution_context().instance.get_latest_materialization_event(
            asset_key
        )
    )
    metadata = (
        event_log_entry.dagster_event.event_specific_data.materialization.metadata
    )
    return metadata[metadata_key].value