skip to content

prefect — Workflow Orchestration

Build, schedule, and observe Python workflows with Prefect. Covers flows, tasks, retries, schedules, deployments, caching, concurrency, and Prefect Cloud.

7 min read 19 snippets deep dive

prefect — Workflow Orchestration#

What it is#

Prefect is a Python workflow orchestration framework where you decorate ordinary Python functions with @flow and @task to get scheduling, retries, caching, logging, observability, and a UI — all with minimal boilerplate. Unlike Dagster’s asset-centric model, Prefect is function-centric: you write imperative Python that calls tasks, and Prefect tracks the execution graph at runtime. It is the most approachable orchestrator for Python developers who want to “add orchestration” to existing scripts with minimal refactoring.

Install#

pip install prefect

Output: (none — exits 0 on success)

Quick example#

from prefect import flow, task

@task
def fetch_data(url: str) -> list[dict]:
    import httpx
    return httpx.get(url).json()

@task
def process(records: list[dict]) -> list[dict]:
    return [r for r in records if r.get("active")]

@flow(name="My Pipeline")
def pipeline(url: str) -> list[dict]:
    raw = fetch_data(url)
    return process(raw)

if __name__ == "__main__":
    result = pipeline("https://httpbin.org/json")
    print(result)

Output:

15:42:01.234 | INFO    | prefect.engine - Created flow run 'nimble-falcon' for flow 'My Pipeline'
15:42:01.401 | INFO    | Task run 'fetch_data' - Finished in state Completed()
15:42:01.412 | INFO    | Task run 'process' - Finished in state Completed()
15:42:01.420 | INFO    | Flow run 'nimble-falcon' - Finished in state Completed()
[]

When / why to use it#

  • Adding retries, logging, and scheduling to existing Python scripts with two decorators.
  • ETL pipelines that need caching (skip re-running tasks whose inputs haven’t changed).
  • Data workflows that run on a cron or are triggered by an event.
  • Teams that want a UI (local or Prefect Cloud) to monitor runs without setting up Airflow.
  • Concurrent fan-out: submit dozens of tasks in parallel and collect results with wait().

Common pitfalls#

[!WARNING] Mutating state across tasks — tasks should be pure functions. Passing mutable objects between tasks and modifying them in place breaks Prefect’s state tracking and caching. Return new objects instead.

[!WARNING] @task functions called outside a @flow — calling a @task function outside a flow context runs it as a plain Python function with no orchestration. Always call tasks from within a flow.

[!WARNING] Large return values — Prefect serialises task results to its result storage (local or cloud). Returning a 2 GB DataFrame from a task causes slow serialisation and potential OOM. Use files or external storage and return a path/URI instead.

[!TIP] task.submit() submits a task asynchronously and returns a PrefectFuture. Collect results with .result(). This is the idiomatic way to parallelise tasks in a flow.

[!TIP] Use flow.serve() to run a long-lived deployment locally without Prefect Cloud or a separate agent: pipeline.serve(name="local-deploy", cron="0 6 * * *").

Retries and timeouts#

Prefect handles retries and timeouts at both the task and flow level with decorator parameters.

from prefect import flow, task
from datetime import timedelta

@task(
    retries=3,
    retry_delay_seconds=10,
    timeout_seconds=30,
    log_prints=True,
)
def unreliable_api_call(endpoint: str) -> dict:
    import httpx, random
    if random.random() < 0.5:
        raise ConnectionError("Simulated network failure")
    print(f"Called {endpoint} successfully")
    return {"status": "ok"}

@flow(
    retries=1,
    timeout_seconds=120,
    log_prints=True,
)
def robust_pipeline():
    result = unreliable_api_call("https://api.example.com/data")
    print(f"Result: {result}")

robust_pipeline()

Output (with retry):

INFO | Task run 'unreliable_api_call' - Retrying in 10.0 seconds...
INFO | Task run 'unreliable_api_call' - Called https://api.example.com/data successfully
INFO | Task run 'unreliable_api_call' - Finished in state Completed()
INFO | Result: {'status': 'ok'}

Caching — skip re-running unchanged tasks#

The cache_key_fn parameter tells Prefect when two task calls are equivalent and the cached result can be reused. task_input_hash uses the function name and all input arguments as the cache key.

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import time

@task(
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1),
    log_prints=True,
)
def expensive_query(sql: str) -> list[dict]:
    print(f"Running query: {sql}")
    time.sleep(2)  # simulate slow DB call
    return [{"result": 42}]

@flow
def pipeline():
    # First call — runs the query
    r1 = expensive_query("SELECT * FROM big_table")
    # Second call with same input — returns cached result instantly
    r2 = expensive_query("SELECT * FROM big_table")
    print(r1, r2)

pipeline()

Output:

INFO | Running query: SELECT * FROM big_table   ← runs once
INFO | [{'result': 42}] [{'result': 42}]

Parallel task submission#

task.submit() returns a PrefectFuture immediately without blocking. Collect all futures at the end with wait() or iterate to call .result().

from prefect import flow, task
from prefect.futures import wait
import time

@task(log_prints=True)
def process_item(item: int) -> int:
    time.sleep(0.5)
    return item ** 2

@flow
def parallel_pipeline(items: list[int]) -> list[int]:
    futures = [process_item.submit(item) for item in items]
    wait(futures)
    return [f.result() for f in futures]

result = parallel_pipeline(list(range(10)))
print(result)

Output:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Subflows — flows calling flows#

Flows can call other flows. The sub-flow runs as a nested tracked unit with its own run ID and logs.

from prefect import flow, task

@task
def validate(data: list) -> bool:
    return len(data) > 0

@flow(name="transform")
def transform_flow(data: list) -> list:
    if validate(data):
        return [x * 2 for x in data]
    return []

@flow(name="main")
def main_flow():
    raw = [1, 2, 3, 4, 5]
    result = transform_flow(raw)   # sub-flow call
    print(f"Transformed: {result}")

main_flow()

Output:

INFO | Created flow run 'crimson-hawk' for flow 'main'
INFO | Created subflow run for flow 'transform'
INFO | Transformed: [2, 4, 6, 8, 10]

Schedules#

Attach a cron or interval schedule to a flow at serve() or deploy() time.

from prefect import flow
from prefect.schedules import CronSchedule, IntervalSchedule
from datetime import timedelta

@flow(log_prints=True)
def daily_report():
    print("Generating daily report...")

# Local serve — runs the flow on schedule in the foreground process
if __name__ == "__main__":
    daily_report.serve(
        name="daily-report",
        cron="0 8 * * *",            # 8 AM UTC daily
        timezone="America/New_York",
    )

For interval scheduling:

daily_report.serve(
    name="every-6h",
    interval=timedelta(hours=6),
)

Deployments — production scheduling#

A deployment registers a flow with the Prefect server so it can be scheduled, triggered via API, or run by a worker.

# deploy.py
from prefect import flow

@flow
def my_pipeline(env: str = "prod"):
    print(f"Running in {env}")

if __name__ == "__main__":
    my_pipeline.deploy(
        name="prod-pipeline",
        work_pool_name="local-process-pool",
        cron="0 6 * * *",
        parameters={"env": "prod"},
    )
# Create a local work pool, then run the worker
prefect work-pool create --type process local-process-pool
prefect worker start --pool local-process-pool
python deploy.py

Output: (none — exits 0 on success)

Logging and state hooks#

from prefect import flow, task
from prefect.states import State

@task(log_prints=True)
def my_task():
    print("task running")
    return 42

def on_failure(flow, flow_run, state):
    print(f"Flow {flow.name} failed: {state.message}")

def on_completion(flow, flow_run, state):
    print(f"Flow {flow.name} completed successfully")

@flow(
    on_failure=[on_failure],
    on_completion=[on_completion],
    log_prints=True,
)
def instrumented_flow():
    result = my_task()
    print(f"Result: {result}")

instrumented_flow()

Concurrency limits#

Use concurrency_limit to cap how many tasks run simultaneously, preventing resource exhaustion.

from prefect import flow, task
from prefect.concurrency.sync import concurrency

@task
def api_call(endpoint: str) -> dict:
    with concurrency("external-api", occupy=1):
        import httpx
        return httpx.get(endpoint).json()

@flow
def batch_requests(endpoints: list[str]) -> list[dict]:
    futures = [api_call.submit(ep) for ep in endpoints]
    return [f.result() for f in futures]

Create the concurrency limit once:

prefect concurrency-limit create external-api 5

Output: (none — exits 0 on success)

Starting the UI#

prefect server start      # local server + UI at http://localhost:4200

Output: (none — exits 0 on success)

Quick reference#

TaskCode
Define flow@flow def my_flow(): ...
Define task@task def my_task(): ...
Call task (blocking)result = my_task(arg)
Submit task (async)future = my_task.submit(arg)
Wait for futureswait(futures)
Get future resultfuture.result()
Retries@task(retries=3, retry_delay_seconds=10)
Timeout@task(timeout_seconds=30)
Cache@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
Log prints@task(log_prints=True)
Cron scheduleflow.serve(name="x", cron="0 6 * * *")
Interval scheduleflow.serve(name="x", interval=timedelta(hours=6))
Deploy to workerflow.deploy(name="x", work_pool_name="pool", cron="...")
Concurrency limitwith concurrency("key", occupy=1):
Start UIprefect server start