This page is specific to asset definitions. Looking for ops? Refer to the Partitioned ops documentation.
An asset definition can represent a collection of partitions that can be tracked and materialized independently. In many ways, each partition functions like its own mini-asset, but they all share a common materialization function and dependencies. Typically, each partition will correspond to a separate file or a slice of a table in a database.
A common use is for each partition to represent all the records in a data set that fall within a particular time window, e.g. hourly, daily or monthly. Alternatively, each partition can represent a region, a customer, an experiment - any dimension along which you want to be able to materialize and monitor independently. An asset can also be partitioned along multiple dimensions, e.g. by region and by hour.
A graph of assets with the same partitions implicitly forms a partitioned data pipeline, and you can launch a run that selects multiple assets and materializes the same partition in each asset.
Once an asset has a set of partitions, you can launch materializations of individual partitions and view the materialization history by partition in the Dagster UI.
An asset definitions can be assigned a PartitionsDefinition, which determines the set of partitions that compose it. If the asset is stored in a filesystem or an object store, then each partition will typically correspond to a file or object. If the asset is stored in a database, then each partition will typically correspond to a range of values in a table that fall within a particular window.
The following example demonstrates creating an asset that has a partition for each day since October 1st, 2023. Materializing partition 2023-11-13 of this asset would result in fetching data from the URL https://api.nasa.gov/planetary/apod?date=2023-11-13 and storing it at the path nasa/2023-11-13.csv. Note that api_key=DEMO_KEY is used but has a limited number of calls:
import os
import urllib.request
# Create a new 'nasa' directory if needed
dir_name ="nasa"ifnot os.path.exists(dir_name):
os.makedirs(dir_name)from dagster import AssetExecutionContext, DailyPartitionsDefinition, asset
@asset(partitions_def=DailyPartitionsDefinition(start_date="2023-10-01"))defmy_daily_partitioned_asset(context: AssetExecutionContext)->None:
partition_date_str = context.partition_key
url =f"https://api.nasa.gov/planetary/apod?api_key=DEMO_KEY&date={partition_date_str}"
target_location =f"nasa/{partition_date_str}.csv"
urllib.request.urlretrieve(url, target_location)
In the following sections, we'll demonstrate a few additional ways to partition assets:
Heads up! Multipartitions definitions are currently limited to two dimensions.
The MultiPartitionsDefinition class accepts a mapping of dimension names to a PartitionsDefinition, creating a partition for each unique combination of dimension partitions.
In this example, the asset would contain a partition for each combination of color and date:
red|2022-01-01
yellow|2022-01-01
blue|2022-01-01
red|2022-01-02
... and so on
Additionally, notice the code snippet above fetches the partition key from the asset context. Multi-dimensional partition keys are returned as MultiPartitionKey objects, which contain a MultiPartitionKey.keys_by_dimension method that returns the key per dimension. This object can also be passed into partition key execution parameters:
from dagster import MultiPartitionKey, materialize
result = materialize([multi_partitions_asset],
partition_key=MultiPartitionKey({"date":"2022-01-01","color":"red"}),)
Looking for example projects? Check out the Dynamic Partitions example for a look at a full project that uses dynamic asset partitions.
Sometimes you don't know the set of partitions ahead of time when defining assets. For example, maybe you want to add a new partition every time a new data file lands in a directory or every time you want to experiment with a new set of hyperparameters. In these cases, you can use a DynamicPartitionsDefinition.
Partition keys can be added and removed for a given dynamic partition set. For example, the following code snippet demonstrates the usage of a sensor to detect the presence of a new partition and then trigger a run for that partition:
images_job = define_asset_job("images_job", AssetSelection.assets("images"), partitions_def=images_partitions_def
)@sensor(job=images_job)defimage_sensor(context: SensorEvaluationContext):
new_images =[
img_filename
for img_filename in os.listdir(os.getenv("MY_DIRECTORY"))ifnot images_partitions_def.has_partition_key(
img_filename, dynamic_partitions_store=context.instance
)]return SensorResult(
run_requests=[
RunRequest(partition_key=img_filename)for img_filename in new_images
],
dynamic_partitions_requests=[
images_partitions_def.build_add_request(new_images)],)
We recommend limiting the number of partitions for each asset to 25,000 or fewer. Assets with partition counts exceeding this limit will likely have slower load times in the UI.
Partitioned assets can depend on other partitioned assets. In this case, each partition in the downstream asset will depend on a partition or multiple partitions in the upstream asset.
A few rules govern default partition-to-partition dependencies:
When the upstream asset and downstream asset have the same PartitionsDefinition, each partition in the downstream asset will depend on the same partition in the upstream asset.
When the upstream asset and downstream asset are both time window-partitioned, each partition in the downstream asset will depend on all partitions in the upstream asset that intersect its time window.
For example, if an asset with a DailyPartitionsDefinition depends on an asset with an HourlyPartitionsDefinition, then partition 2022-04-12 of the daily asset would depend on 24 partitions of the hourly asset: 2022-04-12-00:00 through 2022-04-12-23:00.
Default partition dependency rules can be overridden by providing a PartitionMapping when specifying a dependency on an asset. How this is accomplished depends on the type of dependency the asset has - refer to the following tabs for more info.
To override partition dependency rules for managed-loading asset dependencies, you can use a PartitionMapping to specify that each partition of an asset should depend on a partition in an upstream asset.
In the following code snippet, we used a TimeWindowPartitionMapping to specify that each partition of a daily-partitioned asset should depend on the prior day's partition in an upstream asset:
To view all partitions for an asset, open the Definition tab of the asset's details page. The bar in the Partitions section represents all of the partitions for the asset.
In the following image, the partitions bar is entirely gray. This is because none of the partitions have been materialized:
When you materialize a partitioned asset, you choose which partitions to materialize and Dagster will launch a run for each partition. Note: If you choose more than one partition, the Dagster daemon needs to be running to queue the multiple runs.
The following image shows the Launch runs dialog on an asset's Details page, where you'll be prompted to select a partition to materialize:
After a partition has been successfully materialized, it will display as green in the partitions bar:
Heads up! Familiarity with I/O managers is required for this section.
Asset functions can write data out to files, but they can also delegate the writing operation to an I/O manager. Dagster's built-in I/O managers support handling partitioned assets, but you can also write your own I/O manager if you want additional customization.
For example, the following demonstrates how to define an asset that relies on an I/O manager to store its output:
import pandas as pd
from dagster import AssetExecutionContext, DailyPartitionsDefinition, asset
@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))defmy_daily_partitioned_asset(context: AssetExecutionContext)-> pd.DataFrame:
partition_date_str = context.partition_key
return pd.read_csv(f"coolweatherwebsite.com/weather_obs&date={partition_date_str}")
If using the default I/O manager, materializing partition 2022-07-23 of this asset would store the output DataFrame in a pickle file at a path like my_daily_partitioned_asset/2022-07-23.