Dagster lets developers express what they expect their op inputs and outputs to look like through Dagster Types.
The dagster type system is gradual and optional - jobs can run without types specified explicitly, and specifying types in some places doesn't require that types be specified everywhere.
Dagster type-checking happens at op execution time - each type defines a type_check_fn
that knows how to check whether values match what it expects.
Let's look back at our simple download_csv
op.
@op
def download_csv():
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
get_dagster_logger().info(f"Read {len(lines)} lines")
return [row for row in csv.DictReader(lines)]
The object returned by Python's built-in csv.DictReader
is a list of collections.OrderedDict
, each of which represents one row of the dataset:
[
OrderedDict([
('name', '100% Bran'), ('mfr', 'N'), ('type', 'C'), ('calories', '70'), ('protein', '4'),
('fat', '1'), ('sodium', '130'), ('carbo', '5'), ('sugars', '6'), ('potass', '280'),
('vitamins', '25'), ('shelf', '3'), ('weight', '1'), ('cups', '0.33'),
('rating', '68.402973')
]),
OrderedDict([
('name', '100% Natural Bran'), ('mfr', 'Q'), ('type', 'C'), ('calories', '120'),
('protein', '3'), ('fat', '5'), ('sodium', '15'), ('fiber', '2'), ('carbo', '8'),
('sugars', '8'), ('potass', '135'), ('vitamins', '0'), ('shelf', '3'), ('weight', '1'),
('cups', '1'), ('rating', '33.983679')
]),
...
]
This is a simple representation of a "data frame", or a table of data. We'd like to be able to use Dagster's type system to type the output of download_csv
, so that we can do type checking when we construct the job, ensuring that any op consuming the output of download_csv
expects to receive data in this format.
To do this, we'll construct a DagsterType
that verifies an object is a list of dictionaries.
def is_list_of_dicts(_, value):
return isinstance(value, list) and all(
isinstance(element, dict) for element in value
)
SimpleDataFrame = DagsterType(
name="SimpleDataFrame",
type_check_fn=is_list_of_dicts,
description="A naive representation of a data frame, e.g., as returned by csv.DictReader.",
)
Now we can annotate the rest of our job with our new type:
@op(out=Out(SimpleDataFrame))
def download_csv():
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
get_dagster_logger().info(f"Read {len(lines)} lines")
return [row for row in csv.DictReader(lines)]
@op(ins={"cereals": In(SimpleDataFrame)})
def sort_by_calories(cereals):
sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
get_dagster_logger().info(f'Most caloric cereal: {sorted_cereals[-1]["name"]}')
The type metadata now appears in Dagit and the system will ensure the input and output to this op indeed match the criteria for SimpleDataFrame
. As usual, run:
dagit -f custom_types.py
You can see that the output of download_csv
(which by default has the name result
) is marked to be of type SimpleDataFrame
.
Now, if our op logic fails to return the right type, we'll see a type check failure, which will fail the job. Let's replace our download_csv
op with the following bad logic:
@op(out=Out(SimpleDataFrame))
def bad_download_csv():
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
get_dagster_logger().info(f"Read {len(lines)} lines")
return ["not_a_dict"]
When we run the job with this op, we'll see an error in your terminal like:
2021-10-18 13:15:37 - dagster - ERROR - custom_type_job - 66d26360-84bc-41a3-8848-fba271354673 - 16200 - bad_download_csv - STEP_FAILURE - Execution of step "bad_download_csv" failed.
dagster.core.errors.DagsterTypeCheckDidNotPass: Type check failed for step output "result" - expected type "SimpleDataFrame".
We will also see the error message in Dagit:
Custom types can also yield metadata about the type check. For example, in the case of our data frame, we might want to record the number of rows and columns in the dataset when our type checks succeed, and provide more information about why type checks failed when they fail. User-defined type check functions can optionally return a TypeCheck
object that contains metadata about the success or failure of the type check. Let's see how to use this to emit some summary statistics about our DataFrame type:
def less_simple_data_frame_type_check(_, value):
if not isinstance(value, list):
return TypeCheck(
success=False,
description=f"LessSimpleDataFrame should be a list of dicts, got {type(value)}",
)
fields = [field for field in value[0].keys()]
for i in range(len(value)):
row = value[i]
idx = i + 1
if not isinstance(row, dict):
return TypeCheck(
success=False,
description=(
f"LessSimpleDataFrame should be a list of dicts, got {type(row)} for row {idx}"
),
)
row_fields = [field for field in row.keys()]
if fields != row_fields:
return TypeCheck(
success=False,
description=(
f"Rows in LessSimpleDataFrame should have the same fields, got {row_fields} "
f"for row {idx}, expected {fields}"
),
)
return TypeCheck(
success=True,
description="LessSimpleDataFrame summary statistics",
metadata={
"n_rows": len(value),
"n_cols": len(value[0].keys()) if len(value) > 0 else 0,
"column_names": str(list(value[0].keys()) if len(value) > 0 else []),
},
)
A TypeCheck
must include a success
argument describing whether the check passed or failed, and may include a description and/or a list of EventMetadataEntry
objects. You should use the static constructors on EventMetadataEntry
to construct these objects, which are flexible enough to support arbitrary metadata in JSON or Markdown format.
Dagit knows how to display and archive structured metadata of this kind for future review: