Python Authoring

Waymark's conceptual model is the same in every language we target. Workflows are durable control flow, and actions are distributed work. This page covers the Python-specific bits: how arguments and return values get serialized, how dependency injection works, how the worker discovers your handlers.

If you haven't read it yet, the Quickstart is the fastest way to get a working setup. The Concepts section covers the behavior of workflows and actions in language-neutral terms.

Inputs and outputs

Actions and workflows might be run on different servers. So any values that you pass outside of a boundary need to be serialized in a way that's safe to pass through Postgres.

Waymark's Python serializer handles most of the objects you'll use day to day: pydantic models, dataclasses, primitives, lists, tuples, sets, dicts, enums, UUID, datetime, decimal, bytes, and Path. One constraint to know: dict keys must be strings: a dict keyed by ints or tuples fails at serialization time because it's not supported in JSON.

For anything richer than a couple of scalars, declare a pydantic model:

from datetime import datetime, timezone

from pydantic import BaseModel, Field
from waymark import action

from myapp.models import User

class FetchUsersRequest(BaseModel):
    user_ids: list[str] = Field(min_length=1)
    include_deactivated: bool = False

class FetchUsersResult(BaseModel):
    users: list[User]
    fetched_at: datetime

@action
async def fetch_users(request: FetchUsersRequest) -> FetchUsersResult:
    users = await db.user.fetch_many(
        ids=request.user_ids,
        include_deactivated=request.include_deactivated,
    )
    return FetchUsersResult(users=users, fetched_at=datetime.now(timezone.utc))

This is the same shape FastAPI uses for request/response models. Pydantic helps both during development because it typehints the nested object, but also helps during production because Waymark will run its validators against your data to ensure that you're passing things as you intend them to be.

For tiny actions, plain types are fine:

@action
async def compute_factorial(n: int) -> int:
    ...

Type hints and validation

Action signatures are type-hinted Python. Typehinters like ty, mypy, and pyright understand them as you'd expect. At runtime, the worker coerces incoming arguments toward the declared types before the handler runs: pydantic models are validated with model_validate (a malformed payload raises during the action attempt, not in your handler body), while plain types pass through as-is.

Workflow run() inputs are checked against the run() signature when you queue the workflow, so a missing or misnamed argument fails immediately at the call site.

Dependency injection

Actions can declare common dependencies the same way Mountaineer and FastAPI controllers do - via Annotated[T, Depends(provider)] parameters:

from typing import Annotated
from waymark import Depends, action

@action
async def send_email(
    to: str,
    subject: str,
    emailer: Annotated[EmailClient, Depends(get_email_client)],
) -> EmailResult:
    return await emailer.send(to=to, subject=subject)

Depends is smart enough to use functions that use nested dependencies, and allows you to recycle dependency injectors that you've already written for your FastAPI app. This includes database integrations, processors, caches layers, etc.

A dependency is resolved per action invocation by the worker that claims the row. That means the lifecycle of any per-request resource (database session, scoped HTTP client) is handled the same way you'd handle it inside a web handler. If your provider is itself async, that works - the resolver awaits it.

Workflow bodies do not take injected dependencies. They never run on a worker - the body is parsed once at registration to produce IR and never executed. If your workflow needs data that relies on a dependency, fetch it inside an action and pass the result through.

Async / await

Both actions and workflow run() methods are async. Waymark only dispatches async handlers: synchronous functions aren't supported as actions. They encourage implicit parallelism via threading (which itself is not truly parallel because of the GIL) and we would rather be explicit about when our functions yield.

If you have legacy sync code, wrap it inside an async action that calls asyncio.to_thread(...) to push the blocking work off the event loop. You control this logic to make it explicit in your architecture:

import asyncio

@action
async def render_pdf(report: Report) -> bytes:
    return await asyncio.to_thread(_render_pdf_sync, report)

Inside a workflow body, await on an action call is what tells the compiler to lower it to a dispatch instruction. Synchronous actions aren't recognized. We do support adding helper functions within the workflow class definition that let you factor a long run() into named steps.

@workflow
class OnboardUserWorkflow(Workflow):
    async def run(self, user_id: str) -> OnboardResult:
        profile = await self._provision(user_id)
        await self._notify(profile)
        return await build_onboard_result(profile)

    async def _provision(self, user_id: str) -> Profile:
        account = await create_account(user_id)
        return await create_profile(account)

    async def _notify(self, profile: Profile) -> None:
        await send_welcome_email(profile.email)
        await enqueue_drip_campaign(profile.id)

In addition to the conditional (if), loop (for and while) logic that makes up most logical flow, we also support asyncio primitives for dealing with asynchronous systems. asyncio.gather(...) and asyncio.sleep(...) are the two primary calls recognized as control-flow primitives by the compiler. See Control Flow for the full list of supported patterns.

Module preloading on workers

Workers need to import the modules where your @action and @workflow decorators run, so they have handlers registered before they start pulling rows off the queue. Tell them where to look with WAYMARK_USER_MODULE. We will attempt to intelligently find these classes at startup if this parameter isn't specified - but it helps to speed up the latency of processing the first action after startup.

export WAYMARK_USER_MODULE=myapp.workflows
uv run waymark-start-workers

For a multi-module project, list them all:

export WAYMARK_USER_MODULE=myapp.workflows.billing,myapp.workflows.email

Where workflows differ from actions

A few Python-specific things worth knowing about workflow run() bodies that don't apply to actions:

  • No f-strings. Build formatted values inside an action.
  • No constructor calls for return values. return MyResult(...) at the end of a workflow doesn't compile. Either move it to a variable or into an action.
  • Most module-level globals are off limits. Enum members and exception classes resolve fine; other module-level names are flagged at registration. Pass values in as workflow inputs or fetch them inside an action.
  • No injected dependencies. Workflow run() doesn't get Depends(...) resolution. The body is parsed once at registration and never executed.

The compiler diagnostic on registration tells you the offending node and offers a fix. The Control Flow guide walks through the patterns that are supported, with examples for each.