Dagster & dlt
This integration allows you to use dlt to easily ingest and replicate data between systems through Dagster.
note
This integration is currently experimental.
Installation
pip install dagster-dlt
Example
import dlt
from dagster_dlt import DagsterDltResource, dlt_assets
from dlt_sources.github import github_reactions
import dagster as dg
@dlt_assets(
dlt_source=github_reactions("dagster-io", "dagster"),
dlt_pipeline=dlt.pipeline(
pipeline_name="github_issues",
dataset_name="github",
destination="snowflake",
),
name="github",
group_name="github",
)
def github_issues_to_snowflake_assets(
context: dg.AssetExecutionContext, dlt: DagsterDltResource
):
yield from dlt.run(context=context)
defs = dg.Definitions(
assets=[
github_issues_to_snowflake_assets,
],
resources={
"dlt": DagsterDltResource(),
},
)
note
If you are using the sql_database source, consider setting defer_table_reflect=True
to reduce database reads. By default, the Dagster daemon will refresh definitions roughly every minute, which will query the database for resource definitions.
About dlt
Data Load Tool (dlt) is an open source library for creating efficient data pipelines. It offers features like secret management, data structure conversion, incremental updates, and pre-built sources and destinations, simplifying the process of loading messy data into well-structured datasets.