Backpressure
Overview
Backpressure is a resilience pattern used to prevent a system from being overwhelmed by temporarily stopping the flow of work towards a component that is known to be failing or overloaded. It's a form of flow control that helps prevent cascading failures.
In the Athomic Layer, the Backpressure mechanism is primarily used by background processing services, like the OutboxPublisher, to avoid repeatedly attempting to process a task or resource that is consistently causing errors.
Key Features
- Temporary Throttling: Pauses work on a specific resource (e.g., a message
aggregate_key) for a configurable duration after a failure. - Distributed State: Uses a distributed Key-Value store (like Redis) to share the backpressure state across all instances of a service.
- Automatic Recovery: Since the backpressure flag is set with a Time-To-Live (TTL), the system automatically attempts to process the resource again after the embargo period expires.
How It Works
The system is orchestrated by the BackpressureManager.
-
Failure Detection: A service (like the
OutboxPublisher) detects a failure associated with a specific resource. For example, it fails to acquire a lease fororder-123, or it fails to publish a message fororder-123. -
Applying Backpressure: The service calls
backpressure_manager.apply_backpressure(resource_id="order-123", ...). This creates a key (e.g.,athomic:backpressure:order-123) in the configured KV store with a specific TTL (e.g., 30 seconds). The existence of this key acts as a "throttling" flag. -
Checking Before Processing: In its next processing cycle, before attempting to work on any resources, the service first calls
backpressure_manager.filter_throttled(["order-123", "order-456", ...]). This method checks the KV store for any backpressure flags. -
Skipping Work: The manager returns a filtered list containing only the resources that are not currently throttled. The service then proceeds to work only on this "safe" list, skipping
order-123for this cycle. -
Automatic Expiration: After 30 seconds, the key
athomic:backpressure:order-123automatically expires in Redis. In the next cycle, thefilter_throttledmethod will no longer filter outorder-123, and the service will attempt to process it again.
Usage Example
While primarily used internally by Athomic services, you could use the BackpressureManager in a custom background worker.
from nala.athomic.resilience.backpressure import BackpressureFactory
# Get the singleton manager instance
backpressure_manager = BackpressureFactory.create()
async def process_batch_of_items(all_item_ids: list[str]):
# First, filter out any items that are currently throttled
safe_to_process_ids = await backpressure_manager.filter_throttled(all_item_ids)
for item_id in safe_to_process_ids:
try:
await do_work_on(item_id)
except Exception as e:
# If work fails, apply backpressure to this item for 60 seconds
print(f"Work failed for {item_id}. Applying backpressure.")
await backpressure_manager.apply_backpressure(
resource_id=item_id,
reason="processing_failed",
ttl_seconds=60
)
Configuration
The backpressure system is configured under the [resilience.backpressure] section in your settings.toml. It requires a KV store connection to manage its state.
[default.resilience.backpressure]
enabled = true
# The default duration in seconds to apply backpressure if not specified.
default_ttl_seconds = 30
# The prefix for all backpressure keys stored in the KV store.
key_prefix = "athomic:backpressure"
# The name of the KVStore connection (from [database.kvstore]) to use.
kv_store_connection_name = "default_redis"
API Reference
nala.athomic.resilience.backpressure.manager.BackpressureManager
Manages the state of throttled resources in a distributed Key-Value (KV) store.
This class implements the core logic of the Backpressure pattern by using time-to-live (TTL) keys in a shared store to mark resources that are temporarily overloaded or failing. This prevents downstream services from being overwhelmed.
Attributes:
| Name | Type | Description |
|---|---|---|
storage |
KVStoreProtocol
|
The KV store client used for state persistence. |
key_prefix |
str
|
The static prefix for all keys in the store. |
default_ttl |
int
|
The default expiration time in seconds for a backpressure flag. |
__init__(storage, key_prefix, default_ttl)
Initializes the BackpressureManager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
storage
|
KVStoreProtocol
|
The KV store implementation (e.g., Redis). |
required |
key_prefix
|
str
|
The key prefix used for all backpressure flags. |
required |
default_ttl
|
int
|
The default duration for a backpressure embargo in seconds. |
required |
apply_backpressure(resource_id, reason, ttl_seconds=None)
async
Marks a resource as throttled by setting a time-limited flag in the KV store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource_id
|
str
|
The unique ID of the resource (e.g., aggregate key). |
required |
reason
|
str
|
The cause of the backpressure (e.g., 'publish_error', 'lease_conflict'). |
required |
ttl_seconds
|
Optional[int]
|
The duration of the embargo. If None, uses the default TTL. |
None
|
filter_throttled(resource_ids)
async
Filters a list of resource IDs concurrently, returning only those that are NOT throttled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource_ids
|
List[str]
|
The input list of resource identifiers. |
required |
Returns:
| Type | Description |
|---|---|
List[str]
|
List[str]: A new list containing only the resource IDs that are currently operational. |
is_throttled(resource_id)
async
Checks if a resource is currently under a backpressure embargo.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource_id
|
str
|
The unique ID of the resource. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if the resource is throttled (key exists), False otherwise. |
nala.athomic.resilience.backpressure.factory.BackpressureFactory
Factory responsible for creating the singleton instance of BackpressureManager.
This factory resolves the necessary dependencies—the configuration settings and the configured Key-Value (KV) store client—to instantiate the manager. It performs a runtime check to ensure the backpressure mechanism is enabled.
create(connection_manager=None)
classmethod
Creates and returns a configured instance of BackpressureManager.
The process involves: 1. Checking if the feature is enabled in application settings. 2. Resolving the designated KV store client via the injected ConnectionManager. 3. Instantiating the manager with the KV store, key prefix, and default TTL.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the backpressure mechanism is disabled in the settings. |
Returns:
| Name | Type | Description |
|---|---|---|
BackpressureManager |
BackpressureManager
|
A fully initialized instance ready for use. |