The dagster-aws integration library provides the PipesEMRClient resource, which can be used to launch AWS EMR jobs from Dagster assets and ops. Dagster can receive regular events such as logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes to your EMR jobs.
AWS authentication credentials configured. If you don't have this set up already, refer to the boto3 quickstart.
In AWS:
An existing AWS account
Prepared infrastructure such as S3 buckets, IAM roles, and other resources required for your EMR job
Step 1: Install the dagster-pipes module in your EMR environment#
Choose one of the options to install dagster-pipes in the EMR environment.
For example, this Dockerfile can be used to package all required dependencies into a single PEX file (in practice, the most straightforward way to package Python dependencies for EMR jobs):
# this Dockerfile can be used to create a venv archive for PySpark on AWS EMRFROM amazonlinux:2 AS builderRUN yum install -y python3WORKDIR /buildCOPY--from=ghcr.io/astral-sh/uv:latest /uv /bin/uvENV VIRTUAL_ENV=/build/.venvENV PATH="$VIRTUAL_ENV/bin:$PATH"RUN uv python install --python-preference only-managed 3.9.16 && uv python pin 3.9.16RUN uv venv .venvRUN--mount=type=cache,target=/root/.cache/uv\
uv pip install pex dagster-pipes boto3 pysparkRUN pex dagster-pipes boto3 pyspark -o /output/venv.pex && chmod +x /output/venv.pex# test importsRUN /output/venv.pex -c "import dagster_pipes, pyspark, boto3;"FROM scratch AS exportCOPY--from=builder /output/venv.pex /venv.pex
Call open_dagster_pipes in the EMR script to create a context that can be used to send messages to Dagster:
import boto3
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes
from pyspark.sql import SparkSession
defmain():with open_dagster_pipes(
message_writer=PipesS3MessageWriter(client=boto3.client("s3")))as pipes:
pipes.log.info("Hello from AWS EMR!")
spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
df = spark.createDataFrame([(1,"Alice",34),(2,"Bob",45),(3,"Charlie",56)],["id","name","age"],)# calculate a really important statistic
avg_age =float(df.agg({"age":"avg"}).collect()[0][0])# attach it to the asset materialization in Dagster
pipes.report_asset_materialization(
metadata={"average_age":{"raw_value": avg_age,"type":"float"}},
data_version="alpha",)
spark.stop()print("Hello from stdout!")if __name__ =="__main__":
main()
Step 3: Create an asset using the PipesEMRClient to launch the job#
In the Dagster asset/op code, use the PipesEMRClient resource to launch the job:
import os
import boto3
from dagster_aws.pipes import PipesEMRClient, PipesS3MessageReader
from mypy_boto3_emr.type_defs import InstanceFleetTypeDef
from dagster import AssetExecutionContext, asset
@assetdefemr_pipes_asset(context: AssetExecutionContext, pipes_emr_client: PipesEMRClient):return pipes_emr_client.run(
context=context,# see full reference here: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr/client/run_job_flow.html#EMR.Client.run_job_flow
run_job_flow_params={},).get_materialize_result()
This will launch the AWS EMR job and wait for it completion. If the job fails, the Dagster process will raise an exception. If the Dagster process is interrupted while the job is still running, the job will be terminated.
EMR application steps stdout and stderr will be forwarded to the Dagster process.