dagster — Modern Data Orchestration#
What it is#
Dagster is a data orchestration platform centred on software-defined assets — Python objects (DataFrames, models, files, database tables) whose production logic you declare alongside the asset itself. Instead of thinking in terms of “tasks that run,” you declare “what data should exist and how to produce it.” Dagster derives the execution graph from asset dependencies, supports partitioning, and ships a full observability UI. It is the modern alternative to Airflow for Python-first teams.
Install#
pip install dagster dagster-webserver
Output: (none — exits 0 on success)
Quick example#
from dagster import asset, Definitions, materialize
@asset
def raw_data() -> list[dict]:
"""Simulated CSV load."""
return [
{"name": "Alice", "score": 92},
{"name": "Bob", "score": 78},
{"name": "Carol", "score": 85},
]
@asset
def high_scorers(raw_data: list[dict]) -> list[dict]:
"""Filter to scores above 80."""
return [r for r in raw_data if r["score"] > 80]
defs = Definitions(assets=[raw_data, high_scorers])
# Materialise both assets locally
result = materialize([raw_data, high_scorers])
print(result.output_for_node("high_scorers"))
Output:
[{'name': 'Alice', 'score': 92}, {'name': 'Carol', 'score': 85}]
When / why to use it#
- Data pipelines where lineage, reproducibility, and observability matter — asset graph makes dependencies explicit.
- ETL / ELT pipelines over databases, files, or APIs.
- ML feature pipelines where intermediate artefacts need versioning and re-use.
- Teams migrating from Airflow who want stronger typing and a better local dev story.
- Partitioned pipelines: daily, weekly, or by-category processing with automatic backfill.
Common pitfalls#
[!WARNING] Asset function return type must be serialisable — Dagster stores asset outputs in an I/O manager. The default I/O manager uses pickle. For large DataFrames, configure a Parquet or Arrow I/O manager instead.
[!WARNING]
@opvs@asset—@opis the lower-level “task” primitive (Dagster’s Airflow equivalent).@assetis the higher-level “declare what exists” primitive. Prefer@assetfor new pipelines;@opfor complex fan-out/fan-in graphs that don’t map to named data artefacts.
[!WARNING]
Definitionsmust be importable at module level — thedagster devserver and scheduler import your code to discover assets. Avoid side effects in module-level code; keep heavy initialisation inside functions or resources.
[!TIP]
dagster devlaunches the local UI athttp://localhost:3000. Use it during development to visualise the asset graph, trigger materialisations, and inspect logs — it is significantly better than reading logs from stdout.
[!TIP] Use
AssetInandOutputto attach metadata (row count, schema, file size) to asset outputs. The UI displays this metadata alongside the materialisation event for free observability.
Software-defined assets#
Assets are the core Dagster primitive. Each @asset function declares one data object and optionally depends on others by naming them as parameters.
import pandas as pd
from dagster import asset, AssetIn, Output, MetadataValue
@asset(group_name="raw")
def raw_orders() -> pd.DataFrame:
"""Load orders from CSV."""
df = pd.read_csv("orders.csv")
return df
@asset(group_name="processed", ins={"raw_orders": AssetIn()})
def cleaned_orders(raw_orders: pd.DataFrame) -> Output[pd.DataFrame]:
"""Remove invalid rows and normalise column names."""
df = raw_orders.dropna(subset=["order_id", "amount"])
df.columns = df.columns.str.lower().str.replace(" ", "_")
return Output(
df,
metadata={
"row_count": MetadataValue.int(len(df)),
"columns": MetadataValue.text(str(list(df.columns))),
},
)
@asset(group_name="processed")
def revenue_by_region(cleaned_orders: pd.DataFrame) -> pd.DataFrame:
"""Aggregate revenue per region."""
return cleaned_orders.groupby("region")["amount"].sum().reset_index()
Resources — injectable clients and connections#
Resources are reusable configured objects (database connections, API clients, file system handles) that are injected into asset and op functions. Define them once and share across all assets.
import pandas as pd
from dagster import asset, Definitions, EnvVar
from dagster_duckdb import DuckDBResource
@asset
def sales_summary(duckdb: DuckDBResource) -> pd.DataFrame:
"""Query the DuckDB warehouse."""
with duckdb.get_connection() as conn:
return conn.execute(
"SELECT region, SUM(amount) AS total FROM sales GROUP BY region"
).df()
defs = Definitions(
assets=[sales_summary],
resources={
"duckdb": DuckDBResource(database=EnvVar("DUCKDB_PATH")),
},
)
Jobs — targeted execution graphs#
A job selects a subset of assets (or ops) and their configuration for a single run. Use jobs to run different asset subsets on different schedules or with different configs.
from dagster import job, op, In, Out
@op
def fetch_data() -> list[dict]:
return [{"id": 1, "value": 42}, {"id": 2, "value": 7}]
@op(ins={"data": In()})
def filter_data(data: list[dict]) -> list[dict]:
return [d for d in data if d["value"] > 10]
@op(ins={"data": In()})
def save_data(data: list[dict]) -> None:
print(f"Saving {len(data)} records")
@job
def my_pipeline():
save_data(filter_data(fetch_data()))
For asset-based jobs:
from dagster import define_asset_job, AssetSelection
# Run all assets in the "processed" group
processed_job = define_asset_job("processed_job", AssetSelection.groups("processed"))
Schedules#
A schedule triggers a job on a cron expression.
from dagster import ScheduleDefinition, define_asset_job, AssetSelection, Definitions
daily_job = define_asset_job("daily_pipeline", AssetSelection.all())
daily_schedule = ScheduleDefinition(
job=daily_job,
cron_schedule="0 6 * * *", # 6 AM UTC daily
execution_timezone="UTC",
)
defs = Definitions(
assets=[raw_orders, cleaned_orders, revenue_by_region],
jobs=[daily_job],
schedules=[daily_schedule],
)
Sensors — event-driven triggers#
A sensor polls an external condition (new file, queue message, API event) and triggers a run when the condition is met.
import os
from dagster import sensor, RunRequest, SensorEvaluationContext, define_asset_job, AssetSelection, Definitions
ingest_job = define_asset_job("ingest_job", AssetSelection.all())
@sensor(job=ingest_job, minimum_interval_seconds=30)
def new_file_sensor(context: SensorEvaluationContext):
"""Trigger a run when a new CSV appears in the inbox directory."""
known_files = set(context.cursor.split(",")) if context.cursor else set()
inbox_files = {f for f in os.listdir("./inbox") if f.endswith(".csv")}
new_files = inbox_files - known_files
for filename in new_files:
yield RunRequest(run_key=filename, run_config={"filename": filename})
context.update_cursor(",".join(inbox_files))
Partitions — time and category slicing#
Partitions let you process data in named slices (dates, categories) and backfill or re-run individual partitions independently.
from dagster import DailyPartitionsDefinition, asset, Definitions
daily_partitions = DailyPartitionsDefinition(start_date="2026-01-01")
@asset(partitions_def=daily_partitions, group_name="raw")
def daily_events(context) -> list[dict]:
"""Load events for the given partition date."""
partition_date = context.partition_key # "2026-01-15"
# Load data for this specific date
return [{"date": partition_date, "events": 1000}]
@asset(partitions_def=daily_partitions, group_name="processed")
def daily_summary(daily_events: list[dict]) -> dict:
return {"total": sum(e["events"] for e in daily_events)}
Materialise a single partition:
dagster asset materialize --select daily_events --partition 2026-01-15
Output: (none — exits 0 on success)
Config — parameterising runs#
from dagster import asset, Config, Definitions
class FilterConfig(Config):
min_score: int = 80
region: str = "East"
@asset
def filtered_sales(config: FilterConfig) -> list[dict]:
"""Return sales filtered by config parameters."""
data = [
{"region": "East", "score": 92},
{"region": "West", "score": 78},
{"region": "East", "score": 85},
]
return [
d for d in data
if d["score"] >= config.min_score and d["region"] == config.region
]
defs = Definitions(assets=[filtered_sales])
Launch with config:
from dagster import materialize
result = materialize(
[filtered_sales],
run_config={"ops": {"filtered_sales": {"config": {"min_score": 85, "region": "East"}}}},
)
Testing assets#
from dagster import materialize, build_asset_context
from my_pipeline import raw_orders, cleaned_orders
def test_cleaned_orders():
import pandas as pd
sample = pd.DataFrame({
"Order_ID": [1, None, 3],
"Amount": [100.0, 200.0, None],
"Region": ["East", "West", "East"],
})
context = build_asset_context()
result = cleaned_orders(context, raw_orders=sample)
# None in Order_ID and Amount should be dropped
assert len(result.value) == 1
assert "order_id" in result.value.columns # normalised to lowercase
def test_pipeline_end_to_end():
result = materialize([raw_orders, cleaned_orders])
assert result.success
df = result.output_for_node("cleaned_orders")
assert len(df) > 0
Launching the UI#
dagster dev # starts UI at http://localhost:3000
Output: (none — exits 0 on success)
The UI shows the full asset graph, per-asset materialisation history, run logs, schedules, sensors, and partition status.
Quick reference#
| Task | Code |
|---|---|
| Define asset | @asset def my_asset(...) -> T: |
| Asset dependency | parameter name matches upstream asset name |
| Asset with metadata | return Output(value, metadata={"rows": MetadataValue.int(n)}) |
| Group assets | @asset(group_name="raw") |
| Resource | @asset def fn(db: MyResource) + Definitions(resources={"db": MyResource(...)}) |
| Define job | define_asset_job("name", AssetSelection.groups("g")) |
| Schedule | ScheduleDefinition(job=job, cron_schedule="0 6 * * *") |
| Sensor | @sensor(job=job) def s(context): yield RunRequest(run_key=...) |
| Partition | @asset(partitions_def=DailyPartitionsDefinition(...)) |
| Config | class C(Config): field: type → @asset def fn(config: C) |
| Materialise locally | materialize([asset1, asset2]) |
| Run UI | dagster dev |
| Run job | dagster job execute -j job_name |