Control Flow
Anything inside a workflow's run() method is parsed into Waymark IR at
registration time, then compiled to a DAG. The patterns below are what the
parser recognizes today. They cover almost everything you'd actually want
to write - and where they don't, the compiler tells you up front, not at
recovery time.
The samples in this guide all come from the example app, which exercises every pattern end-to-end against Postgres.
Sequential chains
The simplest case: one action's result feeds the next.
@workflow
class SequentialChainWorkflow(Workflow):
async def run(self, text: str) -> ChainResult:
step1 = await step_uppercase(text)
step2 = await step_reverse(step1)
step3 = await step_add_stars(step2)
return await build_chain_result(text, step1, step2, step3)
Each await becomes an action node. The DAG has a straight-line edge from
each action to the next, with the prior result threaded into the next
action's inbox.
Parallel fan-out
asyncio.gather is the parallel primitive. The compiler turns it into a
fan-out / fan-in pair:
@workflow
class ParallelMathWorkflow(Workflow):
async def run(self, number: int) -> ComputationResult:
factorial_value, fib_value = await asyncio.gather(
compute_factorial(number),
compute_fibonacci(number),
return_exceptions=True,
)
return await summarize_math(
input_number=number,
factorial_value=factorial_value,
fibonacci_value=fib_value,
)
return_exceptions=True is supported and means failures propagate as
values, not as raised exceptions - useful when you want to inspect partial
results in a downstream action.
You can also spread a list comprehension into gather for variable-width
fan-outs:
results = await asyncio.gather(
*[send_email(to=user.email, subject="Welcome") for user in active_users],
return_exceptions=True,
)
Empty spreads are handled correctly - a gather over an empty list
produces an empty result without dispatching any action invocations.
Conditionals
Plain if / elif / else works. Each branch becomes a subgraph of the
DAG; the runner picks the branch at execution time and routes the data
flow accordingly.
@workflow
class ConditionalBranchWorkflow(Workflow):
async def run(self, value: int) -> BranchResult:
if value >= 75:
return await evaluate_high(value)
elif value >= 25:
return await evaluate_medium(value)
else:
return await evaluate_low(value)
if without else also works - control flow falls through to whatever
comes next. This is the common "guard with default" shape:
@workflow
class GuardFallbackWorkflow(Workflow):
async def run(self, user: str) -> GuardFallbackResult:
notes = await fetch_recent_notes(user)
summary = "no notes found"
if notes:
summary = await summarize_notes(notes)
return await build_guard_fallback_result(user, len(notes), summary)
Loops
for loops compile into a DAG sub-pattern that processes items one at a
time and accumulates results. The accumulator pattern works as expected:
@workflow
class LoopProcessingWorkflow(Workflow):
async def run(self, items: list[str]) -> LoopResult:
processed = []
for item in items:
result = await process_item(item)
processed.append(result)
return await build_loop_result(items, processed)
while-style loops are written as for _ in range(limit): against a
counter - same compiled shape:
@workflow
class WhileLoopWorkflow(Workflow):
async def run(self, limit: int) -> WhileLoopResult:
current = 0
iterations = 0
for _ in range(limit):
current = await increment_counter(current)
iterations = iterations + 1
return await build_while_result(
limit=limit, final=current, iterations=iterations,
)
Try / except
try/except is the recovery primitive. The compiler turns the except
arm into a sibling sub-graph that's reached when the action raises a
matching exception.
@workflow
class ErrorHandlingWorkflow(Workflow):
async def run(self, should_fail: bool) -> ErrorResult:
recovered = False
message = ""
try:
result = await self.run_action(
risky_action(should_fail),
retry=RetryPolicy(attempts=1), # don't retry, let it fail
)
message = await success_action(result)
except IntentionalError:
message = await recovery_action("IntentionalError was caught")
recovered = True
return await build_error_result(True, recovered, message)
Two notes:
self.run_action(...)is the form to use when you want a per-callRetryPolicy,BackoffPolicy, ortimeout. A bareawait some_action(...)uses the defaults.- The compiler matches exceptions by class name, so the exception type needs to be importable at workflow-registration time on the worker.
try/except also works inside loops - useful for "process each item, skip
the failures":
@workflow
class LoopExceptionWorkflow(Workflow):
async def run(self, items: list[str]) -> LoopExceptionResult:
processed: list[str] = []
error_count = 0
for item in items:
try:
result = await self.run_action(
process_item_may_fail(item),
retry=RetryPolicy(attempts=1),
)
processed.append(result)
except ItemProcessingError:
error_count = error_count + 1
return await build_loop_exception_result(items, processed, error_count)
Durable sleep
await asyncio.sleep(seconds) is recognized specially. It's a durable
sleep - the workflow record is evicted from the runner's in-memory state
and rehydrated when the sleep deadline arrives, so a multi-hour pause
costs almost nothing while it's pending.
@workflow
class DurableSleepWorkflow(Workflow):
async def run(self, seconds: int) -> SleepResult:
started = await get_timestamp()
await asyncio.sleep(seconds) # survives worker restarts
resumed = await get_timestamp()
return await format_sleep_result(started, resumed, seconds)
This is also fine inside loops - useful for polling or for slow drip behaviors:
for i in range(iterations):
await asyncio.sleep(sleep_seconds)
await perform_loop_action(iteration=i + 1)
Early return
Returning from inside a branch - including from inside a loop - works:
@workflow
class LoopReturnWorkflow(Workflow):
async def run(self, items: list[int], needle: int) -> LoopReturnResult:
checked = 0
for value in items:
checked += 1
if await matches_needle(value=value, needle=needle):
return await build_loop_return_result(
items=items, needle=needle,
found=True, value=value, checked=checked,
)
return await build_loop_return_result(
items=items, needle=needle,
found=False, value=None, checked=checked,
)
What does not compile (yet)
The compiler is honest about what it can and can't see. Common gotchas:
- f-strings against action results. Build the formatted value inside a follow-up action.
- Constructing pydantic models in
return. Move the construction into a dedicated "build result" action, then return its result. - Calling helpers that aren't
@action-decorated. Anything called inside a workflow body needs to be either a recognized control-flow primitive or an action. - References to module-level globals from inside the workflow body. Pass them in through workflow inputs or fetch them inside an action.
The diagnostic on registration tells you the offending node and offers a
fix. If you'd like to see the compiled DAG visually, every workflow has a
.visualize() method that emits a Graphviz rendering - handy for
verifying the shape matches your mental model.