Control Flow
Anything inside a workflow's run() method is parsed into Waymark IR at
registration time, then compiled into the program the runtime executes.
The patterns below are what the compiler recognizes today. They cover almost everything you'd actually want
to write - and where they don't, the compiler tells you up front, not at
recovery time.
Every pattern in this guide runs end-to-end against Postgres today.
Sequential chains
The simplest case: one action's result feeds the next.
@workflow
class SequentialChainWorkflow(Workflow):
async def run(self, text: str) -> ChainResult:
step1 = await step_uppercase(text)
step2 = await step_reverse(step1)
step3 = await step_add_stars(step2)
return await build_chain_result(text, step1, step2, step3)
Each await compiles to an action dispatch. The runtime executes them in
order, threading each result into the next action's inputs.
Parallel fan-out
asyncio.gather is the parallel primitive. The compiler turns it into a
fan-out / fan-in pair:
@workflow
class ParallelMathWorkflow(Workflow):
async def run(self, number: int) -> ComputationResult:
factorial_value, fib_value = await asyncio.gather(
compute_factorial(number),
compute_fibonacci(number),
return_exceptions=True,
)
return await summarize_math(
input_number=number,
factorial_value=factorial_value,
fibonacci_value=fib_value,
)
return_exceptions=True is required - the compiler rejects a gather
without it. Failures propagate as values, not as raised exceptions, so a
downstream action can inspect partial results instead of losing the whole
fan-out to one bad item.
You can also spread a list comprehension into gather for variable-width
fan-outs:
results = await asyncio.gather(
*[send_email(to=user.email, subject="Welcome") for user in active_users],
return_exceptions=True,
)
Empty spreads are handled correctly - a gather over an empty list
produces an empty result without dispatching any action invocations.
Conditionals
Plain if / elif / else works. Each branch compiles separately; the
runtime evaluates the condition at execution time and runs only the branch
that's taken.
@workflow
class ConditionalBranchWorkflow(Workflow):
async def run(self, value: int) -> BranchResult:
if value >= 75:
return await evaluate_high(value)
elif value >= 25:
return await evaluate_medium(value)
else:
return await evaluate_low(value)
if without else also works - control flow falls through to whatever
comes next. This is the common "guard with default" shape:
@workflow
class GuardFallbackWorkflow(Workflow):
async def run(self, user: str) -> GuardFallbackResult:
notes = await fetch_recent_notes(user)
summary = "no notes found"
if notes:
summary = await summarize_notes(notes)
return await build_guard_fallback_result(user, len(notes), summary)
Loops
for loops process items one at a time and accumulate results. The
accumulator pattern works as expected:
@workflow
class LoopProcessingWorkflow(Workflow):
async def run(self, items: list[str]) -> LoopResult:
processed = []
for item in items:
result = await process_item(item)
processed.append(result)
return await build_loop_result(items, processed)
while loops compile too, with break and continue working the way
you'd expect:
@workflow
class WhileLoopWorkflow(Workflow):
async def run(self, target: int) -> WhileLoopResult:
current = 0
iterations = 0
while current < target:
current = await increment_counter(current)
iterations = iterations + 1
return await build_while_result(
target=target, final=current, iterations=iterations,
)
Try / except
try/except is the recovery primitive. The compiler lowers the except
arm into a recovery path the runtime takes when the action raises a
matching exception.
@workflow
class ErrorHandlingWorkflow(Workflow):
async def run(self, should_fail: bool) -> ErrorResult:
recovered = False
message = ""
try:
result = await self.run_action(
risky_action(should_fail),
retry=RetryPolicy(attempts=1), # don't retry, let it fail
)
message = await success_action(result)
except IntentionalError:
message = await recovery_action("IntentionalError was caught")
recovered = True
return await build_error_result(True, recovered, message)
Two notes:
self.run_action(...)is the form to use when you want a per-callretry=RetryPolicy(...)ortimeout=. A bareawait some_action(...)uses the defaults. Note thatRetryPolicy()withattemptsomitted defaults to a budget of 100 retries; passattempts=1when you want a failure to surface immediately.- The compiler matches exceptions by class name, so the exception type needs to be importable at workflow-registration time on the worker.
try/except also works inside loops - useful for "process each item, skip
the failures":
@workflow
class LoopExceptionWorkflow(Workflow):
async def run(self, items: list[str]) -> LoopExceptionResult:
processed: list[str] = []
error_count = 0
for item in items:
try:
result = await self.run_action(
process_item_may_fail(item),
retry=RetryPolicy(attempts=1),
)
processed.append(result)
except ItemProcessingError:
error_count = error_count + 1
return await build_loop_exception_result(items, processed, error_count)
Durable sleep
await asyncio.sleep(seconds) is recognized specially. It's a durable
sleep - the workflow record is evicted from the runner's in-memory state
and rehydrated when the sleep deadline arrives, so a multi-hour pause
costs almost nothing while it's pending.
@workflow
class DurableSleepWorkflow(Workflow):
async def run(self, seconds: int) -> SleepResult:
started = await get_timestamp()
await asyncio.sleep(seconds) # survives worker restarts
resumed = await get_timestamp()
return await format_sleep_result(started, resumed, seconds)
This is also fine inside loops - useful for polling or for slow drip behaviors:
for i in range(iterations):
await asyncio.sleep(sleep_seconds)
await perform_loop_action(iteration=i + 1)
Early return
Returning from inside a branch - including from inside a loop - works:
@workflow
class LoopReturnWorkflow(Workflow):
async def run(self, items: list[int], needle: int) -> LoopReturnResult:
checked = 0
for value in items:
checked += 1
if await matches_needle(value=value, needle=needle):
return await build_loop_return_result(
items=items, needle=needle,
found=True, value=value, checked=checked,
)
return await build_loop_return_result(
items=items, needle=needle,
found=False, value=None, checked=checked,
)
What does not compile (yet)
The compiler refuses anything it can't lower to IR, and it tells you at registration with the offending line and a suggested fix. The full list of rejected patterns:
- f-strings. Anywhere in the workflow body. Build formatted strings inside an action.
- Nondeterministic calls.
datetime.now(),uuid4(),random()and friends. Move them into an action - that's what actions are for. - Calling helpers that aren't
@action-decorated. Anything called inside a workflow body needs to be either a recognized control-flow primitive or an action. - Constructing pydantic models in
return. Move the construction into a dedicated "build result" action, then return its result. raise. Raise inside an action; the workflow handles it withtry/except.with,lambda,match,yield, walrus (:=),assert,del,global/nonlocal. None of these have IR lowerings yet.- Most references to module-level globals. Enum members and exception classes resolve fine; arbitrary globals don't. Pass values in through workflow inputs or fetch them inside an action.
- Comprehensions outside two positions. List and dict comprehensions
compile when assigned directly to a variable or spread into
gather; set comprehensions don't compile at all.
Everything in this list fails at your desk, at registration time. Nothing here surfaces as a production surprise.