The dagster-pipes
library is intended for inclusion in an external process that integrates with Dagster using the Pipes protocol. This could be in an environment like Databricks, Kubernetes, or Docker. Using this library, you can write code in the external process that streams metadata back to Dagster.
For a detailed look at the Pipes process, including how to customize it, refer to the Dagster Pipes details and customization guide.
Looking to set up a Pipes client in Dagster? Refer to the Dagster Pipes API reference.
Note: This library isn’t included with dagster
and must be installed separately.
Initialize the Dagster Pipes context.
This function should be called near the entry point of a pipes process. It will load injected context information from Dagster and spin up the machinery for streaming messages back to Dagster.
If the process was not launched by Dagster, this function will emit a warning and return a MagicMock object. This should make all operations on the context no-ops and prevent your code from crashing.
context_loader (Optional[PipesContextLoader]) – The context loader to use. Defaults to
PipesDefaultContextLoader
.
message_writer (Optional[PipesMessageWriter]) – The message writer to use. Defaults to
PipesDefaultMessageWriter
.
params_loader (Optional[PipesParamsLoader]) – The params loader to use. Defaults to
PipesEnvVarParamsLoader
.
The initialized context.
The context for a Dagster Pipes process.
This class is analogous to OpExecutionContext
on the Dagster side of the Pipes
connection. It provides access to information such as the asset key(s) and partition key(s) in
scope for the current step. It also provides methods for logging and emitting results that will
be streamed back to Dagster.
This class should not be directly instantiated by the user. Instead it should be initialized by
calling open_dagster_pipes()
, which will return the singleton instance of this class.
After open_dagster_pipes() has been called, the singleton instance can also be retrieved by
calling PipesContext.get()
.
Close the pipes connection. This will flush all buffered messages to the orchestration process and cause any further attempt to write a message to raise an error. This method is idempotent– subsequent calls after the first have no effect.
Get the singleton instance of the context. Raises an error if the context has not been initialized.
Get the value of an extra provided by the user. Raises an error if the extra is not defined.
key (str) – The key of the extra.
The value of the extra.
Any
Report to Dagster that an asset check has been performed. Streams a payload containing check result information back to Dagster. If no assets or associated checks are in scope, raises an error.
check_name (str) – The name of the check.
passed (bool) – Whether the check passed.
severity (PipesAssetCheckSeverity) – The severity of the check. Defaults to “ERROR”.
metadata (Optional[Mapping[str, Union[PipesMetadataRawValue, PipesMetadataValue]]]) – Metadata for the check. Defaults to None.
asset_key (Optional[str]) – The asset key for the check. If only a single asset is in scope, default to that asset’s key. If multiple assets are in scope, this must be set explicitly or an error will be raised.
Report to Dagster that an asset has been materialized. Streams a payload containing materialization information back to Dagster. If no assets are in scope, raises an error.
metadata (Optional[Mapping[str, Union[PipesMetadataRawValue, PipesMetadataValue]]]) – Metadata for the materialized asset. Defaults to None.
data_version (Optional[str]) – The data version for the materialized asset. Defaults to None.
asset_key (Optional[str]) – The asset key for the materialized asset. If only a single asset is in scope, default to that asset’s key. If multiple assets are in scope, this must be set explicitly or an error will be raised.
Send a JSON serializable payload back to the orchestration process. Can be retrieved there using get_custom_messages.
payload (Any) – JSON serializable data.
The AssetKey for the currently scoped asset. Raises an error if 0 or multiple assets are in scope.
str
The AssetKeys for the currently scoped assets. Raises an error if no assets are in scope.
Sequence[str]
The code version for the currently scoped asset. Raises an error if 0 or multiple assets are in scope.
Optional[str]
Mapping of asset key to code version for the currently scoped assets. Raises an error if no assets are in scope.
Mapping[str, Optional[str]]
Key-value map for all extras provided by the user.
Mapping[str, Any]
Whether the current step targets assets.
bool
Whether the context has been closed.
bool
Whether the current step is scoped to one or more partitions.
bool
The job name for the currently executing run. Returns None if the run is not derived from a job.
Optional[str]
A logger that streams log messages back to Dagster.
logging.Logger
The partition key for the currently scoped partition. Raises an error if 0 or multiple partitions are in scope.
str
The partition key range for the currently scoped partition or partitions. Raises an error if no partitions are in scope.
PipesPartitionKeyRange
The partition time window for the currently scoped partition or partitions. Returns None if partitions in scope are not temporal. Raises an error if no partitions are in scope.
Optional[PipesTimeWindow]
The provenance for the currently scoped asset. Raises an error if 0 or multiple assets are in scope.
Optional[PipesDataProvenance]
Mapping of asset key to provenance for the currently scoped assets. Raises an error if no assets are in scope.
Mapping[str, Optional[PipesDataProvenance]]
The retry number for the currently executing run.
int
The run ID for the currently executing pipeline run.
str
Most Pipes users won’t need to use the APIs in the following sections unless they are customizing the Pipes protocol.
Refer to the Dagster Pipes details and customization guide for more information.
Context loaders load the context payload from the location specified in the bootstrap payload.
A @contextmanager that loads context data injected by the orchestration process.
This method should read and yield the context data from the location specified by the passed in PipesParams.
params (PipesParams) – The params provided by the context injector in the orchestration process.
PipesContextData – The context data.
Context loader that loads context data from either a file or directly from the provided params.
The location of the context data is configured by the params received by the loader. If the params include a key path, then the context data will be loaded from a file at the specified path. If the params instead include a key data, then the corresponding value should be a dict representing the context data.
A @contextmanager that loads context data injected by the orchestration process.
This method should read and yield the context data from the location specified by the passed in PipesParams.
params (PipesParams) – The params provided by the context injector in the orchestration process.
PipesContextData – The context data.
Context loader that reads context from a JSON file on DBFS.
A @contextmanager that loads context data injected by the orchestration process.
This method should read and yield the context data from the location specified by the passed in PipesParams.
params (PipesParams) – The params provided by the context injector in the orchestration process.
PipesContextData – The context data.
Params loaders load the bootstrap payload from some globally accessible key-value store.
Object that loads params passed from the orchestration process by the context injector and
message reader. These params are used to respectively bootstrap the
PipesContextLoader
and PipesMessageWriter
.
Whether or not this process has been provided with provided with information to create a PipesContext or should instead return a mock.
Params loader that extracts params from environment variables.
Params loader that extracts params from known CLI arguments.
Whether or not this process has been provided with provided with information to create a PipesContext or should instead return a mock.
Message writers write messages to the location specified in the bootstrap payload.
Return arbitary reader-specific information to be passed back to the orchestration process under the extras key of the initialization payload.
A dict of arbitrary data to be passed back to the orchestration process.
PipesExtras
Return a payload containing information about the external process to be passed back to the orchestration process. This should contain information that cannot be known before the external process is launched.
This method should not be overridden by users. Instead, users should override get_opened_extras to inject custom data.
A @contextmanager that initializes a channel for writing messages back to Dagster.
This method should takes the params passed by the orchestration-side
PipesMessageReader
and use them to construct and yield a
PipesMessageWriterChannel
.
params (PipesParams) – The params provided by the message reader in the orchestration process.
PipesMessageWriterChannel – Channel for writing messagse back to Dagster.
Message writer that writes messages to either a file or the stdout or stderr stream.
The write location is configured by the params received by the writer. If the params include a key path, then messages will be written to a file at the specified path. If the params instead include a key stdio, then messages then the corresponding value must specify either stderr or stdout, and messages will be written to the selected stream.
A @contextmanager that initializes a channel for writing messages back to Dagster.
This method should takes the params passed by the orchestration-side
PipesMessageReader
and use them to construct and yield a
PipesMessageWriterChannel
.
params (PipesParams) – The params provided by the message reader in the orchestration process.
PipesMessageWriterChannel – Channel for writing messagse back to Dagster.
Message writer channel that periodically uploads message chunks to some blob store endpoint.
Construct and yield a PipesBlobStoreMessageWriterChannel
.
params (PipesParams) – The params provided by the message reader in the orchestration process.
PipesBlobStoreMessageWriterChannel – Channel that periodically uploads message chunks to a blob store.
Message writer that writes messages by periodically writing message chunks to an S3 bucket.
client (Any) – A boto3.client(“s3”) object.
interval (float) – interval in seconds between upload chunk uploads
Message writer that writes messages by periodically writing message chunks to a directory on DBFS.
Message writer channels are objects that write messages back to the Dagster orchestration process.
Object that writes messages back to the Dagster orchestration process.
Message writer channel that periodically uploads message chunks to some blob store endpoint.
Message writer channel that periodically writes message chunks to an endpoint mounted on the filesystem.
interval (float) – interval in seconds between chunk uploads
Message writer channel that writes one message per line to a file.
Message writer channel that writes one message per line to a TextIO stream.
Message writer channel for writing messages by periodically writing message chunks to an S3 bucket.
client (Any) – A boto3.client(“s3”) object.
bucket (str) – The name of the S3 bucket to write to.
key_prefix (Optional[str]) – An optional prefix to use for the keys of written blobs.
interval (float) – interval in seconds between upload chunk uploads
Encode value by serializing to JSON, compressing with zlib, and finally encoding with base64. base64_encode(compress(to_json(value))) in function notation.
value (Any) – The value to encode. Must be JSON-serializable.
The encoded value.
str
Decode a value by decoding from base64, decompressing with zlib, and finally deserializing from JSON. from_json(decompress(base64_decode(value))) in function notation.
value (Any) – The value to decode.
The decoded value.
Any