The dagster_openai library provides utilities for using OpenAI with Dagster. A good place to start with dagster_openai is the guide.
( experimental ) > This API may break in future versions, even between dot releases.
This wrapper can be used on any endpoint of the openai library <https://github.com/openai/openai-python> to log the OpenAI API usage metadata in the asset metadata.
Examples
from dagster import (
AssetExecutionContext,
AssetKey,
AssetSelection,
AssetSpec,
Definitions,
EnvVar,
MaterializeResult,
asset,
define_asset_job,
multi_asset,
)
from dagster_openai import OpenAIResource, with_usage_metadata
@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
with openai.get_client(context) as client:
client.fine_tuning.jobs.create = with_usage_metadata(
context=context, output_name="some_output_name", func=client.fine_tuning.jobs.create
)
client.fine_tuning.jobs.create(model="gpt-3.5-turbo", training_file="some_training_file")
openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")
@multi_asset(
specs=[
AssetSpec("my_asset1"),
AssetSpec("my_asset2"),
]
)
def openai_multi_asset(context: AssetExecutionContext, openai: OpenAIResource):
with openai.get_client(context, asset_key=AssetKey("my_asset1")) as client:
client.chat.completions.create(
model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test"}]
)
# The materialization of `my_asset1` will include both OpenAI usage metadata
# and the metadata added when calling `MaterializeResult`.
return (
MaterializeResult(asset_key="my_asset1", metadata={"foo": "bar"}),
MaterializeResult(asset_key="my_asset2", metadata={"baz": "qux"}),
)
openai_multi_asset_job = define_asset_job(
name="openai_multi_asset_job", selection=AssetSelection.assets(openai_multi_asset)
)
defs = Definitions(
assets=[openai_asset, openai_multi_asset],
jobs=[openai_asset_job, openai_multi_asset_job],
resources={
"openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
},
)
( experimental ) > This API may break in future versions, even between dot releases.
This resource is wrapper over the openai library.
By configuring this OpenAI resource, you can interact with OpenAI API and log its usage metadata in the asset metadata.
Examples
import os
from dagster import AssetExecutionContext, Definitions, EnvVar, asset, define_asset_job
from dagster_openai import OpenAIResource
@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
with openai.get_client(context) as client:
client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Say this is a test"}]
)
openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")
defs = Definitions(
assets=[openai_asset],
jobs=[openai_asset_job],
resources={
"openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
},
)
Yields an openai.Client
for interacting with the OpenAI API.
By default, in an asset context, the client comes with wrapped endpoints for three API resources, Completions, Embeddings and Chat, allowing to log the API usage metadata in the asset metadata.
Note that the endpoints are not and cannot be wrapped to automatically capture the API usage metadata in an op context.
context – The context
object for computing the op or asset in which get_client
is called.
Examples
from dagster import (
AssetExecutionContext,
Definitions,
EnvVar,
GraphDefinition,
OpExecutionContext,
asset,
define_asset_job,
op,
)
from dagster_openai import OpenAIResource
@op
def openai_op(context: OpExecutionContext, openai: OpenAIResource):
with openai.get_client(context) as client:
client.chat.completions.create(
model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test"}]
)
openai_op_job = GraphDefinition(name="openai_op_job", node_defs=[openai_op]).to_job()
@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
with openai.get_client(context) as client:
client.chat.completions.create(
model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test"}]
)
openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")
defs = Definitions(
assets=[openai_asset],
jobs=[openai_asset_job, openai_op_job],
resources={
"openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
},
)
Yields an openai.Client
for interacting with the OpenAI.
When using this method, the OpenAI API usage metadata is automatically
logged in the asset materializations associated with the provided asset_key
.
By default, the client comes with wrapped endpoints for three API resources, Completions, Embeddings and Chat, allowing to log the API usage metadata in the asset metadata.
This method can only be called when working with assets,
i.e. the provided context
must be of type AssetExecutionContext
.
context – The context
object for computing the asset in which get_client
is called.
asset_key – the asset_key
of the asset for which a materialization should include the metadata.
Examples
from dagster import (
AssetExecutionContext,
AssetKey,
AssetSpec,
Definitions,
EnvVar,
MaterializeResult,
asset,
define_asset_job,
multi_asset,
)
from dagster_openai import OpenAIResource
@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
with openai.get_client_for_asset(context, context.asset_key) as client:
client.chat.completions.create(
model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test"}]
)
openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")
@multi_asset(specs=[AssetSpec("my_asset1"), AssetSpec("my_asset2")], compute_kind="OpenAI")
def openai_multi_asset(context: AssetExecutionContext, openai_resource: OpenAIResource):
with openai_resource.get_client_for_asset(context, asset_key=AssetKey("my_asset1")) as client:
client.chat.completions.create(
model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test"}]
)
return (
MaterializeResult(asset_key="my_asset1", metadata={"some_key": "some_value1"}),
MaterializeResult(asset_key="my_asset2", metadata={"some_key": "some_value2"}),
)
openai_multi_asset_job = define_asset_job(
name="openai_multi_asset_job", selection="openai_multi_asset"
)
defs = Definitions(
assets=[openai_asset, openai_multi_asset],
jobs=[openai_asset_job, openai_multi_asset_job],
resources={
"openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
},
)