Skip to content

Backpressure

Overview

Backpressure is a resilience pattern used to prevent a system from being overwhelmed by temporarily stopping the flow of work towards a component that is known to be failing or overloaded. It's a form of flow control that helps prevent cascading failures.

In the Athomic Layer, the Backpressure mechanism is primarily used by background processing services, like the OutboxPublisher, to avoid repeatedly attempting to process a task or resource that is consistently causing errors.

Key Features

  • Temporary Throttling: Pauses work on a specific resource (e.g., a message aggregate_key) for a configurable duration after a failure.
  • Distributed State: Uses a distributed Key-Value store (like Redis) to share the backpressure state across all instances of a service.
  • Automatic Recovery: Since the backpressure flag is set with a Time-To-Live (TTL), the system automatically attempts to process the resource again after the embargo period expires.

How It Works

The system is orchestrated by the BackpressureManager.

  1. Failure Detection: A service (like the OutboxPublisher) detects a failure associated with a specific resource. For example, it fails to acquire a lease for order-123, or it fails to publish a message for order-123.

  2. Applying Backpressure: The service calls backpressure_manager.apply_backpressure(resource_id="order-123", ...). This creates a key (e.g., athomic:backpressure:order-123) in the configured KV store with a specific TTL (e.g., 30 seconds). The existence of this key acts as a "throttling" flag.

  3. Checking Before Processing: In its next processing cycle, before attempting to work on any resources, the service first calls backpressure_manager.filter_throttled(["order-123", "order-456", ...]). This method checks the KV store for any backpressure flags.

  4. Skipping Work: The manager returns a filtered list containing only the resources that are not currently throttled. The service then proceeds to work only on this "safe" list, skipping order-123 for this cycle.

  5. Automatic Expiration: After 30 seconds, the key athomic:backpressure:order-123 automatically expires in Redis. In the next cycle, the filter_throttled method will no longer filter out order-123, and the service will attempt to process it again.


Usage Example

While primarily used internally by Athomic services, you could use the BackpressureManager in a custom background worker.

from nala.athomic.resilience.backpressure import BackpressureFactory

# Get the singleton manager instance
backpressure_manager = BackpressureFactory.create()

async def process_batch_of_items(all_item_ids: list[str]):
    # First, filter out any items that are currently throttled
    safe_to_process_ids = await backpressure_manager.filter_throttled(all_item_ids)

    for item_id in safe_to_process_ids:
        try:
            await do_work_on(item_id)
        except Exception as e:
            # If work fails, apply backpressure to this item for 60 seconds
            print(f"Work failed for {item_id}. Applying backpressure.")
            await backpressure_manager.apply_backpressure(
                resource_id=item_id,
                reason="processing_failed",
                ttl_seconds=60
            )

Configuration

The backpressure system is configured under the [resilience.backpressure] section in your settings.toml. It requires a KV store connection to manage its state.

[default.resilience.backpressure]
enabled = true

# The default duration in seconds to apply backpressure if not specified.
default_ttl_seconds = 30

# The prefix for all backpressure keys stored in the KV store.
key_prefix = "athomic:backpressure"

# The name of the KVStore connection (from [database.kvstore]) to use.
kv_store_connection_name = "default_redis"

API Reference

nala.athomic.resilience.backpressure.manager.BackpressureManager

Manages the state of throttled resources in a distributed Key-Value (KV) store.

This class implements the core logic of the Backpressure pattern by using time-to-live (TTL) keys in a shared store to mark resources that are temporarily overloaded or failing. This prevents downstream services from being overwhelmed.

Attributes:

Name Type Description
storage KVStoreProtocol

The KV store client used for state persistence.

key_prefix str

The static prefix for all keys in the store.

default_ttl int

The default expiration time in seconds for a backpressure flag.

__init__(storage, key_prefix, default_ttl)

Initializes the BackpressureManager.

Parameters:

Name Type Description Default
storage KVStoreProtocol

The KV store implementation (e.g., Redis).

required
key_prefix str

The key prefix used for all backpressure flags.

required
default_ttl int

The default duration for a backpressure embargo in seconds.

required

apply_backpressure(resource_id, reason, ttl_seconds=None) async

Marks a resource as throttled by setting a time-limited flag in the KV store.

Parameters:

Name Type Description Default
resource_id str

The unique ID of the resource (e.g., aggregate key).

required
reason str

The cause of the backpressure (e.g., 'publish_error', 'lease_conflict').

required
ttl_seconds Optional[int]

The duration of the embargo. If None, uses the default TTL.

None

filter_throttled(resource_ids) async

Filters a list of resource IDs concurrently, returning only those that are NOT throttled.

Parameters:

Name Type Description Default
resource_ids List[str]

The input list of resource identifiers.

required

Returns:

Type Description
List[str]

List[str]: A new list containing only the resource IDs that are currently operational.

is_throttled(resource_id) async

Checks if a resource is currently under a backpressure embargo.

Parameters:

Name Type Description Default
resource_id str

The unique ID of the resource.

required

Returns:

Name Type Description
bool bool

True if the resource is throttled (key exists), False otherwise.

nala.athomic.resilience.backpressure.factory.BackpressureFactory

Factory responsible for creating the singleton instance of BackpressureManager.

This factory resolves the necessary dependencies—the configuration settings and the configured Key-Value (KV) store client—to instantiate the manager. It performs a runtime check to ensure the backpressure mechanism is enabled.

create(connection_manager=None) classmethod

Creates and returns a configured instance of BackpressureManager.

The process involves: 1. Checking if the feature is enabled in application settings. 2. Resolving the designated KV store client via the injected ConnectionManager. 3. Instantiating the manager with the KV store, key prefix, and default TTL.

Raises:

Type Description
RuntimeError

If the backpressure mechanism is disabled in the settings.

Returns:

Name Type Description
BackpressureManager BackpressureManager

A fully initialized instance ready for use.