dbt (dagster-dbt)

This library provides a Dagster integration with dbt (data build tool), created by dbt Labs.

Ops

dbt Core Ops

dagster_dbt provides a set of pre-built ops that work with either the CLI or RPC interfaces. For more advanced use cases, we suggest building your own ops which directly interact with these resources.

dagster_dbt.dbt_run_op = <dagster.core.definitions.op_definition.OpDefinition object>[source]

Config Schema:
yield_materializations (Bool, optional)

If True, materializations corresponding to the results of the dbt operation will be yielded when the op executes. Default: True

Default Value: True

asset_key_prefix (List[String], optional)

If provided and yield_materializations is True, these components will be used to prefix the generated asset keys.

Default Value: [‘dbt’]

This op executes a dbt run command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource) or over RPC (using the dbt_rpc_sync_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_run_op, dbt_cli_resource, dbt_rpc_sync_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_run_op()

@job(resource_defs={"dbt":dbt_rpc_sync_resource})
def my_dbt_rpc_job():
    dbt_run_op()
dagster_dbt.dbt_compile_op(context)[source]

This op executes a dbt compile command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource) or over RPC (using the dbt_rpc_sync_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_compile_op, dbt_cli_resource, dbt_rpc_sync_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_compile_op()

@job(resource_defs={"dbt":dbt_rpc_sync_resource})
def my_dbt_rpc_job():
    dbt_compile_op()
dagster_dbt.dbt_ls_op(context)[source]

This op executes a dbt ls command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource) or over RPC (using the dbt_rpc_sync_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_ls_op, dbt_cli_resource, dbt_rpc_sync_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_ls_op()

@job(resource_defs={"dbt":dbt_rpc_sync_resource})
def my_dbt_rpc_job():
    dbt_ls_op()
dagster_dbt.dbt_test_op(context)[source]

This op executes a dbt test command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource) or over RPC (using the dbt_rpc_sync_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_test_op, dbt_cli_resource, dbt_rpc_sync_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_test_op()

@job(resource_defs={"dbt":dbt_rpc_sync_resource})
def my_dbt_rpc_job():
    dbt_test_op()
dagster_dbt.dbt_snapshot_op(context)[source]

This op executes a dbt snapshot command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource) or over RPC (using the dbt_rpc_sync_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_snapshot_op, dbt_cli_resource, dbt_rpc_sync_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_snapshot_op()

@job(resource_defs={"dbt":dbt_rpc_sync_resource})
def my_dbt_rpc_job():
    dbt_snapshot_op()
dagster_dbt.dbt_seed_op(context)[source]

This op executes a dbt seed command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource) or over RPC (using the dbt_rpc_sync_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_seed_op, dbt_cli_resource, dbt_rpc_sync_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_seed_op()

@job(resource_defs={"dbt":dbt_rpc_sync_resource})
def my_dbt_rpc_job():
    dbt_seed_op()
dagster_dbt.dbt_docs_generate_op(context)[source]

This op executes a dbt docs generate command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource) or over RPC (using the dbt_rpc_sync_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_docs_generate_op, dbt_cli_resource, dbt_rpc_sync_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_docs_generate_op()

@job(resource_defs={"dbt":dbt_rpc_sync_resource})
def my_dbt_rpc_job():
    dbt_docs_generate_op()

dbt Cloud Ops

dagster_dbt.dbt_cloud_run_op = <dagster.core.definitions.op_definition.OpDefinition object>[source]

Config Schema:
job_id (Int)

The integer ID of the relevant dbt Cloud job. You can find this value by going to the details page of your job in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/jobs/{job_id}/

poll_interval (Float, optional)

The time (in seconds) that will be waited between successive polls.

Default Value: 10

poll_timeout (Union[Float, None], optional)

The maximum time that will waited before this operation is timed out. By default, this will never time out.

Default Value: None

yield_materializations (Bool, optional)

If True, materializations corresponding to the results of the dbt operation will be yielded when the op executes.

Default Value: True

asset_key_prefix (List[String], optional)

If provided and yield_materializations is True, these components will be used to prefix the generated asset keys.

Default Value: [‘dbt’]

Initiates a run for a dbt Cloud job, then polls until the run completes. If the job fails or is otherwised stopped before succeeding, a dagster.Failure exception will be raised, and this op will fail.

It requires the use of a ‘dbt_cloud’ resource, which is used to connect to the dbt Cloud API.

Config Options:

job_id (int)

The integer ID of the relevant dbt Cloud job. You can find this value by going to the details page of your job in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/jobs/{job_id}/

poll_interval (float)

The time (in seconds) that will be waited between successive polls. Defaults to 10.

poll_timeout (float)

The maximum time (in seconds) that will waited before this operation is timed out. By default, this will never time out.

yield_materializations (bool)

If True, materializations corresponding to the results of the dbt operation will be yielded when the solid executes. Defaults to True.

rasset_key_prefix (float)

If provided and yield_materializations is True, these components will be used to ” prefix the generated asset keys. Defaults to [“dbt”].

Examples:

from dagster import job
from dagster_dbt import dbt_cloud_resource, dbt_cloud_run_op

my_dbt_cloud_resource = dbt_cloud_resource.configured(
    {"auth_token": {"env": "DBT_CLOUD_AUTH_TOKEN"}, "account_id": 77777}
)
run_dbt_nightly_sync = dbt_cloud_run_op.configured(
    {"job_id": 54321}, name="run_dbt_nightly_sync"
)

@job(resource_defs={"dbt_cloud": my_dbt_cloud_resource})
def dbt_cloud():
    run_dbt_nightly_sync()

Resources

CLI Resources

class dagster_dbt.DbtCliResource(executable, default_flags, warn_error, ignore_handled_error, target_path, logger=None, docs_url=None)[source]

A resource that allows you to execute dbt cli commands. For the most up-to-date documentation on the specific parameters available to you for each command, check out the dbt docs:

https://docs.getdbt.com/reference/commands/run

To use this as a dagster resource, we recommend using dbt_cli_resource.

build(select=None, **kwargs)[source]

Run the build command on a dbt project. kwargs are passed in as additional parameters.

Parameters

select (List[str], optional) – the models/resources to include in the run.

Returns

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type

DbtCliOutput

cli(command, **kwargs)[source]
Executes a dbt CLI command. Params passed in as keyword arguments will be merged with the

default flags that were configured on resource initialization (if any) overriding the default values if necessary.

Parameters

command (str) – The command you wish to run (e.g. ‘run’, ‘test’, ‘docs generate’, etc.)

Returns

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type

DbtCliOutput

compile(models=None, exclude=None, **kwargs)[source]

Run the compile command on a dbt project. kwargs are passed in as additional parameters.

Parameters
  • models (List[str], optional) – the models to include in compilation.

  • exclude (List[str]), optional) – the models to exclude from compilation.

Returns

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type

DbtCliOutput

property default_flags

A set of params populated from resource config that are passed as flags to each dbt CLI command.

freshness(select=None, **kwargs)[source]

Run the source snapshot-freshness command on a dbt project. kwargs are passed in as additional parameters.

Parameters

select (List[str], optional) – the sources to include in the run.

Returns

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type

DbtCliOutput

generate_docs(compile_project=False, **kwargs)[source]

Run the docs generate command on a dbt project. kwargs are passed in as additional parameters.

Parameters

compile_project (bool, optional) – If true, compile the project before generating a catalog.

Returns

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type

DbtCliOutput

get_manifest_json(**kwargs)[source]

Get a parsed version of the manifest.json file for the relevant dbt project.

Returns

dictionary containing the parsed contents of the manifest json file

for this dbt project.

Return type

Dict[str, Any]

get_run_results_json(**kwargs)[source]

Get a parsed version of the run_results.json file for the relevant dbt project.

Returns

dictionary containing the parsed contents of the manifest json file

for this dbt project.

Return type

Dict[str, Any]

ls(select=None, models=None, exclude=None, **kwargs)[source]

Run the ls command on a dbt project. kwargs are passed in as additional parameters.

Parameters
  • select (List[str], optional) – the resources to include in the output.

  • models (List[str], optional) – the models to include in the output.

  • exclude (List[str], optional) – the resources to exclude from the output.

Returns

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type

DbtCliOutput

run(models=None, exclude=None, **kwargs)[source]

Run the run command on a dbt project. kwargs are passed in as additional parameters.

Parameters
  • models (List[str], optional) – the models to include in compilation.

  • exclude (List[str]), optional) – the models to exclude from compilation.

Returns

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type

DbtCliOutput

run_operation(macro, args=None, **kwargs)[source]

Run the run-operation command on a dbt project. kwargs are passed in as additional parameters.

Parameters
  • macro (str) – the dbt macro to invoke.

  • args (Dict[str, Any], optional) – the keyword arguments to be supplied to the macro.

Returns

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type

DbtCliOutput

seed(show=False, select=None, exclude=None, **kwargs)[source]

Run the seed command on a dbt project. kwargs are passed in as additional parameters.

Parameters
  • show (bool, optional) – If True, then show a sample of the seeded data in the response. Defaults to False.

  • select (List[str], optional) – the snapshots to include in the run.

  • exclude (List[str], optional) – the snapshots to exclude from the run.

Returns

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type

DbtCliOutput

snapshot(select=None, exclude=None, **kwargs)[source]

Run the snapshot command on a dbt project. kwargs are passed in as additional parameters.

Parameters
  • select (List[str], optional) – the snapshots to include in the run.

  • exclude (List[str], optional) – the snapshots to exclude from the run.

Returns

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type

DbtCliOutput

property strict_flags

A set of flags that should not be auto-populated from the default flags unless they are arguments to the associated function.

test(models=None, exclude=None, data=True, schema=True, **kwargs)[source]

Run the test command on a dbt project. kwargs are passed in as additional parameters.

Parameters
  • models (List[str], optional) – the models to include in testing.

  • exclude (List[str], optional) – the models to exclude from testing.

  • data (bool, optional) – If True (default), then run data tests.

  • schema (bool, optional) – If True (default), then run schema tests.

Returns

An instance of DbtCliOutput containing

parsed log output as well as the contents of run_results.json (if applicable).

Return type

DbtCliOutput

class dagster_dbt.DbtCliOutput(command, return_code, raw_output, logs, result, docs_url=None)[source]

The results of executing a dbt command, along with additional metadata about the dbt CLI process that was run.

Note that users should not construct instances of this class directly. This class is intended to be constructed from the JSON output of dbt commands.

command

The full shell command that was executed.

Type

str

return_code

The return code of the dbt CLI process.

Type

int

raw_output

The raw output (stdout) of the dbt CLI process.

Type

str

logs

List of parsed JSON logs produced by the dbt command.

Type

List[Dict[str, Any]]

result

Dictionary containing dbt-reported result information contained in run_results.json. Some dbt commands do not produce results, and will therefore have result = None.

Type

Optional[Dict[str, Any]]

docs_url

Hostname where dbt docs are being served for this project.

Type

Optional[str]

dagster_dbt.dbt_cli_resource ResourceDefinition[source]

Config Schema:
project_dir (dagster.StringSource, optional)

Which directory to look in for the dbt_project.yml file. Default is the current working directory and its parents.

Default Value: ‘.’

profiles_dir (dagster.StringSource, optional)

Which directory to look in for the profiles.yml file. Default = $DBT_PROFILES_DIR or $HOME/.dbt

profile (dagster.StringSource, optional)

Which profile to load. Overrides setting in dbt_project.yml.

target (dagster.StringSource, optional)

Which target to load for the given profile.

vars (permissive dict, optional)

Supply variables to the project. This argument overrides variables defined in your dbt_project.yml file. This argument should be a dictionary, eg. {‘my_variable’: ‘my_value’}

bypass_cache (Bool, optional)

If set, bypass the adapter-level cache of database state

Default Value: False

warn_error (Bool, optional)

If dbt would normally warn, instead raise an exception. Examples include –models that selects nothing, deprecations, configurations with no associated models, invalid test configurations, and missing sources/refs in tests.

Default Value: False

dbt_executable (dagster.StringSource, optional)

Path to the dbt executable. Default is dbt

Default Value: ‘dbt’

ignore_handled_error (Bool, optional)

When True, will not raise an exception when the dbt CLI returns error code 1. Default is False.

Default Value: False

target_path (dagster.StringSource, optional)

The directory path for target if different from the default target-path in your dbt project configuration file.

Default Value: ‘target’

docs_url (dagster.StringSource, optional)

The url for where dbt docs are being served for this project.

This resource defines a dbt CLI interface.

To configure this resource, we recommend using the configured method.

Examples:

custom_dbt_cli_resource = dbt_cli_resource.configured({"project-dir": "path/to/my/dbt_project"})

@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt": custom_dbt_cli_resource})])
def dbt_cli_pipeline():
    # Run solids with `required_resource_keys={"dbt", ...}`.

You may configure this resource as follows:

resources:
  dbt_cli_resource:
    config:
      project_dir: "."
      # Optional[str]: Which directory to look in for the dbt_project.yml file. Default is
      # the current working directory and its parents.
      profiles_dir: $DBT_PROFILES_DIR or $HOME/.dbt
      # Optional[str]: Which directory to look in for the profiles.yml file.
      profile: ""
      # Optional[str]: Which profile to load. Overrides setting in dbt_project.yml.
      target: ""
      # Optional[str]: Which target to load for the given profile.
      vars: {}
      # Optional[Permissive]: Supply variables to the project. This argument overrides
      # variables defined in your dbt_project.yml file. This argument should be a
      # dictionary, eg. "{'my_variable': 'my_value'}"
      bypass_cache: False
      # Optional[bool]: If set, bypass the adapter-level cache of database state.

RPC Resources

class dagster_dbt.DbtRpcResource(host='0.0.0.0', port=8580, jsonrpc_version='2.0', logger=None, **_)[source]

A client for a dbt RPC server.

To use this as a dagster resource, we recommend using dbt_rpc_resource.

build(select=None, **kwargs)[source]

Run the build command on a dbt project. kwargs are passed in as additional parameters.

Parameters

select (List[str], optional) – the models/resources to include in the run.

Returns

object containing parsed output from dbt

Return type

DbtOutput

cli(command, **kwargs)[source]

Sends a request with CLI syntax to the dbt RPC server, and returns the response. For more details, see the dbt docs for running CLI commands via RPC.

Parameters

cli (str) – a dbt command in CLI syntax.

Returns

the HTTP response from the dbt RPC server.

Return type

Response

compile(models=None, exclude=None, **kwargs)[source]

Sends a request with the method compile to the dbt RPC server, and returns the response. For more details, see the dbt docs for compiling projects via RPC.

Parameters
  • models (List[str], optional) – the models to include in compilation.

  • exclude (List[str]), optional) – the models to exclude from compilation.

Returns

the HTTP response from the dbt RPC server.

Return type

Response

compile_sql(sql, name)[source]

Sends a request with the method compile_sql to the dbt RPC server, and returns the response. For more details, see the dbt docs for compiling SQL via RPC.

Parameters
  • sql (str) – the SQL to compile in base-64 encoding.

  • name (str) – a name for the compiled SQL.

Returns

the HTTP response from the dbt RPC server.

Return type

Response

generate_docs(compile_project=False, **kwargs)[source]

Sends a request with the method docs.generate to the dbt RPC server, and returns the response. For more details, see the dbt docs for the RPC method docs.generate.

Parameters

compile_project (bool, optional) – If true, compile the project before generating a catalog.

get_manifest_json(**kwargs)[source]

Get a parsed version of the manifest.json file for the relevant dbt project.

Returns

dictionary containing the parsed contents of the manifest json file

for this dbt project.

Return type

Dict[str, Any]

get_run_results_json(**kwargs)[source]

Get a parsed version of the run_results.json file for the relevant dbt project.

Returns

dictionary containing the parsed contents of the run_results json file

for this dbt project.

Return type

Dict[str, Any]

property host

The IP address of the host of the dbt RPC server.

Type

str

property jsonrpc_version

The JSON-RPC version to send in RPC requests.

Type

str

kill(task_id)[source]

Sends a request with the method kill to the dbt RPC server, and returns the response. For more details, see the dbt docs for the RPC method kill.

Parameters

task_id (str) – the ID of the task to terminate.

Returns

the HTTP response from the dbt RPC server.

Return type

Response

property logger

A property for injecting a logger dependency.

Type

logging.Logger

ls(select=None, models=None, exclude=None, **kwargs)[source]

Sends a request with the method list to the dbt RPC server, and returns the response. For more details, see the dbt docs for list.

Parameters
  • select (List[str], optional) – the resources to include in the output.

  • models (List[str], optional) – the models to include in the output.

  • exclude (List[str]), optional) – the resources to exclude from compilation.

Returns

the HTTP response from the dbt RPC server.

Return type

Response

poll(request_token, logs=False, logs_start=0)[source]

Sends a request with the method poll to the dbt RPC server, and returns the response. For more details, see the dbt docs for the RPC method poll.

Parameters
  • request_token (str) – the token to poll responses for.

  • logs (bool) – Whether logs should be returned in the response. Defaults to False.

  • logs_start (int) – The zero-indexed log line to fetch logs from. Defaults to 0.

Returns

the HTTP response from the dbt RPC server.

Return type

Response

property port

The port of the dbt RPC server.

Type

int

ps(completed=False)[source]

Sends a request with the method ps to the dbt RPC server, and returns the response. For more details, see the dbt docs for the RPC method ps.

Parameters

compelted (bool) – If True, then also return completed tasks. Defaults to False.

Returns

the HTTP response from the dbt RPC server.

Return type

Response

run(models=None, exclude=None, **kwargs)[source]

Sends a request with the method run to the dbt RPC server, and returns the response. For more details, see the dbt docs for the RPC method run.

Parameters
  • models (List[str], optional) – the models to include in the run.

  • exclude (List[str]), optional) – the models to exclude from the run.

Returns

the HTTP response from the dbt RPC server.

Return type

Response

run_operation(macro, args=None, **kwargs)[source]

Sends a request with the method run-operation to the dbt RPC server, and returns the response. For more details, see the dbt docs for the command run-operation.

Parameters
  • macro (str) – the dbt macro to invoke.

  • args (Dict[str, Any], optional) – the keyword arguments to be supplied to the macro.

Returns

the HTTP response from the dbt RPC server.

Return type

Response

run_sql(sql, name)[source]

Sends a request with the method run_sql to the dbt RPC server, and returns the response. For more details, see the dbt docs for running SQL via RPC.

Parameters
  • sql (str) – the SQL to run in base-64 encoding.

  • name (str) – a name for the compiled SQL.

Returns

the HTTP response from the dbt RPC server.

Return type

Response

seed(show=False, select=None, exclude=None, **kwargs)[source]

Sends a request with the method seed to the dbt RPC server, and returns the response. For more details, see the dbt docs for the RPC method seed.

Parameters
  • show (bool, optional) – If True, then show a sample of the seeded data in the response. Defaults to False.

  • select (List[str], optional) – the snapshots to include in the run.

  • exclude (List[str], optional) – the snapshots to exclude from the run.

Returns

the HTTP response from the dbt RPC server.

Return type

Response

snapshot(select=None, exclude=None, **kwargs)[source]

Sends a request with the method snapshot to the dbt RPC server, and returns the response. For more details, see the dbt docs for the command snapshot.

Parameters
  • select (List[str], optional) – the snapshots to include in the run.

  • exclude (List[str], optional) – the snapshots to exclude from the run.

Returns

the HTTP response from the dbt RPC server.

Return type

Response

snapshot_freshness(select=None, **kwargs)[source]

Sends a request with the method snapshot-freshness to the dbt RPC server, and returns the response. For more details, see the dbt docs for the command source snapshot-freshness.

Parameters

select (List[str], optional) – the models to include in calculating snapshot freshness.

Returns

the HTTP response from the dbt RPC server.

Return type

Response

status()[source]

Sends a request with the method status to the dbt RPC server, and returns the response. For more details, see the dbt docs for the RPC method status.

Returns

the HTTP response from the dbt RPC server.

Return type

Response

test(models=None, exclude=None, data=True, schema=True, **kwargs)[source]

Sends a request with the method test to the dbt RPC server, and returns the response. For more details, see the dbt docs for the RPC method test.

Parameters
  • models (List[str], optional) – the models to include in testing.

  • exclude (List[str], optional) – the models to exclude from testing.

  • data (bool, optional) – If True (default), then run data tests.

  • schema (bool, optional) – If True (default), then run schema tests.

Returns

the HTTP response from the dbt RPC server.

Return type

Response

property url

The URL for sending dbt RPC requests.

Type

str

class dagster_dbt.DbtRpcSyncResource(host='0.0.0.0', port=8580, jsonrpc_version='2.0', logger=None, poll_interval=1, **_)[source]
class dagster_dbt.DbtRpcOutput(response)[source]

The output from executing a dbt command via the dbt RPC server.

result

The parsed contents of the “result” field of the JSON response from the rpc server (if any).

Type

Dict[str, Any]

response_dict

The entire contents of the JSON response from the rpc server.

Type

Dict[str, Any]

response

The original Response from which this output was generated.

Type

requests.Response

dagster_dbt.local_dbt_rpc_resource ResourceDefinition

This resource defines a dbt RPC client for an RPC server running on 0.0.0.0:8580.

dagster_dbt.dbt_rpc_resource ResourceDefinition[source]

Config Schema:
host (dagster.StringSource)

port (dagster.IntSource, optional)

Default Value: 8580

This resource defines a dbt RPC client.

To configure this resource, we recommend using the configured method.

Examples:

Examples:

from dagster_dbt import dbt_rpc_resource

custom_dbt_rpc_resource = dbt_rpc_resource.configured({"host": "80.80.80.80","port": 8080,})

@job(resource_defs={"dbt_rpc": custom_dbt_rpc_sync_resource})
def dbt_rpc_job():
    # Run ops with `required_resource_keys={"dbt_rpc", ...}`.
dagster_dbt.dbt_rpc_sync_resource ResourceDefinition[source]

Config Schema:
host (dagster.StringSource)

port (dagster.IntSource, optional)

Default Value: 8580

poll_interval (dagster.IntSource, optional)

Default Value: 1

This resource defines a synchronous dbt RPC client, which sends requests to a dbt RPC server, and waits for the request to complete before returning.

To configure this resource, we recommend using the configured method.

Examples:

from dagster_dbt import dbt_rpc_sync_resource

custom_sync_dbt_rpc_resource = dbt_rpc_sync_resource.configured({"host": "80.80.80.80","port": 8080,})

@job(resource_defs={"dbt_rpc": custom_dbt_rpc_sync_resource})
def dbt_rpc_sync_job():
    # Run ops with `required_resource_keys={"dbt_rpc", ...}`.

dbt Cloud Resources

class dagster_dbt.DbtCloudResourceV2(auth_token, account_id, disable_schedule_on_trigger=True, request_max_retries=3, request_retry_delay=0.25, dbt_cloud_host='https://cloud.getdbt.com/', log=<Logger dagster.builtin (DEBUG)>, log_requests=False)[source]

This class exposes methods on top of the dbt Cloud REST API v2.

For a complete set of documentation on the dbt Cloud Administrative REST API, including expected response JSON schemae, see the dbt Cloud API Docs.

cancel_run(run_id)[source]

Cancels a dbt Cloud run.

Parameters

run_id (int) – The ID of the relevant dbt Cloud run. You can find this value by going to the details page of your run in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/runs/{run_id}/

Returns

A dictionary containing the parsed contents of the dbt Cloud run details.

See: https://docs.getdbt.com/dbt-cloud/api-v2#operation/getRunById for schema.

Return type

Dict[str, Any]

get_job(job_id)[source]

Gets details about a given dbt job from the dbt Cloud API.

Parameters

job_id (int) – The ID of the relevant dbt Cloud job. You can find this value by going to the details page of your job in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/jobs/{job_id}/

Returns

Parsed json data from the response to this request

Return type

Dict[str, Any]

get_manifest(run_id, step=None)[source]

The parsed contents of a manifest.json file created by a completed run.

Parameters
  • run_id (int) – The ID of the relevant dbt Cloud run. You can find this value by going to the details page of your run in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/runs/{run_id}/

  • step (int) – The index of the step in the run to query for artifacts. The first step in the run has the index 1. If the step parameter is omitted, then this endpoint will return the artifacts compiled for the last step in the run.

Returns

Parsed contents of the manifest.json file

Return type

Dict[str, Any]

get_run(run_id, include_related=None)[source]

Gets details about a specific job run.

Parameters
  • run_id (int) – The ID of the relevant dbt Cloud run. You can find this value by going to the details page of your run in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/runs/{run_id}/

  • include_related (List[str]) – List of related fields to pull with the run. Valid values are “trigger”, “job”, and “debug_logs”.

Returns

A dictionary containing the parsed contents of the dbt Cloud run details.

See: https://docs.getdbt.com/dbt-cloud/api-v2#operation/getRunById for schema.

Return type

Dict[str, Any]

get_run_artifact(run_id, path, step=None)[source]

The string contents of a run artifact from a dbt Cloud run.

Parameters
  • run_id (int) – The ID of the relevant dbt Cloud run. You can find this value by going to the details page of your run in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/runs/{run_id}/

  • path (str) – The path to this run artifact (e.g. ‘run/my_new_project/models/example/my_first_dbt_model.sql’)

  • step (int) – The index of the step in the run to query for artifacts. The first step in the run has the index 1. If the step parameter is omitted, then this endpoint will return the artifacts compiled for the last step in the run.

Returns

List of the names of the available run artifacts

Return type

List[str]

get_run_results(run_id, step=None)[source]

The parsed contents of a run_results.json file created by a completed run.

Parameters
  • run_id (int) – The ID of the relevant dbt Cloud run. You can find this value by going to the details page of your run in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/runs/{run_id}/

  • step (int) – The index of the step in the run to query for artifacts. The first step in the run has the index 1. If the step parameter is omitted, then this endpoint will return the artifacts compiled for the last step in the run.

Returns

Parsed contents of the run_results.json file

Return type

Dict[str, Any]

get_run_steps(run_id)[source]

Gets the steps of an initialized dbt Cloud run.

Parameters

run_id (int) – The ID of the relevant dbt Cloud run. You can find this value by going to the details page of your run in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/runs/{run_id}/

Returns

List of commands for each step of the run.

Return type

List[str, Any]

get_runs(include_related=None, job_id=None, order_by='-id', offset=0, limit=100)[source]

Returns a list of runs from dbt Cloud. This can be optionally filtered to a specific job using the job_definition_id. It supports pagination using offset and limit as well and can be configured to load a variety of related information about the runs.

Parameters
  • include_related (Optional[List[str]]) – A list of resources to include in the response from dbt Cloud. This is technically a required field according to the API, but it can be passed with an empty list where it will only load the default run information. Valid values are “trigger”, “job”, “repository”, and “environment”.

  • job_definition_id (Optional[int]) – This method can be optionally filtered to only load runs for a specific job id if it is included here. If omitted it will pull runs for every job.

  • order_by (Optional[str]) – An identifier designated by dbt Cloud in which to sort the results before returning them. Useful when combined with offset and limit to load runs for a job. Defaults to “-id” where “-” designates reverse order and “id” is the key to filter on.

  • offset (int) – An offset to apply when listing runs. Can be used to paginate results when combined with order_by and limit. Defaults to 0.

  • limit (int) – Limits the amount of rows returned by the API. Defaults to 100.

Returns

A list of dictionaries containing the runs and any included

related information.

Return type

List[Dict[str, Any]]

list_run_artifacts(run_id, step=None)[source]

Lists the paths of the available run artifacts from a completed dbt Cloud run.

Parameters
  • run_id (int) – The ID of the relevant dbt Cloud run. You can find this value by going to the details page of your run in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/runs/{run_id}/

  • step (int) – The index of the step in the run to query for artifacts. The first step in the run has the index 1. If the step parameter is omitted, then this endpoint will return the artifacts compiled for the last step in the run

Returns

List of the paths of the available run artifacts

Return type

List[str]

make_request(method, endpoint, data=None, return_text=False)[source]

Creates and sends a request to the desired dbt Cloud API endpoint.

Parameters
  • method (str) – The http method to use for this request (e.g. “POST”, “GET”, “PATCH”).

  • endpoint (str) – The dbt Cloud API endpoint to send this request to.

  • data (Optional[str]) – JSON-formatted data string to be included in the request.

  • return_text (bool) – Override default behavior and return unparsed {“text”: response.text} blob instead of json.

Returns

Parsed json data from the response to this request

Return type

Dict[str, Any]

poll_run(run_id, poll_interval=10, poll_timeout=None, href=None)[source]

Polls a dbt Cloud job run until it completes. Will raise a dagster.Failure exception if the run does not complete successfully.

Parameters
  • run_id (int) – The ID of the relevant dbt Cloud run. You can find this value by going to the details page of your run in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/runs/{run_id}/

  • poll_interval (float) – The time (in seconds) that should be waited between successive polls of the dbt Cloud API.

  • poll_timeout (float) – The maximum time (in seconds) that should be waited for this run to complete. If this threshold is exceeded, the run will be cancelled and an exception will be thrown. By default, this will poll forver.

  • href (str) – For internal use, generally should not be set manually.

Returns

A dictionary containing the parsed contents of the dbt Cloud run details.

See: https://docs.getdbt.com/dbt-cloud/api-v2#operation/getRunById for schema.

Return type

Dict[str, Any]

run_job(job_id, **kwargs)[source]

Initializes a run for a job. Overrides for specific properties can be set by passing in values to the kwargs. A full list of overridable properties can be found here: https://docs.getdbt.com/dbt-cloud/api-v2#operation/triggerRun

Parameters
  • job_id (int) – The ID of the relevant dbt Cloud job. You can find this value by going to the details page of your job in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/jobs/{job_id}/

  • kwargs – Passed in as the properties to be overridden.

Returns

Parsed json data from the response to this request

Return type

Dict[str, Any]

run_job_and_poll(job_id, poll_interval=10, poll_timeout=None)[source]

Runs a dbt Cloud job and polls until it completes. Will raise a dagster.Failure exception if the run does not complete successfully.

Parameters
  • job_id (int) – The ID of the relevant dbt Cloud job. You can find this value by going to the details page of your job in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/jobs/{job_id}/

  • poll_interval (float) – The time (in seconds) that should be waited between successive polls of the dbt Cloud API.

  • poll_timeout (float) – The maximum time (in seconds) that should be waited for this run to complete. If this threshold is exceeded, the run will be cancelled and an exception will be thrown. By default, this will poll forver.

Returns

Class containing details about the specific job run and the

parsed run results.

Return type

DbtCloudOutput

update_job(job_id, **kwargs)[source]

Updates specific properties of a dbt job. Documentation on the full set of potential parameters can be found here: https://docs.getdbt.com/dbt-cloud/api-v2#operation/updateJobById

Parameters
  • job_id (int) – The ID of the relevant dbt Cloud job. You can find this value by going to the details page of your job in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/jobs/{job_id}/

  • kwargs – Passed in as the properties to be changed.

Returns

Parsed json data from the response to this request

Return type

Dict[str, Any]

Examples:

# disable schedule for job with id=12345
my_dbt_cloud_resource.update_job(12345, triggers={"schedule": False})
dagster_dbt.dbt_cloud_resource ResourceDefinition[source]

Config Schema:
auth_token (dagster.StringSource)

dbt Cloud API Token. User tokens can be found in the [dbt Cloud UI](https://cloud.getdbt.com/#/profile/api/), or see the [dbt Cloud Docs](https://docs.getdbt.com/docs/dbt-cloud/dbt-cloud-api/service-tokens) for instructions on creating a Service Account token.

account_id (Int)

dbt Cloud Account ID. This value can be found in the url of a variety of views in the dbt Cloud UI, e.g. https://cloud.getdbt.com/#/accounts/{account_id}/settings/.

disable_schedule_on_trigger (Bool, optional)

Specifies if you would like any job that is triggered using this resource to automatically disable its schedule.

Default Value: True

request_max_retries (Int, optional)

The maximum number of times requests to the dbt Cloud API should be retried before failing.

Default Value: 3

request_retry_delay (Float, optional)

Time (in seconds) to wait between each request retry.

Default Value: 0.25

dbt_cloud_host (dagster.StringSource, optional)

The hostname where dbt cloud is being hosted (e.g. https://my_org.cloud.getdbt.com/).

Default Value:https://cloud.getdbt.com/

This resource allows users to programatically interface with the dbt Cloud Administrative REST API (v2) to launch jobs and monitor their progress. This currently implements only a subset of the functionality exposed by the API.

For a complete set of documentation on the dbt Cloud Administrative REST API, including expected response JSON schemae, see the dbt Cloud API Docs.

To configure this resource, we recommend using the configured method.

Examples:

from dagster import job
from dagster_dbt import dbt_cloud_resource

my_dbt_cloud_resource = dbt_cloud_resource.configured(
    {
        "auth_token": {"env": "DBT_CLOUD_AUTH_TOKEN"},
        "account_id": 30000,
    }
)

@job(resource_defs={"dbt_cloud":my_dbt_cloud_resource})
def my_dbt_cloud_job():
    ...

Assets

dagster_dbt.load_assets_from_dbt_project(project_dir, profiles_dir=None, target_dir=None, select=None, runtime_metadata_fn=None, io_manager_key=None, node_info_to_asset_key=<function _get_node_asset_key>, use_build_command=False)[source]

Loads a set of DBT models from a DBT project into Dagster assets.

Creates one Dagster asset for each dbt model. All assets will be re-materialized using a single dbt run command.

Parameters
  • project_dir (Optional[str]) – The directory containing the DBT project to load.

  • profiles_dir (Optional[str]) – The profiles directory to use for loading the DBT project. Defaults to a directory called “config” inside the project_dir.

  • target_dir (Optional[str]) – The target directory where DBT will place compiled artifacts. Defaults to “target” underneath the project_dir.

  • select (str) – A DBT selection string for the models in a project that you want to include. Defaults to “*”.

  • runtime_metadata_fn – (Optional[Callable[[SolidExecutionContext, Mapping[str, Any]], Mapping[str, Any]]]): A function that will be run after any of the assets are materialized and returns metadata entries for the asset, to be displayed in the asset catalog for that run.

  • io_manager_key (Optional[str]) – The IO manager key that will be set on each of the returned assets. When other ops are downstream of the loaded assets, the IOManager specified here determines how the inputs to those ops are loaded. Defaults to “io_manager”.

  • node_info_to_asset_key – (Mapping[str, Any] -> AssetKey): A function that takes a dictionary of dbt node info and returns the AssetKey that you want to represent that node. By default, the asset key will simply be the name of the dbt model.

  • use_build_command – (bool): Flag indicating if you want to use dbt build as the core computation for this asset, rather than dbt run.

dagster_dbt.load_assets_from_dbt_manifest(manifest_json, runtime_metadata_fn=None, io_manager_key=None, selected_unique_ids=None, select=None, node_info_to_asset_key=<function _get_node_asset_key>, use_build_command=False)[source]

Loads a set of dbt models, described in a manifest.json, into Dagster assets.

Creates one Dagster asset for each dbt model. All assets will be re-materialized using a single dbt run command.

Parameters
  • manifest_json (Optional[Mapping[str, Any]]) – The contents of a DBT manifest.json, which contains a set of models to load into assets.

  • runtime_metadata_fn – (Optional[Callable[[SolidExecutionContext, Mapping[str, Any]], Mapping[str, Any]]]): A function that will be run after any of the assets are materialized and returns metadata entries for the asset, to be displayed in the asset catalog for that run.

  • io_manager_key (Optional[str]) – The IO manager key that will be set on each of the returned assets. When other ops are downstream of the loaded assets, the IOManager specified here determines how the inputs to those ops are loaded. Defaults to “io_manager”.

  • selected_unique_ids (Optional[Set[str]]) – The set of dbt unique_ids that you want to load as assets.

  • node_info_to_asset_key – (Mapping[str, Any] -> AssetKey): A function that takes a dictionary of dbt node info and returns the AssetKey that you want to represent that node. By default, the asset key will simply be the name of the dbt model.

  • use_build_command – (bool): Flag indicating if you want to use dbt build as the core computation for this asset, rather than dbt run.

Types

class dagster_dbt.DbtOutput(result)[source]

Base class for both DbtCliOutput and DbtRPCOutput. Contains a single field, result, which represents the dbt-formatted result of the command that was run (if any).

Used internally, should not be instantiated directly by the user.

class dagster_dbt.DbtResource(logger=None)[source]

Base class for a resource allowing users to interface with dbt

abstract build(select=None, **kwargs)[source]

Run the build command on a dbt project. kwargs are passed in as additional parameters.

Parameters

select (List[str], optional) – the models/resources to include in the run.

Returns

object containing parsed output from dbt

Return type

DbtOutput

abstract compile(models=None, exclude=None, **kwargs)[source]

Run the compile command on a dbt project. kwargs are passed in as additional parameters.

Parameters
  • models (List[str], optional) – the models to include in compilation.

  • exclude (List[str]), optional) – the models to exclude from compilation.

Returns

object containing parsed output from dbt

Return type

DbtOutput

abstract generate_docs(compile_project=False, **kwargs)[source]

Run the docs generate command on a dbt project. kwargs are passed in as additional parameters.

Parameters

compile_project (bool, optional) – If true, compile the project before generating a catalog.

Returns

object containing parsed output from dbt

Return type

DbtOutput

abstract get_manifest_json(**kwargs)[source]

Get a parsed version of the manifest.json file for the relevant dbt project.

Returns

dictionary containing the parsed contents of the manifest json file

for this dbt project.

Return type

Dict[str, Any]

abstract get_run_results_json(**kwargs)[source]

Get a parsed version of the run_results.json file for the relevant dbt project.

Returns

dictionary containing the parsed contents of the run_results json file

for this dbt project.

Return type

Dict[str, Any]

property logger

A property for injecting a logger dependency.

Type

logging.Logger

abstract ls(select=None, models=None, exclude=None, **kwargs)[source]

Run the ls command on a dbt project. kwargs are passed in as additional parameters.

Parameters
  • select (List[str], optional) – the resources to include in the output.

  • models (List[str], optional) – the models to include in the output.

  • exclude (List[str], optional) – the resources to exclude from the output.

Returns

object containing parsed output from dbt

Return type

DbtOutput

abstract run(models=None, exclude=None, **kwargs)[source]

Run the run command on a dbt project. kwargs are passed in as additional parameters.

Parameters
  • models (List[str], optional) – the models to include in the run.

  • exclude (List[str]), optional) – the models to exclude from the run.

Returns

object containing parsed output from dbt

Return type

DbtOutput

abstract run_operation(macro, args=None, **kwargs)[source]

Run the run-operation command on a dbt project. kwargs are passed in as additional parameters.

Parameters
  • macro (str) – the dbt macro to invoke.

  • args (Dict[str, Any], optional) – the keyword arguments to be supplied to the macro.

Returns

object containing parsed output from dbt

Return type

DbtOutput

abstract seed(show=False, select=None, exclude=None, **kwargs)[source]

Run the seed command on a dbt project. kwargs are passed in as additional parameters.

Parameters
  • show (bool, optional) – If True, then show a sample of the seeded data in the response. Defaults to False.

  • select (List[str], optional) – the snapshots to include in the run.

  • exclude (List[str], optional) – the snapshots to exclude from the run.

Returns

object containing parsed output from dbt

Return type

DbtOutput

abstract snapshot(select=None, exclude=None, **kwargs)[source]

Run the snapshot command on a dbt project. kwargs are passed in as additional parameters.

Parameters
  • select (List[str], optional) – the snapshots to include in the run.

  • exclude (List[str], optional) – the snapshots to exclude from the run.

Returns

object containing parsed output from dbt

Return type

DbtOutput

abstract test(models=None, exclude=None, data=True, schema=True, **kwargs)[source]

Run the test command on a dbt project. kwargs are passed in as additional parameters.

Parameters
  • models (List[str], optional) – the models to include in testing.

  • exclude (List[str], optional) – the models to exclude from testing.

  • data (bool, optional) – If True (default), then run data tests.

  • schema (bool, optional) – If True (default), then run schema tests.

Returns

object containing parsed output from dbt

Return type

DbtOutput

Errors

exception dagster_dbt.DagsterDbtError(description=None, metadata_entries=None, metadata=None)[source]

The base exception of the dagster-dbt library.

exception dagster_dbt.DagsterDbtCliRuntimeError(description, logs, raw_output)[source]

Represents an error while executing a dbt CLI command.

exception dagster_dbt.DagsterDbtCliFatalRuntimeError(logs, raw_output)[source]

Represents a fatal error in the dbt CLI (return code 2).

exception dagster_dbt.DagsterDbtCliHandledRuntimeError(logs, raw_output)[source]

Represents a model error reported by the dbt CLI at runtime (return code 1).

exception dagster_dbt.DagsterDbtCliOutputsNotFoundError(path)[source]

Represents a problem in finding the target/run_results.json artifact when executing a dbt CLI command.

For more details on target/run_results.json, see https://docs.getdbt.com/reference/dbt-artifacts#run_resultsjson.

exception dagster_dbt.DagsterDbtCliUnexpectedOutputError(invalid_line_nos)[source]

Represents an error when parsing the output of a dbt CLI command.

exception dagster_dbt.DagsterDbtRpcUnexpectedPollOutputError(description=None, metadata_entries=None, metadata=None)[source]

Represents an unexpected response when polling the dbt RPC server.

Utils

dagster_dbt.utils.generate_materializations(dbt_output, asset_key_prefix=None)[source]

This function yields dagster.AssetMaterialization events for each model updated by a dbt command.

Information parsed from a DbtOutput object.

Note that this will not work with output from the dbt_rpc_resource, because this resource does not wait for a response from the RPC server before returning. Instead, use the dbt_rpc_sync_resource, which will wait for execution to complete.

Examples:

from dagster import op, Output
from dagster_dbt.utils import generate_materializations
from dagster_dbt import dbt_cli_resource, dbt_rpc_sync_resource

@op(required_resource_keys={"dbt"})
def my_custom_dbt_run(context):
    dbt_output = context.resources.dbt.run()
    for materialization in generate_materializations(dbt_output):
        # you can modify the materialization object to add extra metadata, if desired
        yield materialization
    yield Output(my_dbt_output)

@job(resource_defs={{"dbt":dbt_cli_resource}})
def my_dbt_cli_job():
    my_custom_dbt_run()

@job(resource_defs={{"dbt":dbt_rpc_sync_resource}})
def my_dbt_rpc_job():
    my_custom_dbt_run()

Solids [Legacy]

dagster_dbt provides a set of solids that may be used in legacy pipelines.

CLI Solids

dagster_dbt.dbt_cli_compile = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
project-dir (dagster.StringSource, optional)

Which directory to look in for the dbt_project.yml file. Default is the current working directory and its parents.

Default Value: ‘.’

profiles-dir (dagster.StringSource, optional)

Which directory to look in for the profiles.yml file. Default = $DBT_PROFILES_DIR or $HOME/.dbt

profile (dagster.StringSource, optional)

Which profile to load. Overrides setting in dbt_project.yml.

target (dagster.StringSource, optional)

Which target to load for the given profile.

vars (permissive dict, optional)

Supply variables to the project. This argument overrides variables defined in your dbt_project.yml file. This argument should be a dictionary, eg. {‘my_variable’: ‘my_value’}

bypass-cache (Bool, optional)

If set, bypass the adapter-level cache of database state

Default Value: False

warn-error (Bool, optional)

If dbt would normally warn, instead raise an exception. Examples include –models that selects nothing, deprecations, configurations with no associated models, invalid test configurations, and missing sources/refs in tests.

Default Value: False

dbt_executable (dagster.StringSource, optional)

Path to the dbt executable. Default is dbt

Default Value: ‘dbt’

ignore_handled_error (Bool, optional)

When True, will not raise an exception when the dbt CLI returns error code 1. Default is False.

Default Value: False

target-path (dagster.StringSource, optional)

The directory path for target if different from the default target-path in your dbt project configuration file.

Default Value: ‘target’

docs_url (dagster.StringSource, optional)

The url for where dbt docs are being served for this project.

parse-only (Bool, optional)

Default Value: False

threads (Union[Int, None], optional)

Specify number of threads to use while executing models. Overrides settings in profiles.yml.

Default Value: None

no-version-check (Bool, optional)

Skip the check that dbt’s version matches the one specified in the dbt_project.yml file (‘require-dbt-version’)

Default Value: False

models (Union[List[String], None], optional)

The dbt models to run.

Default Value: None

exclude (Union[List[String], None], optional)

The dbt models to exclude.

Default Value: None

selector (Union[List[String], None], optional)

The selector name to use, as defined in your selectors.yml

Default Value: None

state (Union[List[String], None], optional)

If set, use the given directory as the source for json files to compare with this project.

Default Value: None

full-refresh (Bool, optional)

If specified, DBT will drop incremental models and fully-recalculate the incremental table from the model definition. (–full-refresh)

Default Value: False

This solid executes dbt compile via the dbt CLI.

dagster_dbt.dbt_cli_run = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
project-dir (dagster.StringSource, optional)

Which directory to look in for the dbt_project.yml file. Default is the current working directory and its parents.

Default Value: ‘.’

profiles-dir (dagster.StringSource, optional)

Which directory to look in for the profiles.yml file. Default = $DBT_PROFILES_DIR or $HOME/.dbt

profile (dagster.StringSource, optional)

Which profile to load. Overrides setting in dbt_project.yml.

target (dagster.StringSource, optional)

Which target to load for the given profile.

vars (permissive dict, optional)

Supply variables to the project. This argument overrides variables defined in your dbt_project.yml file. This argument should be a dictionary, eg. {‘my_variable’: ‘my_value’}

bypass-cache (Bool, optional)

If set, bypass the adapter-level cache of database state

Default Value: False

warn-error (Bool, optional)

If dbt would normally warn, instead raise an exception. Examples include –models that selects nothing, deprecations, configurations with no associated models, invalid test configurations, and missing sources/refs in tests.

Default Value: False

dbt_executable (dagster.StringSource, optional)

Path to the dbt executable. Default is dbt

Default Value: ‘dbt’

ignore_handled_error (Bool, optional)

When True, will not raise an exception when the dbt CLI returns error code 1. Default is False.

Default Value: False

target-path (dagster.StringSource, optional)

The directory path for target if different from the default target-path in your dbt project configuration file.

Default Value: ‘target’

docs_url (dagster.StringSource, optional)

The url for where dbt docs are being served for this project.

threads (Union[Int, None], optional)

Specify number of threads to use while executing models. Overrides settings in profiles.yml.

Default Value: None

models (Union[List[String], None], optional)

The dbt models to run.

Default Value: None

exclude (Union[List[String], None], optional)

The dbt models to exclude.

Default Value: None

full-refresh (Bool, optional)

If specified, DBT will drop incremental models and fully-recalculate the incremental table from the model definition. (–full-refresh)

Default Value: False

fail-fast (Bool, optional)

Stop execution upon a first failure. (–fail-fast)

Default Value: False

yield_materializations (Bool, optional)

If True, materializations corresponding to the results of the dbt operation will be yielded when the solid executes. Default: True

Default Value: True

asset_key_prefix (List[String], optional)

If provided and yield_materializations is True, these components will be used to prefix the generated asset keys.

Default Value: []

This solid executes dbt run via the dbt CLI. See the solid definition for available parameters.

dagster_dbt.dbt_cli_run_operation = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
project-dir (dagster.StringSource, optional)

Which directory to look in for the dbt_project.yml file. Default is the current working directory and its parents.

Default Value: ‘.’

profiles-dir (dagster.StringSource, optional)

Which directory to look in for the profiles.yml file. Default = $DBT_PROFILES_DIR or $HOME/.dbt

profile (dagster.StringSource, optional)

Which profile to load. Overrides setting in dbt_project.yml.

target (dagster.StringSource, optional)

Which target to load for the given profile.

vars (permissive dict, optional)

Supply variables to the project. This argument overrides variables defined in your dbt_project.yml file. This argument should be a dictionary, eg. {‘my_variable’: ‘my_value’}

bypass-cache (Bool, optional)

If set, bypass the adapter-level cache of database state

Default Value: False

warn-error (Bool, optional)

If dbt would normally warn, instead raise an exception. Examples include –models that selects nothing, deprecations, configurations with no associated models, invalid test configurations, and missing sources/refs in tests.

Default Value: False

dbt_executable (dagster.StringSource, optional)

Path to the dbt executable. Default is dbt

Default Value: ‘dbt’

ignore_handled_error (Bool, optional)

When True, will not raise an exception when the dbt CLI returns error code 1. Default is False.

Default Value: False

target-path (dagster.StringSource, optional)

The directory path for target if different from the default target-path in your dbt project configuration file.

Default Value: ‘target’

docs_url (dagster.StringSource, optional)

The url for where dbt docs are being served for this project.

macro (dagster.StringSource)

Specify the macro to invoke. dbt will call this macro with the supplied arguments and then exit.

args (permissive dict, optional)

Supply arguments to the macro. This dictionary will be mapped to the keyword arguments defined in the selected macro. This argument should be a dictionary, eg. {‘my_variable’: ‘my_value’}

This solid executes dbt run-operation via the dbt CLI.

dagster_dbt.dbt_cli_snapshot = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
project-dir (dagster.StringSource, optional)

Which directory to look in for the dbt_project.yml file. Default is the current working directory and its parents.

Default Value: ‘.’

profiles-dir (dagster.StringSource, optional)

Which directory to look in for the profiles.yml file. Default = $DBT_PROFILES_DIR or $HOME/.dbt

profile (dagster.StringSource, optional)

Which profile to load. Overrides setting in dbt_project.yml.

target (dagster.StringSource, optional)

Which target to load for the given profile.

vars (permissive dict, optional)

Supply variables to the project. This argument overrides variables defined in your dbt_project.yml file. This argument should be a dictionary, eg. {‘my_variable’: ‘my_value’}

bypass-cache (Bool, optional)

If set, bypass the adapter-level cache of database state

Default Value: False

warn-error (Bool, optional)

If dbt would normally warn, instead raise an exception. Examples include –models that selects nothing, deprecations, configurations with no associated models, invalid test configurations, and missing sources/refs in tests.

Default Value: False

dbt_executable (dagster.StringSource, optional)

Path to the dbt executable. Default is dbt

Default Value: ‘dbt’

ignore_handled_error (Bool, optional)

When True, will not raise an exception when the dbt CLI returns error code 1. Default is False.

Default Value: False

target-path (dagster.StringSource, optional)

The directory path for target if different from the default target-path in your dbt project configuration file.

Default Value: ‘target’

docs_url (dagster.StringSource, optional)

The url for where dbt docs are being served for this project.

threads (Union[Int, None], optional)

Specify number of threads to use while executing models. Overrides settings in profiles.yml.

Default Value: None

select (Union[List[String], None], optional)

The dbt models to include.

Default Value: None

exclude (Union[List[String], None], optional)

The dbt models to exclude.

Default Value: None

This solid executes dbt snapshot via the dbt CLI.

dagster_dbt.dbt_cli_snapshot_freshness = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
project-dir (dagster.StringSource, optional)

Which directory to look in for the dbt_project.yml file. Default is the current working directory and its parents.

Default Value: ‘.’

profiles-dir (dagster.StringSource, optional)

Which directory to look in for the profiles.yml file. Default = $DBT_PROFILES_DIR or $HOME/.dbt

profile (dagster.StringSource, optional)

Which profile to load. Overrides setting in dbt_project.yml.

target (dagster.StringSource, optional)

Which target to load for the given profile.

vars (permissive dict, optional)

Supply variables to the project. This argument overrides variables defined in your dbt_project.yml file. This argument should be a dictionary, eg. {‘my_variable’: ‘my_value’}

bypass-cache (Bool, optional)

If set, bypass the adapter-level cache of database state

Default Value: False

warn-error (Bool, optional)

If dbt would normally warn, instead raise an exception. Examples include –models that selects nothing, deprecations, configurations with no associated models, invalid test configurations, and missing sources/refs in tests.

Default Value: False

dbt_executable (dagster.StringSource, optional)

Path to the dbt executable. Default is dbt

Default Value: ‘dbt’

ignore_handled_error (Bool, optional)

When True, will not raise an exception when the dbt CLI returns error code 1. Default is False.

Default Value: False

target-path (dagster.StringSource, optional)

The directory path for target if different from the default target-path in your dbt project configuration file.

Default Value: ‘target’

docs_url (dagster.StringSource, optional)

The url for where dbt docs are being served for this project.

select (Union[List[String], None], optional)

Specify the sources to snapshot freshness.

Default Value: None

output (dagster.StringSource, optional)

Specify the output path for the json report. By default, outputs to target/sources.json

threads (Union[Int, None], optional)

Specify number of threads to use while executing models. Overrides settings in profiles.yml.

Default Value: None

This solid executes dbt source snapshot-freshness via the dbt CLI.

dagster_dbt.dbt_cli_test = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
project-dir (dagster.StringSource, optional)

Which directory to look in for the dbt_project.yml file. Default is the current working directory and its parents.

Default Value: ‘.’

profiles-dir (dagster.StringSource, optional)

Which directory to look in for the profiles.yml file. Default = $DBT_PROFILES_DIR or $HOME/.dbt

profile (dagster.StringSource, optional)

Which profile to load. Overrides setting in dbt_project.yml.

target (dagster.StringSource, optional)

Which target to load for the given profile.

vars (permissive dict, optional)

Supply variables to the project. This argument overrides variables defined in your dbt_project.yml file. This argument should be a dictionary, eg. {‘my_variable’: ‘my_value’}

bypass-cache (Bool, optional)

If set, bypass the adapter-level cache of database state

Default Value: False

warn-error (Bool, optional)

If dbt would normally warn, instead raise an exception. Examples include –models that selects nothing, deprecations, configurations with no associated models, invalid test configurations, and missing sources/refs in tests.

Default Value: False

dbt_executable (dagster.StringSource, optional)

Path to the dbt executable. Default is dbt

Default Value: ‘dbt’

ignore_handled_error (Bool, optional)

When True, will not raise an exception when the dbt CLI returns error code 1. Default is False.

Default Value: False

target-path (dagster.StringSource, optional)

The directory path for target if different from the default target-path in your dbt project configuration file.

Default Value: ‘target’

docs_url (dagster.StringSource, optional)

The url for where dbt docs are being served for this project.

data (Bool, optional)

Run data tests defined in “tests” directory.

Default Value: False

schema (Bool, optional)

Run constraint validations from schema.yml files.

Default Value: False

fail-fast (Bool, optional)

Stop execution upon a first test failure.

Default Value: False

threads (Union[Int, None], optional)

Specify number of threads to use while executing models. Overrides settings in profiles.yml.

Default Value: None

models (Union[List[String], None], optional)

The dbt models to run.

Default Value: None

exclude (Union[List[String], None], optional)

The dbt models to exclude.

Default Value: None

This solid executes dbt test via the dbt CLI. See the solid definition for available parameters.

RPC Solids

dagster_dbt.create_dbt_rpc_run_sql_solid(name, output_def=None, **kwargs)[source]

This function is a factory which constructs a solid that will copy the results of a SQL query run within the context of a dbt project to a pandas DataFrame.

Any kwargs passed to this function will be passed along to the underlying @solid decorator. However, note that overriding config_schema, input_defs, and required_resource_keys is not allowed and will throw a DagsterInvalidDefinitionError.

If you would like to configure this solid with different config fields, you could consider using @composite_solid to wrap this solid.

Parameters
  • name (str) – The name of this solid.

  • output_def (OutputDefinition, optional) – The OutputDefinition for the solid. This value should always be a representation of a pandas DataFrame. If not specified, the solid will default to an OutputDefinition named “df” with a DataFrame dagster type.

Returns

Returns the constructed solid definition.

Return type

SolidDefinition

dagster_dbt.dbt_rpc_compile_sql = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
name (String)

interval (Int, optional)

The interval (in seconds) at which to poll the dbt rpc process.

Default Value: 10

logs (Bool, optional)

Whether or not to return logs from the process.

Default Value: True

yield_materializations (Bool, optional)

If True, materializations corresponding to the results of the dbt operation will be yielded when the solid executes. Default: True

Default Value: True

This solid sends the dbt compile command to a dbt RPC server and returns the request token.

This dbt RPC solid is asynchronous. The request token can be used in subsequent RPC requests to poll the progress of the running dbt process.

dagster_dbt.dbt_rpc_run = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
models (Union[List[String], None], optional)

The dbt models to run.

Default Value: None

exclude (Union[List[String], None], optional)

The dbt models to exclude.

Default Value: None

This solid sends the dbt run command to a dbt RPC server and returns the request token.

This dbt RPC solid is asynchronous. The request token can be used in subsequent RPC requests to poll the progress of the running dbt process.

dagster_dbt.dbt_rpc_run_and_wait = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
models (Union[List[String], None], optional)

The dbt models to run.

Default Value: None

exclude (Union[List[String], None], optional)

The dbt models to exclude.

Default Value: None

full_refresh (Bool, optional)

Whether or not to perform a –full-refresh.

Default Value: False

fail_fast (Bool, optional)

Whether or not to –fail-fast.

Default Value: False

warn_error (Bool, optional)

Whether or not to –warn-error.

Default Value: False

interval (Int, optional)

The interval (in seconds) at which to poll the dbt rpc process.

Default Value: 10

logs (Bool, optional)

Whether or not to return logs from the process.

Default Value: True

task_tags (permissive dict, optional)
Default Value:
{}
max_retries (Int, optional)

Default Value: 5

retry_interval (Int, optional)

Default Value: 120

yield_materializations (Bool, optional)

If True, materializations corresponding to the results of the dbt operation will be yielded when the solid executes. Default: True

Default Value: True

This solid sends the dbt run command to a dbt RPC server and returns the result of the executed dbt process.

This dbt RPC solid is synchronous, and will periodically poll the dbt RPC server until the dbt process is completed.

dagster_dbt.dbt_rpc_run_operation = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
macro (String)

The dbt macro to invoke as a run operation

args (Union[permissive dict, None], optional)

Arguments to supply to the invoked macro.

Default Value: None

This solid sends the dbt run-operation command to a dbt RPC server and returns the request token.

This dbt RPC solid is asynchronous. The request token can be used in subsequent RPC requests to poll the progress of the running dbt process.

dagster_dbt.dbt_rpc_run_operation_and_wait = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
macro (String)

The dbt macro to invoke as a run operation

args (Union[permissive dict, None], optional)

Arguments to supply to the invoked macro.

Default Value: None

interval (Int, optional)

The interval (in seconds) at which to poll the dbt rpc process.

Default Value: 10

logs (Bool, optional)

Whether or not to return logs from the process.

Default Value: True

yield_materializations (Bool, optional)

If True, materializations corresponding to the results of the dbt operation will be yielded when the solid executes. Default: True

Default Value: True

This solid sends the dbt run-operation command to a dbt RPC server and returns the result of the executed dbt process.

This dbt RPC solid is synchronous, and will periodically poll the dbt RPC server until the dbt process is completed.

dagster_dbt.dbt_rpc_snapshot = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
select (Union[List[String], None], optional)

The dbt snapshot files to snapshot.

Default Value: None

exclude (Union[List[String], None], optional)

The dbt snapshot files to exclude from the snapshot.

Default Value: None

This solid sends the dbt snapshot command to a dbt RPC server and returns the request token.

This dbt RPC solid is asynchronous. The request token can be used in subsequent RPC requests to poll the progress of the running dbt process.

dagster_dbt.dbt_rpc_snapshot_and_wait = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
select (Union[List[String], None], optional)

The dbt snapshot files to snapshot.

Default Value: None

exclude (Union[List[String], None], optional)

The dbt snapshot files to exclude from the snapshot.

Default Value: None

interval (Int, optional)

The interval (in seconds) at which to poll the dbt rpc process.

Default Value: 10

logs (Bool, optional)

Whether or not to return logs from the process.

Default Value: True

task_tags (permissive dict, optional)
Default Value:
{}
max_retries (Int, optional)

Default Value: 5

retry_interval (Int, optional)

Default Value: 120

yield_materializations (Bool, optional)

If True, materializations corresponding to the results of the dbt operation will be yielded when the solid executes. Default: True

Default Value: True

This solid sends the dbt snapshot command to a dbt RPC server and returns the result of the executed dbt process.

This dbt RPC solid is synchronous, and will periodically poll the dbt RPC server until the dbt process is completed.

dagster_dbt.dbt_rpc_snapshot_freshness = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
select (Union[List[String], None], optional)

The dbt sources to snapshot-freshness for.

Default Value: None

warn_error (Bool, optional)

Whether or not to –warn-error.

Default Value: False

This solid sends the dbt source snapshot-freshness command to a dbt RPC server and returns the request token.

This dbt RPC solid is asynchronous. The request token can be used in subsequent RPC requests to poll the progress of the running dbt process.

dagster_dbt.dbt_rpc_snapshot_freshness_and_wait = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
select (Union[List[String], None], optional)

The dbt sources to snapshot-freshness for.

Default Value: None

warn_error (Bool, optional)

Whether or not to –warn-error.

Default Value: False

interval (Int, optional)

The interval (in seconds) at which to poll the dbt rpc process.

Default Value: 10

logs (Bool, optional)

Whether or not to return logs from the process.

Default Value: True

yield_materializations (Bool, optional)

If True, materializations corresponding to the results of the dbt operation will be yielded when the solid executes. Default: True

Default Value: True

This solid sends the dbt source snapshot command to a dbt RPC server and returns the result of the executed dbt process.

This dbt RPC solid is synchronous, and will periodically poll the dbt RPC server until the dbt process is completed.

dagster_dbt.dbt_rpc_test = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
models (Union[List[String], None], optional)

The dbt models to test.

Default Value: None

exclude (Union[List[String], None], optional)

The dbt models to exclude.

Default Value: None

data (Bool, optional)

Whether or not to run custom data tests.

Default Value: True

schema (Bool, optional)

Whether or not to run schema tests.

Default Value: True

This solid sends the dbt test command to a dbt RPC server and returns the request token.

This dbt RPC solid is asynchronous. The request token can be used in subsequent RPC requests to poll the progress of the running dbt process.

dagster_dbt.dbt_rpc_test_and_wait = <dagster.core.definitions.solid_definition.SolidDefinition object>[source]

Config Schema:
models (Union[List[String], None], optional)

The dbt models to test.

Default Value: None

exclude (Union[List[String], None], optional)

The dbt models to exclude.

Default Value: None

data (Bool, optional)

Whether or not to run custom data tests.

Default Value: True

schema (Bool, optional)

Whether or not to run schema tests.

Default Value: True

interval (Int, optional)

The interval (in seconds) at which to poll the dbt rpc process.

Default Value: 10

logs (Bool, optional)

Whether or not to return logs from the process.

Default Value: True

yield_materializations (Bool, optional)

If True, materializations corresponding to the results of the dbt operation will be yielded when the solid executes. Default: True

Default Value: True

This solid sends the dbt test command to a dbt RPC server and returns the result of the executed dbt process.

This dbt RPC solid is synchronous, and will periodically poll the dbt RPC server until the dbt process is completed.