Scheduling
The scheduling API creates, pauses, resumes, deletes, and lists recurring workflow runs. See the Scheduled Workflows guide for the conceptual walk-through with examples.
Schedule a workflow
schedule_workflow
Register a schedule for a workflow.
This function registers both the workflow DAG and the schedule in a single call. When the schedule fires, the registered workflow version will be executed.
Args: workflow_cls: The Workflow class to schedule. schedule_name: Unique name for this schedule. Allows multiple schedules per workflow with different inputs. Must be unique within a workflow. schedule: Either a cron expression string (e.g., "0 * * * *" for hourly) or a timedelta for interval-based scheduling. jitter: Optional jitter window to add to each scheduled run. inputs: Optional keyword arguments to pass to each scheduled run. priority: Optional priority for queue ordering. Higher values are processed first. Default is 0. allow_duplicate: If False (default), the scheduler skips creating a new instance when one is already running for this schedule. If True, always creates a new instance.
Returns: The schedule ID.
Examples: # Run every hour at minute 0 await schedule_workflow( MyWorkflow, schedule_name="hourly-run", schedule="0 * * * *" )
Code
# Run every 5 minutes
await schedule_workflow(
MyWorkflow,
schedule_name="frequent-check",
schedule=timedelta(minutes=5)
)
# Multiple schedules with different inputs
await schedule_workflow(
MyWorkflow,
schedule_name="small-batch",
schedule="0 0 * * *",
inputs={"batch_size": 100}
)
await schedule_workflow(
MyWorkflow,
schedule_name="large-batch",
schedule="0 12 * * *",
inputs={"batch_size": 1000}
)
# High priority schedule
await schedule_workflow(
CriticalWorkflow,
schedule_name="critical-job",
schedule="*/5 * * * *",
priority=100
)
Raises: ValueError: If the cron expression is invalid, interval is non-positive, or schedule_name is empty. ScheduleAlreadyExistsError: If a schedule with the same name already exists. RuntimeError: If the gRPC call fails.
Parameters
- Name
workflow_cls- Type
- Type[Workflow]
- Description
- Name
schedule_name- Type
- str
- Description
- Name
schedule- Type
- Union[str, timedelta]
- Description
- Name
jitter- Type
- Optional[timedelta]
- Description
Default: None
- Name
inputs- Type
- Optional[Dict[str, Any]]
- Description
Default: None
- Name
priority- Type
- Optional[int]
- Description
Default: None
- Name
allow_duplicate- Type
- bool
- Description
Default: False
Pause / resume / delete
pause_schedule
Pause a workflow's schedule.
The schedule will not fire until resumed. Existing running instances are not affected.
Args: workflow_cls: The Workflow class whose schedule to pause. schedule_name: The name of the schedule to pause.
Returns: True if a schedule was found and paused, False otherwise.
Raises: ValueError: If schedule_name is empty. RuntimeError: If the gRPC call fails.
Parameters
- Name
workflow_cls- Type
- Type[Workflow]
- Description
- Name
schedule_name- Type
- str
- Description
resume_schedule
Resume a paused workflow schedule.
Args: workflow_cls: The Workflow class whose schedule to resume. schedule_name: The name of the schedule to resume.
Returns: True if a schedule was found and resumed, False otherwise.
Raises: ValueError: If schedule_name is empty. RuntimeError: If the gRPC call fails.
Parameters
- Name
workflow_cls- Type
- Type[Workflow]
- Description
- Name
schedule_name- Type
- str
- Description
delete_schedule
Delete a workflow's schedule.
The schedule is soft-deleted and can be recreated by calling schedule_workflow again.
Args: workflow_cls: The Workflow class whose schedule to delete. schedule_name: The name of the schedule to delete.
Returns: True if a schedule was found and deleted, False otherwise.
Raises: ValueError: If schedule_name is empty. RuntimeError: If the gRPC call fails.
Parameters
- Name
workflow_cls- Type
- Type[Workflow]
- Description
- Name
schedule_name- Type
- str
- Description
List schedules
list_schedules
List all registered workflow schedules.
Args: status_filter: Optional filter by status ("active" or "paused"). If None, returns all non-deleted schedules.
Returns: A list of ScheduleInfo objects containing schedule details.
Examples: # List all schedules schedules = await list_schedules() for s in schedules: print(f"{s.workflow_name}: {s.status}")
Code
# List only active schedules
active = await list_schedules(status_filter="active")
# List only paused schedules
paused = await list_schedules(status_filter="paused")
Raises: RuntimeError: If the gRPC call fails.
Parameters
- Name
status_filter- Type
- Optional[ScheduleStatus]
- Description
Default: None
Schedule info
The result type returned by list_schedules.
Information about a registered schedule.
Class Constructor
- Name
id- Type
- str
- Description
- Name
workflow_name- Type
- str
- Description
- Name
schedule_name- Type
- str
- Description
- Name
schedule_type- Type
- ScheduleType
- Description
- Name
cron_expression- Type
- Optional[str]
- Description
- Name
interval_seconds- Type
- Optional[int]
- Description
- Name
jitter_seconds- Type
- Optional[int]
- Description
- Name
status- Type
- ScheduleStatus
- Description
- Name
next_run_at- Type
- Optional[datetime]
- Description
- Name
last_run_at- Type
- Optional[datetime]
- Description
- Name
last_instance_id- Type
- Optional[str]
- Description
- Name
created_at- Type
- datetime
- Description
- Name
updated_at- Type
- datetime
- Description
- Name
allow_duplicate- Type
- bool
- Description