Skip to content

Task Scheduler

Overview

The Scheduler module provides a persistent, distributed mechanism for scheduling tasks to be executed at a specific time in the future or after a certain delay. It is distinct from the main Background Tasks module, which is for immediate asynchronous execution.

The scheduler is essential for features like: - Implementing the Delayed Messages strategy. - Scheduling recurring batch jobs. - Triggering future actions (e.g., ending a trial period).

How It Works

The system is composed of a persistent storage backend and a background worker. 1. Scheduling: When you call scheduler.schedule(), a GenericTask object (containing the handler path and payload) is created. This object is stored in a KV store (like Redis), and its execution time is added as a score to a sorted set. 2. Polling (Worker): The SchedulerWorkerService is a background service that periodically polls the sorted set for tasks whose execution time has passed. 3. Dispatching: The worker atomically fetches and removes a due task from the schedule and then enqueues it into the main application Task Broker for actual execution by a standard worker.

This design decouples scheduling from execution, making the system highly scalable and resilient.

API Reference

nala.athomic.control.scheduler.protocol.SchedulerProtocol

Bases: BaseServiceProtocol, Protocol

Defines the contract for a task scheduling system. Abstracts the logic of delaying or scheduling the execution of a task, whether in memory, via broker, or in persistent storage.

cancel(task_id) async

Attempts to cancel a scheduled task.

Parameters:

Name Type Description Default
task_id str

The ID of the task to be canceled.

required

Returns:

Type Description
bool

True if the cancellation was successful, False otherwise.

schedule(task, *, delay, task_id=None) async

Schedules a task to be executed after a certain delay.

Parameters:

Name Type Description Default
task Any

The task object to be scheduled (can be a Pydantic model, dict, etc.).

required
delay timedelta

The waiting time from now.

required
task_id str | None

An optional ID for the task. If not provided, one will be generated.

None

Returns:

Type Description
str

The ID of the scheduled task.

schedule_at(task, *, execution_time, task_id=None) async

Schedules a task to be executed at an exact moment in the future.

Parameters:

Name Type Description Default
task Any

The task object to be scheduled.

required
execution_time datetime

The exact date and time (UTC) for execution.

required
task_id str | None

An optional ID for the task.

None

Returns:

Type Description
str

The ID of the scheduled task.

nala.athomic.control.scheduler.schemas.GenericTask

Bases: BaseModel

Represents a generic, serializable task for the scheduler.

This model acts as a standard envelope for any task that needs to be scheduled for delayed or future execution. It decouples the scheduler from the task's implementation by referencing the handler as a string and encapsulating all necessary data within the payload.

Attributes:

Name Type Description
task_id UUID

A unique identifier for this specific task instance.

handler str

The string identifier of the callable handler that will execute the task's logic.

payload Dict[str, Any]

A dictionary containing the data required by the handler to perform the task.

nala.athomic.control.scheduler.worker.SchedulerWorkerService

Bases: BaseService

A background service that polls a persistent schedule and dispatches due tasks.

This worker is the execution engine for the custom KV-based scheduler. It periodically polls a sorted set in a KV store (e.g., Redis) to find tasks whose scheduled execution time has passed. Once a task is claimed, it is dispatched to the primary application task broker (e.g., Taskiq) for actual execution. It uses an exponential backoff strategy to reduce polling frequency during idle periods.

Attributes:

Name Type Description
settings

The worker-specific configuration settings.

kv_store Optional[KVStoreProtocol]

The client for the KV store that holds the schedule.

task_broker Optional[TaskBrokerProtocol]

The client for the main task broker where tasks are dispatched.

tasks_key str

The key for the sorted set of scheduled tasks.

details_prefix str

The key prefix for storing task details.

__init__(settings)

Initializes the SchedulerWorkerService.

Parameters:

Name Type Description Default
settings SchedulerSettings

The main scheduler configuration, from which worker-specific settings are extracted.

required