Azure (dagster-azure)

Utilities for using Azure Storage Accounts with Dagster. This is mostly aimed at Azure Data Lake Storage Gen 2 (ADLS2) but also contains some utilities for Azure Blob Storage.


NOTE: This package is incompatible with dagster-snowflake! This is due to a version mismatch between the underlying azure-storage-blob package; dagster-snowflake has a transitive dependency on an old version, via snowflake-connector-python.

dagster_azure.adls2.adls2_resource ResourceDefinition[source]

Config Schema:
storage_account (dagster.StringSource)

The storage account name.

credential (selector)

The credentials with which to authenticate.

Config Schema:
sas (dagster.StringSource)

SAS token for the account.

key (dagster.StringSource)

Shared Access Key for the account

Resource that gives ops access to Azure Data Lake Storage Gen2.

The underlying client is a DataLakeServiceClient.

Attach this resource definition to a JobDefinition in order to make it available to your ops.

Example

from dagster import job, op
from dagster_azure.adls2 import adls2_resource

@op(required_resource_keys={'adls2'})
def example_adls2_op(context):
    return list(context.resources.adls2.adls2_client.list_file_systems())

@job(resource_defs={"adls2": adls2_resource})
def my_job():
    example_adls2_op()

Note that your ops must also declare that they require this resource with required_resource_keys, or it will not be initialized for the execution of their compute functions.

You may pass credentials to this resource using either a SAS token or a key, using environment variables if desired:

resources:
  adls2:
    config:
      storage_account: my_storage_account
      # str: The storage account name.
      credential:
        sas: my_sas_token
        # str: the SAS token for the account.
        key:
          env: AZURE_DATA_LAKE_STORAGE_KEY
        # str: The shared access key for the account.
class dagster_azure.adls2.FakeADLS2Resource(account_name, credential='fake-creds')[source]

Stateful mock of an ADLS2Resource for testing.

Wraps a mock.MagicMock. Containers are implemented using an in-memory dict.

dagster_azure.adls2.adls2_file_cache ResourceDefinition[source]

Config Schema:
storage_account (dagster.StringSource)

The storage account name.

credential (selector)

The credentials with which to authenticate.

Config Schema:
sas (dagster.StringSource)

SAS token for the account.

key (dagster.StringSource)

Shared Access Key for the account

prefix (dagster.StringSource)

The base path prefix to use in ADLS2

file_system (dagster.StringSource)

The storage account filesystem (aka container)

overwrite (Bool, optional)

Default Value: False

class dagster_azure.blob.AzureBlobComputeLogManager(storage_account, container, secret_key, local_dir=None, inst_data=None, prefix='dagster')[source]

Logs op compute function stdout and stderr to Azure Blob Storage.

This is also compatible with Azure Data Lake Storage.

Users should not instantiate this class directly. Instead, use a YAML block in dagster.yaml such as the following:

compute_logs:
  module: dagster_azure.blob.compute_log_manager
  class: AzureBlobComputeLogManager
  config:
    storage_account: my-storage-account
    container: my-container
    credential: sas-token-or-secret-key
    prefix: "dagster-test-"
    local_dir: "/tmp/cool"
Parameters
  • storage_account (str) – The storage account name to which to log.

  • container (str) – The container (or ADLS2 filesystem) to which to log.

  • secret_key (str) – Secret key for the storage account. SAS tokens are not supported because we need a secret key to generate a SAS token for a download URL.

  • local_dir (Optional[str]) – Path to the local directory in which to stage logs. Default: dagster.seven.get_system_temp_directory().

  • prefix (Optional[str]) – Prefix for the log file keys.

  • inst_data (Optional[ConfigurableClassData]) – Serializable representation of the compute log manager when newed up from config.

dagster_azure.adls2.adls2_file_manager ResourceDefinition[source]

Config Schema:
storage_account (dagster.StringSource)

The storage account name.

credential (selector)

The credentials with which to authenticate.

Config Schema:
sas (dagster.StringSource)

SAS token for the account.

key (dagster.StringSource)

Shared Access Key for the account

adls2_file_system (dagster.StringSource)

ADLS Gen2 file system name

adls2_prefix (dagster.StringSource, optional)

Default Value: ‘dagster’

FileManager that provides abstract access to ADLS2.

Implements the FileManager API.

class dagster_azure.adls2.ADLS2FileHandle(account, file_system, key)[source]

A reference to a file on ADLS2.

property account

The name of the ADLS2 account.

Type

str

property adls2_path

The file’s ADLS2 URL.

Type

str

property file_system

The name of the ADLS2 file system.

Type

str

property key

The ADLS2 key.

Type

str

property path_desc

The file’s ADLS2 URL.

Type

str

dagster_azure.adls2.adls2_pickle_io_manager IOManagerDefinition[source]

Config Schema:
adls2_file_system (dagster.StringSource)

ADLS Gen2 file system name

adls2_prefix (dagster.StringSource, optional)

Default Value: ‘dagster’

Persistent IO manager using Azure Data Lake Storage Gen2 for storage.

Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for ADLS and the backing container.

Attach this resource definition to your job in order to make it available all your ops:

@job(resource_defs={
    'io_manager': adls2_pickle_io_manager,
    'adls2': adls2_resource,
    ...,
})
def my_job():
    ...

You may configure this storage as follows:

resources:
    io_manager:
        config:
            adls2_file_system: my-cool-file-system
            adls2_prefix: good/prefix-for-files-
dagster_azure.adls2.adls2_pickle_asset_io_manager IOManagerDefinition[source]

Config Schema:
adls2_file_system (dagster.StringSource)

ADLS Gen2 file system name

adls2_prefix (dagster.StringSource, optional)

Default Value: ‘dagster’

Persistent IO manager using Azure Data Lake Storage Gen2 for storage, meant for use with software-defined assets.

Each asset is assigned to a single filesystem path, so subsequent materializations of an asset will overwrite previous materializations of that asset.

Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for ADLS and the backing container.

Attach this resource definition to your job in order to make it available all your ops:

asset_group = AssetGroup(
    assets...,
    resource_defs={'io_manager': adls2_pickle_io_manager, "adls2": adls2_resource, ...}),
)

You may configure this storage as follows:

resources:
    io_manager:
        config:
            adls2_file_system: my-cool-file-system
            adls2_prefix: good/prefix-for-files