This library provides Dagster integration with Polars. It allows using Polars eager or lazy DataFrames as inputs and outputs with Dagster’s @asset and @op. Type annotations are used to control whether to load an eager or lazy DataFrame. Lazy DataFrames can be sinked as output. Multiple serialization formats (Parquet, Delta Lake, BigQuery) and filesystems (local, S3, GCS, …) are supported.
Comprehensive list of dagster-polars behavior for supported type annotations can be found in Type Annotations section.
pip install dagster-polars
Some IOManagers (like PolarsDeltaIOManager
) may require additional dependencies, which are provided with extras like dagster-polars[delta].
Please check the documentation for each IOManager for more details.
Common filesystem-based IOManagers features highlights, using PolarsParquetIOManager
as an example (see BasePolarsUPathIOManager
for the full list of features provided by dagster-polars):
Type annotations are not required. By default an eager pl.DataFrame will be loaded.
from dagster import asset
import polars as pl
@asset(io_manager_key="polars_parquet_io_manager")
def upstream():
return DataFrame({"foo": [1, 2, 3]})
@asset(io_manager_key="polars_parquet_io_manager")
def downstream(upstream) -> pl.LazyFrame:
assert isinstance(upstream, pl.DataFrame)
return upstream.lazy() # LazyFrame will be sinked
Lazy pl.LazyFrame can be scanned by annotating the input with pl.LazyFrame, and returning a pl.LazyFrame will sink it:
@asset(io_manager_key="polars_parquet_io_manager")
def downstream(upstream: pl.LazyFrame) -> pl.LazyFrame:
assert isinstance(upstream, pl.LazyFrame)
return upstream
The same logic applies to partitioned assets:
@asset
def downstream(partitioned_upstream: Dict[str, pl.LazyFrame]):
assert isinstance(partitioned_upstream, dict)
assert isinstance(partitioned_upstream["my_partition"], pl.LazyFrame)
Optional inputs and outputs are supported:
@asset
def upstream() -> Optional[pl.DataFrame]:
if has_data:
return DataFrame({"foo": [1, 2, 3]}) # type check will pass
else:
return None # type check will pass and `dagster_polars` will skip writing the output completely
@asset
def downstream(upstream: Optional[pl.LazyFrame]): # upstream will be None if it doesn't exist in storage
...
By default all the IOManagers store separate partitions as physically separated locations, such as:
/my/asset/key/partition_0.extension
/my/asset/key/partition_1.extension
This mode is useful for e.g. snapshotting.
Some IOManagers (like PolarsDeltaIOManager
) support reading and writing partitions in storage-native format in the same location.
This mode can be typically enabled by setting “partition_by” metadata value. For example, PolarsDeltaIOManager
would store different partitions in the same /my/asset/key.delta directory, which will be properly partitioned.
This mode should be preferred for true partitioning.
Type aliases like DataFrameWithPartitions are provided by dagster_polars.types
for convenience.
Type annotation |
Type Alias |
Behavior |
---|---|---|
DataFrame |
read/write a DataFrame |
|
LazyFrame |
read/sink a LazyFrame |
|
Optional[DataFrame] |
read/write a DataFrame. Do nothing if no data is found in storage or the output is None |
|
Optional[LazyFrame] |
read a LazyFrame. Do nothing if no data is found in storage |
|
Dict[str, DataFrame] |
DataFrameWithPartitions |
read multiple DataFrame`s as `Dict[str, DataFrame]. Raises an error for missing partitions, unless “allow_missing_partitions” input metadata is set to True |
Dict[str, LazyFrame] |
LazyFramePartitions |
read multiple LazyFrame`s as `Dict[str, LazyFrame]. Raises an error for missing partitions, unless “allow_missing_partitions” input metadata is set to True |
Generic builtins (like tuple[…] instead of Tuple[…]) are supported for Python >= 3.9.
Default Value: None
Base directory for storing files.
Default Value: None
Storage authentication for cloud object store
Default Value: None
Base class for dagster-polars IOManagers.
Doesn’t define a specific storage format.
To implement a specific storage format (parquet, csv, etc), inherit from this class and implement the write_df_to_path, sink_df_to_path and scan_df_from_path methods.
All the features of UPathIOManager
- works with local and remote filesystems (like S3), supports loading multiple partitions with respect to PartitionMapping
, and more
loads the correct type - polars.DataFrame, polars.LazyFrame, or other types defined in dagster_polars.types
- based on the input type annotation (or dagster.DagsterType’s typing_type)
can sink lazy pl.LazyFrame DataFrames
handles Nones with Optional types by skipping loading missing inputs or saving None outputs
logs various metadata about the DataFrame - size, schema, sample, stats, …
the “columns” input metadata value can be used to select a subset of columns to load
Default Value: ‘.parquet’
Base directory for storing files.
Default Value: None
Storage authentication for cloud object store
Default Value: None
( experimental ) > This API may break in future versions, even between dot releases.
Implements reading and writing Polars DataFrames in Apache Parquet format.
All features provided by BasePolarsUPathIOManager
.
All read/write options can be set via corresponding metadata or config parameters (metadata takes precedence).
Supports reading partitioned Parquet datasets (for example, often produced by Spark).
Supports reading/writing custom metadata in the Parquet file’s schema as json-serialized bytes at “dagster_polars_metadata” key.
Examples
from dagster import asset
from dagster_polars import PolarsParquetIOManager
import polars as pl
@asset(
io_manager_key="polars_parquet_io_manager",
key_prefix=["my_dataset"]
)
def my_asset() -> pl.DataFrame: # data will be stored at <base_dir>/my_dataset/my_asset.parquet
...
defs = Definitions(
assets=[my_table],
resources={
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir="s3://my-bucket/my-dir")
}
)
Reading partitioned Parquet datasets:
from dagster import SourceAsset
my_asset = SourceAsset(
key=["path", "to", "dataset"],
io_manager_key="polars_parquet_io_manager",
metadata={
"partition_by": ["year", "month", "day"]
}
)
Default Value: ‘.delta’
Base directory for storing files.
Default Value: None
Storage authentication for cloud object store
Default Value: None
Default Value: ‘overwrite’
Default Value: False
Default Value: None
( experimental ) > This API may break in future versions, even between dot releases.
Implements writing and reading DeltaLake tables.
All features provided by BasePolarsUPathIOManager
.
All read/write options can be set via corresponding metadata or config parameters (metadata takes precedence).
Supports native DeltaLake partitioning by storing different asset partitions in the same DeltaLake table. To enable this behavior, set the partition_by metadata value or config parameter and use a non-dict type annotation when loading the asset. The partition_by value will be used in delta_write_options of pl.DataFrame.write_delta and pyarrow_options of pl.scan_detla). When using a one-dimensional PartitionsDefinition, it should be a single string like “column`. When using a MultiPartitionsDefinition, it should be a dict with dimension to column names mapping, like {“dimension”: “column”}.
Install dagster-polars[delta] to use this IOManager.
Examples
from dagster import asset
from dagster_polars import PolarsDeltaIOManager
import polars as pl
@asset(
io_manager_key="polars_delta_io_manager",
key_prefix=["my_dataset"]
)
def my_asset() -> pl.DataFrame: # data will be stored at <base_dir>/my_dataset/my_asset.delta
...
defs = Definitions(
assets=[my_table],
resources={
"polars_parquet_io_manager": PolarsDeltaIOManager(base_dir="s3://my-bucket/my-dir")
}
)
Appending to a DeltaLake table:
@asset(
io_manager_key="polars_delta_io_manager",
metadata={
"mode": "append"
},
)
def my_table() -> pl.DataFrame:
...
Using native DeltaLake partitioning by storing different asset partitions in the same DeltaLake table:
from dagster import AssetExecutionContext, DailyPartitionedDefinition
from dagster_polars import LazyFramePartitions
@asset(
io_manager_key="polars_delta_io_manager",
metadata={
"partition_by": "partition_col"
},
partitions_def=StaticPartitionsDefinition(["a, "b", "c"])
)
def upstream(context: AssetExecutionContext) -> pl.DataFrame:
df = ...
# column with the partition_key must match `partition_by` metadata value
return df.with_columns(pl.lit(context.partition_key).alias("partition_col"))
@asset
def downstream(upstream: pl.LazyFrame) -> pl.DataFrame:
...
When using MuiltiPartitionsDefinition, partition_by metadata value should be a dictionary mapping dimensions to column names.
from dagster import AssetExecutionContext, DailyPartitionedDefinition, MultiPartitionsDefinition, StaticPartitionsDefinition
from dagster_polars import LazyFramePartitions
@asset(
io_manager_key="polars_delta_io_manager",
metadata={
"partition_by": {"time": "date", "clients": "client"} # dimension->column mapping
},
partitions_def=MultiPartitionsDefinition(
{
"date": DailyPartitionedDefinition(...),
"clients": StaticPartitionsDefinition(...)
}
)
)
def upstream(context: AssetExecutionContext) -> pl.DataFrame:
df = ...
partition_keys_by_dimension = context.partition_key.keys_by_dimension
return df.with_columns(
pl.lit(partition_keys_by_dimension["time"]).alias("date"), # time dimension matches date column
pl.lit(partition_keys_by_dimension["clients"]).alias("client") # clients dimension matches client column
)
@asset
def downstream(upstream: pl.LazyFrame) -> pl.DataFrame:
...
The GCP project to use.
Name of the BigQuery dataset to use. If not provided, the last prefix before the asset name will be used.
Default Value: None
The GCP location. Note: When using PySpark DataFrames, the default location of the project will be used. A custom location can be specified in your SparkSession configuration.
Default Value: None
GCP authentication credentials. If provided, a temporary file will be created with the credentials and GOOGLE_APPLICATION_CREDENTIALS
will be set to the temporary file. To avoid issues with newlines in the keys, you must base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_AUTH_CREDENTIALS | base64
Default Value: None
When using PySpark DataFrames, optionally specify a temporary GCS bucket to store data. If not provided, data will be directly written to BigQuery.
Default Value: None
When using Pandas DataFrames, optionally specify a timeout for the BigQuery queries (loading and reading from tables).
Default Value: None
Implements reading and writing Polars DataFrames from/to BigQuery).
Features:
- All DBIOManager
features
- Supports writing partitioned tables (“partition_expr” input metadata key must be specified).
IOManagerDefinition
Examples
from dagster import Definitions, EnvVar
from dagster_polars import PolarsBigQueryIOManager
@asset(
key_prefix=["my_dataset"] # will be used as the dataset in BigQuery
)
def my_table() -> pl.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources={
"io_manager": PolarsBigQueryIOManager(project=EnvVar("GCP_PROJECT"))
}
)
You can tell Dagster in which dataset to create tables by setting the “dataset” configuration value. If you do not provide a dataset as configuration to the I/O manager, Dagster will determine a dataset based on the assets and ops using the I/O Manager. For assets, the dataset will be determined from the asset key, as shown in the above example. The final prefix before the asset name will be used as the dataset. For example, if the asset “my_table” had the key prefix [“gcp”, “bigquery”, “my_dataset”], the dataset “my_dataset” will be used. For ops, the dataset can be specified by including a “schema” entry in output metadata. If “schema” is not provided via config or on the asset/op, “public” will be used for the dataset.
@op(
out={"my_table": Out(metadata={"schema": "my_dataset"})}
)
def make_my_table() -> pl.DataFrame:
# the returned value will be stored at my_dataset.my_table
...
To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pl.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...
If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the “gcp_credentials” configuration. Dagster will store this key in a temporary file and set GOOGLE_APPLICATION_CREDENTIALS to point to the file. After the run completes, the file will be deleted, and GOOGLE_APPLICATION_CREDENTIALS will be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_APPLICATION_CREDENTIALS | base64
The “write_disposition” metadata key can be used to set the write_disposition parameter of bigquery.JobConfig. For example, set it to “WRITE_APPEND” to append to an existing table intead of overwriting it.
Install dagster-polars[gcp] to use this IOManager.