Skip to content

Transactional Outbox Pattern

Overview

The Transactional Outbox pattern is a critical resilience mechanism that ensures reliable, at-least-once delivery of events in a distributed system. It solves the classic "dual-write" problem, where an application needs to both commit a change to its database and publish a corresponding event to a message broker.

Without the outbox pattern, if the database commit succeeds but the message broker publish fails, the system becomes inconsistent. The Athomic outbox implementation guarantees that an event is published if and only if the business transaction that created it is successfully committed.

How It Works

The pattern is implemented in two distinct parts: saving the event atomically and publishing it reliably.

1. Saving the Event (The "Write")

Instead of publishing an event directly to a message broker, the application saves the event record to a dedicated outbox_events collection within the same database and transaction as the business data.

The primary way to achieve this is by using the @publish_on_success decorator on your service methods. This decorator wraps your business logic and ensures that the outbox event is only saved to the database if your function completes without raising an exception.

Example Usage

# In your application's service layer
from nala.athomic.database.outbox import OutboxEventType, publish_on_success

class DocumentService:
    @publish_on_success(
        event_name="DocumentCreated",
        event_type=OutboxEventType.MESSAGING,
        destination_topic="document.events.v1",
        # The payload_mapper receives the result of the decorated function
        payload_mapper=lambda result, **kwargs: result.model_dump()
    )
    async def create_document(self, content: str, user_id: str, db_session) -> Document:
        """
        Creates a document and ensures a 'DocumentCreated' event is reliably published.

        The `db_session` is passed by the repository and used by the decorator
        to ensure the document and the outbox event are saved in the same transaction.
        """
        new_doc = Document(content=content, owner_id=user_id)
        await self.repository.save(new_doc, session=db_session)
        return new_doc

2. Publishing the Event (The "Poll")

A separate, continuously running background service called the OutboxPublisher polls the outbox_events collection for new, PENDING events.

When the publisher finds pending events, it: 1. Publishes them to the actual message broker (e.g., Kafka). 2. Upon successful publication, it updates the event's status in the database to PUBLISHED.

This polling mechanism ensures that even if the application crashes immediately after the database transaction commits, the event will eventually be found and published when the service restarts.


Guaranteed Event Ordering (FIFO)

The outbox pattern in Athomic also supports guaranteed First-In, First-Out (FIFO) processing for events that belong to the same logical entity (or "aggregate"). This is essential for use cases where the order of events matters (e.g., OrderCreated, OrderUpdated, OrderShipped).

To enable ordering, you simply need to provide an aggregate_key when creating the event. The MongoOutboxRepository will then atomically generate a sequence_id for each event within that aggregate. The OutboxPublisher is designed to process events for the same aggregate key in strict sequential order.


Configuration

The OutboxPublisher is configured under the [integration.outbox_publisher] section in your settings.toml. Here you can enable the service, define the polling interval, and configure its storage backend.

[default.integration.outbox_publisher]
enabled = true
poll_interval_seconds = 5.0
fetch_limit = 100 # Max number of unordered events to fetch per cycle

  [default.integration.outbox_publisher.storage]
  backend = "mongo"
  max_attempts = 3
    [default.integration.outbox_publisher.storage.mongo]
    collection_name = "outbox_events"
    db_connection_name = "default_mongo"

API Reference

nala.athomic.integration.outbox.decorators.publish_on_success(event_name, event_type, destination_topic, payload_mapper, key_mapper=None, include_context=True, storage=None)

Decorator implementing the Transactional Outbox pattern. It ensures that an event is saved to the persistent Outbox storage (and thus will be published later) only if the decorated function completes without raising an exception.

The storage operation is intended to be executed within the same database transaction as the main function's work.

Parameters:

Name Type Description Default
event_name str

A descriptive name for the domain event (e.g., 'DocumentCreated').

required
event_type OutboxEventType

The type of the event (e.g., Command, Event, Query).

required
destination_topic Optional[str]

The messaging topic where the event will eventually be published.

required
payload_mapper Callable[..., Dict[str, Any]]

A callable that accepts the decorated function's result, args, and kwargs, and returns the event payload (Dict[str, Any]).

required
key_mapper Optional[Callable[..., Optional[str]]]

An optional callable to generate a message key (str) for partitioning purposes.

None
include_context Optional[bool]

If True, automatically embeds the current ExecutionContext into the event payload under the '_nala_context' key. Defaults to True.

True
storage Optional[OutboxStorageProtocol]

An optional pre-configured OutboxStorageProtocol instance. If None, the OutboxStorageFactory is used to create one.

None

Returns:

Type Description
Callable[P, Coroutine[Any, R, Any]]

The decorated asynchronous wrapper function.

nala.athomic.database.outbox.protocol.OutboxStorageProtocol

Bases: Protocol

Protocol defining the storage operations for the Outbox pattern.

delete_processed_event(event_id) async

(Optional but recommended for cleanup) Deletes an event that has been successfully published or marked as permanently failed (e.g., after reaching max attempts).

Parameters:

Name Type Description Default
event_id UUID

The unique ID (UUID) of the event to delete.

required

Returns:

Type Description
bool

True if the event was found and deleted, False otherwise.

get_distinct_pending_aggregate_keys() async

Retrieves a list of unique aggregate_keys that have events in the PENDING state. This is used by the publisher to shard and process aggregates independently.

Returns:

Type Description
List[str]

A list of unique aggregate key strings.

get_hot_aggregates(top_n=10) async

Retrieves the top N aggregate keys with the highest number of pending events.

Parameters:

Name Type Description Default
top_n int

The number of hot aggregates to retrieve.

10

Returns:

Type Description
List[Dict[str, Any]]

A list of dictionaries, each containing 'aggregate_key' and 'count'.

get_pending_events(limit=100) async

Retrieves a batch of events currently in the PENDING state, ordered by creation time (oldest first).

Parameters:

Name Type Description Default
limit int

The maximum number of events to retrieve.

100

Returns:

Type Description
List[OutboxEventData]

A list of pending OutboxEventData objects.

get_pending_events_for_aggregate(aggregate_key) async

Retrieves all PENDING events for a specific aggregate_key, ordered by sequence_id.

Parameters:

Name Type Description Default
aggregate_key str

The aggregate key to fetch events for.

required

Returns:

Type Description
List[OutboxEventData]

A list of pending OutboxEventData objects for the given aggregate, correctly ordered.

mark_event_attempted(event_id) async

Marks an event as attempted by incrementing its attempt counter and updating the last attempt timestamp.

Parameters:

Name Type Description Default
event_id UUID

The unique ID (UUID) of the event to mark.

required

Returns:

Type Description
bool

True if the event was found and marked, False otherwise.

mark_event_failed(event_id, error_message) async

Marks an event as failed, increments the attempt counter, and stores the error message.

Parameters:

Name Type Description Default
event_id UUID

The unique ID (UUID) of the event.

required
error_message str

A description of the error encountered during publishing.

required

Returns:

Type Description
Optional[int]

The new attempt count if the event was found and marked, None otherwise.

mark_event_published(event_id) async

Marks a specific event as successfully published by updating its status.

Parameters:

Name Type Description Default
event_id UUID

The unique ID (UUID) of the event to mark.

required

Returns:

Type Description
bool

True if the event was found and marked, False otherwise.

save_event(event, db_session=None) async

Saves an event to the outbox storage within the provided database session/transaction. Supports agnostic usage (messaging, webhooks, tasks...).

Parameters:

Name Type Description Default
event OutboxEventData

An OutboxEventData object containing the event details.

required

Returns:

Type Description
OutboxEventData

The saved OutboxEventData object.

nala.athomic.database.outbox.domain_models.OutboxEventData

Bases: BaseModel

A pure domain model for an outbox event.

This Pydantic model represents a single event to be processed reliably. It is designed to be independent of any specific database or ORM, containing all necessary data for the Outbox Publisher to process it. It supports both unordered events and ordered (FIFO) events via the aggregate_key and sequence_id fields.

Attributes:

Name Type Description
event_id UUID

The unique identifier for this specific event instance.

event_name str

A human-readable name identifying the event (e.g., "user.created").

event_type Optional[OutboxEventType]

The category of the event, determining how it's handled.

payload Dict[str, Any]

The actual data or body of the event.

status OutboxEventStatus

The current processing status of the event.

destination_topic Optional[str]

The target topic or queue for 'messaging' type events.

message_key Optional[str]

The partitioning key for 'messaging' type events.

aggregate_key Optional[str]

The identifier used to group events to ensure ordered processing. All events with the same aggregate_key are processed in strict FIFO order.

sequence_id Optional[int]

A monotonically increasing number for events within the same aggregate_key to enforce ordering.

attempts int

The number of times processing has been attempted for this event.

last_attempt_at Optional[datetime]

The timestamp of the last processing attempt.

last_error Optional[str]

The error message from the last failed attempt.

created_at datetime

The UTC timestamp of when the event was created.

nala.athomic.database.outbox.mongo.mongo_outbox_repository.MongoOutboxRepository

Bases: OutboxStorageProtocol

A MongoDB-backed repository for storing and managing outbox events.

This class provides a concrete implementation of the OutboxStorageProtocol using MongoDB as the persistence layer. It leverages beanie for object-document mapping and uses atomic operations and aggregation pipelines for efficient and reliable event handling.

Attributes:

Name Type Description
storage_settings

Configuration specific to the outbox storage.

db_provider

The database client provider for MongoDB connections.

logger

A pre-configured logger instance.

service_name

The identifier for this service in observability systems.

tracer

An OpenTelemetry tracer instance.

sequence_generator

A component for generating sequential IDs for ordered events.

__init__(storage_settings=None, db_provider=None, sequence_generator=None)

Initializes the MongoOutboxRepository.

Parameters:

Name Type Description Default
storage_settings Optional[OutboxStorageSettings]

Configuration for the outbox storage layer.

None
db_provider Optional[DatabaseClientProtocol]

A required database client provider instance.

None
sequence_generator Optional[SequenceGeneratorProtocol]

An optional custom sequence generator.

None

delete_processed_event(event_id) async

Deletes a processed event from the collection for cleanup purposes.

Parameters:

Name Type Description Default
event_id UUID

The unique ID of the event to delete.

required

Returns:

Type Description
bool

True if the event was found and deleted, False otherwise.

get_distinct_pending_aggregate_keys() async

Finds all unique aggregate keys that have pending events.

Uses MongoDB's distinct command to efficiently get a list of all aggregate_key values that have events in the 'PENDING' state. This is used by the publisher to shard the processing workload.

Returns:

Type Description
List[str]

A list of unique aggregate key strings.

get_hot_aggregates(top_n=10) async

Finds the top N aggregate keys with the most pending events.

This method executes a MongoDB aggregation pipeline to group events by aggregate_key, count them, sort by the count, and return the top results. This is primarily used for monitoring to identify potential bottlenecks.

Parameters:

Name Type Description Default
top_n int

The number of hot aggregates to retrieve.

10

Returns:

Type Description
List[Dict[str, Any]]

A list of dictionaries, each containing 'aggregate_key' and 'count'.

get_pending_events(limit=100) async

Retrieves a batch of pending, unordered events.

This method queries MongoDB for events that are in the 'PENDING' state and do not have an aggregate_key, sorted by their creation time to ensure older events are processed first.

Parameters:

Name Type Description Default
limit int

The maximum number of events to retrieve.

100

Returns:

Type Description
List[OutboxEventData]

A list of pending OutboxEventData objects.

get_pending_events_for_aggregate(aggregate_key) async

Retrieves all pending events for a specific aggregate, sorted by sequence.

Parameters:

Name Type Description Default
aggregate_key str

The aggregate key to fetch events for.

required

Returns:

Type Description
List[OutboxEventData]

A list of pending OutboxEventData objects for the given aggregate.

mark_event_attempted(event_id) async

Increments an event's attempt counter and updates its last attempt timestamp.

Parameters:

Name Type Description Default
event_id UUID

The unique ID of the event to update.

required

Returns:

Type Description
bool

True if the event was found and updated, False otherwise.

mark_event_failed(event_id, exception) async

Marks an event as failed, storing the error and updating its status.

This method updates the event with the error message. It checks if the exception is non-retriable or if the maximum attempt count has been reached to decide whether to move the event to the final 'FAILED' state or keep it as 'PENDING' for a future retry.

Parameters:

Name Type Description Default
event_id UUID

The unique ID of the event.

required
exception Exception

The exception that caused the failure.

required

Returns:

Type Description
Optional[int]

The new attempt count if the event was found and marked, None otherwise.

mark_event_published(event_id) async

Marks a specific event's status as PUBLISHED.

Parameters:

Name Type Description Default
event_id UUID

The unique ID of the event to mark as published.

required

Returns:

Type Description
bool

True if the event was found and updated, False otherwise.

save_event(event, db_session=None) async

Saves an event to the outbox collection within a transaction.

If the event includes an aggregate_key, this method first atomically generates a new sequence_id before inserting the event document into the database.

Parameters:

Name Type Description Default
event OutboxEventData

The domain model of the event to save.

required
db_session Optional[ClientSession]

An optional MongoDB client session for transactional operations.

None

Returns:

Type Description
OutboxEventData

The saved event, now potentially populated with a sequence_id.