Consuming Messages (Consumer)
Overview
The Message Consumer is the component responsible for subscribing to topics on a message broker, receiving messages, and delegating them to your application's business logic for processing.
In the Athomic Layer, defining a consumer is an extremely simple and declarative process thanks to the @subscribe_to decorator. This decorator handles all the complex boilerplate of connecting to the broker, managing the consumer lifecycle, and integrating with resilience and observability systems, allowing you to focus purely on your handler logic.
Defining a Consumer
To create a message consumer, you simply decorate an async function with @subscribe_to. This function becomes your handler, which will be executed for each message received from the specified topic.
Basic (Single Message) Consumer
By default, your handler will receive one message at a time. The handler should accept the deserialized message payload as its first argument.
from nala.athomic.integration.messaging import subscribe_to
from your_app.schemas import UserCreatedEvent
@subscribe_to(
topic="user.events.v1",
group_id="user-event-processor-group",
target_type=UserCreatedEvent # Pydantic model for deserialization and validation
)
async def handle_user_created_event(message: UserCreatedEvent):
"""
This handler will be called for each message on the 'user.events.v1' topic.
The incoming JSON payload will be automatically deserialized and validated
into a `UserCreatedEvent` instance before this function is called.
"""
print(f"Processing new user: {message.user_id}")
await user_service.send_welcome_email(message.email)
Batch Consumer
For high-throughput scenarios, you can process messages in batches to improve efficiency. To enable batch mode, simply set the batch_size argument in the decorator. Your handler function will then receive a list of deserialized messages.
@subscribe_to(
topic="tracking.events.v1",
group_id="tracking-batch-processor",
target_type=TrackingEvent,
batch_size=100,
max_batch_linger_ms=5000
)
async def handle_tracking_events_in_batch(messages: list[TrackingEvent]):
"""
This handler will receive a list of up to 100 TrackingEvent objects.
"""
await tracking_repo.save_many([event.data for event in messages])
Advanced: Ordered Consumption (FIFO) with @ordered_consumer
This is an innovative feature of the Athomic Layer. The @ordered_consumer decorator guarantees that messages belonging to the same logical entity (e.g., all events for order-123) are processed in strict First-In, First-Out (FIFO) order, even in a distributed environment with multiple consumer instances.
How It Works
This strategy requires two fields in your message payload:
- An aggregate_key_field: The identifier for the entity (e.g., "order_id").
- A sequence_id_field: A monotonically increasing number for that entity.
The OrderedOrchestrationStrategy uses two KV stores behind the scenes:
1. A State Store to track the last successfully processed sequence_id for each aggregate_key.
2. A Waiting Room (a Redis Sorted Set) to temporarily buffer messages that arrive out of order.
When a message arrives, the strategy checks its sequence_id against the last processed ID for its aggregate_key. If it's the next message in the sequence, it's processed. If it's a future message, it's placed in the waiting room. After processing a message, the strategy checks the waiting room to process any subsequent messages that have now come into order.
Usage Example
from nala.athomic.integration.messaging.decorators import ordered_consumer
from your_app.schemas import OrderEvent
@ordered_consumer(
topic="order.events.v1",
group_id="ordered-order-processor",
target_type=OrderEvent,
# Specify the fields in your message payload for ordering
aggregate_key_field="order_id",
sequence_id_field="event_sequence"
)
async def handle_ordered_order_event(message: OrderEvent):
"""
This handler will process events for the same order_id in strict
sequence based on the event_sequence number.
"""
print(f"Processing event #{message.event_sequence} for order {message.order_id}")
await order_service.update_status(message.order_id, message.new_status)
Disabling Lineage for Specific Consumers
In some infrastructure cases (like the Lineage Collector itself), you may want to disable automatic lineage tracking to prevent recursion loops (a lineage consumer generating lineage events about consuming lineage events). You can do this via the enable_lineage parameter.
@subscribe_to(
topic="platform.lineage.events",
group_id="nala-lineage-collector",
target_type=LineageEvent,
enable_lineage=False # Prevents generating lineage events for this handler
)
async def handle_lineage_event(event: LineageEvent):
...
API Reference
nala.athomic.integration.messaging.decorators.subscribe_to(topic, *, target_type=None, group_id=None, priority=MessagePriority.NORMAL, concurrency=None, batch_size=None, fetch_timeout_ms=None, fetch_max_linger_ms=None, fetch_max_bytes=None, connection_name=None, idempotency_settings=None, idempotency_key_resolver=None, enable_lineage=True)
This function collects the configuration metadata and registers it with the
global decorated_consumer_registry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
Union[str, List[str]]
|
The topic name (str) or a list of topic names. |
required |
target_type
|
Optional[Type[Any]]
|
Optional Pydantic model or class used to deserialize. |
None
|
group_id
|
Optional[str]
|
Optional consumer group identifier. |
None
|
priority
|
MessagePriority
|
The priority queue this handler will consume from. |
NORMAL
|
concurrency
|
Optional[int]
|
The maximum number of messages processed concurrently. |
None
|
batch_size
|
Optional[int]
|
If > 0, enables "true batch" processing mode. |
None
|
fetch_timeout_ms
|
Optional[int]
|
Optional override for batch fetch timeout. |
None
|
fetch_max_linger_ms
|
Optional[int]
|
Optional override for batch linger time. |
None
|
fetch_max_bytes
|
Optional[int]
|
Optional override for batch size in bytes. |
None
|
connection_name
|
Optional[str]
|
The specific messaging connection. |
None
|
idempotency_settings
|
Optional[IdempotencySettings]
|
Optional configuration for idempotency. |
None
|
idempotency_key_resolver
|
Optional[Callable[..., str]]
|
Optional function to resolve idempotency key. |
None
|
enable_lineage
|
bool
|
If False, disables automatic lineage tracking for this consumer. Use this for internal infrastructure consumers (e.g., Lineage Collector). |
True
|
Returns:
| Type | Description |
|---|---|
Callable
|
The decorator function. |
nala.athomic.integration.messaging.decorators.ordered_consumer.ordered_consumer(topic, *, group_id, aggregate_key_field, sequence_id_field, target_type=None, priority=MessagePriority.NORMAL, concurrency=None, fetch_size=None, idempotency_settings=None, idempotency_key_resolver=None)
Registers an ordered consumer with optional idempotency.
Enforces sequential processing. Idempotency is applied per-message if enabled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
Union[str, List[str]]
|
The topic name(s). |
required |
group_id
|
str
|
Consumer group ID. |
required |
aggregate_key_field
|
str
|
Field for ordering (partition key). |
required |
sequence_id_field
|
str
|
Field for sequencing. |
required |
target_type
|
Optional[Type[Any]]
|
Optional Pydantic model for deserialization. |
None
|
priority
|
MessagePriority
|
Message priority level. |
NORMAL
|
concurrency
|
Optional[int]
|
Limit on concurrent executions. |
None
|
fetch_size
|
Optional[int]
|
Number of messages to pre-fetch from broker (IO optimization). |
None
|
idempotency_settings
|
Optional[IdempotencySettings]
|
Resilience config (overrides global defaults). |
None
|
idempotency_key_resolver
|
Optional[Callable[..., str]]
|
Strategy to extract ID. |
None
|
nala.athomic.integration.messaging.consumers.protocol.ConsumerProtocol
Bases: Protocol
Protocol for consuming messages from a messaging system.
Focuses on a subscription pattern where received messages are processed by an asynchronous callback function.
consume()
async
Starts the continuous message consumption process.
Subscribes to topics/queues and passes received messages to the
provided callback function for processing. This function typically blocks
execution (or runs in a background task) until close() is called
or an unrecoverable error occurs.
Raises:
| Type | Description |
|---|---|
ConsumeError
|
If an unrecoverable error occurs during consumption (e.g., persistent authentication failure, error in the consume loop). |
MessagingConnectionError
|
If there are initial connection problems with the broker when trying to start consumption. |
ValueError
|
If essential arguments (like |
nala.athomic.integration.messaging.consumers.base.BaseConsumer
Bases: BaseService, ABC, ConsumerProtocol
Abstract base class for message consumers.
This class serves as the core lifecycle manager for any message consumer implementation (e.g., Kafka, Redis, SQS). It enforces the required dependencies and delegates all message-specific logic to injected protocols, thereby adhering strictly to the Dependency Inversion Principle (DIP).
Responsibilities include:
- Managing the connection lifecycle to the message broker (via BaseService).
- Holding the configuration and essential dependencies.
- Delegating the message fetching loop to a ConsumptionStrategy.
- Delegating message processing to a Processor.
group_id
abstractmethod
property
The consumer group identifier for this consumer instance.
Returns:
| Type | Description |
|---|---|
str
|
The unique string identifying the consumer group. |
__init__(settings, handler_config, message_processor, consumption_strategy)
Initializes the base consumer with all necessary dependencies.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
MessagingSettings
|
The general messaging configuration settings. |
required |
handler_config
|
ConsumerHandlerConfig
|
The configuration metadata for the consumer handler, typically extracted from a decorator. |
required |
message_processor
|
ProcessorProtocol
|
The strategy responsible for the entire message processing pipeline (deserialization, execution, DLQ). |
required |
consumption_strategy
|
ConsumptionStrategyProtocol
|
The strategy defining how messages are fetched from the broker (e.g., individual or batch consumption). |
required |
Raises:
| Type | Description |
|---|---|
InvalidConsumerConfigurationError
|
If |
get_batch(max_records, timeout_ms)
abstractmethod
async
Fetches a batch of raw messages from the broker in a single operation.
This method must be implemented by concrete consumer providers to enable batch processing strategies.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_records
|
int
|
The maximum number of messages to attempt to fetch. |
required |
timeout_ms
|
int
|
The maximum time (in milliseconds) to wait for records to arrive. |
required |
Returns:
| Type | Description |
|---|---|
List[RawMessage]
|
A list of |
get_message_iterator()
abstractmethod
async
Returns an async generator that yields raw messages one by one from the broker.
This method must be implemented by concrete consumer providers (e.g., Kafka, SQS) to handle the broker-specific low-level message fetching.
Yields:
| Name | Type | Description |
|---|---|---|
RawMessage |
AsyncGenerator[RawMessage, None]
|
The next raw message fetched from the broker. |