Python Authoring
Waymark's conceptual model is the same in every language we target. Workflows are durable control flow, and actions are distributed work. This page covers the Python-specific bits: how arguments and return values get serialized, how dependency injection works, how the worker discovers your handlers.
If you haven't read it yet, the Quickstart is the fastest way to get a working setup. The Concepts section covers the behavior of workflows and actions in language-neutral terms.
Inputs and outputs
Actions and workflows might be run on different servers. So any values that you pass outside of a boundary need to be serialized in a way that's safe to pass through Postgres.
Waymark's Python serializer handles most of the objects you'll use day to day: pydantic models, dataclasses, primitives, lists, tuples, sets, dicts, enums, UUID, datetime, decimal, bytes, and Path. One constraint to know: dict keys must be strings: a dict keyed by ints or tuples fails at serialization time because it's not supported in JSON.
For anything richer than a couple of scalars, declare a pydantic model:
from datetime import datetime, timezone
from pydantic import BaseModel, Field
from waymark import action
from myapp.models import User
class FetchUsersRequest(BaseModel):
user_ids: list[str] = Field(min_length=1)
include_deactivated: bool = False
class FetchUsersResult(BaseModel):
users: list[User]
fetched_at: datetime
@action
async def fetch_users(request: FetchUsersRequest) -> FetchUsersResult:
users = await db.user.fetch_many(
ids=request.user_ids,
include_deactivated=request.include_deactivated,
)
return FetchUsersResult(users=users, fetched_at=datetime.now(timezone.utc))
This is the same shape FastAPI uses for request/response models. Pydantic helps both during development because it typehints the nested object, but also helps during production because Waymark will run its validators against your data to ensure that you're passing things as you intend them to be.
For tiny actions, plain types are fine:
@action
async def compute_factorial(n: int) -> int:
...
Type hints and validation
Action signatures are type-hinted Python. Typehinters like ty, mypy, and pyright understand
them as you'd expect. At runtime, the worker coerces incoming arguments
toward the declared types before the handler runs: pydantic models are
validated with model_validate (a malformed payload raises during the
action attempt, not in your handler body), while plain types pass
through as-is.
Workflow run() inputs are checked against the run() signature when you
queue the workflow, so a missing or misnamed argument fails immediately
at the call site.
Dependency injection
Actions can declare common dependencies the same way Mountaineer and FastAPI
controllers do - via Annotated[T, Depends(provider)] parameters:
from typing import Annotated
from waymark import Depends, action
@action
async def send_email(
to: str,
subject: str,
emailer: Annotated[EmailClient, Depends(get_email_client)],
) -> EmailResult:
return await emailer.send(to=to, subject=subject)
Depends is smart enough to use functions that use nested dependencies, and allows you to
recycle dependency injectors that you've already written for your FastAPI app. This includes
database integrations, processors, caches layers, etc.
A dependency is resolved per action invocation by the worker that claims the row. That means the lifecycle of any per-request resource (database session, scoped HTTP client) is handled the same way you'd handle it inside a web handler. If your provider is itself async, that works - the resolver awaits it.
Workflow bodies do not take injected dependencies. They never run on a worker - the body is parsed once at registration to produce IR and never executed. If your workflow needs data that relies on a dependency, fetch it inside an action and pass the result through.
Async / await
Both actions and workflow run() methods are async. Waymark only
dispatches async handlers: synchronous functions aren't supported as
actions. They encourage implicit parallelism via threading (which itself is not truly
parallel because of the GIL) and we would rather be explicit about when our functions yield.
Async functions in Python are not "fair" in the conventional systems design sense. If you launch 100 asyncio tasks where each has a function that has no awaits, your interpreter will happily run each in sequence until completion. This is why Waymark has parallelism at the interpreter level where we bind things to each core. But still: you should use asyncio await maximally where you can.
If you have legacy sync code, wrap it inside an async action
that calls asyncio.to_thread(...) to push the blocking work off the
event loop. You control this logic to make it explicit in your architecture:
import asyncio
@action
async def render_pdf(report: Report) -> bytes:
return await asyncio.to_thread(_render_pdf_sync, report)
Inside a workflow body, await on an action call is what tells the
compiler to lower it to a dispatch instruction. Synchronous actions aren't recognized. We do support adding helper functions
within the workflow class definition that let you factor a long run()
into named steps.
@workflow
class OnboardUserWorkflow(Workflow):
async def run(self, user_id: str) -> OnboardResult:
profile = await self._provision(user_id)
await self._notify(profile)
return await build_onboard_result(profile)
async def _provision(self, user_id: str) -> Profile:
account = await create_account(user_id)
return await create_profile(account)
async def _notify(self, profile: Profile) -> None:
await send_welcome_email(profile.email)
await enqueue_drip_campaign(profile.id)
In addition to the conditional (if), loop (for and while) logic that makes up most
logical flow, we also support asyncio primitives for dealing with asynchronous systems.
asyncio.gather(...) and asyncio.sleep(...) are the two primary calls recognized as
control-flow primitives by the compiler. See
Control Flow for the full list of supported
patterns.
Module preloading on workers
Workers need to import the modules where your @action and @workflow
decorators run, so they have handlers registered before they start
pulling rows off the queue. Tell them where to look with
WAYMARK_USER_MODULE. We will attempt to intelligently find these classes
at startup if this parameter isn't specified - but it helps to speed up the latency
of processing the first action after startup.
export WAYMARK_USER_MODULE=myapp.workflows
uv run waymark-start-workers
For a multi-module project, list them all:
export WAYMARK_USER_MODULE=myapp.workflows.billing,myapp.workflows.email
Where workflows differ from actions
A few Python-specific things worth knowing about workflow run()
bodies that don't apply to actions:
- No f-strings. Build formatted values inside an action.
- No constructor calls for return values.
return MyResult(...)at the end of a workflow doesn't compile. Either move it to a variable or into an action. - Most module-level globals are off limits. Enum members and exception classes resolve fine; other module-level names are flagged at registration. Pass values in as workflow inputs or fetch them inside an action.
- No injected dependencies. Workflow
run()doesn't getDepends(...)resolution. The body is parsed once at registration and never executed.
The compiler diagnostic on registration tells you the offending node and offers a fix. The Control Flow guide walks through the patterns that are supported, with examples for each.