Ask AI

Testing partitioned config and jobs#

This guide is applicable to both asset definitions and ops.

In this guide, we'll cover a few ways to test your partitioned config and jobs.


Prerequisites#

Before continuing, you should be familiar with:


Testing partitioned config#

Invoking a PartitionedConfig object directly invokes the decorated function.

If you want to check whether the generated run config is valid for the config of a job, you can use the validate_run_config function.

from dagster import validate_run_config, daily_partitioned_config
from datetime import datetime


@daily_partitioned_config(start_date=datetime(2020, 1, 1))
def my_partitioned_config(start: datetime, _end: datetime):
    return {
        "ops": {
            "process_data_for_date": {"config": {"date": start.strftime("%Y-%m-%d")}}
        }
    }


def test_my_partitioned_config():
    # assert that the decorated function returns the expected output
    run_config = my_partitioned_config(datetime(2020, 1, 3), datetime(2020, 1, 4))
    assert run_config == {
        "ops": {"process_data_for_date": {"config": {"date": "2020-01-03"}}}
    }

    # assert that the output of the decorated function is valid configuration for the
    # partitioned_op_job job
    assert validate_run_config(partitioned_op_job, run_config)

If you want to test that a PartitionedConfig creates the partitions you expect, use the get_partition_keys or get_run_config_for_partition_key functions:

from dagster import Config, OpExecutionContext


@daily_partitioned_config(start_date=datetime(2020, 1, 1), minute_offset=15)
def my_offset_partitioned_config(start: datetime, _end: datetime):
    return {
        "ops": {
            "process_data": {
                "config": {
                    "start": start.strftime("%Y-%m-%d-%H:%M"),
                    "end": _end.strftime("%Y-%m-%d-%H:%M"),
                }
            }
        }
    }


class ProcessDataConfig(Config):
    start: str
    end: str


@op
def process_data(context: OpExecutionContext, config: ProcessDataConfig):
    s = config.start
    e = config.end
    context.log.info(f"processing data for {s} - {e}")


@job(config=my_offset_partitioned_config)
def do_more_stuff_partitioned():
    process_data()


def test_my_offset_partitioned_config():
    # test that the partition keys are what you expect
    keys = my_offset_partitioned_config.get_partition_keys()
    assert keys[0] == "2020-01-01"
    assert keys[1] == "2020-01-02"

    # test that the run_config for a partition is valid for partitioned_op_job
    run_config = my_offset_partitioned_config.get_run_config_for_partition_key(keys[0])
    assert validate_run_config(do_more_stuff_partitioned, run_config)

    # test that the contents of run_config are what you expect
    assert run_config == {
        "ops": {
            "process_data": {
                "config": {"start": "2020-01-01-00:15", "end": "2020-01-02-00:15"}
            }
        }
    }

Testing partitioned jobs#

To run a partitioned job in-process on a particular partition, supply a value for the partition_key argument of JobDefinition.execute_in_process. For example:

def test_partitioned_op_job():
    assert partitioned_op_job.execute_in_process(partition_key="2020-01-01").success