Background Tasks
Overview
The Tasks module provides a high-level abstraction for enqueuing and executing background tasks asynchronously. It decouples the function call from its execution, allowing you to offload long-running or non-critical operations to a separate pool of workers.
Key Features
- Simple Decorator API: Convert any
asyncfunction into a background task with the@taskdecorator. - Multiple Backends: Supports
TaskiqandRQ(Redis Queue) as distributed backends, as well as a local in-memory backend for testing. - Automatic Context Propagation: The
ExecutionContext(trace_id,tenant_id, etc.) from the caller is automatically captured and restored on the worker, ensuring seamless tracing and multi-tenancy. - Resilient Fallback Chain: Configure a chain of providers (e.g., try
Taskiqfirst, fall back toRQif it's down) for high availability.
How to Define and Enqueue a Task
# In your_app/tasks.py
from nala.athomic.integration.tasks import task
@task(queue="emails")
async def send_welcome_email(user_id: str, email: str):
# This function will be executed by a worker process.
# The context from the original API request will be available here.
await email_service.send(to=email, ...)
# In your API endpoint or service
from your_app.tasks import send_welcome_email
async def create_user(user_data: dict):
# Instead of calling the function directly...
# send_welcome_email(user.id, user.email)
# ...you enqueue it for background execution using .delay()
await send_welcome_email.delay(user_id=user.id, email=user.email)
API Reference
nala.athomic.integration.tasks.decorator.task(queue=None)
Decorator used to register a standard asynchronous function as a background task.
The decorated function is wrapped in a TaskWrapper, giving it the
.delay() method for asynchronous execution scheduling.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue
|
Optional[str]
|
Optional name of the queue where the task should be placed. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Callable |
Callable[[Callable], TaskWrapper]
|
The decorator function that returns a TaskWrapper instance. |
nala.athomic.integration.tasks.protocol.TaskBrokerProtocol
Bases: Protocol
Defines the standard, agnostic interface (contract) for integrating with background task queue systems.
This protocol serves as an abstraction layer (DIP) over concrete backends like RQ, Taskiq, or internal local executors, decoupling application logic from the task execution infrastructure.
enqueue_task(task_name, *args, **kwargs)
async
Enqueues a task for eventual asynchronous execution by a dedicated worker process.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_name
|
str
|
A string identifier for the task function (e.g., 'send_welcome_email'). The format depends on the specific task broker backend (e.g., function path, registered name). |
required |
*args
|
Any
|
Positional arguments to be passed to the task function. These arguments must be safely serializable by the backend broker. |
()
|
**kwargs
|
Any
|
Keyword arguments to be passed to the task function. Must also be safely serializable. |
{}
|
Returns:
| Type | Description |
|---|---|
Optional[str]
|
An optional string representing the unique ID assigned to the enqueued |
Optional[str]
|
task by the backend (if available). Returns None if the backend |
Optional[str]
|
does not provide an ID or enqueues silently. |
Raises:
| Type | Description |
|---|---|
TaskEnqueueError
|
If the system fails to communicate with the broker or if a critical serialization failure prevents the task from being successfully placed in the queue. |
nala.athomic.integration.tasks.worker.run_task_with_context(func)
A decorator wrapper applied to task functions on the worker side.
This wrapper performs the critical function of: 1. Extracting the serialized execution context from task arguments. 2. Restoring the context (tracing IDs, tenant info, etc.) for the worker's thread/async loop. 3. Applying tracing and metrics before and after task execution. 4. Handling exceptions and logging status.