Using Google BigQuery with Dagster
This tutorial focuses on creating and interacting with BigQuery tables using Dagster's asset definitions.
The dagster-gcp
library provides two ways to interact with BigQuery tables:
- Resource: The resource allows you to directly run SQL queries against tables within an asset's compute function. Available resources:
BigQueryResource
- I/O manager: The I/O manager transfers the responsibility of storing and loading DataFrames as BigQuery tables to Dagster. Available I/O managers:
BigQueryPandasIOManager
,BigQueryPySparkIOManager
This tutorial is divided into two sections to demonstrate the differences between the BigQuery resource and the BigQuery I/O manager. Each section will create the same assets, but the first section will use the BigQuery resource to store data in BigQuery, whereas the second section will use the BigQuery I/O manager. When writing your own assets, you may choose one or the other (or both) approaches depending on your storage requirements.
In Option 1 you will:
- Set up and configure the BigQuery resource.
- Use the BigQuery resource to execute a SQL query to create a table.
- Use the BigQuery resource to execute a SQL query to interact with the table.
In Option 2 you will:
- Set up and configure the BigQuery I/O manager.
- Use Pandas to create a DataFrame, then delegate responsibility creating a table to the BigQuery I/O manager.
- Use the BigQuery I/O manager to load the table into memory so that you can interact with it using the Pandas library.
By the end of the tutorial, you will:
- Understand how to interact with a BigQuery database using the BigQuery resource.
- Understand how to use the BigQuery I/O manager to store and load DataFrames as BigQuery tables.
- Understand how to define dependencies between assets corresponding to tables in a BigQuery database.
Prerequisites
To complete this tutorial, you'll need:
-
To install the
dagster-gcp
anddagster-gcp-pandas
libraries:pip install dagster-gcp dagster-gcp-pandas
-
To gather the following information:
-
Google Cloud Project (GCP) project name: You can find this by logging into GCP and choosing one of the project names listed in the dropdown in the top left corner.
-
GCP credentials: You can authenticate with GCP two ways: by following GCP authentication instructions here, or by providing credentials directly to the BigQuery I/O manager.
In this guide, we assume that you have run one of the
gcloud auth
commands or have setGOOGLE_APPLICATION_CREDENTIALS
as specified in the linked instructions. For more information on providing credentials directly to the BigQuery resource and I/O manager, see Providing credentials as configuration in the BigQuery reference guide.
-
Option 1: Using the BigQuery resource
Step 1: Configure the BigQuery resource
To use the BigQuery resource, you'll need to add it to your Definitions
object. The BigQuery resource requires some configuration:
- A
project
- One method of authentication. You can follow the GCP authentication instructions here, or see Providing credentials as configuration in the BigQuery reference guide.
You can also specify a location
where computation should take place.
from dagster_gcp import BigQueryResource
from dagster import Definitions
defs = Definitions(
assets=[iris_data],
resources={
"bigquery": BigQueryResource(
project="my-gcp-project", # required
location="us-east5", # optional, defaults to the default location for the project - see https://cloud.google.com/bigquery/docs/locations for a list of locations
)
},
)
Step 2: Create tables in BigQuery
- Create BigQuery tables in Dagster
- Making Dagster aware of existing tables
Create BigQuery tables in Dagster
Using the BigQuery resource, you can create BigQuery tables using the BigQuery Python API:
import pandas as pd
from dagster_gcp import BigQueryResource
from dagster import asset
@asset
def iris_data(bigquery: BigQueryResource) -> None:
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
with bigquery.get_client() as client:
job = client.load_table_from_dataframe(
dataframe=iris_df,
destination="iris.iris_data",
)
job.result()
In this example, you're defining an asset that fetches the Iris dataset as a Pandas DataFrame and renames the columns. Then, using the BigQuery resource, the DataFrame is stored in BigQuery as the iris.iris_data
table.
Now you can run dagster dev
and materialize the iris_data
asset from the Dagster UI.
Making Dagster aware of existing tables
If you already have existing tables in BigQuery and other assets defined in Dagster depend on those tables, you may want Dagster to be aware of those upstream dependencies. Making Dagster aware of these tables will allow you to track the full data lineage in Dagster. You can accomplish this by defining external assets for these tables.
from dagster import AssetSpec
iris_harvest_data = AssetSpec(key="iris_harvest_data")
In this example, you're creating an AssetSpec
for a pre-existing table called iris_harvest_data
.
Step 3: Define downstream assets
Once you have created an asset that represents a table in BigQuery, you will likely want to create additional assets that work with the data.
from dagster import asset
from .create_table import iris_data
# this example uses the iris_dataset asset from Step 2
@asset(deps=[iris_data])
def iris_setosa(bigquery: BigQueryResource) -> None:
job_config = bq.QueryJobConfig(destination="iris.iris_setosa")
sql = "SELECT * FROM iris.iris_data WHERE species = 'Iris-setosa'"
with bigquery.get_client() as client:
job = client.query(sql, job_config=job_config)
job.result()
In this asset, you're creating second table that only contains the data for the Iris Setosa species. This asset has a dependency on the iris_data
asset. To define this dependency, you provide the iris_data
asset as the deps
parameter to the iris_setosa
asset. You can then run the SQL query to create the table of Iris Setosa data.
Completed code example
When finished, your code should look like the following:
import pandas as pd
from dagster_gcp import BigQueryResource
from google.cloud import bigquery as bq
from dagster import AssetSpec, Definitions, asset
iris_harvest_data = AssetSpec(key="iris_harvest_data")
@asset
def iris_data(bigquery: BigQueryResource) -> None:
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
with bigquery.get_client() as client:
job = client.load_table_from_dataframe(
dataframe=iris_df,
destination="iris.iris_data",
)
job.result()
@asset(deps=[iris_data])
def iris_setosa(bigquery: BigQueryResource) -> None:
job_config = bq.QueryJobConfig(destination="iris.iris_setosa")
sql = "SELECT * FROM iris.iris_data WHERE species = 'Iris-setosa'"
with bigquery.get_client() as client:
job = client.query(sql, job_config=job_config)
job.result()
defs = Definitions(
assets=[iris_data, iris_setosa, iris_harvest_data],
resources={
"bigquery": BigQueryResource(
project="my-gcp-project",
location="us-east5",
)
},
)
Option 2: Using the BigQuery I/O manager
While using an I/O manager is not required, you may want to use an I/O manager to handle storing DataFrames as tables in BigQuery and loading BigQuery tables as DataFrames in downstream assets. You may want to use an I/O manager if:
- You want your data to be loaded in memory so that you can interact with it using Python.
- You'd like to have Dagster manage how you store the data and load it as an input in downstream assets.
This section of the guide focuses on storing and loading Pandas DataFrames in BigQuery, but Dagster also supports using PySpark DataFrames with BigQuery. The concepts from this guide apply to working with PySpark DataFrames, and you can learn more about setting up and using the BigQuery I/O manager with PySpark DataFrames in the reference guide.
Step 1: Configure the BigQuery I/O manager
To use the BigQuery I/O manager, you'll need to add it to your Definitions
object. The BigQuery I/O manager requires some configuration to connect to your Bigquery instance:
- A
project
- One method of authentication. You can follow the GCP authentication instructions here, or see Providing credentials as configuration in the BigQuery reference guide.
You can also specify a location
where data should be stored and processed and dataset
that should hold the created tables. You can also set a timeout
when working with Pandas DataFrames.
from dagster_gcp_pandas import BigQueryPandasIOManager
from dagster import Definitions
defs = Definitions(
assets=[iris_data],
resources={
"io_manager": BigQueryPandasIOManager(
project="my-gcp-project", # required
location="us-east5", # optional, defaults to the default location for the project - see https://cloud.google.com/bigquery/docs/locations for a list of locations
dataset="IRIS", # optional, defaults to PUBLIC
timeout=15.0, # optional, defaults to None
)
},
)
With this configuration, if you materialized an asset called iris_data
, the BigQuery I/O manager would store the data in the IRIS.IRIS_DATA
table in the my-gcp-project
project. The BigQuery instance would be located in us-east5
.
Finally, in the Definitions
object, we assign the BigQueryPandasIOManager
to the io_manager
key. io_manager
is a reserved key to set the default I/O manager for your assets.
For more info about each of the configuration values, refer to the BigQueryPandasIOManager
API documentation.
Step 2: Create tables in BigQuery
The BigQuery I/O manager can create and update tables for your Dagster defined assets, but you can also make existing BigQuery tables available to Dagster.
- Create tables in BigQuery from Dagster assets
- Making Dagster aware of existing tables
Store a Dagster asset as a table in BigQuery
To store data in BigQuery using the BigQuery I/O manager, you can simply return a Pandas DataFrame from your asset. Dagster will handle storing and loading your assets in BigQuery.
import pandas as pd
from dagster import asset
@asset
def iris_data() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
In this example, you're defining an asset that fetches the Iris dataset as a Pandas DataFrame, renames the columns, then returns the DataFrame. The type signature of the function tells the I/O manager what data type it is working with, so it is important to include the return type pd.DataFrame
.
When Dagster materializes the iris_data
asset using the configuration from Step 1: Configure the BigQuery I/O manager, the BigQuery I/O manager will create the table IRIS.IRIS_DATA
if it does not exist and replace the contents of the table with the value returned from the iris_data
asset.
Making Dagster aware of existing tables
If you already have existing tables in BigQuery and other assets defined in Dagster depend on those tables, you may want Dagster to be aware of those upstream dependencies. Making Dagster aware of these tables will allow you to track the full data lineage in Dagster. You can define external assets for these tables. When using an I/O manager, defining an external asset for an existing table also allows you to tell Dagster how to find the table so it can be fetched for downstream assets.
from dagster import AssetSpec
iris_harvest_data = AssetSpec(key="iris_harvest_data")
In this example, you're creating a AssetSpec
for a pre-existing table - perhaps created by an external data ingestion tool - that contains data about iris harvests. To make the data available to other Dagster assets, you need to tell the BigQuery I/O manager how to find the data, so that the I/O manager can load the data into memory.
Because you already supplied the project and dataset in the I/O manager configuration in Step 1: Configure the BigQuery I/O manager, you only need to provide the table name. This is done with the key
parameter in AssetSpec
. When the I/O manager needs to load the iris_harvest_data
in a downstream asset, it will select the data in the IRIS.IRIS_HARVEST_DATA
table as a Pandas DataFrame and provide it to the downstream asset.
Step 3: Load BigQuery tables in downstream assets
Once you have created an asset that represents a table in BigQuery, you will likely want to create additional assets that work with the data. Dagster and the BigQuery I/O manager allow you to load the data stored in BigQuery tables into downstream assets.
import pandas as pd
from dagster import asset
# this example uses the iris_data asset from Step 2
@asset
def iris_setosa(iris_data: pd.DataFrame) -> pd.DataFrame:
return iris_data[iris_data["species"] == "Iris-setosa"]
In this asset, you're providing the iris_data
asset from the Store a Dagster asset as a table in BigQuery example to the iris_setosa
asset.
In this asset, you're providing the iris_data
asset as a dependency to iris_setosa
. By supplying iris_data
as a parameter to iris_setosa
, Dagster knows to use the BigQueryPandasIOManager
to load this asset into memory as a Pandas DataFrame and pass it as an argument to iris_setosa
. Next, a DataFrame that only contains the data for the Iris Setosa species is created and returned. Then the BigQueryPandasIOManager
will store the DataFrame as the IRIS.IRIS_SETOSA
table in BigQuery.
Completed code example
When finished, your code should look like the following:
import pandas as pd
from dagster_gcp_pandas import BigQueryPandasIOManager
from dagster import AssetSpec, Definitions, asset
iris_harvest_data = AssetSpec(key="iris_harvest_data")
@asset
def iris_data() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
@asset
def iris_setosa(iris_data: pd.DataFrame) -> pd.DataFrame:
return iris_data[iris_data["species"] == "Iris-setosa"]
defs = Definitions(
assets=[iris_data, iris_harvest_data, iris_setosa],
resources={
"io_manager": BigQueryPandasIOManager(
project="my-gcp-project",
location="us-east5",
dataset="IRIS",
timeout=15.0,
)
},
)
Related
For more BigQuery features, refer to the BigQuery reference.
For more information on asset definitions, see the Assets documentation.
For more information on I/O managers, see the I/O manager documentation.