Ask AI

External Assets#

An external asset is an asset that is visible in Dagster but executed by an external process. For example, you have a process that loads data from Kafka into Amazon S3 every day. You want the S3 asset to be visible alongside your other data assets, but not triggered by Dagster.

In this case, you could use an external asset to leverage Dagster's event log and tooling without using the orchestrator. This allows you to maintain data lineage, observability, and data quality without unnecessary migrations.


Uses and limitations#

Using external assets, you can:

  • Attach metadata to asset definitions for documentation, tracking ownership, and so on
  • Track the assets' data quality and version in Dagster
  • Use asset sensors or auto-materialize policies to update downstream assets based on updates to external assets

Limitations#

The following aren't currently supported when using external assets:

  • Scheduling the execution of an external asset
  • Backfilling an external asset using Dagster
  • Using the Dagster UI or GraphQL API to instigate ad hoc executions

Relevant APIs#

NameDescription
AssetSpecAn object that represents the metadata of a particular asset without representing how it's computed.

Defining external assets#

External assets are defined using the AssetSpec class. An AssetSpec represents metadata about an asset without describing how it's computed. The following code declares a single external asset that represents a file in S3 and passes it to a Definitions object:

Click the Asset in the Dagster UI tab to see how this asset would be rendered in the Dagster UI.

from dagster import AssetSpec, Definitions

defs = Definitions(assets=[AssetSpec("file_in_s3")])

External assets with dependencies#

Dependencies are defined by using the deps argument of AssetSpec. This enables Dagster to model entire graphs of assets scheduled and orchestrated by other systems.

In the following example, we have two assets: raw_logs and processed_logs. The processed_logs asset is produced by a scheduled computation in another orchestration system. Using external assets allows you to model both assets in Dagster.

Click the Assets in the Dagster UI tab to see how these assets would be rendered in the Dagster UI.

from dagster import AssetSpec, Definitions

raw_logs = AssetSpec("raw_logs")
processed_logs = AssetSpec("processed_logs", deps=[raw_logs])

defs = Definitions(assets=[raw_logs, processed_logs])

Dagster-native assets with external asset dependencies#

Fully-managed assets can depend on external assets. In this example, the aggregated_logs asset depends on processed_logs, which is an external asset:

Click the Assets in the Dagster UI tab to see how these assets would be rendered in the Dagster UI.

from dagster import AssetSpec, Definitions, asset

raw_logs = AssetSpec("raw_logs")
processed_logs = AssetSpec("processed_logs", deps=[raw_logs])


@asset(deps=[processed_logs])
def aggregated_logs() -> None:
    # Loads "processed_log" into memory and performs some aggregation
    ...


defs = Definitions(assets=[aggregated_logs, raw_logs, processed_logs])

Updating external asset metadata#

As Dagster doesn't control scheduling or materializing external assets, it's up to you to keep their metadata updated. This also means that materialization for external assets will be disabled in the Dagster UI.

To keep your external assets updated, you can use any of the following approaches:

Using the REST API#

Whether you're using Dagster OSS or Dagster+, you can use a REST endpoint for reporting asset materializations, asset check evaluations, and asset observations. Refer to the External assets REST API reference for more information and examples.

Using sensors#

By using the asset_events parameter of SensorResult, you can generate events to attach to external assets and then provide them directly to sensors. For example:

import datetime

from dagster import (
    AssetMaterialization,
    AssetSpec,
    Definitions,
    SensorEvaluationContext,
    SensorResult,
    sensor,
)


def utc_now_str() -> str:
    return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d, %H:%M:%S")


@sensor()
def keep_external_asset_a_up_to_date(context: SensorEvaluationContext) -> SensorResult:
    # Materialization happened in external system, but is recorded here
    return SensorResult(
        asset_events=[
            AssetMaterialization(
                asset_key="external_asset_a",
                metadata={
                    "source": f'From sensor "{context.sensor_name}" at UTC time "{utc_now_str()}"'
                },
            )
        ]
    )


defs = Definitions(
    assets=[AssetSpec("external_asset_a")],
    sensors=[keep_external_asset_a_up_to_date],
)

Using the Python API#

You can insert events to attach to external assets directly from Dagster's Python API. Specifically, the API is report_runless_asset_event on DagsterInstance.

For example, this would be useful when writing a Python script to backfill metadata:

from dagster import AssetMaterialization

# instance is a DagsterInstance. Get using DagsterInstance.get()
instance.report_runless_asset_event(
    AssetMaterialization(
        "asset_one", metadata={"nrows": 10, "source": "From this script."}
    )
)

Logging events using ops#

You can log an AssetMaterialization from an op. In this case, use the log_event method of OpExecutionContext to report an asset materialization of an external asset. For example:

from dagster import (
    AssetMaterialization,
    AssetSpec,
    Definitions,
    OpExecutionContext,
    job,
    op,
)


@op
def an_op(context: OpExecutionContext) -> None:
    context.log_event(AssetMaterialization(asset_key="external_asset"))


@job
def a_job() -> None:
    an_op()


defs = Definitions(assets=[AssetSpec("external_asset")], jobs=[a_job])