Creating testable and verifiable data pipelines is one of the focuses of Dagster. We believe ensuring data quality is critical for managing the complexity of data systems. Here, we'll cover how to write unit tests for individual assets, as well as for graphs of assets together.
Let's go back to the assets we defined in the prior section, and ensure that they work as expected by writing some unit tests.
We'll start by writing a test for the nabisco_cereals
asset definition, which filters the larger list of cereals down to the those that were manufactured by Nabisco. To run the function that derives an asset from its upstream dependencies, we can invoke it directly, as if it's a regular Python function:
def test_nabisco_cereals():
cereals = [
{"name": "cereal1", "mfr": "N"},
{"name": "cereal2", "mfr": "K"},
]
result = nabisco_cereals(cereals)
assert len(result) == 1
assert result == [{"name": "cereal1", "mfr": "N"}]
We'll also write a test for all the assets together. To do that, we need to combine them into an AssetGroup
. Then, we can invoke AssetGroup.materialize_in_process
, which returns an ExecuteInProcessResult
, whose methods let us investigate, in detail, the success or failure of execution, the values produced by the computation, and (as we'll see later) other events associated with execution.
from dagster import AssetGroup
def test_cereal_asset_group():
group = AssetGroup(
[
nabisco_cereals,
cereals,
cereal_protein_fractions,
highest_protein_nabisco_cereal,
]
)
result = group.materialize()
assert result.success
assert result.output_for_node("highest_protein_nabisco_cereal") == "100% Bran"
Now you can use pytest, or your test runner of choice, to run the unit tests.
pytest test_complex_asset_graph.py
Dagster is written to make testing easy in a domain where it has historically been very difficult. You can learn more about Testing in Dagster by reading the Testing page.
π Congratulations! Having reached this far, you now have a working, testable, and maintainable group of software-defined assets.