Ask AI

Pipes (dagster-pipes)

The dagster-pipes library is intended for inclusion in an external process that integrates with Dagster using the Pipes protocol. This could be in an environment like Databricks, Kubernetes, or Docker. Using this library, you can write code in the external process that streams metadata back to Dagster.

For a detailed look at the Pipes process, including how to customize it, refer to the Dagster Pipes details and customization guide.

Looking to set up a Pipes client in Dagster? Refer to the Dagster Pipes API reference.

Note: This library isn’t included with dagster and must be installed separately.


Context

dagster_pipes.open_dagster_pipes(*, context_loader=None, message_writer=None, params_loader=None)[source]

Initialize the Dagster Pipes context.

This function should be called near the entry point of a pipes process. It will load injected context information from Dagster and spin up the machinery for streaming messages back to Dagster.

If the process was not launched by Dagster, this function will emit a warning and return a MagicMock object. This should make all operations on the context no-ops and prevent your code from crashing.

Parameters:
Returns:

The initialized context.

Return type:

PipesContext

class dagster_pipes.PipesContext(params_loader, context_loader, message_writer)[source]

The context for a Dagster Pipes process.

This class is analogous to OpExecutionContext on the Dagster side of the Pipes connection. It provides access to information such as the asset key(s) and partition key(s) in scope for the current step. It also provides methods for logging and emitting results that will be streamed back to Dagster.

This class should not be directly instantiated by the user. Instead it should be initialized by calling open_dagster_pipes(), which will return the singleton instance of this class. After open_dagster_pipes() has been called, the singleton instance can also be retrieved by calling PipesContext.get().

close(exc=None)[source]

Close the pipes connection. This will flush all buffered messages to the orchestration process and cause any further attempt to write a message to raise an error. This method is idempotent– subsequent calls after the first have no effect.

classmethod get()[source]

Get the singleton instance of the context. Raises an error if the context has not been initialized.

get_extra(key)[source]

Get the value of an extra provided by the user. Raises an error if the extra is not defined.

Parameters:

key (str) – The key of the extra.

Returns:

The value of the extra.

Return type:

Any

classmethod is_initialized()[source]

bool: Whether the context has been initialized.

report_asset_check(check_name, passed, severity='ERROR', metadata=None, asset_key=None)[source]

Report to Dagster that an asset check has been performed. Streams a payload containing check result information back to Dagster. If no assets or associated checks are in scope, raises an error.

Parameters:
  • check_name (str) – The name of the check.

  • passed (bool) – Whether the check passed.

  • severity (PipesAssetCheckSeverity) – The severity of the check. Defaults to “ERROR”.

  • metadata (Optional[Mapping[str, Union[PipesMetadataRawValue, PipesMetadataValue]]]) – Metadata for the check. Defaults to None.

  • asset_key (Optional[str]) – The asset key for the check. If only a single asset is in scope, default to that asset’s key. If multiple assets are in scope, this must be set explicitly or an error will be raised.

report_asset_materialization(metadata=None, data_version=None, asset_key=None)[source]

Report to Dagster that an asset has been materialized. Streams a payload containing materialization information back to Dagster. If no assets are in scope, raises an error.

Parameters:
  • metadata (Optional[Mapping[str, Union[PipesMetadataRawValue, PipesMetadataValue]]]) – Metadata for the materialized asset. Defaults to None.

  • data_version (Optional[str]) – The data version for the materialized asset. Defaults to None.

  • asset_key (Optional[str]) – The asset key for the materialized asset. If only a single asset is in scope, default to that asset’s key. If multiple assets are in scope, this must be set explicitly or an error will be raised.

report_custom_message(payload)[source]

Send a JSON serializable payload back to the orchestration process. Can be retrieved there using get_custom_messages.

Parameters:

payload (Any) – JSON serializable data.

classmethod set(context)[source]

Set the singleton instance of the context.

property asset_key

The AssetKey for the currently scoped asset. Raises an error if 0 or multiple assets are in scope.

Type:

str

property asset_keys

The AssetKeys for the currently scoped assets. Raises an error if no assets are in scope.

Type:

Sequence[str]

property code_version

The code version for the currently scoped asset. Raises an error if 0 or multiple assets are in scope.

Type:

Optional[str]

property code_version_by_asset_key

Mapping of asset key to code version for the currently scoped assets. Raises an error if no assets are in scope.

Type:

Mapping[str, Optional[str]]

property extras

Key-value map for all extras provided by the user.

Type:

Mapping[str, Any]

property is_asset_step

Whether the current step targets assets.

Type:

bool

property is_closed

Whether the context has been closed.

Type:

bool

property is_partition_step

Whether the current step is scoped to one or more partitions.

Type:

bool

property job_name

The job name for the currently executing run. Returns None if the run is not derived from a job.

Type:

Optional[str]

property log

A logger that streams log messages back to Dagster.

Type:

logging.Logger

property partition_key

The partition key for the currently scoped partition. Raises an error if 0 or multiple partitions are in scope.

Type:

str

property partition_key_range

The partition key range for the currently scoped partition or partitions. Raises an error if no partitions are in scope.

Type:

PipesPartitionKeyRange

property partition_time_window

The partition time window for the currently scoped partition or partitions. Returns None if partitions in scope are not temporal. Raises an error if no partitions are in scope.

Type:

Optional[PipesTimeWindow]

property provenance

The provenance for the currently scoped asset. Raises an error if 0 or multiple assets are in scope.

Type:

Optional[PipesDataProvenance]

property provenance_by_asset_key

Mapping of asset key to provenance for the currently scoped assets. Raises an error if no assets are in scope.

Type:

Mapping[str, Optional[PipesDataProvenance]]

property retry_number

The retry number for the currently executing run.

Type:

int

property run_id

The run ID for the currently executing pipeline run.

Type:

str


Advanced

Most Pipes users won’t need to use the APIs in the following sections unless they are customizing the Pipes protocol.

Refer to the Dagster Pipes details and customization guide for more information.

Context loaders

Context loaders load the context payload from the location specified in the bootstrap payload.

class dagster_pipes.PipesContextLoader[source]
abstract load_context(params)[source]

A @contextmanager that loads context data injected by the orchestration process.

This method should read and yield the context data from the location specified by the passed in PipesParams.

Parameters:

params (PipesParams) – The params provided by the context injector in the orchestration process.

Yields:

PipesContextData – The context data.

class dagster_pipes.PipesDefaultContextLoader[source]

Context loader that loads context data from either a file or directly from the provided params.

The location of the context data is configured by the params received by the loader. If the params include a key path, then the context data will be loaded from a file at the specified path. If the params instead include a key data, then the corresponding value should be a dict representing the context data.

load_context(params)[source]

A @contextmanager that loads context data injected by the orchestration process.

This method should read and yield the context data from the location specified by the passed in PipesParams.

Parameters:

params (PipesParams) – The params provided by the context injector in the orchestration process.

Yields:

PipesContextData – The context data.

DIRECT_KEY = 'data'
FILE_PATH_KEY = 'path'
class dagster_pipes.PipesDbfsContextLoader[source]

Context loader that reads context from a JSON file on DBFS.

load_context(params)[source]

A @contextmanager that loads context data injected by the orchestration process.

This method should read and yield the context data from the location specified by the passed in PipesParams.

Parameters:

params (PipesParams) – The params provided by the context injector in the orchestration process.

Yields:

PipesContextData – The context data.


Params loaders

Params loaders load the bootstrap payload from some globally accessible key-value store.

class dagster_pipes.PipesParamsLoader[source]

Object that loads params passed from the orchestration process by the context injector and message reader. These params are used to respectively bootstrap the PipesContextLoader and PipesMessageWriter.

abstract is_dagster_pipes_process()[source]

Whether or not this process has been provided with provided with information to create a PipesContext or should instead return a mock.

abstract load_context_params()[source]

PipesParams: Load params passed by the orchestration-side context injector.

abstract load_messages_params()[source]

PipesParams: Load params passed by the orchestration-side message reader.

class dagster_pipes.PipesEnvVarParamsLoader[source]

Params loader that extracts params from environment variables.

class dagster_pipes.PipesCliArgsParamsLoader[source]

Params loader that extracts params from known CLI arguments.

is_dagster_pipes_process()[source]

Whether or not this process has been provided with provided with information to create a PipesContext or should instead return a mock.

load_context_params()[source]

PipesParams: Load params passed by the orchestration-side context injector.

load_messages_params()[source]

PipesParams: Load params passed by the orchestration-side message reader.


Message writers

Message writers write messages to the location specified in the bootstrap payload.

class dagster_pipes.PipesMessageWriter[source]
get_opened_extras()[source]

Return arbitary reader-specific information to be passed back to the orchestration process under the extras key of the initialization payload.

Returns:

A dict of arbitrary data to be passed back to the orchestration process.

Return type:

PipesExtras

final get_opened_payload()[source]

Return a payload containing information about the external process to be passed back to the orchestration process. This should contain information that cannot be known before the external process is launched.

This method should not be overridden by users. Instead, users should override get_opened_extras to inject custom data.

abstract open(params)[source]

A @contextmanager that initializes a channel for writing messages back to Dagster.

This method should takes the params passed by the orchestration-side PipesMessageReader and use them to construct and yield a PipesMessageWriterChannel.

Parameters:

params (PipesParams) – The params provided by the message reader in the orchestration process.

Yields:

PipesMessageWriterChannel – Channel for writing messagse back to Dagster.

class dagster_pipes.PipesDefaultMessageWriter[source]

Message writer that writes messages to either a file or the stdout or stderr stream.

The write location is configured by the params received by the writer. If the params include a key path, then messages will be written to a file at the specified path. If the params instead include a key stdio, then messages then the corresponding value must specify either stderr or stdout, and messages will be written to the selected stream.

open(params)[source]

A @contextmanager that initializes a channel for writing messages back to Dagster.

This method should takes the params passed by the orchestration-side PipesMessageReader and use them to construct and yield a PipesMessageWriterChannel.

Parameters:

params (PipesParams) – The params provided by the message reader in the orchestration process.

Yields:

PipesMessageWriterChannel – Channel for writing messagse back to Dagster.

BUFFERED_STDIO_KEY = 'buffered_stdio'
FILE_PATH_KEY = 'path'
STDERR = 'stderr'
STDIO_KEY = 'stdio'
STDOUT = 'stdout'
class dagster_pipes.PipesBlobStoreMessageWriter(*, interval=10)[source]

Message writer channel that periodically uploads message chunks to some blob store endpoint.

abstract make_channel(params)[source]
open(params)[source]

Construct and yield a PipesBlobStoreMessageWriterChannel.

Parameters:

params (PipesParams) – The params provided by the message reader in the orchestration process.

Yields:

PipesBlobStoreMessageWriterChannel – Channel that periodically uploads message chunks to a blob store.

class dagster_pipes.PipesS3MessageWriter(client, *, interval=10)[source]

Message writer that writes messages by periodically writing message chunks to an S3 bucket.

Parameters:
  • client (Any) – A boto3.client(“s3”) object.

  • interval (float) – interval in seconds between upload chunk uploads

make_channel(params)[source]
class dagster_pipes.PipesDbfsMessageWriter(*, interval=10)[source]

Message writer that writes messages by periodically writing message chunks to a directory on DBFS.

get_opened_extras()[source]

Return arbitary reader-specific information to be passed back to the orchestration process under the extras key of the initialization payload.

Returns:

A dict of arbitrary data to be passed back to the orchestration process.

Return type:

PipesExtras

make_channel(params)[source]

Message writer channels

Message writer channels are objects that write messages back to the Dagster orchestration process.

class dagster_pipes.PipesMessageWriterChannel[source]

Object that writes messages back to the Dagster orchestration process.

abstract write_message(message)[source]

Write a message to the orchestration process.

Parameters:

message (PipesMessage) – The message to write.

class dagster_pipes.PipesBlobStoreMessageWriterChannel(*, interval=10)[source]

Message writer channel that periodically uploads message chunks to some blob store endpoint.

buffered_upload_loop()[source]
flush_messages()[source]
abstract upload_messages_chunk(payload, index)[source]
write_message(message)[source]

Write a message to the orchestration process.

Parameters:

message (PipesMessage) – The message to write.

class dagster_pipes.PipesBufferedFilesystemMessageWriterChannel(path, *, interval=10)[source]

Message writer channel that periodically writes message chunks to an endpoint mounted on the filesystem.

Parameters:

interval (float) – interval in seconds between chunk uploads

upload_messages_chunk(payload, index)[source]
class dagster_pipes.PipesFileMessageWriterChannel(path)[source]

Message writer channel that writes one message per line to a file.

write_message(message)[source]

Write a message to the orchestration process.

Parameters:

message (PipesMessage) – The message to write.

class dagster_pipes.PipesStreamMessageWriterChannel(stream)[source]

Message writer channel that writes one message per line to a TextIO stream.

write_message(message)[source]

Write a message to the orchestration process.

Parameters:

message (PipesMessage) – The message to write.

class dagster_pipes.PipesS3MessageWriterChannel(client, bucket, key_prefix, *, interval=10)[source]

Message writer channel for writing messages by periodically writing message chunks to an S3 bucket.

Parameters:
  • client (Any) – A boto3.client(“s3”) object.

  • bucket (str) – The name of the S3 bucket to write to.

  • key_prefix (Optional[str]) – An optional prefix to use for the keys of written blobs.

  • interval (float) – interval in seconds between upload chunk uploads

upload_messages_chunk(payload, index)[source]

Utilities

dagster_pipes.encode_env_var(value)

Encode value by serializing to JSON, compressing with zlib, and finally encoding with base64. base64_encode(compress(to_json(value))) in function notation.

Parameters:

value (Any) – The value to encode. Must be JSON-serializable.

Returns:

The encoded value.

Return type:

str

dagster_pipes.decode_env_var(value)

Decode a value by decoding from base64, decompressing with zlib, and finally deserializing from JSON. from_json(decompress(base64_decode(value))) in function notation.

Parameters:

value (Any) – The value to decode.

Returns:

The decoded value.

Return type:

Any

class dagster_pipes.DagsterPipesError[source]
class dagster_pipes.DagsterPipesWarning[source]