An external asset is an asset that is visible in Dagster but executed by an external process. For example, you have a process that loads data from Kafka into Amazon S3 every day. You want the S3 asset to be visible alongside your other data assets, but not triggered by Dagster.
In this case, you could use an external asset to leverage Dagster's event log and tooling without using the orchestrator. This allows you to maintain data lineage, observability, and data quality without unnecessary migrations.
External assets are defined using the AssetSpec class. An AssetSpec represents metadata about an asset without describing how it's computed. The following code declares a single external asset that represents a file in S3 and passes it to a Definitions object:
Click the Asset in the Dagster UI tab to see how this asset would be rendered in the Dagster UI.
from dagster import AssetSpec, Definitions
defs = Definitions(assets=[AssetSpec("file_in_s3")])
Click the Asset definition tab to view how this asset is defined.
Note that the Materialize button is disabled, as external assets can't be executed by Dagster.
Dependencies are defined by using the deps argument of AssetSpec. This enables Dagster to model entire graphs of assets scheduled and orchestrated by other systems.
In the following example, we have two assets: raw_logs and processed_logs. The processed_logs asset is produced by a scheduled computation in another orchestration system. Using external assets allows you to model both assets in Dagster.
Click the Assets in the Dagster UI tab to see how these assets would be rendered in the Dagster UI.
As Dagster doesn't control scheduling or materializing external assets, it's up to you to keep their metadata updated. This also means that materialization for external assets will be disabled in the Dagster UI.
To keep your external assets updated, you can use any of the following approaches:
Whether you're using Dagster OSS or Dagster+, you can use a REST endpoint for reporting asset materializations, asset check evaluations, and asset observations. Refer to the External assets REST API reference for more information and examples.
By using the asset_events parameter of SensorResult, you can generate events to attach to external assets and then provide them directly to sensors. For example:
import datetime
from dagster import(
AssetMaterialization,
AssetSpec,
Definitions,
SensorEvaluationContext,
SensorResult,
sensor,)defutc_now_str()->str:return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d, %H:%M:%S")@sensor()defkeep_external_asset_a_up_to_date(context: SensorEvaluationContext)-> SensorResult:# Materialization happened in external system, but is recorded herereturn SensorResult(
asset_events=[
AssetMaterialization(
asset_key="external_asset_a",
metadata={"source":f'From sensor "{context.sensor_name}" at UTC time "{utc_now_str()}"'},)])
defs = Definitions(
assets=[AssetSpec("external_asset_a")],
sensors=[keep_external_asset_a_up_to_date],)
You can insert events to attach to external assets directly from Dagster's Python API. Specifically, the API is report_runless_asset_event on DagsterInstance.
For example, this would be useful when writing a Python script to backfill metadata:
from dagster import AssetMaterialization
# instance is a DagsterInstance. Get using DagsterInstance.get()
instance.report_runless_asset_event(
AssetMaterialization("asset_one", metadata={"nrows":10,"source":"From this script."}))
You can log an AssetMaterialization from an op. In this case, use the log_event method of OpExecutionContext to report an asset materialization of an external asset. For example: