Ops#

Ops are the core unit of computation in Dagster. Multiple ops can be connected to create a Graph.

ops

Relevant APIs#

NameDescription
@opA decorator used to define ops. Returns an OpDefinition. The decorated function is called the "compute function".
InAn input to an op. Defined on the ins argument to the @op decorator.
OutAn output of an op. Defined on the out argument to the @op decorator.
OpExecutionContextAn object exposing Dagster system APIs for resource access, logging, and more. Can be injected into an op by specifying context as the first argument of the compute function.
OpDefinitionClass for ops. You will rarely want to instantiate this class directly. Instead, you should use the @op.

Overview#

Ops are Dagster's core unit of computation. Individual ops should perform relatively simple tasks. Collections of ops can then be assembled into Graphs to perform more complex tasks. Some examples of tasks appropriate for a single op:

  • Derive a dataset from other datasets.
  • Execute a database query.
  • Initiate a Spark job in a remote cluster.
  • Query an API and store the result in a data warehouse.
  • Send an email or Slack message.

The op as computational unit enables many useful features for data orchestration:

  • Flexible execution strategies: Data pipelines are frequently developed locally and later deployed to production. Ops represent sealed units of logic independent of pipeline execution strategy, making the transition from development to production painless. Collections of ops (i.e. Graphs) can be bound via Jobs to an appropriate Executor for single-process execution or distribution across a cluster.
  • Pluggable external systems: Production data pipelines almost always interface with external systems. The availability of these systems varies across environments. During development, it may be desirable to use a local substitute (e.g. SQLite) in place of a production system (e.g. cloud-hosted Postgres). Dagster provides Resources as an abstraction layer for this purpose. Ops can be written against abstract resources (e.g. database), with resource definitions later bound at the Job level. Op logic can thus remain uncoupled to any particular implementation of an external system.
  • Input and output management: Ops have defined inputs and outputs, analogous to the arguments and return value(s) of a Python function. An input or output can be annotated with a Dagster Type for arbitrarily complex runtime validation. Outputs can additionally be tagged with an IO Manager to manage storage of the associated data in between ops. This enables easy swapping of I/O strategy depending on the execution environment, as well as efficient caching of data intermediates.
  • Configuration: Operations in a data pipeline are often parameterized by both upstream data (e.g. a stream of database records) and "configuration" parameters independent of upstream data (e.g. a "chunk size" of incoming records to operate on). Ops can be given an associated Config Schema to define such configuration parameters.
  • Event streams: Ops emit a stream of Events during execution. Certain events are emitted by default (e.g. indicating the start of an op's execution), but op authors are additionally given access to an event API. This can be used to report data asset creation or modification (AssetMaterialization), the result of a data quality check (ExpectationResult), or other arbitrary information. Event streams can be visualized by Dagster's browser UI Dagit. This rich log of execution facilitates debugging, inspection, and real-time monitoring of running jobs.
  • Testability: The properties that enable flexible execution of ops also facilitate versatile testing. Ops can be tested in isolation or as part of a pipeline. Further, the Resource API allows external systems (e.g. databases) to be stubbed or substituted as needed.

Defining an op#

To define an op, use the @op decorator. The decorated function is called the compute_fn.

@op
def my_op():
    return "hello"

Inputs and Outputs#

Each op has a set of inputs and outputs, which define the data it consumes and produces. Inputs and outputs are used to define dependencies between ops and to pass data between ops.

Both definitions have a few important properties:

  • They are named.
  • They are optionally typed. These types are validated at runtime.
  • (Advanced) They can be linked to an IOManager, which defines how the output or input is stored and loaded. See the IOManager concept page for more info.

Inputs#

Inputs are passed as arguments to an op's compute_fn. The value of an input can be passed from the output of another op, or stubbed (hardcoded) using config.

The most common way to define inputs is just to add arguments to the decorated function:

@op
def my_input_op(abc, xyz):
    pass

An op only starts to execute once all of its inputs have been resolved. Inputs can be resolved in two ways:

  • The upstream output that the input depends on has been successfully emitted and stored.
  • The input was stubbed through config.

You can use a Dagster Type to provide a function that validates an op's input every time the op runs. In this case, you use a dictionary of Ins corresponding to the decorated function arguments.

MyDagsterType = DagsterType(
    type_check_fn=lambda _, value: value % 2 == 0, name="MyDagsterType"
)


@op(ins={"abc": In(dagster_type=MyDagsterType)})
def my_typed_input_op(abc):
    pass

Outputs#

Outputs are yielded from an op's compute_fn. By default, all ops have a single output called "result".

When you have one output, you can return the output value directly.

@op
def my_output_op():
    return 5

To define multiple outputs, or to use a different output name than "result", you can provide a dictionary of Outs to the @op decorator.

When you have more than one output, you can return a tuple of values, one for each output.

@op(out={"first_output": Out(), "second_output": Out()})
def my_multi_output_op():
    return 5, 6

Like inputs, outputs can also have Dagster Types.

Op Context#

When writing an op, users can optionally provide a first parameter, context. When this parameter is supplied, Dagster will supply a context object to the body of the op. The context provides access to system information like op configuration, loggers, resources, and the current run id. See OpExecutionContext for the full list of properties accessible from the op context.

For example, to access the logger and log a info message:

@op(config_schema={"name": str})
def context_op(context):
    name = context.op_config["name"]
    context.log.info(f"My name is {name}")

Op Configuration#

All definitions in dagster expose a config_schema, making them configurable and parameterizable. The configuration system is explained in detail on Config Schema.

Op definitions can specify a config_schema for the op's configuration. The configuration is accessible through the op context at runtime. Therefore, op configuration can be used to specify op behavior at runtime, making ops more flexible and reusable.

For example, we can define an op where the API endpoint it queries is defined through its configuration:

@op(config_schema={"api_endpoint": str})
def my_configurable_op(context):
    api_endpoint = context.op_config["api_endpoint"]
    data = requests.get(f"{api_endpoint}/data").json()
    return data

Using an op#

Ops are used within a Job or Graph. You can also execute a single op, usually within a test context, by directly invoking it. More information can be found at Testing ops.

Patterns#

Op Factory#

You may find the need to create utilities that help generate ops. In most cases, you should parameterize op behavior by adding op configuration. You should reach for this pattern if you find yourself needing to vary the arguments to the @op decorator or OpDefinition themselves, since they cannot be modified based on op configuration.

To create an op factory, you define a function that returns an OpDefinition, either directly or by decorating a function with the op decorator.

def my_op_factory(
    name="default_name",
    ins=None,
    **kwargs,
):
    """
    Args:
        name (str): The name of the new op.
        ins (Dict[str, In]): Any Ins for the new op. Default: None.

    Returns:
        function: The new op.
    """

    @op(name=name, ins=ins or {"start": In(Nothing)}, **kwargs)
    def my_inner_op(**kwargs):
        # Op logic here
        pass

    return my_inner_op

See it in action#

For more examples of ops, check out the following in our Hacker News example: