Skip to content

Sagas for Distributed Transactions

Overview

The Saga pattern is an advanced resilience mechanism for managing data consistency across multiple services in a distributed transaction. Unlike traditional (ACID) transactions, which are not feasible in a microservices architecture, a saga is a sequence of local transactions where each step is coordinated with the next.

If any step in the sequence fails, the saga executes a series of compensating actions in reverse order to undo the work that was already completed, thus maintaining overall data consistency.

Athomic provides a complete framework for defining, executing, and recovering sagas, supporting two distinct execution models: Orchestration and Choreography.


Core Concepts

  • Saga Definition: A static blueprint of the entire business transaction, created using the SagaBuilder.
  • Step: A single, atomic operation within the saga. Each step consists of an Action and a Compensation.
  • Action: A forward-moving operation that performs a task (e.g., reserve_inventory).
  • Compensation: An operation that semantically undoes an Action (e.g., release_inventory).
  • Saga State: The dynamic, persisted state of a single running saga instance. It tracks the current step, the business payload, and the history of executed steps.

Execution Models

1. Orchestration (Command-Based)

In this model, a central OrchestrationSagaExecutor manages the entire flow. It directly calls the action handlers for each step in sequence. If any action fails, the orchestrator is responsible for calling the corresponding compensation handlers in reverse order.

  • Pros: Simple to understand, centralized logic, explicit control flow.
  • Cons: Can lead to a "god object" orchestrator if the saga is very complex.

2. Choreography (Event-Based)

In this model, there is no central orchestrator. The ChoreographySagaExecutor simply starts the saga by publishing the first action as a command/event on the message bus.

Each step is then handled by an independent service (a "participant") that subscribes to its relevant event. After completing its local transaction, the participant uses the injected SagaContext to publish the command/event for the next step. Compensation also flows via events.

  • Pros: Highly decoupled, services have no direct knowledge of each other.
  • Cons: Can be harder to debug and visualize the end-to-end flow.

1. How to Define a Saga

Regardless of the execution model, you define a saga using the fluent SagaBuilder.

# In your_app/sagas.py
from nala.athomic.resilience.sagas import SagaBuilder, saga_definition_registry

# Define the blueprint for an "create_order" saga
create_order_saga = SagaBuilder(name="create_order_saga") 
    .add_step(
        name="reserve_inventory",
        action="tasks.inventory.reserve_stock",
        compensation="tasks.inventory.release_stock"
    ) 
    .add_step(
        name="process_payment",
        action="tasks.payment.charge_card",
        compensation="tasks.payment.refund_charge"
    ) 
    .add_step(
        name="update_order_status",
        action="tasks.orders.set_order_approved",
        compensation="tasks.orders.set_order_failed"
    ) 
    .build()

# Register the definition so the SagaManager can find it
saga_definition_registry.register(create_order_saga)

The action and compensation strings are identifiers that the executor will resolve into callable functions or tasks.

2. How to Execute a Saga

You start a new saga instance using the SagaManager.

from nala.athomic.resilience.sagas import SagaManager

async def handle_create_order_request(order_details: dict):
    saga_manager = SagaManager()

    initial_payload = {
        "order_id": order_details["id"],
        "customer_id": order_details["customer_id"],
        "amount": order_details["total_price"]
    }

    # This call will block until the saga completes or fails
    final_state = await saga_manager.execute(
        saga_name="create_order_saga",
        initial_payload=initial_payload
    )

    return final_state

3. Implementing Participants (for Choreography)

If you are using the Choreography model, each action and compensation handler is a message consumer decorated with @saga_participant. The decorator injects a SagaContext object, which provides methods to safely advance or compensate the saga.

# In your inventory service's consumers
from nala.athomic.resilience.sagas import saga_participant, SagaContext

@subscribe_to("tasks.inventory.reserve_stock")
@saga_participant()
async def reserve_stock_handler(context: SagaContext):
    try:
        inventory = await inventory_service.reserve(context.payload)

        # Action succeeded: update the payload and complete the step.
        # This will automatically publish the next event ("tasks.payment.charge_card").
        await context.complete_step(updated_payload={"inventory_info": inventory})

    except Exception as e:
        # Action failed: initiate the compensation flow.
        # This will publish the first compensation event ("tasks.inventory.release_stock").
        await context.fail_and_compensate(error_details=str(e))

Configuration

The saga system is configured under the [resilience.sagas] section in your settings.toml.

[default.resilience.sagas]
enabled = true

# Choose the execution model: "orchestration" or "choreography"
executor_type = "orchestration"

# The name of the KVStore connection used for persisting saga state.
kv_store_connection_name = "default_redis"

  # Configuration for the "Reaper" service, which recovers stalled sagas.
  [default.resilience.sagas.reaper]
  enabled = true
  poll_interval_seconds = 300 # Check every 5 minutes
  stalled_threshold_seconds = 900 # A saga is stalled if not updated for 15 minutes

API Reference

nala.athomic.resilience.sagas.builder.SagaBuilder

A fluent builder for creating immutable SagaDefinition objects.

This class provides a declarative, chainable API for defining the sequence of steps, actions, and compensations that constitute a distributed transaction.

__init__(name)

Initializes the builder for a new saga definition.

Parameters:

Name Type Description Default
name str

The unique name for the saga, used for identification and logging.

required

add_step(name, action, compensation)

Adds a new step to the saga sequence.

Parameters:

Name Type Description Default
name str

A human-readable name for the step.

required
action str

An identifier for the function/task that performs the action.

required
compensation str

An identifier for the function/task that compensates the action.

required

Returns:

Type Description
SagaBuilder

The builder instance to allow for method chaining.

build()

Constructs and returns the final, immutable SagaDefinition object.

Once built, the definition should not be altered.

Returns:

Type Description
SagaDefinition

A configured and validated SagaDefinition instance.

nala.athomic.resilience.sagas.manager.SagaManager

The main entry point and public interface for the Saga distributed transaction framework.

This manager is responsible for initiating a saga run by resolving the static SagaDefinition and delegating the entire execution lifecycle (forward actions and compensation) to the configured SagaExecutorProtocol.

Attributes:

Name Type Description
executor SagaExecutorProtocol

The saga executor responsible for running the process.

registry

The repository holding registered saga definitions.

__init__(executor)

Initializes the SagaManager.

Parameters:

Name Type Description Default
executor SagaExecutorProtocol

A pre-configured saga executor instance responsible for the entire saga lifecycle.

required

execute(saga_name, initial_payload) async

Initiates a new instance of a registered saga.

Parameters:

Name Type Description Default
saga_name str

The unique name of the saga definition to execute.

required
initial_payload Dict[str, Any]

The business data required to start the transaction.

required

Returns:

Name Type Description
SagaState SagaState

The final state object of the saga upon completion or failure.

Raises:

Type Description
SagaError

If the saga definition cannot be found in the registry.

nala.athomic.resilience.sagas.participant.saga_participant()

Decorator for a message consumer function to designate it as a participant in a choreographed saga.

This decorator handles the necessary boilerplate for saga participation: 1. Extracts the saga_id from the incoming message. 2. Loads the current SagaState from the repository. 3. Validates that the saga is in a runnable state (RUNNING or COMPENSATING). 4. Injects a rich SagaContext object into the decorated function, which allows the handler to safely advance or compensate the saga.

nala.athomic.resilience.sagas.context.SagaContext

An active context object injected into saga participant handlers.

It holds the current, mutable state of the saga and provides high-level methods to safely transition the saga's state (advance or fail/compensate), abstracting away the underlying repository and messaging infrastructure.

payload property

Returns the mutable business payload associated with the saga.

saga_id property

Returns the unique ID of the current saga instance.

__init__(state, definition, repository, producer)

Initializes the SagaContext.

Parameters:

Name Type Description Default
state SagaState

The current, mutable state of the running saga instance.

required
definition SagaDefinition

The static blueprint defining the steps of the saga.

required
repository SagaStateRepositoryProtocol

The persistence layer for state updates.

required
producer ProducerProtocol

The messaging component used to publish events (commands).

required

complete_step(updated_payload=None) async

Marks the current step as succeeded and triggers the next action in the saga chain.

If this is the final step, the saga status is set to COMPLETED. Otherwise, the next action is published as an event via the producer.

Parameters:

Name Type Description Default
updated_payload Optional[Dict[str, Any]]

Optional dictionary to update or merge into the saga's persistent payload.

None

fail_and_compensate(error_details='') async

Marks the current step as failed, initiates the compensation flow, and publishes the first compensation event.

Parameters:

Name Type Description Default
error_details str

A message detailing the reason for the failure.

''

nala.athomic.resilience.sagas.models.SagaState

Bases: BaseModel

Represents the dynamic, persisted state of a single, running saga instance.

This is the core model managed by the SagaStateRepository, tracking the saga's progress, status, and accumulated payload.

Attributes:

Name Type Description
saga_id UUID

The unique instance ID of the running saga.

saga_name str

The name of the saga definition used to start this instance.

status SagaStatus

The current state of the saga (RUNNING, COMPLETED, etc.).

current_step int

The index of the last or currently executing step in the definition.

payload Dict[str, Any]

The mutable business data for the transaction, updated by steps.

context Dict[str, Any]

The propagated ExecutionContext (e.g., trace_id, tenant_id) captured at initiation.

step_history List[SagaStepHistory]

An ordered log of execution attempts for each step.

created_at datetime

The UTC time when the saga instance was created.

updated_at datetime

The UTC time of the last state update (used by the reaper service).