skip to content

dagster — Modern Data Orchestration

Build, schedule, and observe data pipelines as software-defined assets with Dagster. Covers assets, jobs, schedules, sensors, resources, partitions, and the Dagster UI.

6 min read 15 snippets deep dive

dagster — Modern Data Orchestration#

What it is#

Dagster is a data orchestration platform centred on software-defined assets — Python objects (DataFrames, models, files, database tables) whose production logic you declare alongside the asset itself. Instead of thinking in terms of “tasks that run,” you declare “what data should exist and how to produce it.” Dagster derives the execution graph from asset dependencies, supports partitioning, and ships a full observability UI. It is the modern alternative to Airflow for Python-first teams.

Install#

pip install dagster dagster-webserver

Output: (none — exits 0 on success)

Quick example#

from dagster import asset, Definitions, materialize

@asset
def raw_data() -> list[dict]:
    """Simulated CSV load."""
    return [
        {"name": "Alice", "score": 92},
        {"name": "Bob",   "score": 78},
        {"name": "Carol", "score": 85},
    ]

@asset
def high_scorers(raw_data: list[dict]) -> list[dict]:
    """Filter to scores above 80."""
    return [r for r in raw_data if r["score"] > 80]

defs = Definitions(assets=[raw_data, high_scorers])

# Materialise both assets locally
result = materialize([raw_data, high_scorers])
print(result.output_for_node("high_scorers"))

Output:

[{'name': 'Alice', 'score': 92}, {'name': 'Carol', 'score': 85}]

When / why to use it#

  • Data pipelines where lineage, reproducibility, and observability matter — asset graph makes dependencies explicit.
  • ETL / ELT pipelines over databases, files, or APIs.
  • ML feature pipelines where intermediate artefacts need versioning and re-use.
  • Teams migrating from Airflow who want stronger typing and a better local dev story.
  • Partitioned pipelines: daily, weekly, or by-category processing with automatic backfill.

Common pitfalls#

[!WARNING] Asset function return type must be serialisable — Dagster stores asset outputs in an I/O manager. The default I/O manager uses pickle. For large DataFrames, configure a Parquet or Arrow I/O manager instead.

[!WARNING] @op vs @asset@op is the lower-level “task” primitive (Dagster’s Airflow equivalent). @asset is the higher-level “declare what exists” primitive. Prefer @asset for new pipelines; @op for complex fan-out/fan-in graphs that don’t map to named data artefacts.

[!WARNING] Definitions must be importable at module level — the dagster dev server and scheduler import your code to discover assets. Avoid side effects in module-level code; keep heavy initialisation inside functions or resources.

[!TIP] dagster dev launches the local UI at http://localhost:3000. Use it during development to visualise the asset graph, trigger materialisations, and inspect logs — it is significantly better than reading logs from stdout.

[!TIP] Use AssetIn and Output to attach metadata (row count, schema, file size) to asset outputs. The UI displays this metadata alongside the materialisation event for free observability.

Software-defined assets#

Assets are the core Dagster primitive. Each @asset function declares one data object and optionally depends on others by naming them as parameters.

import pandas as pd
from dagster import asset, AssetIn, Output, MetadataValue

@asset(group_name="raw")
def raw_orders() -> pd.DataFrame:
    """Load orders from CSV."""
    df = pd.read_csv("orders.csv")
    return df

@asset(group_name="processed", ins={"raw_orders": AssetIn()})
def cleaned_orders(raw_orders: pd.DataFrame) -> Output[pd.DataFrame]:
    """Remove invalid rows and normalise column names."""
    df = raw_orders.dropna(subset=["order_id", "amount"])
    df.columns = df.columns.str.lower().str.replace(" ", "_")
    return Output(
        df,
        metadata={
            "row_count": MetadataValue.int(len(df)),
            "columns": MetadataValue.text(str(list(df.columns))),
        },
    )

@asset(group_name="processed")
def revenue_by_region(cleaned_orders: pd.DataFrame) -> pd.DataFrame:
    """Aggregate revenue per region."""
    return cleaned_orders.groupby("region")["amount"].sum().reset_index()

Resources — injectable clients and connections#

Resources are reusable configured objects (database connections, API clients, file system handles) that are injected into asset and op functions. Define them once and share across all assets.

import pandas as pd
from dagster import asset, Definitions, EnvVar
from dagster_duckdb import DuckDBResource

@asset
def sales_summary(duckdb: DuckDBResource) -> pd.DataFrame:
    """Query the DuckDB warehouse."""
    with duckdb.get_connection() as conn:
        return conn.execute(
            "SELECT region, SUM(amount) AS total FROM sales GROUP BY region"
        ).df()

defs = Definitions(
    assets=[sales_summary],
    resources={
        "duckdb": DuckDBResource(database=EnvVar("DUCKDB_PATH")),
    },
)

Jobs — targeted execution graphs#

A job selects a subset of assets (or ops) and their configuration for a single run. Use jobs to run different asset subsets on different schedules or with different configs.

from dagster import job, op, In, Out

@op
def fetch_data() -> list[dict]:
    return [{"id": 1, "value": 42}, {"id": 2, "value": 7}]

@op(ins={"data": In()})
def filter_data(data: list[dict]) -> list[dict]:
    return [d for d in data if d["value"] > 10]

@op(ins={"data": In()})
def save_data(data: list[dict]) -> None:
    print(f"Saving {len(data)} records")

@job
def my_pipeline():
    save_data(filter_data(fetch_data()))

For asset-based jobs:

from dagster import define_asset_job, AssetSelection

# Run all assets in the "processed" group
processed_job = define_asset_job("processed_job", AssetSelection.groups("processed"))

Schedules#

A schedule triggers a job on a cron expression.

from dagster import ScheduleDefinition, define_asset_job, AssetSelection, Definitions

daily_job = define_asset_job("daily_pipeline", AssetSelection.all())

daily_schedule = ScheduleDefinition(
    job=daily_job,
    cron_schedule="0 6 * * *",   # 6 AM UTC daily
    execution_timezone="UTC",
)

defs = Definitions(
    assets=[raw_orders, cleaned_orders, revenue_by_region],
    jobs=[daily_job],
    schedules=[daily_schedule],
)

Sensors — event-driven triggers#

A sensor polls an external condition (new file, queue message, API event) and triggers a run when the condition is met.

import os
from dagster import sensor, RunRequest, SensorEvaluationContext, define_asset_job, AssetSelection, Definitions

ingest_job = define_asset_job("ingest_job", AssetSelection.all())

@sensor(job=ingest_job, minimum_interval_seconds=30)
def new_file_sensor(context: SensorEvaluationContext):
    """Trigger a run when a new CSV appears in the inbox directory."""
    known_files = set(context.cursor.split(",")) if context.cursor else set()
    inbox_files = {f for f in os.listdir("./inbox") if f.endswith(".csv")}
    new_files = inbox_files - known_files

    for filename in new_files:
        yield RunRequest(run_key=filename, run_config={"filename": filename})

    context.update_cursor(",".join(inbox_files))

Partitions — time and category slicing#

Partitions let you process data in named slices (dates, categories) and backfill or re-run individual partitions independently.

from dagster import DailyPartitionsDefinition, asset, Definitions

daily_partitions = DailyPartitionsDefinition(start_date="2026-01-01")

@asset(partitions_def=daily_partitions, group_name="raw")
def daily_events(context) -> list[dict]:
    """Load events for the given partition date."""
    partition_date = context.partition_key   # "2026-01-15"
    # Load data for this specific date
    return [{"date": partition_date, "events": 1000}]

@asset(partitions_def=daily_partitions, group_name="processed")
def daily_summary(daily_events: list[dict]) -> dict:
    return {"total": sum(e["events"] for e in daily_events)}

Materialise a single partition:

dagster asset materialize --select daily_events --partition 2026-01-15

Output: (none — exits 0 on success)

Config — parameterising runs#

from dagster import asset, Config, Definitions

class FilterConfig(Config):
    min_score: int = 80
    region: str = "East"

@asset
def filtered_sales(config: FilterConfig) -> list[dict]:
    """Return sales filtered by config parameters."""
    data = [
        {"region": "East", "score": 92},
        {"region": "West", "score": 78},
        {"region": "East", "score": 85},
    ]
    return [
        d for d in data
        if d["score"] >= config.min_score and d["region"] == config.region
    ]

defs = Definitions(assets=[filtered_sales])

Launch with config:

from dagster import materialize

result = materialize(
    [filtered_sales],
    run_config={"ops": {"filtered_sales": {"config": {"min_score": 85, "region": "East"}}}},
)

Testing assets#

from dagster import materialize, build_asset_context
from my_pipeline import raw_orders, cleaned_orders

def test_cleaned_orders():
    import pandas as pd
    sample = pd.DataFrame({
        "Order_ID": [1, None, 3],
        "Amount":   [100.0, 200.0, None],
        "Region":   ["East", "West", "East"],
    })
    context = build_asset_context()
    result = cleaned_orders(context, raw_orders=sample)
    # None in Order_ID and Amount should be dropped
    assert len(result.value) == 1
    assert "order_id" in result.value.columns   # normalised to lowercase

def test_pipeline_end_to_end():
    result = materialize([raw_orders, cleaned_orders])
    assert result.success
    df = result.output_for_node("cleaned_orders")
    assert len(df) > 0

Launching the UI#

dagster dev            # starts UI at http://localhost:3000

Output: (none — exits 0 on success)

The UI shows the full asset graph, per-asset materialisation history, run logs, schedules, sensors, and partition status.

Quick reference#

TaskCode
Define asset@asset def my_asset(...) -> T:
Asset dependencyparameter name matches upstream asset name
Asset with metadatareturn Output(value, metadata={"rows": MetadataValue.int(n)})
Group assets@asset(group_name="raw")
Resource@asset def fn(db: MyResource) + Definitions(resources={"db": MyResource(...)})
Define jobdefine_asset_job("name", AssetSelection.groups("g"))
ScheduleScheduleDefinition(job=job, cron_schedule="0 6 * * *")
Sensor@sensor(job=job) def s(context): yield RunRequest(run_key=...)
Partition@asset(partitions_def=DailyPartitionsDefinition(...))
Configclass C(Config): field: type@asset def fn(config: C)
Materialise locallymaterialize([asset1, asset2])
Run UIdagster dev
Run jobdagster job execute -j job_name