Sagas for Distributed Transactions
Overview
The Saga pattern is an advanced resilience mechanism for managing data consistency across multiple services in a distributed transaction. Unlike traditional (ACID) transactions, which are not feasible in a microservices architecture, a saga is a sequence of local transactions where each step is coordinated with the next.
If any step in the sequence fails, the saga executes a series of compensating actions in reverse order to undo the work that was already completed, thus maintaining overall data consistency.
Athomic provides a complete framework for defining, executing, and recovering sagas, supporting two distinct execution models: Orchestration and Choreography.
Core Concepts
- Saga Definition: A static blueprint of the entire business transaction, created using the
SagaBuilder. - Step: A single, atomic operation within the saga. Each step consists of an Action and a Compensation.
- Action: A forward-moving operation that performs a task (e.g.,
reserve_inventory). - Compensation: An operation that semantically undoes an Action (e.g.,
release_inventory). - Saga State: The dynamic, persisted state of a single running saga instance. It tracks the current step, the business payload, and the history of executed steps.
Execution Models
1. Orchestration (Command-Based)
In this model, a central OrchestrationSagaExecutor manages the entire flow. It directly calls the action handlers for each step in sequence. If any action fails, the orchestrator is responsible for calling the corresponding compensation handlers in reverse order.
- Pros: Simple to understand, centralized logic, explicit control flow.
- Cons: Can lead to a "god object" orchestrator if the saga is very complex.
2. Choreography (Event-Based)
In this model, there is no central orchestrator. The ChoreographySagaExecutor simply starts the saga by publishing the first action as a command/event on the message bus.
Each step is then handled by an independent service (a "participant") that subscribes to its relevant event. After completing its local transaction, the participant uses the injected SagaContext to publish the command/event for the next step. Compensation also flows via events.
- Pros: Highly decoupled, services have no direct knowledge of each other.
- Cons: Can be harder to debug and visualize the end-to-end flow.
1. How to Define a Saga
Regardless of the execution model, you define a saga using the fluent SagaBuilder.
# In your_app/sagas.py
from nala.athomic.resilience.sagas import SagaBuilder, saga_definition_registry
# Define the blueprint for an "create_order" saga
create_order_saga = SagaBuilder(name="create_order_saga")
.add_step(
name="reserve_inventory",
action="tasks.inventory.reserve_stock",
compensation="tasks.inventory.release_stock"
)
.add_step(
name="process_payment",
action="tasks.payment.charge_card",
compensation="tasks.payment.refund_charge"
)
.add_step(
name="update_order_status",
action="tasks.orders.set_order_approved",
compensation="tasks.orders.set_order_failed"
)
.build()
# Register the definition so the SagaManager can find it
saga_definition_registry.register(create_order_saga)
The action and compensation strings are identifiers that the executor will resolve into callable functions or tasks.
2. How to Execute a Saga
You start a new saga instance using the SagaManager.
from nala.athomic.resilience.sagas import SagaManager
async def handle_create_order_request(order_details: dict):
saga_manager = SagaManager()
initial_payload = {
"order_id": order_details["id"],
"customer_id": order_details["customer_id"],
"amount": order_details["total_price"]
}
# This call will block until the saga completes or fails
final_state = await saga_manager.execute(
saga_name="create_order_saga",
initial_payload=initial_payload
)
return final_state
3. Implementing Participants (for Choreography)
If you are using the Choreography model, each action and compensation handler is a message consumer decorated with @saga_participant. The decorator injects a SagaContext object, which provides methods to safely advance or compensate the saga.
# In your inventory service's consumers
from nala.athomic.resilience.sagas import saga_participant, SagaContext
@subscribe_to("tasks.inventory.reserve_stock")
@saga_participant()
async def reserve_stock_handler(context: SagaContext):
try:
inventory = await inventory_service.reserve(context.payload)
# Action succeeded: update the payload and complete the step.
# This will automatically publish the next event ("tasks.payment.charge_card").
await context.complete_step(updated_payload={"inventory_info": inventory})
except Exception as e:
# Action failed: initiate the compensation flow.
# This will publish the first compensation event ("tasks.inventory.release_stock").
await context.fail_and_compensate(error_details=str(e))
Configuration
The saga system is configured under the [resilience.sagas] section in your settings.toml.
[default.resilience.sagas]
enabled = true
# Choose the execution model: "orchestration" or "choreography"
executor_type = "orchestration"
# The name of the KVStore connection used for persisting saga state.
kv_store_connection_name = "default_redis"
# Configuration for the "Reaper" service, which recovers stalled sagas.
[default.resilience.sagas.reaper]
enabled = true
poll_interval_seconds = 300 # Check every 5 minutes
stalled_threshold_seconds = 900 # A saga is stalled if not updated for 15 minutes
API Reference
nala.athomic.resilience.sagas.builder.SagaBuilder
A fluent builder for creating immutable SagaDefinition objects.
This class provides a declarative, chainable API for defining the sequence of steps, actions, and compensations that constitute a distributed transaction.
__init__(name)
Initializes the builder for a new saga definition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The unique name for the saga, used for identification and logging. |
required |
add_step(name, action, compensation)
Adds a new step to the saga sequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
A human-readable name for the step. |
required |
action
|
str
|
An identifier for the function/task that performs the action. |
required |
compensation
|
str
|
An identifier for the function/task that compensates the action. |
required |
Returns:
| Type | Description |
|---|---|
SagaBuilder
|
The builder instance to allow for method chaining. |
build()
Constructs and returns the final, immutable SagaDefinition object.
Once built, the definition should not be altered.
Returns:
| Type | Description |
|---|---|
SagaDefinition
|
A configured and validated SagaDefinition instance. |
nala.athomic.resilience.sagas.manager.SagaManager
The main entry point and public interface for the Saga distributed transaction framework.
This manager is responsible for initiating a saga run by resolving the static
SagaDefinition and delegating the entire execution lifecycle (forward actions
and compensation) to the configured SagaExecutorProtocol.
Attributes:
| Name | Type | Description |
|---|---|---|
executor |
SagaExecutorProtocol
|
The saga executor responsible for running the process. |
registry |
The repository holding registered saga definitions. |
__init__(executor)
Initializes the SagaManager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
executor
|
SagaExecutorProtocol
|
A pre-configured saga executor instance responsible for the entire saga lifecycle. |
required |
execute(saga_name, initial_payload)
async
Initiates a new instance of a registered saga.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
saga_name
|
str
|
The unique name of the saga definition to execute. |
required |
initial_payload
|
Dict[str, Any]
|
The business data required to start the transaction. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
SagaState |
SagaState
|
The final state object of the saga upon completion or failure. |
Raises:
| Type | Description |
|---|---|
SagaError
|
If the saga definition cannot be found in the registry. |
nala.athomic.resilience.sagas.participant.saga_participant()
Decorator for a message consumer function to designate it as a participant in a choreographed saga.
This decorator handles the necessary boilerplate for saga participation:
1. Extracts the saga_id from the incoming message.
2. Loads the current SagaState from the repository.
3. Validates that the saga is in a runnable state (RUNNING or COMPENSATING).
4. Injects a rich SagaContext object into the decorated function, which
allows the handler to safely advance or compensate the saga.
nala.athomic.resilience.sagas.context.SagaContext
An active context object injected into saga participant handlers.
It holds the current, mutable state of the saga and provides high-level methods to safely transition the saga's state (advance or fail/compensate), abstracting away the underlying repository and messaging infrastructure.
payload
property
Returns the mutable business payload associated with the saga.
saga_id
property
Returns the unique ID of the current saga instance.
__init__(state, definition, repository, producer)
Initializes the SagaContext.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
SagaState
|
The current, mutable state of the running saga instance. |
required |
definition
|
SagaDefinition
|
The static blueprint defining the steps of the saga. |
required |
repository
|
SagaStateRepositoryProtocol
|
The persistence layer for state updates. |
required |
producer
|
ProducerProtocol
|
The messaging component used to publish events (commands). |
required |
complete_step(updated_payload=None)
async
Marks the current step as succeeded and triggers the next action in the saga chain.
If this is the final step, the saga status is set to COMPLETED. Otherwise,
the next action is published as an event via the producer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
updated_payload
|
Optional[Dict[str, Any]]
|
Optional dictionary to update or merge into the saga's persistent payload. |
None
|
fail_and_compensate(error_details='')
async
Marks the current step as failed, initiates the compensation flow, and publishes the first compensation event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error_details
|
str
|
A message detailing the reason for the failure. |
''
|
nala.athomic.resilience.sagas.models.SagaState
Bases: BaseModel
Represents the dynamic, persisted state of a single, running saga instance.
This is the core model managed by the SagaStateRepository, tracking the
saga's progress, status, and accumulated payload.
Attributes:
| Name | Type | Description |
|---|---|---|
saga_id |
UUID
|
The unique instance ID of the running saga. |
saga_name |
str
|
The name of the saga definition used to start this instance. |
status |
SagaStatus
|
The current state of the saga (RUNNING, COMPLETED, etc.). |
current_step |
int
|
The index of the last or currently executing step in the definition. |
payload |
Dict[str, Any]
|
The mutable business data for the transaction, updated by steps. |
context |
Dict[str, Any]
|
The propagated |
step_history |
List[SagaStepHistory]
|
An ordered log of execution attempts for each step. |
created_at |
datetime
|
The UTC time when the saga instance was created. |
updated_at |
datetime
|
The UTC time of the last state update (used by the reaper service). |