Scheduled Workflows

Waymark workflows can run on a recurring cadence - a cron expression or a fixed interval - without any extra infrastructure. The scheduler is part of the runloop that you already run via waymark-start-workers. Schedules live in Postgres alongside workflow IR and queue rows, so a schedule survives restarts of every component.

What a schedule targets

Schedules are keyed by (workflow_name, schedule_name):

  • workflow_name is the workflow's short name. By default this is derived from the class; if you want a stable name across renames, set the class attribute name = "...".
  • schedule_name is yours to pick. It's how you let a single workflow run on more than one cadence - e.g., hourly-us-east and hourly-us-west for the same DataSyncWorkflow.

At fire time, the scheduler resolves the workflow by name and uses the most recently registered version in the workflow_versions table. That means redeploying with a changed run() body picks up the new DAG on the next fire - older schedules don't pin you to old code.

Cron schedules

from waymark import Workflow, workflow, schedule_workflow


@workflow
class DataSyncWorkflow(Workflow):
    name = "data_sync"

    async def run(self, region: str) -> None:
        ...


await schedule_workflow(
    DataSyncWorkflow,
    schedule_name="hourly-us-east",
    schedule="0 * * * *",
    inputs={"region": "us-east"},
)

Standard 5-field cron syntax is accepted (Waymark normalizes to 6 fields internally). Common shapes:

CronCadence
0 * * * *Every hour, on the hour
*/15 * * * *Every 15 minutes
0 0 * * *Daily at midnight UTC
0 0 * * 1Every Monday at midnight

Interval schedules

If you'd rather express "every N seconds", pass a timedelta:

from datetime import timedelta

await schedule_workflow(
    DataSyncWorkflow,
    schedule_name="every-5-min",
    schedule=timedelta(minutes=5),
    inputs={"region": "us-west"},
)

The first run fires at now + interval. Each subsequent run fires at last_run_at + interval, so a successful run that itself takes longer than the interval pushes out the next fire by exactly the run's duration (rather than queueing a backlog).

You can add jitter_seconds=N to get a random [0, N] second delay applied to each fire - useful when many hosts schedule the same workflow and you want to spread the queue load.

Pause, resume, delete

from waymark import pause_schedule, resume_schedule, delete_schedule

await pause_schedule(DataSyncWorkflow, schedule_name="hourly-us-east")
await resume_schedule(DataSyncWorkflow, schedule_name="hourly-us-east")
await delete_schedule(DataSyncWorkflow, schedule_name="hourly-us-east")

Pausing keeps the schedule row but stops firing. Resuming recomputes the next fire from "now" (you don't replay missed fires). Deleting drops the row entirely.

If you call schedule_workflow(...) with a (workflow_name, schedule_name) that already exists, it updates the schedule in place, recomputes next_run_at, and sets the status back to active - handy for managing schedules from a deployment script that's safe to re-run.

List schedules

from waymark import list_schedules

all_schedules = await list_schedules()
active_only = await list_schedules(status_filter="active")
paused_only = await list_schedules(status_filter="paused")

The result includes scheduling fields (cron expression, interval, jitter), state fields (next_run_at, last_run_at, last_instance_id), and behavior flags (priority, allow_duplicate).

Overlap suppression

By default a schedule with allow_duplicate=False won't queue a new run if a previous instance of the same schedule is still queued or running. The check runs in Postgres against last_instance_id, so two replicas of the scheduler can race to fire the same schedule and Postgres serializes them deterministically.

If you genuinely want concurrent runs (e.g., scrape multiple sources independently), set allow_duplicate=True when calling schedule_workflow(...).