Control Flow

Anything inside a workflow's run() method is parsed into Waymark IR at registration time, then compiled into the program the runtime executes. The patterns below are what the compiler recognizes today. They cover almost everything you'd actually want to write - and where they don't, the compiler tells you up front, not at recovery time.

Every pattern in this guide runs end-to-end against Postgres today.

Sequential chains

The simplest case: one action's result feeds the next.

@workflow
class SequentialChainWorkflow(Workflow):
    async def run(self, text: str) -> ChainResult:
        step1 = await step_uppercase(text)
        step2 = await step_reverse(step1)
        step3 = await step_add_stars(step2)
        return await build_chain_result(text, step1, step2, step3)

Each await compiles to an action dispatch. The runtime executes them in order, threading each result into the next action's inputs.

Parallel fan-out

asyncio.gather is the parallel primitive. The compiler turns it into a fan-out / fan-in pair:

@workflow
class ParallelMathWorkflow(Workflow):
    async def run(self, number: int) -> ComputationResult:
        factorial_value, fib_value = await asyncio.gather(
            compute_factorial(number),
            compute_fibonacci(number),
            return_exceptions=True,
        )
        return await summarize_math(
            input_number=number,
            factorial_value=factorial_value,
            fibonacci_value=fib_value,
        )

return_exceptions=True is required - the compiler rejects a gather without it. Failures propagate as values, not as raised exceptions, so a downstream action can inspect partial results instead of losing the whole fan-out to one bad item.

You can also spread a list comprehension into gather for variable-width fan-outs:

results = await asyncio.gather(
    *[send_email(to=user.email, subject="Welcome") for user in active_users],
    return_exceptions=True,
)

Empty spreads are handled correctly - a gather over an empty list produces an empty result without dispatching any action invocations.

Conditionals

Plain if / elif / else works. Each branch compiles separately; the runtime evaluates the condition at execution time and runs only the branch that's taken.

@workflow
class ConditionalBranchWorkflow(Workflow):
    async def run(self, value: int) -> BranchResult:
        if value >= 75:
            return await evaluate_high(value)
        elif value >= 25:
            return await evaluate_medium(value)
        else:
            return await evaluate_low(value)

if without else also works - control flow falls through to whatever comes next. This is the common "guard with default" shape:

@workflow
class GuardFallbackWorkflow(Workflow):
    async def run(self, user: str) -> GuardFallbackResult:
        notes = await fetch_recent_notes(user)
        summary = "no notes found"
        if notes:
            summary = await summarize_notes(notes)
        return await build_guard_fallback_result(user, len(notes), summary)

Loops

for loops process items one at a time and accumulate results. The accumulator pattern works as expected:

@workflow
class LoopProcessingWorkflow(Workflow):
    async def run(self, items: list[str]) -> LoopResult:
        processed = []
        for item in items:
            result = await process_item(item)
            processed.append(result)
        return await build_loop_result(items, processed)

while loops compile too, with break and continue working the way you'd expect:

@workflow
class WhileLoopWorkflow(Workflow):
    async def run(self, target: int) -> WhileLoopResult:
        current = 0
        iterations = 0
        while current < target:
            current = await increment_counter(current)
            iterations = iterations + 1
        return await build_while_result(
            target=target, final=current, iterations=iterations,
        )

Try / except

try/except is the recovery primitive. The compiler lowers the except arm into a recovery path the runtime takes when the action raises a matching exception.

@workflow
class ErrorHandlingWorkflow(Workflow):
    async def run(self, should_fail: bool) -> ErrorResult:
        recovered = False
        message = ""
        try:
            result = await self.run_action(
                risky_action(should_fail),
                retry=RetryPolicy(attempts=1),  # don't retry, let it fail
            )
            message = await success_action(result)
        except IntentionalError:
            message = await recovery_action("IntentionalError was caught")
            recovered = True
        return await build_error_result(True, recovered, message)

Two notes:

  • self.run_action(...) is the form to use when you want a per-call retry=RetryPolicy(...) or timeout=. A bare await some_action(...) uses the defaults. Note that RetryPolicy() with attempts omitted defaults to a budget of 100 retries; pass attempts=1 when you want a failure to surface immediately.
  • The compiler matches exceptions by class name, so the exception type needs to be importable at workflow-registration time on the worker.

try/except also works inside loops - useful for "process each item, skip the failures":

@workflow
class LoopExceptionWorkflow(Workflow):
    async def run(self, items: list[str]) -> LoopExceptionResult:
        processed: list[str] = []
        error_count = 0
        for item in items:
            try:
                result = await self.run_action(
                    process_item_may_fail(item),
                    retry=RetryPolicy(attempts=1),
                )
                processed.append(result)
            except ItemProcessingError:
                error_count = error_count + 1
        return await build_loop_exception_result(items, processed, error_count)

Durable sleep

await asyncio.sleep(seconds) is recognized specially. It's a durable sleep - the workflow record is evicted from the runner's in-memory state and rehydrated when the sleep deadline arrives, so a multi-hour pause costs almost nothing while it's pending.

@workflow
class DurableSleepWorkflow(Workflow):
    async def run(self, seconds: int) -> SleepResult:
        started = await get_timestamp()
        await asyncio.sleep(seconds)  # survives worker restarts
        resumed = await get_timestamp()
        return await format_sleep_result(started, resumed, seconds)

This is also fine inside loops - useful for polling or for slow drip behaviors:

for i in range(iterations):
    await asyncio.sleep(sleep_seconds)
    await perform_loop_action(iteration=i + 1)

Early return

Returning from inside a branch - including from inside a loop - works:

@workflow
class LoopReturnWorkflow(Workflow):
    async def run(self, items: list[int], needle: int) -> LoopReturnResult:
        checked = 0
        for value in items:
            checked += 1
            if await matches_needle(value=value, needle=needle):
                return await build_loop_return_result(
                    items=items, needle=needle,
                    found=True, value=value, checked=checked,
                )
        return await build_loop_return_result(
            items=items, needle=needle,
            found=False, value=None, checked=checked,
        )

What does not compile (yet)

The compiler refuses anything it can't lower to IR, and it tells you at registration with the offending line and a suggested fix. The full list of rejected patterns:

  • f-strings. Anywhere in the workflow body. Build formatted strings inside an action.
  • Nondeterministic calls. datetime.now(), uuid4(), random() and friends. Move them into an action - that's what actions are for.
  • Calling helpers that aren't @action-decorated. Anything called inside a workflow body needs to be either a recognized control-flow primitive or an action.
  • Constructing pydantic models in return. Move the construction into a dedicated "build result" action, then return its result.
  • raise. Raise inside an action; the workflow handles it with try/except.
  • with, lambda, match, yield, walrus (:=), assert, del, global/nonlocal. None of these have IR lowerings yet.
  • Most references to module-level globals. Enum members and exception classes resolve fine; arbitrary globals don't. Pass values in through workflow inputs or fetch them inside an action.
  • Comprehensions outside two positions. List and dict comprehensions compile when assigned directly to a variable or spread into gather; set comprehensions don't compile at all.

Everything in this list fails at your desk, at registration time. Nothing here surfaces as a production surprise.