Ask AI

MLflow (dagster-mlflow)

dagster_mlflow.mlflow_tracking ResourceDefinition[source]

Config Schema:
experiment_name (dagster.StringSource):

MlFlow experiment name.

mlflow_tracking_uri (Union[dagster.StringSource, None], optional):

MlFlow tracking server uri.

Default Value: None

parent_run_id (Union[String, None], optional):

Mlflow run ID of parent run if this is a nested run.

Default Value: None

mlflow_run_id (Union[String, None], optional):

Mlflow run ID to use for this run.

Default Value: None

env (permissive dict, optional):

Environment variables for mlflow setup.

Default Value:
{}
env_to_tag (Union[List[Any], None], optional):

List of environment variables to log as tags in mlflow.

Default Value: None

extra_tags (permissive dict, optional):

Any extra key-value tags to log to mlflow.

Default Value:
{}

This resource initializes an MLflow run that’s used for all steps within a Dagster run.

This resource provides access to all of mlflow’s methods as well as the mlflow tracking client’s methods.

Usage:

  1. Add the mlflow resource to any ops in which you want to invoke mlflow tracking APIs.

  2. Add the end_mlflow_on_run_finished hook to your job to end the MLflow run when the Dagster run is finished.

Examples

from dagster_mlflow import end_mlflow_on_run_finished, mlflow_tracking

@op(required_resource_keys={"mlflow"})
def mlflow_op(context):
    mlflow.log_params(some_params)
    mlflow.tracking.MlflowClient().create_registered_model(some_model_name)

@end_mlflow_on_run_finished
@job(resource_defs={"mlflow": mlflow_tracking})
def mlf_example():
    mlflow_op()

# example using an mlflow instance with s3 storage
mlf_example.execute_in_process(run_config={
    "resources": {
        "mlflow": {
            "config": {
                "experiment_name": my_experiment,
                "mlflow_tracking_uri": "http://localhost:5000",

                # if want to run a nested run, provide parent_run_id
                "parent_run_id": an_existing_mlflow_run_id,

                # if you want to resume a run or avoid creating a new run in the resource init,
                # provide mlflow_run_id
                "mlflow_run_id": an_existing_mlflow_run_id,

                # env variables to pass to mlflow
                "env": {
                    "MLFLOW_S3_ENDPOINT_URL": my_s3_endpoint,
                    "AWS_ACCESS_KEY_ID": my_aws_key_id,
                    "AWS_SECRET_ACCESS_KEY": my_secret,
                },

                # env variables you want to log as mlflow tags
                "env_to_tag": ["DOCKER_IMAGE_TAG"],

                # key-value tags to add to your experiment
                "extra_tags": {"super": "experiment"},
            }
        }
    }
})
dagster_mlflow.end_mlflow_on_run_finished HookDefinition