Ask AI

Azure (dagster-azure)

Utilities for using Azure Storage Accounts with Dagster. This is mostly aimed at Azure Data Lake Storage Gen 2 (ADLS2) but also contains some utilities for Azure Blob Storage.

Resources

dagster_azure.adls2.ADLS2Resource ResourceDefinition[source]

Config Schema:
storage_account (dagster.StringSource):

The storage account name.

credential (selector):
Config Schema:
sas (strict dict):
Config Schema:
token (dagster.StringSource):

key (strict dict):
Config Schema:
key (dagster.StringSource):

default_azure_credential (strict dict):
Config Schema:
kwargs (dict):

Resource containing clients to access Azure Data Lake Storage Gen2.

Contains a client for both the Data Lake and Blob APIs, to work around the limitations of each.

dagster_azure.adls2.FakeADLS2Resource ResourceDefinition[source]

Config Schema:
account_name (dagster.StringSource):

storage_account (Union[dagster.StringSource, None], optional):

Default Value: None

Stateful mock of an ADLS2Resource for testing.

Wraps a mock.MagicMock. Containers are implemented using an in-memory dict.

class dagster_azure.blob.AzureBlobComputeLogManager(storage_account, container, secret_credential=None, local_dir=None, inst_data=None, prefix='dagster', upload_interval=None, default_azure_credential=None, show_url_only=True)[source]

Logs op compute function stdout and stderr to Azure Blob Storage.

This is also compatible with Azure Data Lake Storage.

Users should not instantiate this class directly. Instead, use a YAML block in dagster.yaml such as the following:

compute_logs:
  module: dagster_azure.blob.compute_log_manager
  class: AzureBlobComputeLogManager
  config:
    storage_account: my-storage-account
    container: my-container
    secret_credential:
      client_id: my-client-id
      client_secret: my-client-secret
      tenant_id: my-tenant-id
    default_azure_credential:
      exclude_environment_credential: true
    prefix: "dagster-test-"
    local_dir: "/tmp/cool"
    upload_interval: 30
Parameters:
  • storage_account (str) – The storage account name to which to log.

  • container (str) – The container (or ADLS2 filesystem) to which to log.

  • secret_credential (Optional[dict]) – Secret credential for the storage account. This should be a dictionary with keys client_id, client_secret, and tenant_id.

  • default_azure_credential (Optional[dict]) – Use and configure DefaultAzureCredential. Cannot be used with sas token or secret key config.

  • local_dir (Optional[str]) – Path to the local directory in which to stage logs. Default: dagster._seven.get_system_temp_directory().

  • prefix (Optional[str]) – Prefix for the log file keys.

  • upload_interval – (Optional[int]): Interval in seconds to upload partial log files blob storage. By default, will only upload when the capture is complete.

  • inst_data (Optional[ConfigurableClassData]) – Serializable representation of the compute log manager when newed up from config.

I/O Manager

dagster_azure.adls2.ADLS2PickleIOManager IOManagerDefinition[source]

Config Schema:
adls2_file_system (dagster.StringSource):

ADLS Gen2 file system name.

adls2_prefix (Union[dagster.StringSource, None], optional):

ADLS Gen2 file system prefix to write to.

Default Value: ‘dagster’

lease_duration (Union[dagster.IntSource, None], optional):

Lease duration in seconds. Must be between 15 and 60 seconds or -1 for infinite.

Default Value: 60

Persistent IO manager using Azure Data Lake Storage Gen2 for storage.

Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for ADLS and the backing container.

Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.

Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”.

Example usage:

  1. Attach this IO manager to a set of assets.

from dagster import Definitions, asset
from dagster_azure.adls2 import ADLS2PickleIOManager, adls2_resource

@asset
def asset1():
    # create df ...
    return df

@asset
def asset2(asset1):
    return df[:5]

defs = Definitions(
    assets=[asset1, asset2],
    resources={
        "io_manager": ADLS2PickleIOManager(
            adls2_file_system="my-cool-fs",
            adls2_prefix="my-cool-prefix"
        ),
        "adls2": adls2_resource,
    },
)
  1. Attach this IO manager to your job to make it available to your ops.

from dagster import job
from dagster_azure.adls2 import ADLS2PickleIOManager, adls2_resource

@job(
    resource_defs={
        "io_manager": ADLS2PickleIOManager(
            adls2_file_system="my-cool-fs",
            adls2_prefix="my-cool-prefix"
        ),
        "adls2": adls2_resource,
    },
)
def my_job():
    ...

File Manager (Experimental)

dagster_azure.adls2.adls2_file_manager ResourceDefinition[source]

Config Schema:
storage_account (dagster.StringSource):

The storage account name.

credential (selector):

The credentials with which to authenticate.

Config Schema:
sas (dagster.StringSource):

SAS token for the account.

key (dagster.StringSource):

Shared Access Key for the account.

DefaultAzureCredential (permissive dict, optional):
Default Value:
{}
adls2_file_system (dagster.StringSource):

ADLS Gen2 file system name

adls2_prefix (dagster.StringSource, optional):

Default Value: ‘dagster’

FileManager that provides abstract access to ADLS2.

Implements the FileManager API.

class dagster_azure.adls2.ADLS2FileHandle(account, file_system, key)[source]

A reference to a file on ADLS2.

Legacy

dagster_azure.adls2.ConfigurablePickledObjectADLS2IOManager IOManagerDefinition[source]

Config Schema:
adls2_file_system (dagster.StringSource):

ADLS Gen2 file system name.

adls2_prefix (Union[dagster.StringSource, None], optional):

ADLS Gen2 file system prefix to write to.

Default Value: ‘dagster’

lease_duration (Union[dagster.IntSource, None], optional):

Lease duration in seconds. Must be between 15 and 60 seconds or -1 for infinite.

Default Value: 60

deprecated This API will be removed in version 2.0.

Please use ADLS2PickleIOManager instead..

Renamed to ADLS2PickleIOManager. See ADLS2PickleIOManager for documentation.

dagster_azure.adls2.adls2_resource ResourceDefinition[source]

Config Schema:
storage_account (dagster.StringSource):

The storage account name.

credential (selector):

The credentials with which to authenticate.

Config Schema:
sas (dagster.StringSource):

SAS token for the account.

key (dagster.StringSource):

Shared Access Key for the account.

DefaultAzureCredential (permissive dict, optional):
Default Value:
{}

Resource that gives ops access to Azure Data Lake Storage Gen2.

The underlying client is a DataLakeServiceClient.

Attach this resource definition to a JobDefinition in order to make it available to your ops.

Example

from dagster import job, op
from dagster_azure.adls2 import adls2_resource

@op(required_resource_keys={'adls2'})
def example_adls2_op(context):
    return list(context.resources.adls2.adls2_client.list_file_systems())

@job(resource_defs={"adls2": adls2_resource})
def my_job():
    example_adls2_op()

Note that your ops must also declare that they require this resource with required_resource_keys, or it will not be initialized for the execution of their compute functions.

You may pass credentials to this resource using either a SAS token, a key or by passing the DefaultAzureCredential object.

resources:
  adls2:
    config:
      storage_account: my_storage_account
      # str: The storage account name.
      credential:
        sas: my_sas_token
        # str: the SAS token for the account.
        key:
          env: AZURE_DATA_LAKE_STORAGE_KEY
        # str: The shared access key for the account.
        DefaultAzureCredential: {}
        # dict: The keyword arguments used for DefaultAzureCredential
        # or leave the object empty for no arguments
        DefaultAzureCredential:
            exclude_environment_credential: true
dagster_azure.adls2.adls2_pickle_io_manager IOManagerDefinition[source]

Config Schema:
adls2_file_system (dagster.StringSource):

ADLS Gen2 file system name.

adls2_prefix (Union[dagster.StringSource, None], optional):

ADLS Gen2 file system prefix to write to.

Default Value: ‘dagster’

lease_duration (Union[dagster.IntSource, None], optional):

Lease duration in seconds. Must be between 15 and 60 seconds or -1 for infinite.

Default Value: 60

Persistent IO manager using Azure Data Lake Storage Gen2 for storage.

Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for ADLS and the backing container.

Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.

Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”.

Example usage:

  1. Attach this IO manager to a set of assets.

from dagster import Definitions, asset
from dagster_azure.adls2 import adls2_pickle_io_manager, adls2_resource

@asset
def asset1():
    # create df ...
    return df

@asset
def asset2(asset1):
    return df[:5]

defs = Definitions(
    assets=[asset1, asset2],
    resources={
        "io_manager": adls2_pickle_io_manager.configured(
            {"adls2_file_system": "my-cool-fs", "adls2_prefix": "my-cool-prefix"}
        ),
        "adls2": adls2_resource,
    },
)
  1. Attach this IO manager to your job to make it available to your ops.

from dagster import job
from dagster_azure.adls2 import adls2_pickle_io_manager, adls2_resource

@job(
    resource_defs={
        "io_manager": adls2_pickle_io_manager.configured(
            {"adls2_file_system": "my-cool-fs", "adls2_prefix": "my-cool-prefix"}
        ),
        "adls2": adls2_resource,
    },
)
def my_job():
    ...