The dagster-aws integration library provides the PipesEMRServerlessClient resource, which can be used to launch AWS EMR Serverless jobs from Dagster assets and ops. Dagster can receive regular events such as logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes to your EMR jobs.
Install the dagster-pipes module in the image used for your EMR job. For example, you can install the dependency with pip in your image Dockerfile:
# start from EMR imageFROM public.ecr.aws/emr-serverless/spark/emr-7.2.0:latestUSER rootRUN python -m pip install dagster-pipes# copy the job scriptCOPY . .USER hadoop
Step 2: Add dagster-pipes to the EMR Serverless job script#
Call open_dagster_pipes in the EMR Serverless script to create a context that can be used to send messages to Dagster:
from dagster_pipes import open_dagster_pipes
from pyspark.sql import SparkSession
defmain():with open_dagster_pipes()as pipes:
pipes.log.info("Hello from AWS EMR Serverless!")
spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
df = spark.createDataFrame([(1,"Alice",34),(2,"Bob",45),(3,"Charlie",56)],["id","name","age"],)# calculate a really important statistic
avg_age =float(df.agg({"age":"avg"}).collect()[0][0])# attach it to the asset materialization in Dagster
pipes.report_asset_materialization(
metadata={"average_age":{"raw_value": avg_age,"type":"float"}},
data_version="alpha",)
spark.stop()if __name__ =="__main__":
main()
Step 3: Create an asset using the PipesEMRServerlessClient to launch the job#
In the Dagster asset/op code, use the PipesEMRServerlessClient resource to launch the job:
import os
import boto3
from dagster_aws.pipes import PipesEMRServerlessClient
from dagster import AssetExecutionContext, asset
@assetdefemr_serverless_asset(
context: AssetExecutionContext,
pipes_emr_serverless_client: PipesEMRServerlessClient,):return pipes_emr_serverless_client.run(
context=context,
start_job_run_params={"applicationId":"<app-id>","executionRoleArn":"<emr-role>","clientToken": context.run_id,# idempotency identifier for the job run"configurationOverrides":{"monitoringConfiguration":{"cloudWatchLoggingConfiguration":{"enabled":True}}},},).get_results()
This will launch the AWS EMR Serverless job and wait for it completion. If the job fails, the Dagster process will raise an exception. If the Dagster process is interrupted while the job is still running, the job will be terminated.
Dagster will now be able to launch the AWS EMR Serverless task from the emr_serverless_asset asset, and receive logs and events from the job. If using the default message_readerPipesCloudwatchLogReader, driver logs will be forwarded to the Dagster process.