Using resources in schedules
This example demonstrates how to use resources in schedules. To specify a resource dependency, annotate the resource as a parameter to the schedule's function.
note
This article assumes familiarity with resources, code locations and definitions, and schedule testing.
All Dagster definitions, including schedules and resources, must be attached to a Definitions
call.
from dagster import (
schedule,
ScheduleEvaluationContext,
ConfigurableResource,
job,
RunRequest,
RunConfig,
Definitions,
)
from datetime import datetime
from typing import List
class DateFormatter(ConfigurableResource):
format: str
def strftime(self, dt: datetime) -> str:
return dt.strftime(self.format)
@job
def process_data(): ...
@schedule(job=process_data, cron_schedule="* * * * *")
def process_data_schedule(
context: ScheduleEvaluationContext,
date_formatter: DateFormatter,
):
formatted_date = date_formatter.strftime(context.scheduled_execution_time)
return RunRequest(
run_key=None,
tags={"date": formatted_date},
)
defs = Definitions(
jobs=[process_data],
schedules=[process_data_schedule],
resources={"date_formatter": DateFormatter(format="%Y-%m-%d")},
)
APIs in this guide
Name | Description |
---|---|
@dg.schedule | Decorator that defines a schedule that executes according to a given cron schedule. |
ConfigurableResource | |
@dg.job | The decorator used to define a job. |
RunRequest | A class that represents all the information required to launch a single run. |
RunConfig | |
Definitions |