Workflows & Actions
Waymark has exactly two primitives for user code. Once you have a feel for the line between them, the rest of the system mostly falls out.
Two primitives
Actions are the distributed work. Network calls, database queries,
file I/O, anything that can fail and that you'd want to retry independently.
An action is a single async def decorated with @action. Each invocation
gets a row in Postgres, gets claimed by a worker, runs to completion or
failure, and the result is persisted before the workflow advances.
Workflows are the durable control flow. Loops, conditionals, parallel
branches, try/except. They orchestrate actions but don't do heavy lifting
themselves. A workflow is a class that subclasses Workflow and is
decorated with @workflow. The class body has one method that matters,
run(), and that body is what Waymark parses into a DAG.
A real example, slightly trimmed from the example app:
import asyncio
from waymark import Workflow, action, workflow
@action
async def compute_factorial(n: int) -> int:
...
@action
async def compute_fibonacci(n: int) -> int:
...
@action
async def summarize(*, n: int, factorial_value: int, fibonacci_value: int) -> str:
...
@workflow
class ParallelMathWorkflow(Workflow):
async def run(self, number: int) -> str:
factorial_value, fibonacci_value = await asyncio.gather(
compute_factorial(number),
compute_fibonacci(number),
return_exceptions=True,
)
return await summarize(
n=number,
factorial_value=factorial_value,
fibonacci_value=fibonacci_value,
)
The @workflow body is the durable thing. The three @action calls are
the distributed work.
Actions
Actions are written like any other async Python function:
@action
async def fetch_user(user_id: str) -> User:
return await db.fetch_one(User, user_id)
A few things to know:
- Calling an action inside a workflow returns a deferred handle. It's
only when you
awaitit that the action gets queued. - An action that raises an exception is, by default, marked failed and
not retried. You opt into retries by wrapping the call in
self.run_action(...)and providing aRetryPolicy. See Retries & Timeouts. - Actions can take any pydantic-serializable arguments and return any pydantic-serializable result. The arguments and result get round-tripped through Postgres, so they need to survive serialization.
- Actions can use anything Python lets them:
random(),datetime.now(), HTTP clients, file handles. Non-determinism lives in actions, not in workflows.
Workflows
A workflow has one job: orchestrate actions through control flow. The
constraints on the body of run() come from what the AST compiler can
turn into DAG nodes - loops, conditionals, parallel gather, try/except,
durable sleep, returns. Anything outside that vocabulary will be flagged
at registration time.
The workflow body never re-runs after the first compilation. That means it shouldn't have side effects or non-determinism - but unlike replay-based engines, you don't enforce that with a runtime determinism check. You enforce it by pushing side effects out into actions.
Workflows can also call themselves recursively, can be scheduled on a cron,
and can pause indefinitely with await asyncio.sleep(...) (durable sleep
that survives worker restarts).
Dependency injection
Actions can declare dependencies the same way Mountaineer and FastAPI controllers do:
from typing import Annotated
from waymark import Depends, action
@action
async def send_email(
to: str,
subject: str,
emailer: Annotated[EmailClient, Depends(get_email_client)],
) -> None:
await emailer.send(to=to, subject=subject)
The Depends helper is re-exported from mountaineer-di; the older
Depend(...) spelling is kept as an alias. Dependencies are resolved per
action invocation by the worker, so any per-request lifecycle (database
sessions, scoped clients) is handled the same way you'd handle it in a web
handler.
Workflow bodies don't take injected dependencies - they don't run on a worker, they ran once at compile time. If a workflow needs a database handle, it asks for it via an action.
How the line gets enforced
Waymark's compiler walks the AST of run() and refuses anything it
doesn't recognize. Common things you might reach for that don't work
inside a workflow body:
f-strings constructed from action results (build them inside an action instead).- Direct construction of pydantic models from action results - return a built result from a final "build" action instead.
- Calling functions that aren't decorated with
@action.
The error you get back at registration tells you what wasn't recognized and what to do instead. The Control Flow guide walks through the patterns the compiler supports today, with examples for each.