This library provides an integration with the DuckDB database and PySpark data processing library.
Related guides:
Path to the DuckDB database.
DuckDB connection configuration options. See https://duckdb.org/docs/sql/configuration.html
{}
Name of the schema to use.
Default Value: None
An I/O manager definition that reads inputs from and writes PySpark DataFrames to DuckDB. When using the DuckDBPySparkIOManager, any inputs and outputs without type annotations will be loaded as PySpark DataFrames.
IOManagerDefinition
Examples
from dagster_duckdb_pyspark import DuckDBPySparkIOManager
@asset(
key_prefix=["my_schema"] # will be used as the schema in DuckDB
)
def my_table() -> pyspark.sql.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources={"io_manager": DuckDBPySparkIOManager(database="my_db.duckdb")}
)
You can set a default schema to store the assets using the schema
configuration value of the DuckDB I/O
Manager. This schema will be used if no other schema is specified directly on an asset or op.
defs = Definitions(
assets=[my_table],
resources={"io_manager": DuckDBPySparkIOManager(database="my_db.duckdb", schema="my_schema")}
)
On individual assets, you an also specify the schema where they should be stored using metadata or
by adding a key_prefix
to the asset key. If both key_prefix
and metadata are defined, the metadata will
take precedence.
@asset(
key_prefix=["my_schema"] # will be used as the schema in duckdb
)
def my_table() -> pyspark.sql.DataFrame:
...
@asset(
metadata={"schema": "my_schema"} # will be used as the schema in duckdb
)
def my_other_table() -> pyspark.sql.DataFrame:
...
For ops, the schema can be specified by including a “schema” entry in output metadata.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pyspark.sql.DataFrame:
...
If none of these is provided, the schema will default to “public”.
To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
# my_table will just contain the data from column "a"
...
Stores PySpark DataFrames in DuckDB.
To use this type handler, return it from the type_handlers` method of an I/O manager that inherits from ``DuckDBIOManager
.
Example
from dagster_duckdb import DuckDBIOManager
from dagster_duckdb_pyspark import DuckDBPySparkTypeHandler
class MyDuckDBIOManager(DuckDBIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [DuckDBPySparkTypeHandler()]
@asset(
key_prefix=["my_schema"] # will be used as the schema in duckdb
)
def my_table() -> pyspark.sql.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources={"io_manager": MyDuckDBIOManager(database="my_db.duckdb")}
)
Path to the DuckDB database.
DuckDB connection configuration options. See https://duckdb.org/docs/sql/configuration.html
{}
Name of the schema to use.
Default Value: None
An I/O manager definition that reads inputs from and writes PySpark DataFrames to DuckDB. When using the duckdb_pyspark_io_manager, any inputs and outputs without type annotations will be loaded as PySpark DataFrames.
IOManagerDefinition
Examples
from dagster_duckdb_pyspark import duckdb_pyspark_io_manager
@asset(
key_prefix=["my_schema"] # will be used as the schema in DuckDB
)
def my_table() -> pyspark.sql.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources={"io_manager": duckdb_pyspark_io_manager.configured({"database": "my_db.duckdb"})}
)
You can set a default schema to store the assets using the schema
configuration value of the DuckDB I/O
Manager. This schema will be used if no other schema is specified directly on an asset or op.
defs = Definitions(
assets=[my_table],
resources={"io_manager": duckdb_pyspark_io_manager.configured({"database": "my_db.duckdb", "schema": "my_schema"})}
)
On individual assets, you an also specify the schema where they should be stored using metadata or
by adding a key_prefix
to the asset key. If both key_prefix
and metadata are defined, the metadata will
take precedence.
@asset(
key_prefix=["my_schema"] # will be used as the schema in duckdb
)
def my_table() -> pyspark.sql.DataFrame:
...
@asset(
metadata={"schema": "my_schema"} # will be used as the schema in duckdb
)
def my_other_table() -> pyspark.sql.DataFrame:
...
For ops, the schema can be specified by including a “schema” entry in output metadata.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pyspark.sql.DataFrame:
...
If none of these is provided, the schema will default to “public”.
To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
# my_table will just contain the data from column "a"
...