Skip to content

Background Tasks

Overview

The Tasks module provides a high-level abstraction for enqueuing and executing background tasks asynchronously. It decouples the function call from its execution, allowing you to offload long-running or non-critical operations to a separate pool of workers.

Key Features

  • Simple Decorator API: Convert any async function into a background task with the @task decorator.
  • Multiple Backends: Supports Taskiq and RQ (Redis Queue) as distributed backends, as well as a local in-memory backend for testing.
  • Automatic Context Propagation: The ExecutionContext (trace_id, tenant_id, etc.) from the caller is automatically captured and restored on the worker, ensuring seamless tracing and multi-tenancy.
  • Resilient Fallback Chain: Configure a chain of providers (e.g., try Taskiq first, fall back to RQ if it's down) for high availability.

How to Define and Enqueue a Task

# In your_app/tasks.py
from nala.athomic.integration.tasks import task

@task(queue="emails")
async def send_welcome_email(user_id: str, email: str):
    # This function will be executed by a worker process.
    # The context from the original API request will be available here.
    await email_service.send(to=email, ...)

# In your API endpoint or service
from your_app.tasks import send_welcome_email

async def create_user(user_data: dict):
    # Instead of calling the function directly...
    # send_welcome_email(user.id, user.email)

    # ...you enqueue it for background execution using .delay()
    await send_welcome_email.delay(user_id=user.id, email=user.email)

API Reference

nala.athomic.integration.tasks.decorator.task(queue=None)

Decorator used to register a standard asynchronous function as a background task.

The decorated function is wrapped in a TaskWrapper, giving it the .delay() method for asynchronous execution scheduling.

Parameters:

Name Type Description Default
queue Optional[str]

Optional name of the queue where the task should be placed.

None

Returns:

Name Type Description
Callable Callable[[Callable], TaskWrapper]

The decorator function that returns a TaskWrapper instance.

nala.athomic.integration.tasks.protocol.TaskBrokerProtocol

Bases: Protocol

Defines the standard, agnostic interface (contract) for integrating with background task queue systems.

This protocol serves as an abstraction layer (DIP) over concrete backends like RQ, Taskiq, or internal local executors, decoupling application logic from the task execution infrastructure.

enqueue_task(task_name, *args, **kwargs) async

Enqueues a task for eventual asynchronous execution by a dedicated worker process.

Parameters:

Name Type Description Default
task_name str

A string identifier for the task function (e.g., 'send_welcome_email'). The format depends on the specific task broker backend (e.g., function path, registered name).

required
*args Any

Positional arguments to be passed to the task function. These arguments must be safely serializable by the backend broker.

()
**kwargs Any

Keyword arguments to be passed to the task function. Must also be safely serializable.

{}

Returns:

Type Description
Optional[str]

An optional string representing the unique ID assigned to the enqueued

Optional[str]

task by the backend (if available). Returns None if the backend

Optional[str]

does not provide an ID or enqueues silently.

Raises:

Type Description
TaskEnqueueError

If the system fails to communicate with the broker or if a critical serialization failure prevents the task from being successfully placed in the queue.

nala.athomic.integration.tasks.worker.run_task_with_context(func)

A decorator wrapper applied to task functions on the worker side.

This wrapper performs the critical function of: 1. Extracting the serialized execution context from task arguments. 2. Restoring the context (tracing IDs, tenant info, etc.) for the worker's thread/async loop. 3. Applying tracing and metrics before and after task execution. 4. Handling exceptions and logging status.