As of Dagster 0.13.0, we recommend Ops as an alternative to Solids. They can generally be used interchangeably.
@
dagster.
solid
(name=None, description=None, input_defs=None, output_defs=None, config_schema=None, required_resource_keys=None, tags=None, version=None, retry_policy=None)[source]¶Create a solid with the specified parameters from the decorated function.
This shortcut simplifies the core SolidDefinition
API by exploding arguments into
kwargs of the decorated compute function and omitting additional parameters when they are not
needed.
Input and output definitions will be inferred from the type signature of the decorated function if not explicitly provided.
The decorated function will be used as the solid’s compute function. The signature of the
decorated function is more flexible than that of the compute_fn
in the core API; it may:
Return a value. This value will be wrapped in an Output
and yielded by the compute function.
Return an Output
. This output will be yielded by the compute function.
Yield Output
or other event objects. Same as default compute behavior.
Note that options 1) and 2) are incompatible with yielding other events – if you would like
to decorate a function that yields events, it must also wrap its eventual output in an
Output
and yield it.
@solid supports async def
functions as well, including async generators when yielding multiple
events or outputs. Note that async solids will generally be run on their own unless using a custom
Executor
implementation that supports running them together.
name (Optional[str]) – Name of solid. Must be unique within any PipelineDefinition
using the solid.
description (Optional[str]) – Human-readable description of this solid. If not provided, and the decorated function has docstring, that docstring will be used as the description.
input_defs (Optional[List[InputDefinition]]) – Information about the inputs to the solid. Information provided here will be combined with what can be inferred from the function signature, with these explicit InputDefinitions taking precedence.
output_defs (Optional[List[OutputDefinition]]) – Information about the solids outputs. Information provided here will be combined with what can be inferred from the return type signature if there is only one OutputDefinition and the function does not use yield.
config_schema (Optional[ConfigSchema) – The schema for the config. If set, Dagster will check that config provided for the solid matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the solid.
required_resource_keys (Optional[Set[str]]) – Set of resource handles required by this solid.
tags (Optional[Dict[str, Any]]) – Arbitrary metadata for the solid. Frameworks may expect and require certain metadata to be attached to a solid. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.
version (Optional[str]) – (Experimental) The version of the solid’s compute_fn. Two solids should have the same version if and only if they deterministically produce the same outputs when provided the same inputs.
retry_policy (Optional[RetryPolicy]) – The retry policy for this solid.
Examples
@solid
def hello_world():
print('hello')
@solid
def hello_world():
return {'foo': 'bar'}
@solid
def hello_world():
return Output(value={'foo': 'bar'})
@solid
def hello_world():
yield Output(value={'foo': 'bar'})
@solid
def hello_world(foo):
return foo
@solid(
input_defs=[InputDefinition(name="foo", str)],
output_defs=[OutputDefinition(str)]
)
def hello_world(foo):
# explicitly type and name inputs and outputs
return foo
@solid
def hello_world(foo: str) -> str:
# same as above inferred from signature
return foo
@solid
def hello_world(context, foo):
context.log.info('log something')
return foo
@solid(
config_schema={'str_value' : Field(str)}
)
def hello_world(context, foo):
# context.solid_config is a dictionary with 'str_value' key
return foo + context.solid_config['str_value']
dagster.
SolidDefinition
(name, input_defs, compute_fn, output_defs, config_schema=None, description=None, tags=None, required_resource_keys=None, version=None, retry_policy=None)[source]¶The definition of a Solid that performs a user-defined computation.
For more details on what a solid is, refer to the Solid Overview .
End users should prefer the @solid
and @lambda_solid
decorators. SolidDefinition is generally intended to be used by framework authors.
name (str) – Name of the solid. Must be unique within any PipelineDefinition
using the solid.
input_defs (List[InputDefinition]) – Inputs of the solid.
compute_fn (Callable) –
The core of the solid, the function that does the actual
computation. The signature of this function is determined by input_defs
, and
optionally, an injected first argument, context
, a collection of information provided
by the system.
This function will be coerced into a generator or an async generator, which must yield
one Output
for each of the solid’s output_defs
, and additionally may
yield other types of Dagster events, including Materialization
and
ExpectationResult
.
output_defs (List[OutputDefinition]) – Outputs of the solid.
config_schema (Optional[ConfigSchema) – The schema for the config. If set, Dagster will check that config provided for the solid matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the solid.
description (Optional[str]) – Human-readable description of the solid.
tags (Optional[Dict[str, Any]]) – Arbitrary metadata for the solid. Frameworks may expect and require certain metadata to be attached to a solid. Users should generally not set metadata directly. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.
required_resource_keys (Optional[Set[str]]) – Set of resources handles required by this solid.
version (Optional[str]) – (Experimental) The version of the solid’s compute_fn. Two solids should have the same version if and only if they deterministically produce the same outputs when provided the same inputs.
retry_policy (Optional[RetryPolicy]) – The retry policy for this solid.
Examples
def _add_one(_context, inputs):
yield Output(inputs["num"] + 1)
SolidDefinition(
name="add_one",
input_defs=[InputDefinition("num", Int)],
output_defs=[OutputDefinition(Int)], # default name ("result")
compute_fn=_add_one,
)
configured
(config_or_config_fn, name, config_schema=None, description=None)¶Wraps this object in an object of the same type that provides configuration to the inner object.
config_or_config_fn (Union[Any, Callable[[Any], Any]]) – Either (1) Run configuration
that fully satisfies this object’s config schema or (2) A function that accepts run
configuration and returns run configuration that fully satisfies this object’s
config schema. In the latter case, config_schema must be specified. When
passing a function, it’s easiest to use configured()
.
name (str) – Name of the new definition. This is a required argument, as this definition type has a name uniqueness constraint.
config_schema (ConfigSchema) – If config_or_config_fn is a function, the config schema that its input must satisfy.
description (Optional[str]) – Description of the new definition. If not specified, inherits the description of the definition being configured.
Returns (ConfigurableDefinition): A configured version of this object.
dagster.
InputDefinition
(name=None, dagster_type=None, description=None, default_value=<class 'dagster.core.definitions.utils.NoValueSentinel'>, root_manager_key=None, metadata=None, asset_key=None, asset_partitions=None)[source]¶Defines an argument to a solid’s compute function.
Inputs may flow from previous solids’ outputs, or be stubbed using config. They may optionally be typed using the Dagster type system.
name (str) – Name of the input.
dagster_type (Optional[Union[Type, DagsterType]]]) – The type of this input.
Users should provide the Python type of the objects that they expect to be passed for
this input, or a DagsterType
that defines a runtime check that they want
to be run on this input. Defaults to Any
.
description (Optional[str]) – Human-readable description of the input.
default_value (Optional[Any]) – The default value to use if no input is provided.
root_manager_key (Optional[str]) – (Experimental) The resource key for the
RootInputManager
used for loading this input when it is not connected to an
upstream output.
metadata (Optional[Dict[str, Any]]) – A dict of metadata for the input.
asset_key (Optional[Union[AssetKey, InputContext -> AssetKey]]) – (Experimental) An AssetKey (or function that produces an AssetKey from the InputContext) which should be associated with this InputDefinition. Used for tracking lineage information through Dagster.
asset_partitions (Optional[Union[Set[str], InputContext -> Set[str]]]) – (Experimental) A set of partitions of the given asset_key (or a function that produces this list of partitions from the InputContext) which should be associated with this InputDefinition.
dagster.
OutputDefinition
(dagster_type=None, name=None, description=None, is_required=True, io_manager_key=None, metadata=None, asset_key=None, asset_partitions=None, asset_partitions_def=None)[source]¶Defines an output from a solid’s compute function.
Solids can have multiple outputs, in which case outputs cannot be anonymous.
Many solids have only one output, in which case the user can provide a single output definition that will be given the default name, “result”.
Output definitions may be typed using the Dagster type system.
dagster_type (Optional[Union[Type, DagsterType]]]) – The type of this output.
Users should provide the Python type of the objects that they expect the solid to yield
for this output, or a DagsterType
that defines a runtime check that they
want to be run on this output. Defaults to Any
.
name (Optional[str]) – Name of the output. (default: “result”)
description (Optional[str]) – Human-readable description of the output.
is_required (Optional[bool]) – Whether the presence of this field is required. (default: True)
io_manager_key (Optional[str]) – The resource key of the IOManager used for storing this output and loading it in downstream steps (default: “io_manager”).
metadata (Optional[Dict[str, Any]]) – A dict of the metadata for the output. For example, users can provide a file path if the data object will be stored in a filesystem, or provide information of a database table when it is going to load the data into the table.
asset_key (Optional[AssetKey]]) – (Experimental) An AssetKey which should be associated with this OutputDefinition. Used for tracking lineage information through Dagster.
asset_partitions (Optional[Union[Set[str], OutputContext -> Set[str]]]) – (Experimental) A set of partitions of the given asset_key (or a function that produces this list of partitions from the OutputContext) which should be associated with this OutputDefinition.
dagster.
RetryPolicy
(max_retries=1, delay=None, backoff=None, jitter=None)[source]A declarative policy for when to request retries when an exception occurs during op execution.
max_retries (int) – The maximum number of retries to attempt. Defaults to 1.
delay (Optional[Union[int,float]]) – The time in seconds to wait between the retry being requested and the next attempt being started. This unit of time can be modulated as a function of attempt number with backoff and randomly with jitter.
backoff (Optional[Backoff]) – A modifier for delay as a function of retry attempt number.
jitter (Optional[Jitter]) – A randomizing modifier for delay, applied after backoff calculation.
dagster.
Backoff
(value)[source]A modifier for delay as a function of attempt number.
LINEAR: attempt_num * delay EXPONENTIAL: ((2 ^ attempt_num) - 1) * delay
dagster.
Jitter
(value)[source]A randomizing modifier for delay, applied after backoff calculation.
FULL: between 0 and the calculated delay based on backoff: random() * backoff_delay PLUS_MINUS: +/- the delay: backoff_delay + ((2 * (random() * delay)) - delay)
dagster.
execute_solid
(solid_def, mode_def=None, input_values=None, tags=None, run_config=None, raise_on_error=True)[source]¶Execute a single solid in an ephemeral pipeline.
Intended to support unit tests. Input values may be passed directly, and no pipeline need be specified – an ephemeral pipeline will be constructed.
solid_def (SolidDefinition) – The solid to execute.
mode_def (Optional[ModeDefinition]) – The mode within which to execute the solid. Use this if, e.g., custom resources, loggers, or executors are desired.
input_values (Optional[Dict[str, Any]]) – A dict of input names to input values, used to
pass inputs to the solid directly. You may also use the run_config
to
configure any inputs that are configurable.
tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to pipeline logs.
run_config (Optional[dict]) – The configuration that parameterized this execution, as a dict.
raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur.
Defaults to True
, since this is the most useful behavior in test.
The result of executing the solid.
dagster.
execute_solid_within_pipeline
(pipeline_def, solid_name, inputs=None, run_config=None, mode=None, preset=None, tags=None, instance=None)[source]¶Execute a single solid within an existing pipeline.
Intended to support tests. Input values may be passed directly.
pipeline_def (PipelineDefinition) – The pipeline within which to execute the solid.
solid_name (str) – The name of the solid, or the aliased solid, to execute.
inputs (Optional[Dict[str, Any]]) – A dict of input names to input values, used to
pass input values to the solid directly. You may also use the run_config
to
configure any inputs that are configurable.
run_config (Optional[dict]) – The configuration that parameterized this execution, as a dict.
mode (Optional[str]) – The name of the pipeline mode to use. You may not set both mode
and preset
.
preset (Optional[str]) – The name of the pipeline preset to use. You may not set both
mode
and preset
.
tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to pipeline logs.
instance (Optional[DagsterInstance]) – The instance to execute against. If this is None
,
an ephemeral instance will be used, and no artifacts will be persisted from the run.
The result of executing the solid.
dagster.
execute_solids_within_pipeline
(pipeline_def, solid_names, inputs=None, run_config=None, mode=None, preset=None, tags=None, instance=None)[source]¶Execute a set of solids within an existing pipeline.
Intended to support tests. Input values may be passed directly.
pipeline_def (PipelineDefinition) – The pipeline within which to execute the solid.
solid_names (FrozenSet[str]) – A set of the solid names, or the aliased solids, to execute.
inputs (Optional[Dict[str, Dict[str, Any]]]) – A dict keyed on solid names, whose values are
dicts of input names to input values, used to pass input values to the solids directly.
You may also use the run_config
to configure any inputs that are configurable.
run_config (Optional[dict]) – The configuration that parameterized this execution, as a dict.
mode (Optional[str]) – The name of the pipeline mode to use. You may not set both mode
and preset
.
preset (Optional[str]) – The name of the pipeline preset to use. You may not set both
mode
and preset
.
tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to pipeline logs.
instance (Optional[DagsterInstance]) – The instance to execute against. If this is None
,
an ephemeral instance will be used, and no artifacts will be persisted from the run.
The results of executing the solids, keyed by solid name.
Dict[str, Union[CompositeSolidExecutionResult, SolidExecutionResult]]
dagster.
SolidExecutionResult
(solid, step_events_by_kind, reconstruct_context, pipeline_def, output_capture=None)[source]¶Execution result for a leaf solid in a pipeline.
Users should not instantiate this class.
compute_input_event_dict
¶All events of type STEP_INPUT
, keyed by input name.
Dict[str, DagsterEvent]
compute_output_events_dict
¶All events of type STEP_OUTPUT
, keyed by output name
Dict[str, List[DagsterEvent]]
compute_step_events
¶All events generated by execution of the solid compute function.
List[DagsterEvent]
compute_step_failure_event
¶The STEP_FAILURE
event, throws if it did not fail.
expectation_events_during_compute
¶All events of type STEP_EXPECTATION_RESULT
.
List[DagsterEvent]
expectation_results_during_compute
¶All expectation results yielded by the solid
List[ExpectationResult]
failure_data
¶Any data corresponding to this step’s failure, if it failed.
Union[None, StepFailureData]
get_output_event_for_compute
(output_name='result')[source]¶The STEP_OUTPUT
event for the given output name.
Throws if not present.
output_name (Optional[str]) – The name of the output. (default: ‘result’)
The corresponding event.
get_output_events_for_compute
(output_name='result')[source]¶The STEP_OUTPUT
event for the given output name.
Throws if not present.
output_name (Optional[str]) – The name of the output. (default: ‘result’)
The corresponding events.
List[DagsterEvent]
input_events_during_compute
¶All events of type STEP_INPUT
.
List[DagsterEvent]
materialization_events_during_compute
¶All events of type ASSET_MATERIALIZATION
.
List[DagsterEvent]
materializations_during_compute
¶All materializations yielded by the solid.
List[Materialization]
output_events_during_compute
¶All events of type STEP_OUTPUT
.
List[DagsterEvent]
output_value
(output_name='result')[source]¶Get a computed output value.
Note that calling this method will reconstruct the pipeline context (including, e.g., resources) to retrieve materialized output values.
output_values
¶The computed output values.
Returns None
if execution did not succeed.
the output values in the normal case
a dictionary from mapping key to corresponding value in the mapped case
Note that accessing this property will reconstruct the pipeline context (including, e.g., resources) to retrieve materialized output values.
retry_attempts
¶Number of times this step retried
dagster.
CompositeSolidExecutionResult
(solid, event_list, step_events_by_kind, reconstruct_context, pipeline_def, handle=None, output_capture=None)[source]¶Execution result for a composite solid in a pipeline.
Users should not instantiate this class directly.
output_for_solid
(handle_str, output_name='result')¶Get the output of a solid by its solid handle string and output name.
result_for_handle
(handle)¶Get the result of a solid by its solid handle.
This allows indexing into top-level solids to retrieve the results of children of composite solids.
handle (Union[str,NodeHandle]) – The handle for the solid.
The result of the given solid.
result_for_solid
(name)¶Get the result of a top level solid.
name (str) – The name of the top-level solid or aliased solid for which to retrieve the result.
The result of the solid execution within the pipeline.
solid_result_list
¶The results for each top level solid.
List[Union[CompositeSolidExecutionResult, SolidExecutionResult]]
step_event_list
¶List[DagsterEvent] The full list of events generated by steps in the execution.
Excludes events generated by the pipeline lifecycle, e.g., PIPELINE_START
.
dagster.
SolidExecutionContext
(step_execution_context)[source]¶The context
object that can be made available as the first argument to a solid’s compute
function.
The context object provides system information such as resources, config, and logging to a solid’s compute function. Users should not instantiate this object directly.
Example:
@solid
def hello_world(context: SolidExecutionContext):
context.log.info("Hello, world!")
add_output_metadata
(metadata, output_name=None, mapping_key=None)[source]¶Add metadata to one of the outputs of an op.
This can only be used once per output in the body of an op. Using this method with the same output_name more than once within an op will result in an error.
Examples:
from dagster import Out, op
from typing import Tuple
@op
def add_metadata(context):
context.add_output_metadata({"foo", "bar"})
return 5 # Since the default output is called "result", metadata will be attached to the output "result".
@op(out={"a": Out(), "b": Out()})
def add_metadata_two_outputs(context) -> Tuple[str, int]:
context.add_output_metadata({"foo": "bar"}, output_name="b")
context.add_output_metadata({"baz": "bat"}, output_name="a")
return ("dog", 5)
consume_events
()[source]¶Pops and yields all user-generated events that have been recorded from this context.
If consume_events has not yet been called, this will yield all logged events since the beginning of the op’s computation. If consume_events has been called, it will yield all events since the last time consume_events was called. Designed for internal use. Users should never need to invoke this method.
get_mapping_key
()[source]¶Which mapping_key this execution is for if downstream of a DynamicOutput, otherwise None.
get_tag
(key)[source]¶Get a logging tag.
key (tag) – The tag to get.
The value of the tag, if present.
Optional[str]
has_partition_key
¶Whether the current run is a partitioned run
instance
¶The current Dagster instance
job_def
¶The currently executing job.
log
¶The log manager available in the execution context.
log_event
(event)[source]¶Log an AssetMaterialization, AssetObservation, or ExpectationResult from within the body of an op.
Events logged with this method will appear in the list of DagsterEvents, as well as the event log.
event (Union[AssetMaterialization, Materialization, AssetObservation, ExpectationResult]) – The event to log.
Examples:
from dagster import op, AssetMaterialization
@op
def log_materialization(context):
context.log_event(AssetMaterialization("foo"))
mode_def
¶The mode of the current execution.
op_def
¶The current op definition.
output_asset_partition_key
(output_name='result')[source]¶Returns the asset partition key for the given output. Defaults to “result”, which is the name of the default output.
output_asset_partitions_time_window
(output_name='result')[source]¶The time window for the partitions of the output asset.
Raises an error if either of the following are true: - The output asset has no partitioning. - The output asset is not partitioned with a TimeWindowPartitionsDefinition.
partition_key
¶The partition key for the current run.
Raises an error if the current run is not a partitioned run.
partition_time_window
¶The partition time window for the current run.
Raises an error if the current run is not a partitioned run, or if the job’s partition definition is not a TimeWindowPartitionsDefinition.
pdb
¶Gives access to pdb debugging from within the op.
Example:
@op
def debug(context):
context.pdb.set_trace()
pipeline_def
¶The currently executing pipeline.
pipeline_run
¶The current pipeline run
resources
¶The currently available resources.
Resources
retry_number
¶Which retry attempt is currently executing i.e. 0 for initial attempt, 1 for first retry, etc.
run
¶The current run
DagsterRun
solid_config
¶The parsed config specific to this solid.
solid_def
¶The current solid definition.
step_launcher
¶The current step launcher, if any.
Optional[StepLauncher]
dagster.
build_solid_context
(resources=None, solid_config=None, resources_config=None, instance=None, config=None, partition_key=None, mapping_key=None)[source]¶Builds solid execution context from provided parameters.
build_solid_context
can be used as either a function or context manager. If there is a
provided resource that is a context manager, then build_solid_context
must be used as a
context manager. This function can be used to provide the context argument when directly
invoking a solid.
resources (Optional[Dict[str, Any]]) – The resources to provide to the context. These can be either values or resource definitions.
solid_config (Optional[Any]) – The solid config to provide to the context. The value provided
here will be available as context.solid_config
.
resources_config (Optional[Dict[str, Any]]) – Configuration for any resource definitions provided to the resources arg. The configuration under a specific key should match the resource under a specific key in the resources dictionary.
instance (Optional[DagsterInstance]) – The dagster instance configured for the context. Defaults to DagsterInstance.ephemeral().
Examples
context = build_solid_context()
solid_to_invoke(context)
with build_solid_context(resources={"foo": context_manager_resource}) as context:
solid_to_invoke(context)
@
dagster.
composite_solid
(name=None, input_defs=None, output_defs=None, description=None, config_schema=None, config_fn=None)[source]¶Create a composite solid with the specified parameters from the decorated composition function.
Using this decorator allows you to build up the dependency graph of the composite by writing a
function that invokes solids and passes the output to other solids. This is similar to the use
of the @pipeline
decorator, with the additional ability to remap inputs,
outputs, and config across the composite boundary.
name (Optional[str]) – Name for the new composite solid. Must be unique within any
PipelineDefinition
using the solid.
description (Optional[str]) – Human-readable description of the new composite solid.
input_defs (Optional[List[InputDefinition]]) –
Information about the inputs that this composite solid maps. Information provided here will be combined with what can be inferred from the function signature, with these explicit InputDefinitions taking precedence.
Uses of inputs in the body of the decorated composition function will determine
the InputMappings
passed to the underlying
CompositeSolidDefinition
.
output_defs (Optional[List[OutputDefinition]]) –
Information about the outputs this composite solid maps. Information provided here will be combined with what can be inferred from the return type signature if there is only one OutputDefinition.
Uses of these outputs in the body of the decorated composition function, as well as the
return value of the decorated function, will be used to infer the appropriate set of
OutputMappings
for the underlying
CompositeSolidDefinition
.
To map multiple outputs, return a dictionary from the composition function.
config_schema (Optional[ConfigSchema]) – If the config_fn argument is provided, this argument can be provided to set the schema for outer config that is passed to the config_fn. If config_fn is provided, but this argument is not provided, any config will be accepted.
config_fn (Callable[[dict], dict]) –
By specifying a config mapping
function, you can override the configuration for the child solids contained within this
composite solid. config_fn
, maps the config provided to the
composite solid to the config that will be provided to the child solids.
If this argument is provided, the config_schema argument can also be provided to limit what config values can be passed to the composite solid.
Examples
@lambda_solid
def add_one(num: int) -> int:
return num + 1
@composite_solid
def add_two(num: int) -> int:
adder_1 = add_one.alias('adder_1')
adder_2 = add_one.alias('adder_2')
return adder_2(adder_1(num))
dagster.
CompositeSolidDefinition
(name, solid_defs, input_mappings=None, output_mappings=None, config_mapping=None, dependencies=None, description=None, tags=None, positional_inputs=None)[source]¶The core unit of composition and abstraction, composite solids allow you to define a solid from a graph of solids.
In the same way you would refactor a block of code in to a function to deduplicate, organize, or manage complexity - you can refactor solids in a pipeline in to a composite solid.
name (str) – The name of this composite solid. Must be unique within any
PipelineDefinition
using the solid.
solid_defs (List[Union[SolidDefinition, CompositeSolidDefinition]]) – The set of solid definitions used in this composite solid. Composites may be arbitrarily nested.
input_mappings (Optional[List[InputMapping]]) – Define the inputs to the composite solid, and how they map to the inputs of its constituent solids.
output_mappings (Optional[List[OutputMapping]]) – Define the outputs of the composite solid, and how they map from the outputs of its constituent solids.
config_mapping (Optional[ConfigMapping]) – By specifying a config mapping, you can override the configuration for the child solids contained within this composite solid. Config mappings require both a configuration field to be specified, which is exposed as the configuration for the composite solid, and a configuration mapping function, which is called to map the configuration of the composite solid into the configuration that is applied to any child solids.
dependencies (Optional[Dict[Union[str, NodeInvocation], Dict[str, DependencyDefinition]]]) – A structure that declares where each solid gets its inputs. The keys at the top level dict are either string names of solids or NodeInvocations. The values are dicts that map input names to DependencyDefinitions.
description (Optional[str]) – Human readable description of this composite solid.
tags (Optional[Dict[str, Any]]) – Arbitrary metadata for the solid. Frameworks may expect and require certain metadata to be attached to a solid. Users should generally not set metadata directly. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. may expect and require certain metadata to be attached to a solid.
positional_inputs (Optional[List[str]]) – The positional order of the inputs if it differs from the order of the input mappings
Examples
@lambda_solid
def add_one(num: int) -> int:
return num + 1
add_two = CompositeSolidDefinition(
'add_two',
solid_defs=[add_one],
dependencies={
NodeInvocation('add_one', 'adder_1'): {},
NodeInvocation('add_one', 'adder_2'): {'num': DependencyDefinition('adder_1')},
},
input_mappings=[InputDefinition('num', Int).mapping_to('adder_1', 'num')],
output_mappings=[OutputDefinition(Int).mapping_from('adder_2')],
)
configured
(config_or_config_fn, name, config_schema=None, description=None)¶Wraps this object in an object of the same type that provides configuration to the inner object.
config_or_config_fn (Union[Any, Callable[[Any], Any]]) – Either (1) Run configuration
that fully satisfies this object’s config schema or (2) A function that accepts run
configuration and returns run configuration that fully satisfies this object’s
config schema. In the latter case, config_schema must be specified. When
passing a function, it’s easiest to use configured()
.
name (str) – Name of the new definition. This is a required argument, as this definition type has a name uniqueness constraint.
config_schema (ConfigSchema) – If config_or_config_fn is a function, the config schema that its input must satisfy.
description (Optional[str]) – Description of the new definition. If not specified, inherits the description of the definition being configured.
Returns (ConfigurableDefinition): A configured version of this object.
dagster.
InputMapping
(definition, maps_to)[source]¶Defines an input mapping for a composite solid.
definition (InputDefinition) – Defines the input to the composite solid.
solid_name (str) – The name of the child solid onto which to map the input.
input_name (str) – The name of the input to the child solid onto which to map the input.
dagster.
OutputMapping
(definition, maps_from)[source]¶Defines an output mapping for a composite solid.
definition (OutputDefinition) – Defines the output of the composite solid.
solid_name (str) – The name of the child solid from which to map the output.
output_name (str) – The name of the child solid’s output from which to map the output.
dagster.
ConfigMapping
(config_fn, config_schema=None, receive_processed_config_values=None)[source]Defines a config mapping for a graph (or job).
By specifying a config mapping function, you can override the configuration for the child ops and graphs contained within a graph.
Config mappings require the configuration schema to be specified as config_schema
, which will
be exposed as the configuration schema for the graph, as well as a configuration mapping
function, config_fn
, which maps the config provided to the composite solid to the config
that will be provided to the child nodes.
config_fn (Callable[[dict], dict]) – The function that will be called to map the graph config to a config appropriate for the child nodes.
config_schema (ConfigSchema) – The schema of the graph config.
receive_processed_config_values (Optional[bool]) – If true, config values provided to the config_fn will be converted to their dagster types before being passed in. For example, if this value is true, enum config passed to config_fn will be actual enums, while if false, then enum config passed to config_fn will be strings.
The objects that can be yielded by the body of solids’ compute functions to communicate with the Dagster framework.
(Note that Failure
and RetryRequested
are intended to be raised from solids rather than yielded.)
dagster.
Output
(value, output_name='result', metadata_entries=None, metadata=None)[source]Event corresponding to one of a op’s outputs.
Op compute functions must explicitly yield events of this type when they have more than
one output, or when they also yield events of other types, or when defining a op using the
OpDefinition
API directly.
Outputs are values produced by ops that will be consumed by downstream ops in a job.
They are type-checked at op boundaries when their corresponding Out
or the downstream In
is typed.
value (Any) – The value returned by the compute function.
output_name (Optional[str]) – Name of the corresponding out. (default: “result”)
metadata_entries (Optional[Union[MetadataEntry, PartitionMetadataEntry]]) – (Experimental) A set of metadata entries to attach to events related to this Output.
metadata (Optional[Dict[str, Union[str, float, int, Dict, MetadataValue]]]) – Arbitrary metadata about the failure. Keys are displayed string labels, and values are one of the following: string, float, int, JSON-serializable dict, JSON-serializable list, and one of the data classes returned by a MetadataValue static method.
dagster.
AssetMaterialization
(asset_key, description=None, metadata_entries=None, partition=None, tags=None, metadata=None)[source]Event indicating that an op has materialized an asset.
Op compute functions may yield events of this type whenever they wish to indicate to the Dagster framework (and the end user) that they have produced a materialized value as a side effect of computation. Unlike outputs, asset materializations can not be passed to other ops, and their persistence is controlled by op logic, rather than by the Dagster framework.
Op authors should use these events to organize metadata about the side effects of their computations, enabling tooling like the Assets dashboard in Dagit.
asset_key (Union[str, List[str], AssetKey]) – A key to identify the materialized asset across job runs
description (Optional[str]) – A longer human-readable description of the materialized value.
metadata_entries (Optional[List[Union[MetadataEntry, PartitionMetadataEntry]]]) – Arbitrary metadata about the materialized value.
partition (Optional[str]) – The name of the partition that was materialized.
tags (Optional[Dict[str, str]]) – (Experimental) Tag metadata for a given asset materialization. Used for search and organization of the asset entry in the asset catalog in Dagit.
metadata (Optional[Dict[str, RawMetadataValue]]) – Arbitrary metadata about the asset. Keys are displayed string labels, and values are one of the following: string, float, int, JSON-serializable dict, JSON-serializable list, and one of the data classes returned by a MetadataValue static method.
dagster.
ExpectationResult
(success, label=None, description=None, metadata_entries=None, metadata=None)[source]Event corresponding to a data quality test.
Op compute functions may yield events of this type whenever they wish to indicate to the Dagster framework (and the end user) that a data quality test has produced a (positive or negative) result.
success (bool) – Whether the expectation passed or not.
label (Optional[str]) – Short display name for expectation. Defaults to “result”.
description (Optional[str]) – A longer human-readable description of the expectation.
metadata_entries (Optional[List[MetadataEntry]]) – Arbitrary metadata about the expectation.
metadata (Optional[Dict[str, RawMetadataValue]]) – Arbitrary metadata about the failure. Keys are displayed string labels, and values are one of the following: string, float, int, JSON-serializable dict, JSON-serializable list, and one of the data classes returned by a MetadataValue static method.
dagster.
TypeCheck
(success, description=None, metadata_entries=None, metadata=None)[source]Event corresponding to a successful typecheck.
Events of this type should be returned by user-defined type checks when they need to encapsulate
additional metadata about a type check’s success or failure. (i.e., when using
as_dagster_type()
, @usable_as_dagster_type
, or the underlying
PythonObjectDagsterType()
API.)
Solid compute functions should generally avoid yielding events of this type to avoid confusion.
success (bool) – True
if the type check succeeded, False
otherwise.
description (Optional[str]) – A human-readable description of the type check.
metadata_entries (Optional[List[MetadataEntry]]) – Arbitrary metadata about the type check.
metadata (Optional[Dict[str, RawMetadataValue]]) – Arbitrary metadata about the failure. Keys are displayed string labels, and values are one of the following: string, float, int, JSON-serializable dict, JSON-serializable list, and one of the data classes returned by a MetadataValue static method.
dagster.
Failure
(description=None, metadata_entries=None, metadata=None)[source]Event indicating op failure.
Raise events of this type from within op compute functions or custom type checks in order to indicate an unrecoverable failure in user code to the Dagster machinery and return structured metadata about the failure.
description (Optional[str]) – A human-readable description of the failure.
metadata_entries (Optional[List[MetadataEntry]]) – Arbitrary metadata about the failure.
metadata (Optional[Dict[str, RawMetadataValue]]) – Arbitrary metadata about the failure. Keys are displayed string labels, and values are one of the following: string, float, int, JSON-serializable dict, JSON-serializable list, and one of the data classes returned by a MetadataValue static method.
dagster.
RetryRequested
(max_retries=1, seconds_to_wait=None)[source]An exception to raise from an op to indicate that it should be retried.
Example
@op
def flakes():
try:
flakey_operation()
except Exception as e:
raise RetryRequested(max_retries=3) from e
Dagster uses metadata to communicate arbitrary user-specified metadata about structured events.
dagster.
MetadataValue
[source]Utility class to wrap metadata values passed into Dagster events so that they can be displayed in Dagit and other tooling.
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset",
metadata={
"my_text_label": "hello",
"dashboard_url": MetadataValue.url("http://mycoolsite.com/my_dashboard"),
"num_rows": 0,
},
)
asset
(asset_key)[source]Static constructor for a metadata value referencing a Dagster asset, by key.
For example:
@op
def validate_table(context, df):
yield AssetMaterialization(
asset_key=AssetKey("my_table"),
metadata={
"Related asset": MetadataValue.asset(AssetKey('my_other_table')),
},
)
asset_key (AssetKey) – The asset key referencing the asset.
bool
(value)[source]Static constructor for a metadata value wrapping a bool as
BoolMetadataValuye
. Can be used as the value type for the metadata
parameter for supported events. For example:
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset",
metadata={
"num rows > 1000": MetadataValue.bool(len(df) > 1000),
},
)
value (bool) – The bool value for a metadata entry.
dagster_run
(run_id)[source]Static constructor for a metadata value wrapping a reference to a Dagster run.
run_id (str) – The ID of the run.
float
(value)[source]Static constructor for a metadata value wrapping a float as
FloatMetadataValue
. Can be used as the value type for the metadata
parameter for supported events. For example:
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset",
metadata={
"size (bytes)": MetadataValue.float(calculate_bytes(df)),
}
)
value (float) – The float value for a metadata entry.
int
(value)[source]Static constructor for a metadata value wrapping an int as
IntMetadataValue
. Can be used as the value type for the metadata
parameter for supported events. For example:
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset",
metadata={
"number of rows": MetadataValue.int(len(df)),
},
)
value (int) – The int value for a metadata entry.
json
(data)[source]Static constructor for a metadata value wrapping a path as
JsonMetadataValue
. Can be used as the value type for the metadata
parameter for supported events. For example:
@op
def emit_metadata(context):
yield ExpectationResult(
success=not missing_things,
label="is_present",
metadata={
"about my dataset": MetadataValue.json({"missing_columns": missing_things})
},
)
data (Dict[str, Any]) – The JSON data for a metadata entry.
md
(data)[source]Static constructor for a metadata value wrapping markdown data as
MarkdownMetadataValue
. Can be used as the value type for the metadata
parameter for supported events. For example:
@op
def emit_metadata(context, md_str):
yield AssetMaterialization(
asset_key="info",
metadata={
'Details': MetadataValue.md(md_str)
},
)
md_str (str) – The markdown for a metadata entry.
path
(path)[source]Static constructor for a metadata value wrapping a path as
PathMetadataValue
. For example:
@op
def emit_metadata(context):
yield AssetMaterialization(
asset_key="my_dataset",
metadata={
"filepath": MetadataValue.path("path/to/file"),
}
)
path (str) – The path for a metadata entry.
python_artifact
(python_artifact)[source]Static constructor for a metadata value wrapping a python artifact as
PythonArtifactMetadataValue
. Can be used as the value type for the
metadata parameter for supported events. For example:
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset",
metadata={
"class": MetadataValue.python_artifact(MyClass),
"function": MetadataValue.python_artifact(my_function),
}
)
value (Callable) – The python class or function for a metadata entry.
table
(records, schema=None)[source]Static constructor for a metadata value wrapping arbitrary tabular data as
TableMetadataValue
. Can be used as the value type for the metadata
parameter for supported events. For example:
@op
def emit_metadata(context):
yield ExpectationResult(
success=not has_errors,
label="is_valid",
metadata={
"errors": MetadataValue.table(
records=[
TableRecord(code="invalid-data-type", row=2, col="name"}]
],
schema=TableSchema(
columns=[
TableColumn(name="code", type="string"),
TableColumn(name="row", type="int"),
TableColumn(name="col", type="string"),
]
)
),
},
)
records (List[TableRecord]) – The data as a list of records (i.e. rows).
schema (Optional[TableSchema]) – A schema for the table.
table_schema
(schema)[source]Static constructor for a metadata value wrapping a table schema as
TableSchemaMetadataValue
. Can be used as the value type
for the metadata parameter for supported events. For example:
schema = TableSchema(
columns = [
TableColumn(name="id", type="int"),
TableColumn(name="status", type="bool"),
]
)
DagsterType(
type_check_fn=some_validation_fn,
name='MyTable',
metadata={
'my_table_schema': MetadataValue.table_schema(schema),
}
)
schema (TableSchema) – The table schema for a metadata entry.
text
(text)[source]Static constructor for a metadata value wrapping text as
TextMetadataValue
. Can be used as the value type for the metadata
parameter for supported events. For example:
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset",
metadata={
"my_text_label": MetadataValue.text("hello")
},
)
text (str) – The text string for a metadata entry.
url
(url)[source]Static constructor for a metadata value wrapping a URL as
UrlMetadataValue
. Can be used as the value type for the metadata
parameter for supported events. For example:
@op
def emit_metadata(context):
yield AssetMaterialization(
asset_key="my_dashboard",
metadata={
"dashboard_url": MetadataValue.url("http://mycoolsite.com/my_dashboard"),
}
)
url (str) – The URL for a metadata entry.
dagster.
MetadataEntry
(label, description=None, entry_data=None, value=None)[source]The standard structure for describing metadata for Dagster events.
Lists of objects of this type can be passed as arguments to Dagster events and will be displayed in Dagit and other tooling.
Should be yielded from within an IO manager to append metadata for a given input/output event. For other event types, passing a dict with MetadataValue values to the metadata argument is preferred.
label (str) – Short display label for this metadata entry.
description (Optional[str]) – A human-readable description of this metadata entry.
value (MetadataValue) – Typed metadata entry data. The different types allow for customized display in tools like dagit.
asset
(asset_key, label, description=None)[source]Static constructor for a metadata entry referencing a Dagster asset, by key.
For example:
@op
def validate_table(context, df):
yield AssetMaterialization(
asset_key=AssetKey("my_table"),
metadata_entries=[
MetadataEntry.asset(AssetKey('my_other_table'), "Related asset"),
],
)
float
(value, label, description=None)[source]Static constructor for a metadata entry containing float as
FloatMetadataValue
. For example:
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset",
metadata_entries=[MetadataEntry.float(calculate_bytes(df), "size (bytes)")],
)
fspath
(path, label=None, description=None)[source]Static constructor for a metadata entry containing a filesystem path as
PathMetadataValue
. For example:
@op
def emit_metadata(context):
yield AssetMaterialization(
asset_key="my_dataset",
metadata_entries=[MetadataEntry.fspath("path/to/file")],
)
int
(value, label, description=None)[source]Static constructor for a metadata entry containing int as
IntMetadataValue
. For example:
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset",
metadata_entries=[MetadataEntry.int(len(df), "number of rows")],
)
json
(data, label, description=None)[source]Static constructor for a metadata entry containing JSON data as
JsonMetadataValue
. For example:
@op
def emit_metadata(context):
yield ExpectationResult(
success=not missing_things,
label="is_present",
metadata_entries=[
MetadataEntry.json(
label="metadata", data={"missing_columns": missing_things},
)
],
)
md
(md_str, label, description=None)[source]Static constructor for a metadata entry containing markdown data as
MarkdownMetadataValue
. For example:
@op
def emit_metadata(context, md_str):
yield AssetMaterialization(
asset_key="info",
metadata_entries=[MetadataEntry.md(md_str=md_str)],
)
path
(path, label, description=None)[source]Static constructor for a metadata entry containing a path as
PathMetadataValue
. For example:
@op
def emit_metadata(context):
yield AssetMaterialization(
asset_key="my_dataset",
metadata_entries=[MetadataEntry.path("path/to/file", label="filepath")],
)
table
(records, label, description=None, schema=None)[source]Static constructor for a metadata entry containing tabluar data as
TableMetadataValue
. For example:
@op
def emit_metadata(context):
yield ExpectationResult(
success=not has_errors,
label="is_valid",
metadata_entries=[
MetadataEntry.table(
label="errors",
records=[
TableRecord(code="invalid-data-type", row=2, col="name"}]
],
schema=TableSchema(
columns=[
TableColumn(name="code", type="string"),
TableColumn(name="row", type="int"),
TableColumn(name="col", type="string"),
]
)
),
],
)
records (List[TableRecord]) – The data as a list of records (i.e. rows).
label (str) – Short display label for this metadata entry.
description (Optional[str]) – A human-readable description of this metadata entry.
schema (Optional[TableSchema]) – A schema for the table. If none is provided, one will be automatically generated by examining the first record. The schema will include as columns all field names present in the first record, with a type of “string”, “int”, “bool” or “float” inferred from the first record’s values. If a value does not directly match one of the above types, it will be treated as a string.
table_schema
(schema, label, description=None)[source]Static constructor for a metadata entry containing a table schema as
TableSchemaMetadataValue
. For example:
schema = TableSchema(
columns = [
TableColumn(name="id", type="int"),
TableColumn(name="status", type="bool"),
]
)
DagsterType(
type_check_fn=some_validation_fn,
name='MyTable',
metadata_entries=[
MetadataEntry.table_schema(
schema,
label='schema',
)
]
)
schema (TableSchema) – The table schema for a metadata entry.
label (str) – Short display label for this metadata entry.
description (Optional[str]) – A human-readable description of this metadata entry.
text
(text, label, description=None)[source]Static constructor for a metadata entry containing text as
TextMetadataValue
. For example:
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset",
metadata_entries=[
MetadataEntry.text("Text-based metadata for this event", "text_metadata")
],
)
url
(url, label, description=None)[source]Static constructor for a metadata entry containing a URL as
UrlMetadataValue
. For example:
@op
def emit_metadata(context):
yield AssetMaterialization(
asset_key="my_dashboard",
metadata_entries=[
MetadataEntry.url(
"http://mycoolsite.com/my_dashboard", label="dashboard_url"
),
],
)
value
Alias of entry_data.
All metadata types inherit from MetadataValue. The following types are defined:
dagster.
DagsterAssetMetadataValue
(asset_key)[source]Representation of a dagster asset.
asset_key (AssetKey) – The dagster asset key
dagster.
DagsterPipelineRunMetadataValue
(run_id)[source]Representation of a dagster pipeline run.
run_id (str) – The pipeline run id
dagster.
FloatMetadataValue
(value)[source]Container class for float metadata entry data.
value (Optional[float]) – The float value.
dagster.
IntMetadataValue
(value)[source]Container class for int metadata entry data.
value (Optional[int]) – The int value.
dagster.
JsonMetadataValue
(data)[source]Container class for JSON metadata entry data.
data (Dict[str, Any]) – The JSON data.
dagster.
MarkdownMetadataValue
(md_str)[source]Container class for markdown metadata entry data.
md_str (Optional[str]) – The markdown as a string.
dagster.
PathMetadataValue
(path)[source]Container class for path metadata entry data.
path (Optional[str]) – The path as a string or conforming to os.PathLike.
dagster.
PythonArtifactMetadataValue
(module, name)[source]Container class for python artifact metadata entry data.
dagster.
TableMetadataValue
(records, schema)[source]Container class for table metadata entry data.
records (TableRecord) – The data as a list of records (i.e. rows).
schema (Optional[TableSchema]) – A schema for the table.
dagster.
TableSchemaMetadataValue
(schema)[source]Representation of a schema for arbitrary tabular data.
schema (TableSchema) – The dictionary containing the schema representation.
Dagster uses AssetKey
to build an index on Materialization
events.
Assets materialized with an AssetKey
are highlighted in dagit on the Assets
dashboard.
dagster.
AssetKey
(path)[source]Object representing the structure of an asset key. Takes in a sanitized string, list of strings, or tuple of strings.
Example usage:
from dagster import op
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key=AssetKey('flat_asset_key'),
metadata={"text_metadata": "Text-based metadata for this event"},
)
@op
def structured_asset_key(context, df):
yield AssetMaterialization(
asset_key=AssetKey(['parent', 'child', 'grandchild']),
metadata={"text_metadata": "Text-based metadata for this event"},
)
@op
def structured_asset_key_2(context, df):
yield AssetMaterialization(
asset_key=AssetKey(('parent', 'child', 'grandchild')),
metadata={"text_metadata": "Text-based metadata for this event"},
)
path (Sequence[str]) – String, list of strings, or tuple of strings. A list of strings represent the hierarchical structure of the asset_key.
to_string
(legacy=False)[source]E.g. ‘[“first_component”, “second_component”]’
to_user_string
()[source]E.g. “first_component>second_component”