This guide explains how you can run a dbt project as part of a Dagster job. This guide focuses on dbt Core, the open-source dbt offering. Dagster also offers an integration with the hosted version of dbt, dbt Cloud. Check out the API docs to learn more.
dbt (data build tool) helps engineers transform data in their warehouses by simply writing SELECT
statements. dbt automatically builds a dependency graph for your transformations and turns these SELECT
statements into tables and views in your data warehouse.
dbt not only runs your data transformations, but also can create data quality tests and generate documentation for your data, right out of the box. To learn more about dbt, visit the official dbt documentation website.
Dagster orchestrates dbt alongside other technologies, so you can combine dbt with Spark, Python, etc. in a single job. Dagster also provides built-in operational and data observability capabilities, like storing dbt run results longitudinally and sending alerts when a dbt run fails.
dagster-dbt
is an integration library that provides pre-built resources for using dbt together with Dagster. These are all designed to be configurable for any dbt project.
The resources that dagster_dbt provides are
dbt_cli_resource
(DbtCliResource
): for running dbt CLI commandsdbt_rpc_resource
(DbtRpcResource
): for sending dbt commands to an RPC serverdbt_rpc_sync_resource
(DbtRpcSyncResource
): for sending dbt commands to an RPC server and polling until they completeEach resource derives from the same base class, DbtResource
, which provides a consistent python interface for interacting with dbt.
This library also provides pre-built ops that take advantage of these resources. These exist solely for convenience (they just run the underlying resource command and return the result), and if you have more complex use cases that require some degree of customization, it's recommended that you build your own ops that use these resources. These ops are compatible with all of the aforementioned dbt resources:
dbt_run_op
(dbt_run_op
)dbt_compile_op
(dbt_compile_op
)dbt_ls_op
(dbt_ls_op
)dbt_test_op
(dbt_test_op
)dbt_snapshot_op
(dbt_snapshot_op
)dbt_seed_op
(dbt_seed_op
)dbt_docs_generate_op
(dbt_docs_generate_op
)To view example jobs that take advantage of this dbt integration, check out either the Hacker News example repo or the dbt example repo.
dagster-dbt
provides a dbt_cli_resource
to help make it easy to run commands through the dbt CLI.
When you supply this resource to an op, you can call any of the many provided methods to invoke that particular CLI command. You can check out a full list of functions (as well as their signatures) in the DbtCliResource
API Docs. All methods on the resource will return a DbtCliOutput
object.
To run dbt CLI commands, your dbt project directory must be on your local filesystem and you must have a dbt profile already set up to connect to your data warehouse. Visit the official dbt CLI documentation for more details.
When you are configuring the dbt_cli_resource
, you have a number of options available to you. Here, you're able to specify any command line options that you'd want to pass into all of your dbt commands.
Typically, you'll want to configure your project_dir
here, as in most cases, you will only be working with a single dbt project in a given job, and wouldn't want to have to pass in this option to every function call. You might want to configure your profiles_dir
, or the specific profile
you'll be using for similar reasons.
from dagster_dbt import dbt_cli_resource
my_dbt_resource = dbt_cli_resource.configured(
{
"project_dir": "path/to/dbt/project",
"profiles_dir": "path/to/dbt/profiles",
}
)
While the config schema doesn't have an option for every single dbt flag (as some flags only work with certain commands), if you configure a flag that is not in the schema, it will still get passed into every cli invocation, exactly the same as the pre-defined config options.
There are also a few options that are not associated with command line flags, which may be useful. These are:
warn_error
: will raise an error for issues that dbt would normally just warn ontarget_path
: the dbt target path, if you set it to something other than the defaultdbt_executable
: the name of the specific dbt executable you're using, if it's not just dbt
Below are some examples of using dagster_dbt
library to run a dbt
project via the CLI.
For these examples, we make heavy use of the dbt_cli_resource
. This resource has methods corresponding to most dbt commands (such as run
, seed
, test
, ls
, etc.). The methods behave essentially identically to each other, so these examples will focus on dbt run
for simplicity.
The dagster_dbt
library provides ops that simplify cases where you simply wish to run a single one of these commands (with no special arguments) and return the result. This saves you from having to write out the boilerplate associated with creating an op from scratch. However, it's fairly common to have more complex needs, and in those cases you should create your own custom op that invokes the resource directly. The below examples will mix and match these two approaches where appropriate.
Note that you can pass in any keyword to these functions that you wish, and they will get added as flags to the underlying dbt command (e.g. my_flag_name = 'foo'
will get converted to --my-flag-name foo
). If there is a dbt option that you would like to set, but is not reflected in the function signature, this is how you would do so.
One common way to use this integration is to have the a step in your job run all of the models in a dbt project. For this case, the easiest method is to configure the resource so it knows where your dbt project is, and import the dbt_run_op
to use in your job.
from dagster_dbt import dbt_cli_resource, dbt_run_op
from dagster import job
my_dbt_resource = dbt_cli_resource.configured(
{"project_dir": "path/to/dbt/project"}
)
@job(resource_defs={"dbt": my_dbt_resource})
def my_dbt_job():
dbt_run_op()
Sometimes, you just want to run a select set of models in your dbt project, rather than the entire thing. The below examples show two ways of doing this, depending on your use case.
Note that in both cases, the models
option takes in a list of strings. The string "tag:staging"
uses dbt's node selection syntax to filter models with the tag "staging"
. For more details, visit the official dbt documentation on the node selection syntax.
If you know what models you want to select ahead of time, you might prefer specifying this while configuring your resource. Because you aren't specifying any specific arguments at runtime, you can use the prebuilt dbt_run_op
, instead of writing your own.
from dagster_dbt import dbt_cli_resource, dbt_run_op
from dagster import job
my_dbt_resource = dbt_cli_resource.configured(
{"project_dir": "path/to/dbt/project", "models": ["tag:staging"]}
)
@job(resource_defs={"dbt": my_dbt_resource})
def my_dbt_job():
dbt_run_op()
If you want to change which models you select depending on what happens during execution, you should create your own op that invokes the dbt resource (rather than using a prebuilt op). This allows you to explicitly set the models
parameter at runtime. This would look similar to the following:
from dagster import op
@op(required_resource_keys={"dbt"})
def run_models(context, some_condition: bool):
if some_condition:
context.resources.dbt.run(models=["tag:staging"])
else:
context.resources.dbt.run(models=["tag:other"])
Dagster supports creating multiple jobs from the same graph. dbt has a similar concept, profiles. You might want to run a dev version of your graph that targets the development-specific dbt profile, and then have a prod version that runs using the prod dbt profile. This example shows how to accomplish this.
from dagster_dbt import dbt_cli_resource, dbt_run_op
from dagster import graph
@graph
def my_dbt():
dbt_run_op()
my_dbt_graph_dev = my_dbt.to_job(
resource_defs={
"dbt": dbt_cli_resource.configured(
{"project_dir": "path/to/dbt/project", "profile": "dev"}
)
}
)
my_dbt_graph_prod = my_dbt.to_job(
resource_defs={
"dbt": dbt_cli_resource.configured(
{"project_dir": "path/to/dbt/project", "profile": "prod"}
)
}
)
Sometimes, you'll want to run multiple different dbt commands in the same job. The dbt_cli_resource
makes this convenient, as you only need to configure your dbt resource once, and all of that configuration will already be set for any ops that are using this resource.
One common use case would be to first run dbt run
to update all of your models, and then run dbt test
to check that they all are working as expected, seen below.
from dagster_dbt import dbt_cli_resource, dbt_run_op, dbt_test_op
from dagster import job
my_dbt_resource = dbt_cli_resource.configured(
{"project_dir": "path/to/dbt/project"}
)
@job(resource_defs={"dbt": my_dbt_resource})
def my_dbt_job():
dbt_test_op(start_after=dbt_run_op())
This integration provides two separate resources to help run commands against a dbt RPC server. The dbt_rpc_resource
will fire off requests against the server and return immediately, while the dbt_rpc_sync_resource
will poll the server until each request completes.
The prebuilt ops provided by dagster-dbt
(such as dbt_run_op
, dbt_test_op
, etc.) are all compatible with both of these resources.
Your dbt RPC server can be running locally or remotely. To interact with the RPC server in your Dagster job, you will need to create a resource for your dbt RPC server.
dagster_dbt.dbt_rpc_resource
can be configured with your specific host and port.
from dagster_dbt import dbt_rpc_resource
my_remote_rpc = dbt_rpc_resource.configured({"host": "80.80.80.80", "port": 8080})
For convenience during local development, you may also use dagster_dbt.local_dbt_rpc_resource
, which is preconfigured for a dbt RPC server that is running on http://localhost:8580
.
All of the prebuilt dbt ops in this library are compatible with the dbt_rpc_resource
(which sends requests and then exists immediately) and the dbt_rpc_sync_resource
(which sends requests and waits for the task to complete). Therefore, we can use the same ops as in the CLI examples as long as we provide the correct resource to the job.
from dagster_dbt import dbt_run_op
from dagster import job
@job(resource_defs={"dbt": my_remote_rpc})
def my_dbt_job():
dbt_run_op()
This is similar to having "params": {"models": "tag:staging"}
in your dbt RPC request body.
from dagster_dbt import dbt_rpc_resource
from dagster import job, op
my_remote_rpc = dbt_rpc_resource.configured({"host": "80.80.80.80", "port": 8080})
@op(required_resource_keys={"dbt"})
def run_staging_models(context):
context.resources.dbt.run(models=["tag:staging"])
@job(resource_defs={"dbt": my_remote_rpc})
def my_dbt_job():
run_staging_models()
Note that the job above will NOT wait until the dbt RPC server has finished executing your request. Instead, it will return immediately with a request token from the dbt RPC server. If you want the op to wait until execution is finished, instead supply a dbt_rpc_sync_resource
(seen below).
from dagster_dbt import dbt_rpc_sync_resource
from dagster import job, op
my_remote_sync_rpc = dbt_rpc_sync_resource.configured(
{"host": "80.80.80.80", "port": 8080}
)
@op(required_resource_keys={"dbt_sync"})
def run_staging_models_and_wait(context):
context.resources.dbt.run(models=["tag:staging"])
@job(resource_defs={"dbt_sync": my_remote_sync_rpc})
def my_dbt_job():
run_staging_models_and_wait()
For full documentation on all available config, visit the API docs for dagster-dbt.
Generating Asset Materializations when you run your dbt project
dagster_dbt
is configured to automatically create Asset Materializations for each of your dbt models when you run the run
command (via either the CLI or over RPC). These materializations are populated with metadata that is automatically parsed from the dbt response. The available metadata differs between dbt versions. If there's anything you would like to add to these materializations, you can explicitly invoke the underlying generate_materializations
utility function, and modify each of the resulting materializations like so:
from dagster_dbt.utils import generate_materializations
from dagster import op
@op(required_resource_keys={"dbt"})
def dbt_run_with_custom_assets(context):
dbt_result = context.resources.dbt.run()
for materialization in generate_materializations(dbt_result):
context.log_event(
materialization._replace(
metadata_entries=[...] # insert whatever metadata you want here
)
)
return dbt_result
dbt CLI: Set the dbt profile and target to load
from dagster_dbt import dbt_cli_resource
from dagster import job
config = {"profile": PROFILE_NAME, "target": TARGET_NAME}
@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
def my_job():
# ...
dbt CLI: Set the path to the dbt executable
from dagster_dbt import dbt_cli_resource
from dagster import job
config = {"dbt_executable": "path/to/dbt/executable"}
@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
def my_job():
# ...
dbt CLI: Select specific models to run
from dagster_dbt import dbt_cli_resource
from dagster import job
config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]}
@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
def my_job():
# ...
For more details, visit the official documentation on dbt's node selection syntax.
dbt CLI: Exclude specific models
from dagster_dbt import dbt_cli_resource
from dagster import job
config = {"exclude": ["my_dbt_model+", "path.to.models", "tag:nightly"]}
@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
def my_job():
# ...
For more details, visit the official documentation on dbt's node selection syntax.
dbt CLI: Set key-values for dbt vars
from dagster_dbt import dbt_cli_resource
from dagster import job
config = {"vars": {"key": "value"}}
@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
def my_job():
# ...
For more details, visit the official documentation on using variables in dbt.
dbt RPC: Configure a remote dbt RPC resource
from dagster_dbt import dbt_rpc_resource
custom_resource = dbt_rpc_resource.configured({"host": HOST, "post": PORT})
dbt RPC: Select specific models to run
No match for startAfter value "start_marker_dbt_rpc_config_select_models"
For more details, visit the official documentation on dbt's node selection syntax.
dbt RPC: Exclude specific models
No match for startAfter value "start_marker_dbt_rpc_config_exclude_models"
For more details, visit the official documentation on dbt's node selection syntax.
dbt RPC: Configure polling interval when using a dbt_rpc_*_and_wait
op
No match for startAfter value "start_marker_dbt_rpc_and_wait_config_polling_interval"
dbt RPC: Disable default asset materializations
No match for endBefore value "end_marker_dbt_rpc_config_disable_assets"
If you find a bug or want to add a feature to the dagster-dbt
library, we invite you to contribute.
If you have questions on using dbt with Dagster, we'd love to hear from you: