Ask AI

Defining and executing asset checks#

After creating some asset definitions, you may want to automate checks on the assets that test for data quality.

In this guide, we'll show you a few approaches to defining asset checks, how to use check results to include helpful information, and how to execute checks.


Defining asset checks#

There are a few ways you can define an asset check:

  • Separately from the assets the checks target - In this approach, asset materialization and asset checks are executed in their own separate operations. If using the multiprocess_executor, this allows you to launch runs that will use separate processes to materialize the asset and execute its check.
  • Together with assets - In this approach, checks execute in the same operation that materializes the asset.
  • Using an asset check factory - This approach allows you to define multiple, similar asset checks at once
  • Loading dbt tests into Dagster - This approach allows you to model your dbt tests as asset checks

Defining checks separately from assets#

In this example, we'll demonstrate how to define separate functions for an asset and the corresponding check.

The following code defines an asset named orders and an asset check named orders_id_has_no_nulls. When executed, the check verifies that all values in the orders asset's primary key column are unique.

import pandas as pd

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def orders():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    orders_df.to_csv("orders.csv")


@asset_check(asset=orders)
def orders_id_has_no_nulls():
    orders_df = pd.read_csv("orders.csv")
    num_null_order_ids = orders_df["order_id"].isna().sum()
    return AssetCheckResult(
        passed=bool(num_null_order_ids == 0),
    )


defs = Definitions(
    assets=[orders],
    asset_checks=[orders_id_has_no_nulls],
)

The @asset_check decorator decorates the orders_id_has_no_nulls function which returns an AssetCheckResult object.

The orders_id_has_no_nulls check runs in its own op. That means that if you launch a run that does all of the following, the check will execute in a separate process from the process that materializes the asset:

  1. Materializes the orders asset,
  2. Executes the orders_id_has_no_nulls check, and
  3. You're using the multiprocess_executor

Defining multiple checks#

Using @multi_asset_check, you can define multiple checks that execute in a single operation without an asset. This approach avoids the overhead of running a separate operation for every check and may enable reusing computations across checks.

from typing import Iterable

from dagster import (
    AssetCheckExecutionContext,
    AssetCheckResult,
    AssetCheckSeverity,
    AssetCheckSpec,
    multi_asset_check,
)


@multi_asset_check(
    specs=[
        AssetCheckSpec(name="asset_check_one", asset="my_asset_one"),
        AssetCheckSpec(name="asset_check_two", asset="my_asset_two"),
    ]
)
def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]:
    yield AssetCheckResult(
        passed=False,
        severity=AssetCheckSeverity.WARN,
        description="The asset is over 0.5",
        asset_key="asset_check_one",
    )

    yield AssetCheckResult(
        passed=True,
        description="The asset is fresh.",
        asset_key="asset_check_two",
    )

You can enable subsetting of checks defined in a @multi_asset_check to exclude certain checks. Refer to the Subsetting asset checks guide for more information.

Defining checks and assets together#

Sometimes, it makes sense for a single function to materialize an asset and execute a check on it.

In this example, we'll demonstrate how to do this by using the check_specs argument. This argument is available when using the @asset or @multi_asset decorators. Each provided AssetCheckSpec declares a check that the decorated function should yield an AssetCheckResult for.

import pandas as pd

from dagster import (
    AssetCheckResult,
    AssetCheckSpec,
    AssetExecutionContext,
    Definitions,
    Output,
    asset,
)


@asset(check_specs=[AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders")])
def orders(context: AssetExecutionContext):
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})

    # save the output and indicate that it's been saved
    orders_df.to_csv("orders")
    yield Output(value=None)

    # check it
    num_null_order_ids = orders_df["order_id"].isna().sum()
    yield AssetCheckResult(
        passed=bool(num_null_order_ids == 0),
    )


defs = Definitions(assets=[orders])

Using a factory#

To define multiple, similar asset checks, use a factory pattern. In the following example, the factory accepts a list of SQL statements and turns them into asset checks:

from typing import Any, Mapping, Sequence
from unittest.mock import MagicMock

from dagster import (
    AssetCheckResult,
    AssetChecksDefinition,
    Definitions,
    asset,
    asset_check,
)


@asset
def orders(): ...


@asset
def items(): ...


def make_check(check_blob: Mapping[str, str]) -> AssetChecksDefinition:
    @asset_check(
        name=check_blob["name"],
        asset=check_blob["asset"],
        required_resource_keys={"db_connection"},
    )
    def _check(context):
        rows = context.resources.db_connection.execute(check_blob["sql"])
        return AssetCheckResult(passed=len(rows) == 0, metadata={"num_rows": len(rows)})

    return _check


check_blobs = [
    {
        "name": "orders_id_has_no_nulls",
        "asset": "orders",
        "sql": "select * from orders where order_id is null",
    },
    {
        "name": "items_id_has_no_nulls",
        "asset": "items",
        "sql": "select * from items where item_id is null",
    },
]

defs = Definitions(
    assets=[orders, items],
    asset_checks=[make_check(check_blob) for check_blob in check_blobs],
    resources={"db_connection": MagicMock()},
)

Loading dbt tests as asset checks#

Using the DagsterDbtTranslator, you can model your existing dbt tests as asset checks. Refer to the dbt integration reference for more information.


Using asset check results#

In this section, we'll show you how to use asset check results to:

Customizing the Dagster UI#

Using asset check results, you can display how check-related information displays in the Dagster UI.

Setting severity#

Using AssetCheckSeverity, you can define a severity on check results. The default severity is ERROR.

The severity determines how the check result will display in the UI. For example, if a check fails with ERROR severity, the asset will appear red in the lineage graph in the UI.

from dagster import (
    AssetCheckResult,
    AssetCheckSeverity,
    Definitions,
    asset,
    asset_check,
)


@asset
def my_asset(): ...


@asset_check(asset=my_asset)
def my_check():
    is_serious = ...
    return AssetCheckResult(
        passed=False,
        severity=AssetCheckSeverity.ERROR if is_serious else AssetCheckSeverity.WARN,
    )


defs = Definitions(assets=[my_asset], asset_checks=[my_check])

Adding metadata#

Including details about a check result can provide helpful context to others who view it in the UI. Using the metadata argument on AssetCheckResult, you can include information about why a check passed or failed.

In the following example, we added num_null_order_ids as metadata to the orders_id_has_no_nulls check:

import pandas as pd

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def orders():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    orders_df.to_csv("orders.csv")


@asset_check(asset=orders, description="Check for null order_ids")
def orders_id_has_no_nulls():
    orders_df = pd.read_csv("orders.csv")
    num_null_order_ids = orders_df["order_id"].isna().sum()
    return AssetCheckResult(
        passed=bool(num_null_order_ids == 0),
        metadata={
            "num_null_order_ids": int(num_null_order_ids),
        },
    )


defs = Definitions(
    assets=[orders],
    asset_checks=[orders_id_has_no_nulls],
)

There are a variety of types supported via the MetadataValue class. You can view the metadata on the Checks tab of the Asset details page.

Blocking downstream assets#

To block downstream assets from executing when checks fail, set the blocking argument to True in the @asset_check decorator. In the following example, check_upstream_asset will block downstream_asset from executing.

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def upstream_asset():
    pass


@asset_check(asset=upstream_asset, blocking=True)
def check_upstream_asset():
    return AssetCheckResult(passed=False)


@asset(deps=[upstream_asset])
def downstream_asset():
    pass


defs = Definitions(
    assets=[upstream_asset, downstream_asset], asset_checks=[check_upstream_asset]
)

When blocking is enabled, downstream assets will wait to execute until the check completes:

  • Downstream assets will not execute if the check returns a failing AssetCheckResult or raises an exception
  • Downstream assets will execute if the AssetCheckSeverity is set to WARN instead of ERROR

This feature has the following limitations:


Executing checks#

Via the UI#

Materializing an asset from the UI will also execute any checks defined for that asset. To execute a check without materializing the asset, use the Checks tab of the Asset's details page.

Via sensors and schedules#

To define jobs that execute sets of assets and checks, you can use define_asset_job and then trigger the jobs via sensors or schedules. By default, checks are included with the assets they check. You can also define jobs that include only checks, or only assets.

from dagster import (
    AssetSelection,
    Definitions,
    ScheduleDefinition,
    asset,
    asset_check,
    define_asset_job,
)


@asset
def my_asset(): ...


@asset_check(asset=my_asset)
def check_1(): ...


@asset_check(asset=my_asset)
def check_2(): ...


# includes my_asset and both checks
my_job = define_asset_job("my_job", selection=AssetSelection.assets(my_asset))


# includes only my_asset
my_asset_only_job = define_asset_job(
    "my_asset_only_job",
    selection=AssetSelection.assets(my_asset).without_checks(),
)

# includes check_1 and check_2, but not my_asset
checks_only_job = define_asset_job(
    "checks_only_job", selection=AssetSelection.checks_for_assets(my_asset)
)

# includes only check_1
check_1_job = define_asset_job("check_1_job", selection=AssetSelection.checks(check_1))

# schedule my_job to run every day at midnight
basic_schedule = ScheduleDefinition(job=my_job, cron_schedule="0 0 * * *")

defs = Definitions(
    assets=[my_asset],
    asset_checks=[check_1, check_2],
    jobs=[my_job, my_asset_only_job, checks_only_job, check_1_job],
    schedules=[basic_schedule],
)

Testing checks#

Refer to the Asset checks section of the Testing documentation for more information.


Alerting on failed checks (Dagster+ only)#

In Dagster+, you can set up alerts to notify you when assets checks fail. To alert on failed checks, create an alert policy with the following settings:

  • Alert policy type - Asset alert
  • Target - The asset keys or groups you've defined checks for. You can also target all assets.
  • Events - Under Asset checks, check the box for the severity you've defined for failed checks: Failed (WARN) or Failed (ERROR)

Refer to the Managing alert policies in Dagster+ for instructions on setting up alerts in the Dagster+ UI.


APIs in this guide#

NameDescription
@asset_checkA decorator used to define asset checks that execute in their own op.
@multi_asset_checkA decorator used to define a set of asset checks that execute together in the same op.
AssetCheckResultThe class returned by asset checks.
AssetCheckSeverityDefines the severity of a given asset check result.
AssetCheckSpecA class that's passed to asset decorators to define checks that execute in the same op as the asset.