Task Scheduler
Overview
The Scheduler module provides a persistent, distributed mechanism for scheduling tasks to be executed at a specific time in the future or after a certain delay. It is distinct from the main Background Tasks module, which is for immediate asynchronous execution.
The scheduler is essential for features like: - Implementing the Delayed Messages strategy. - Scheduling recurring batch jobs. - Triggering future actions (e.g., ending a trial period).
How It Works
The system is composed of a persistent storage backend and a background worker.
1. Scheduling: When you call scheduler.schedule(), a GenericTask object (containing the handler path and payload) is created. This object is stored in a KV store (like Redis), and its execution time is added as a score to a sorted set.
2. Polling (Worker): The SchedulerWorkerService is a background service that periodically polls the sorted set for tasks whose execution time has passed.
3. Dispatching: The worker atomically fetches and removes a due task from the schedule and then enqueues it into the main application Task Broker for actual execution by a standard worker.
This design decouples scheduling from execution, making the system highly scalable and resilient.
API Reference
nala.athomic.control.scheduler.protocol.SchedulerProtocol
Bases: BaseServiceProtocol, Protocol
Defines the contract for a task scheduling system. Abstracts the logic of delaying or scheduling the execution of a task, whether in memory, via broker, or in persistent storage.
cancel(task_id)
async
Attempts to cancel a scheduled task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
str
|
The ID of the task to be canceled. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the cancellation was successful, False otherwise. |
schedule(task, *, delay, task_id=None)
async
Schedules a task to be executed after a certain delay.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Any
|
The task object to be scheduled (can be a Pydantic model, dict, etc.). |
required |
delay
|
timedelta
|
The waiting time from now. |
required |
task_id
|
str | None
|
An optional ID for the task. If not provided, one will be generated. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
The ID of the scheduled task. |
schedule_at(task, *, execution_time, task_id=None)
async
Schedules a task to be executed at an exact moment in the future.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Any
|
The task object to be scheduled. |
required |
execution_time
|
datetime
|
The exact date and time (UTC) for execution. |
required |
task_id
|
str | None
|
An optional ID for the task. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
The ID of the scheduled task. |
nala.athomic.control.scheduler.schemas.GenericTask
Bases: BaseModel
Represents a generic, serializable task for the scheduler.
This model acts as a standard envelope for any task that needs to be
scheduled for delayed or future execution. It decouples the scheduler from
the task's implementation by referencing the handler as a string and
encapsulating all necessary data within the payload.
Attributes:
| Name | Type | Description |
|---|---|---|
task_id |
UUID
|
A unique identifier for this specific task instance. |
handler |
str
|
The string identifier of the callable handler that will execute the task's logic. |
payload |
Dict[str, Any]
|
A dictionary containing the data required by the handler to perform the task. |
nala.athomic.control.scheduler.worker.SchedulerWorkerService
Bases: BaseService
A background service that polls a persistent schedule and dispatches due tasks.
This worker is the execution engine for the custom KV-based scheduler. It periodically polls a sorted set in a KV store (e.g., Redis) to find tasks whose scheduled execution time has passed. Once a task is claimed, it is dispatched to the primary application task broker (e.g., Taskiq) for actual execution. It uses an exponential backoff strategy to reduce polling frequency during idle periods.
Attributes:
| Name | Type | Description |
|---|---|---|
settings |
The worker-specific configuration settings. |
|
kv_store |
Optional[KVStoreProtocol]
|
The client for the KV store that holds the schedule. |
task_broker |
Optional[TaskBrokerProtocol]
|
The client for the main task broker where tasks are dispatched. |
tasks_key |
str
|
The key for the sorted set of scheduled tasks. |
details_prefix |
str
|
The key prefix for storing task details. |
__init__(settings)
Initializes the SchedulerWorkerService.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
SchedulerSettings
|
The main scheduler configuration, from which worker-specific settings are extracted. |
required |