In this guide, we'll walk you through running ops on a schedule. To do this for asset definitions, refer to the Automating assets using schedules guide.
By the end of this guide, you'll be able to:
Definitions
objectTo follow this guide, you'll need:
The first step in creating a schedule is to build a job that executes some ops.
Let's assume we already have a few ops in our project. To create a job that executes the ops, we'll use the @job
decorator to define the job:
@op def count_orders(): return 5 @op def count_users(arg): return arg + 1 @job def ecommerce_job(): count_users(count_orders())
To create the job, we:
job
@job
decorator and name it ecommerce_job
@job
function's body, we used function calls to define dependencies between the count_orders
and count_users
opsRefer to the Op jobs documentation for more info and examples.
Next, we'll construct the schedule using ScheduleDefinition
and attach it to the job we created in Step 1.
ecommerce_schedule = ScheduleDefinition( job=ecommerce_job, cron_schedule="15 5 * * 1-5", default_status=DefaultScheduleStatus.RUNNING, )
To build the schedule, we:
Imported DefaultScheduleStatus and ScheduleDefinition
from dagster
Created a schedule using ScheduleDefinition
that:
ecommerce_job
job15 5 * * 1-5
, which translates to Every Monday through Friday of every month at 5:15AM
default_status
). We'll discuss this more in Step 4.Next, we'll update our project's Definitions
object to include the new job and schedule. This ensures the job and schedule are available to Dagster processes, such as the Dagster UI.
defs = Definitions( jobs=[ecommerce_job], schedules=[ecommerce_schedule], )
At this point, your code should look like the following:
from dagster import job, op, DefaultScheduleStatus, Definitions, ScheduleDefinition @op def count_orders(): return 5 @op def count_users(arg): return arg + 1 @job def ecommerce_job(): count_users(count_orders()) ecommerce_schedule = ScheduleDefinition( job=ecommerce_job, cron_schedule="15 5 * * 1-5", default_status=DefaultScheduleStatus.RUNNING, ) defs = Definitions( jobs=[ecommerce_job], schedules=[ecommerce_schedule], )
turned the schedule on by using the default_status
parameter in its ScheduleDefinition
, but there are a few other ways to do this:
To turn on a schedule in the Dagster UI, navigate to Overview > Schedules:
After the schedule is started, it will begin executing immediately if the dagster-daemon process is running. This process starts automatically when dagster dev
is run.
Name | Description | |
---|---|---|
@op | A decorator used to define ops. Returns an OpDefinition . The decorated function is called the "compute function". | |
@job | The decorator used to define a job. | |
ScheduleDefinition | A class that defines a schedule and attaches it to a job. | |
Definitions | The object that contains all the definitions defined within a code location. Definitions include assets, jobs, resources, schedules, and sensors. |