Transactional Outbox Pattern
Overview
The Transactional Outbox pattern is a critical resilience mechanism that ensures reliable, at-least-once delivery of events in a distributed system. It solves the classic "dual-write" problem, where an application needs to both commit a change to its database and publish a corresponding event to a message broker.
Without the outbox pattern, if the database commit succeeds but the message broker publish fails, the system becomes inconsistent. The Athomic outbox implementation guarantees that an event is published if and only if the business transaction that created it is successfully committed.
How It Works
The pattern is implemented in two distinct parts: saving the event atomically and publishing it reliably.
1. Saving the Event (The "Write")
Instead of publishing an event directly to a message broker, the application saves the event record to a dedicated outbox_events collection within the same database and transaction as the business data.
The primary way to achieve this is by using the @publish_on_success decorator on your service methods. This decorator wraps your business logic and ensures that the outbox event is only saved to the database if your function completes without raising an exception.
Example Usage
# In your application's service layer
from nala.athomic.database.outbox import OutboxEventType, publish_on_success
class DocumentService:
@publish_on_success(
event_name="DocumentCreated",
event_type=OutboxEventType.MESSAGING,
destination_topic="document.events.v1",
# The payload_mapper receives the result of the decorated function
payload_mapper=lambda result, **kwargs: result.model_dump()
)
async def create_document(self, content: str, user_id: str, db_session) -> Document:
"""
Creates a document and ensures a 'DocumentCreated' event is reliably published.
The `db_session` is passed by the repository and used by the decorator
to ensure the document and the outbox event are saved in the same transaction.
"""
new_doc = Document(content=content, owner_id=user_id)
await self.repository.save(new_doc, session=db_session)
return new_doc
2. Publishing the Event (The "Poll")
A separate, continuously running background service called the OutboxPublisher polls the outbox_events collection for new, PENDING events.
When the publisher finds pending events, it:
1. Publishes them to the actual message broker (e.g., Kafka).
2. Upon successful publication, it updates the event's status in the database to PUBLISHED.
This polling mechanism ensures that even if the application crashes immediately after the database transaction commits, the event will eventually be found and published when the service restarts.
Guaranteed Event Ordering (FIFO)
The outbox pattern in Athomic also supports guaranteed First-In, First-Out (FIFO) processing for events that belong to the same logical entity (or "aggregate"). This is essential for use cases where the order of events matters (e.g., OrderCreated, OrderUpdated, OrderShipped).
To enable ordering, you simply need to provide an aggregate_key when creating the event. The MongoOutboxRepository will then atomically generate a sequence_id for each event within that aggregate. The OutboxPublisher is designed to process events for the same aggregate key in strict sequential order.
Configuration
The OutboxPublisher is configured under the [integration.outbox_publisher] section in your settings.toml. Here you can enable the service, define the polling interval, and configure its storage backend.
[default.integration.outbox_publisher]
enabled = true
poll_interval_seconds = 5.0
fetch_limit = 100 # Max number of unordered events to fetch per cycle
[default.integration.outbox_publisher.storage]
backend = "mongo"
max_attempts = 3
[default.integration.outbox_publisher.storage.mongo]
collection_name = "outbox_events"
db_connection_name = "default_mongo"
API Reference
nala.athomic.integration.outbox.decorators.publish_on_success(event_name, event_type, destination_topic, payload_mapper, key_mapper=None, include_context=True, storage=None)
Decorator implementing the Transactional Outbox pattern. It ensures that an event is saved to the persistent Outbox storage (and thus will be published later) only if the decorated function completes without raising an exception.
The storage operation is intended to be executed within the same database transaction as the main function's work.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_name
|
str
|
A descriptive name for the domain event (e.g., 'DocumentCreated'). |
required |
event_type
|
OutboxEventType
|
The type of the event (e.g., Command, Event, Query). |
required |
destination_topic
|
Optional[str]
|
The messaging topic where the event will eventually be published. |
required |
payload_mapper
|
Callable[..., Dict[str, Any]]
|
A callable that accepts the decorated function's result, args, and kwargs, and returns the event payload (Dict[str, Any]). |
required |
key_mapper
|
Optional[Callable[..., Optional[str]]]
|
An optional callable to generate a message key (str) for partitioning purposes. |
None
|
include_context
|
Optional[bool]
|
If True, automatically embeds the current ExecutionContext into the event payload under the '_nala_context' key. Defaults to True. |
True
|
storage
|
Optional[OutboxStorageProtocol]
|
An optional pre-configured OutboxStorageProtocol instance. If None, the OutboxStorageFactory is used to create one. |
None
|
Returns:
| Type | Description |
|---|---|
Callable[P, Coroutine[Any, R, Any]]
|
The decorated asynchronous wrapper function. |
nala.athomic.database.outbox.protocol.OutboxStorageProtocol
Bases: Protocol
Protocol defining the storage operations for the Outbox pattern.
delete_processed_event(event_id)
async
(Optional but recommended for cleanup) Deletes an event that has been successfully published or marked as permanently failed (e.g., after reaching max attempts).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_id
|
UUID
|
The unique ID (UUID) of the event to delete. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the event was found and deleted, False otherwise. |
get_distinct_pending_aggregate_keys()
async
Retrieves a list of unique aggregate_keys that have events in the PENDING state. This is used by the publisher to shard and process aggregates independently.
Returns:
| Type | Description |
|---|---|
List[str]
|
A list of unique aggregate key strings. |
get_hot_aggregates(top_n=10)
async
Retrieves the top N aggregate keys with the highest number of pending events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
top_n
|
int
|
The number of hot aggregates to retrieve. |
10
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
A list of dictionaries, each containing 'aggregate_key' and 'count'. |
get_pending_events(limit=100)
async
Retrieves a batch of events currently in the PENDING state, ordered by creation time (oldest first).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
The maximum number of events to retrieve. |
100
|
Returns:
| Type | Description |
|---|---|
List[OutboxEventData]
|
A list of pending OutboxEventData objects. |
get_pending_events_for_aggregate(aggregate_key)
async
Retrieves all PENDING events for a specific aggregate_key, ordered by sequence_id.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
aggregate_key
|
str
|
The aggregate key to fetch events for. |
required |
Returns:
| Type | Description |
|---|---|
List[OutboxEventData]
|
A list of pending OutboxEventData objects for the given aggregate, correctly ordered. |
mark_event_attempted(event_id)
async
Marks an event as attempted by incrementing its attempt counter and updating the last attempt timestamp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_id
|
UUID
|
The unique ID (UUID) of the event to mark. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the event was found and marked, False otherwise. |
mark_event_failed(event_id, error_message)
async
Marks an event as failed, increments the attempt counter, and stores the error message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_id
|
UUID
|
The unique ID (UUID) of the event. |
required |
error_message
|
str
|
A description of the error encountered during publishing. |
required |
Returns:
| Type | Description |
|---|---|
Optional[int]
|
The new attempt count if the event was found and marked, None otherwise. |
mark_event_published(event_id)
async
Marks a specific event as successfully published by updating its status.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_id
|
UUID
|
The unique ID (UUID) of the event to mark. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the event was found and marked, False otherwise. |
save_event(event, db_session=None)
async
Saves an event to the outbox storage within the provided database session/transaction. Supports agnostic usage (messaging, webhooks, tasks...).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
OutboxEventData
|
An OutboxEventData object containing the event details. |
required |
Returns:
| Type | Description |
|---|---|
OutboxEventData
|
The saved OutboxEventData object. |
nala.athomic.database.outbox.domain_models.OutboxEventData
Bases: BaseModel
A pure domain model for an outbox event.
This Pydantic model represents a single event to be processed reliably. It
is designed to be independent of any specific database or ORM, containing
all necessary data for the Outbox Publisher to process it. It supports both
unordered events and ordered (FIFO) events via the aggregate_key and
sequence_id fields.
Attributes:
| Name | Type | Description |
|---|---|---|
event_id |
UUID
|
The unique identifier for this specific event instance. |
event_name |
str
|
A human-readable name identifying the event (e.g., "user.created"). |
event_type |
Optional[OutboxEventType]
|
The category of the event, determining how it's handled. |
payload |
Dict[str, Any]
|
The actual data or body of the event. |
status |
OutboxEventStatus
|
The current processing status of the event. |
destination_topic |
Optional[str]
|
The target topic or queue for 'messaging' type events. |
message_key |
Optional[str]
|
The partitioning key for 'messaging' type events. |
aggregate_key |
Optional[str]
|
The identifier used to group events to ensure ordered processing.
All events with the same |
sequence_id |
Optional[int]
|
A monotonically increasing number for events within the same
|
attempts |
int
|
The number of times processing has been attempted for this event. |
last_attempt_at |
Optional[datetime]
|
The timestamp of the last processing attempt. |
last_error |
Optional[str]
|
The error message from the last failed attempt. |
created_at |
datetime
|
The UTC timestamp of when the event was created. |
nala.athomic.database.outbox.mongo.mongo_outbox_repository.MongoOutboxRepository
Bases: OutboxStorageProtocol
A MongoDB-backed repository for storing and managing outbox events.
This class provides a concrete implementation of the OutboxStorageProtocol
using MongoDB as the persistence layer. It leverages beanie for
object-document mapping and uses atomic operations and aggregation
pipelines for efficient and reliable event handling.
Attributes:
| Name | Type | Description |
|---|---|---|
storage_settings |
Configuration specific to the outbox storage. |
|
db_provider |
The database client provider for MongoDB connections. |
|
logger |
A pre-configured logger instance. |
|
service_name |
The identifier for this service in observability systems. |
|
tracer |
An OpenTelemetry tracer instance. |
|
sequence_generator |
A component for generating sequential IDs for ordered events. |
__init__(storage_settings=None, db_provider=None, sequence_generator=None)
Initializes the MongoOutboxRepository.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
storage_settings
|
Optional[OutboxStorageSettings]
|
Configuration for the outbox storage layer. |
None
|
db_provider
|
Optional[DatabaseClientProtocol]
|
A required database client provider instance. |
None
|
sequence_generator
|
Optional[SequenceGeneratorProtocol]
|
An optional custom sequence generator. |
None
|
delete_processed_event(event_id)
async
Deletes a processed event from the collection for cleanup purposes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_id
|
UUID
|
The unique ID of the event to delete. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the event was found and deleted, False otherwise. |
get_distinct_pending_aggregate_keys()
async
Finds all unique aggregate keys that have pending events.
Uses MongoDB's distinct command to efficiently get a list of all
aggregate_key values that have events in the 'PENDING' state. This is
used by the publisher to shard the processing workload.
Returns:
| Type | Description |
|---|---|
List[str]
|
A list of unique aggregate key strings. |
get_hot_aggregates(top_n=10)
async
Finds the top N aggregate keys with the most pending events.
This method executes a MongoDB aggregation pipeline to group events by
aggregate_key, count them, sort by the count, and return the top
results. This is primarily used for monitoring to identify potential
bottlenecks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
top_n
|
int
|
The number of hot aggregates to retrieve. |
10
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
A list of dictionaries, each containing 'aggregate_key' and 'count'. |
get_pending_events(limit=100)
async
Retrieves a batch of pending, unordered events.
This method queries MongoDB for events that are in the 'PENDING' state
and do not have an aggregate_key, sorted by their creation time to
ensure older events are processed first.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
The maximum number of events to retrieve. |
100
|
Returns:
| Type | Description |
|---|---|
List[OutboxEventData]
|
A list of pending |
get_pending_events_for_aggregate(aggregate_key)
async
Retrieves all pending events for a specific aggregate, sorted by sequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
aggregate_key
|
str
|
The aggregate key to fetch events for. |
required |
Returns:
| Type | Description |
|---|---|
List[OutboxEventData]
|
A list of pending |
mark_event_attempted(event_id)
async
Increments an event's attempt counter and updates its last attempt timestamp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_id
|
UUID
|
The unique ID of the event to update. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the event was found and updated, False otherwise. |
mark_event_failed(event_id, exception)
async
Marks an event as failed, storing the error and updating its status.
This method updates the event with the error message. It checks if the exception is non-retriable or if the maximum attempt count has been reached to decide whether to move the event to the final 'FAILED' state or keep it as 'PENDING' for a future retry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_id
|
UUID
|
The unique ID of the event. |
required |
exception
|
Exception
|
The exception that caused the failure. |
required |
Returns:
| Type | Description |
|---|---|
Optional[int]
|
The new attempt count if the event was found and marked, None otherwise. |
mark_event_published(event_id)
async
Marks a specific event's status as PUBLISHED.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_id
|
UUID
|
The unique ID of the event to mark as published. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the event was found and updated, False otherwise. |
save_event(event, db_session=None)
async
Saves an event to the outbox collection within a transaction.
If the event includes an aggregate_key, this method first atomically
generates a new sequence_id before inserting the event document
into the database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
OutboxEventData
|
The domain model of the event to save. |
required |
db_session
|
Optional[ClientSession]
|
An optional MongoDB client session for transactional operations. |
None
|
Returns:
| Type | Description |
|---|---|
OutboxEventData
|
The saved event, now potentially populated with a |