Control Flow

Anything inside a workflow's run() method is parsed into Waymark IR at registration time, then compiled to a DAG. The patterns below are what the parser recognizes today. They cover almost everything you'd actually want to write - and where they don't, the compiler tells you up front, not at recovery time.

The samples in this guide all come from the example app, which exercises every pattern end-to-end against Postgres.

Sequential chains

The simplest case: one action's result feeds the next.

@workflow
class SequentialChainWorkflow(Workflow):
    async def run(self, text: str) -> ChainResult:
        step1 = await step_uppercase(text)
        step2 = await step_reverse(step1)
        step3 = await step_add_stars(step2)
        return await build_chain_result(text, step1, step2, step3)

Each await becomes an action node. The DAG has a straight-line edge from each action to the next, with the prior result threaded into the next action's inbox.

Parallel fan-out

asyncio.gather is the parallel primitive. The compiler turns it into a fan-out / fan-in pair:

@workflow
class ParallelMathWorkflow(Workflow):
    async def run(self, number: int) -> ComputationResult:
        factorial_value, fib_value = await asyncio.gather(
            compute_factorial(number),
            compute_fibonacci(number),
            return_exceptions=True,
        )
        return await summarize_math(
            input_number=number,
            factorial_value=factorial_value,
            fibonacci_value=fib_value,
        )

return_exceptions=True is supported and means failures propagate as values, not as raised exceptions - useful when you want to inspect partial results in a downstream action.

You can also spread a list comprehension into gather for variable-width fan-outs:

results = await asyncio.gather(
    *[send_email(to=user.email, subject="Welcome") for user in active_users],
    return_exceptions=True,
)

Empty spreads are handled correctly - a gather over an empty list produces an empty result without dispatching any action invocations.

Conditionals

Plain if / elif / else works. Each branch becomes a subgraph of the DAG; the runner picks the branch at execution time and routes the data flow accordingly.

@workflow
class ConditionalBranchWorkflow(Workflow):
    async def run(self, value: int) -> BranchResult:
        if value >= 75:
            return await evaluate_high(value)
        elif value >= 25:
            return await evaluate_medium(value)
        else:
            return await evaluate_low(value)

if without else also works - control flow falls through to whatever comes next. This is the common "guard with default" shape:

@workflow
class GuardFallbackWorkflow(Workflow):
    async def run(self, user: str) -> GuardFallbackResult:
        notes = await fetch_recent_notes(user)
        summary = "no notes found"
        if notes:
            summary = await summarize_notes(notes)
        return await build_guard_fallback_result(user, len(notes), summary)

Loops

for loops compile into a DAG sub-pattern that processes items one at a time and accumulates results. The accumulator pattern works as expected:

@workflow
class LoopProcessingWorkflow(Workflow):
    async def run(self, items: list[str]) -> LoopResult:
        processed = []
        for item in items:
            result = await process_item(item)
            processed.append(result)
        return await build_loop_result(items, processed)

while-style loops are written as for _ in range(limit): against a counter - same compiled shape:

@workflow
class WhileLoopWorkflow(Workflow):
    async def run(self, limit: int) -> WhileLoopResult:
        current = 0
        iterations = 0
        for _ in range(limit):
            current = await increment_counter(current)
            iterations = iterations + 1
        return await build_while_result(
            limit=limit, final=current, iterations=iterations,
        )

Try / except

try/except is the recovery primitive. The compiler turns the except arm into a sibling sub-graph that's reached when the action raises a matching exception.

@workflow
class ErrorHandlingWorkflow(Workflow):
    async def run(self, should_fail: bool) -> ErrorResult:
        recovered = False
        message = ""
        try:
            result = await self.run_action(
                risky_action(should_fail),
                retry=RetryPolicy(attempts=1),  # don't retry, let it fail
            )
            message = await success_action(result)
        except IntentionalError:
            message = await recovery_action("IntentionalError was caught")
            recovered = True
        return await build_error_result(True, recovered, message)

Two notes:

  • self.run_action(...) is the form to use when you want a per-call RetryPolicy, BackoffPolicy, or timeout. A bare await some_action(...) uses the defaults.
  • The compiler matches exceptions by class name, so the exception type needs to be importable at workflow-registration time on the worker.

try/except also works inside loops - useful for "process each item, skip the failures":

@workflow
class LoopExceptionWorkflow(Workflow):
    async def run(self, items: list[str]) -> LoopExceptionResult:
        processed: list[str] = []
        error_count = 0
        for item in items:
            try:
                result = await self.run_action(
                    process_item_may_fail(item),
                    retry=RetryPolicy(attempts=1),
                )
                processed.append(result)
            except ItemProcessingError:
                error_count = error_count + 1
        return await build_loop_exception_result(items, processed, error_count)

Durable sleep

await asyncio.sleep(seconds) is recognized specially. It's a durable sleep - the workflow record is evicted from the runner's in-memory state and rehydrated when the sleep deadline arrives, so a multi-hour pause costs almost nothing while it's pending.

@workflow
class DurableSleepWorkflow(Workflow):
    async def run(self, seconds: int) -> SleepResult:
        started = await get_timestamp()
        await asyncio.sleep(seconds)  # survives worker restarts
        resumed = await get_timestamp()
        return await format_sleep_result(started, resumed, seconds)

This is also fine inside loops - useful for polling or for slow drip behaviors:

for i in range(iterations):
    await asyncio.sleep(sleep_seconds)
    await perform_loop_action(iteration=i + 1)

Early return

Returning from inside a branch - including from inside a loop - works:

@workflow
class LoopReturnWorkflow(Workflow):
    async def run(self, items: list[int], needle: int) -> LoopReturnResult:
        checked = 0
        for value in items:
            checked += 1
            if await matches_needle(value=value, needle=needle):
                return await build_loop_return_result(
                    items=items, needle=needle,
                    found=True, value=value, checked=checked,
                )
        return await build_loop_return_result(
            items=items, needle=needle,
            found=False, value=None, checked=checked,
        )

What does not compile (yet)

The compiler is honest about what it can and can't see. Common gotchas:

  • f-strings against action results. Build the formatted value inside a follow-up action.
  • Constructing pydantic models in return. Move the construction into a dedicated "build result" action, then return its result.
  • Calling helpers that aren't @action-decorated. Anything called inside a workflow body needs to be either a recognized control-flow primitive or an action.
  • References to module-level globals from inside the workflow body. Pass them in through workflow inputs or fetch them inside an action.

The diagnostic on registration tells you the offending node and offers a fix. If you'd like to see the compiled DAG visually, every workflow has a .visualize() method that emits a Graphviz rendering - handy for verifying the shape matches your mental model.