Skip to content

Consuming Messages (Consumer)

Overview

The Message Consumer is the component responsible for subscribing to topics on a message broker, receiving messages, and delegating them to your application's business logic for processing.

In the Athomic Layer, defining a consumer is an extremely simple and declarative process thanks to the @subscribe_to decorator. This decorator handles all the complex boilerplate of connecting to the broker, managing the consumer lifecycle, and integrating with resilience and observability systems, allowing you to focus purely on your handler logic.


Defining a Consumer

To create a message consumer, you simply decorate an async function with @subscribe_to. This function becomes your handler, which will be executed for each message received from the specified topic.

Basic (Single Message) Consumer

By default, your handler will receive one message at a time. The handler should accept the deserialized message payload as its first argument.

from nala.athomic.integration.messaging import subscribe_to
from your_app.schemas import UserCreatedEvent

@subscribe_to(
    topic="user.events.v1",
    group_id="user-event-processor-group",
    target_type=UserCreatedEvent # Pydantic model for deserialization and validation
)
async def handle_user_created_event(message: UserCreatedEvent):
    """
    This handler will be called for each message on the 'user.events.v1' topic.
    The incoming JSON payload will be automatically deserialized and validated
    into a `UserCreatedEvent` instance before this function is called.
    """
    print(f"Processing new user: {message.user_id}")
    await user_service.send_welcome_email(message.email)

Batch Consumer

For high-throughput scenarios, you can process messages in batches to improve efficiency. To enable batch mode, simply set the batch_size argument in the decorator. Your handler function will then receive a list of deserialized messages.

@subscribe_to(
    topic="tracking.events.v1",
    group_id="tracking-batch-processor",
    target_type=TrackingEvent,
    batch_size=100,
    max_batch_linger_ms=5000
)
async def handle_tracking_events_in_batch(messages: list[TrackingEvent]):
    """
    This handler will receive a list of up to 100 TrackingEvent objects.
    """
    await tracking_repo.save_many([event.data for event in messages])

Advanced: Ordered Consumption (FIFO) with @ordered_consumer

This is an innovative feature of the Athomic Layer. The @ordered_consumer decorator guarantees that messages belonging to the same logical entity (e.g., all events for order-123) are processed in strict First-In, First-Out (FIFO) order, even in a distributed environment with multiple consumer instances.

How It Works

This strategy requires two fields in your message payload: - An aggregate_key_field: The identifier for the entity (e.g., "order_id"). - A sequence_id_field: A monotonically increasing number for that entity.

The OrderedOrchestrationStrategy uses two KV stores behind the scenes: 1. A State Store to track the last successfully processed sequence_id for each aggregate_key. 2. A Waiting Room (a Redis Sorted Set) to temporarily buffer messages that arrive out of order.

When a message arrives, the strategy checks its sequence_id against the last processed ID for its aggregate_key. If it's the next message in the sequence, it's processed. If it's a future message, it's placed in the waiting room. After processing a message, the strategy checks the waiting room to process any subsequent messages that have now come into order.

Usage Example

from nala.athomic.integration.messaging.decorators import ordered_consumer
from your_app.schemas import OrderEvent

@ordered_consumer(
    topic="order.events.v1",
    group_id="ordered-order-processor",
    target_type=OrderEvent,
    # Specify the fields in your message payload for ordering
    aggregate_key_field="order_id",
    sequence_id_field="event_sequence"
)
async def handle_ordered_order_event(message: OrderEvent):
    """
    This handler will process events for the same order_id in strict
    sequence based on the event_sequence number.
    """
    print(f"Processing event #{message.event_sequence} for order {message.order_id}")
    await order_service.update_status(message.order_id, message.new_status)

Disabling Lineage for Specific Consumers

In some infrastructure cases (like the Lineage Collector itself), you may want to disable automatic lineage tracking to prevent recursion loops (a lineage consumer generating lineage events about consuming lineage events). You can do this via the enable_lineage parameter.

@subscribe_to(
    topic="platform.lineage.events",
    group_id="nala-lineage-collector",
    target_type=LineageEvent,
    enable_lineage=False  # Prevents generating lineage events for this handler
)
async def handle_lineage_event(event: LineageEvent):
    ...

API Reference

nala.athomic.integration.messaging.decorators.subscribe_to(topic, *, target_type=None, group_id=None, priority=MessagePriority.NORMAL, concurrency=None, batch_size=None, fetch_timeout_ms=None, fetch_max_linger_ms=None, fetch_max_bytes=None, connection_name=None, idempotency_settings=None, idempotency_key_resolver=None, enable_lineage=True)

This function collects the configuration metadata and registers it with the global decorated_consumer_registry.

Parameters:

Name Type Description Default
topic Union[str, List[str]]

The topic name (str) or a list of topic names.

required
target_type Optional[Type[Any]]

Optional Pydantic model or class used to deserialize.

None
group_id Optional[str]

Optional consumer group identifier.

None
priority MessagePriority

The priority queue this handler will consume from.

NORMAL
concurrency Optional[int]

The maximum number of messages processed concurrently.

None
batch_size Optional[int]

If > 0, enables "true batch" processing mode.

None
fetch_timeout_ms Optional[int]

Optional override for batch fetch timeout.

None
fetch_max_linger_ms Optional[int]

Optional override for batch linger time.

None
fetch_max_bytes Optional[int]

Optional override for batch size in bytes.

None
connection_name Optional[str]

The specific messaging connection.

None
idempotency_settings Optional[IdempotencySettings]

Optional configuration for idempotency.

None
idempotency_key_resolver Optional[Callable[..., str]]

Optional function to resolve idempotency key.

None
enable_lineage bool

If False, disables automatic lineage tracking for this consumer. Use this for internal infrastructure consumers (e.g., Lineage Collector).

True

Returns:

Type Description
Callable

The decorator function.

nala.athomic.integration.messaging.decorators.ordered_consumer.ordered_consumer(topic, *, group_id, aggregate_key_field, sequence_id_field, target_type=None, priority=MessagePriority.NORMAL, concurrency=None, fetch_size=None, idempotency_settings=None, idempotency_key_resolver=None)

Registers an ordered consumer with optional idempotency.

Enforces sequential processing. Idempotency is applied per-message if enabled.

Parameters:

Name Type Description Default
topic Union[str, List[str]]

The topic name(s).

required
group_id str

Consumer group ID.

required
aggregate_key_field str

Field for ordering (partition key).

required
sequence_id_field str

Field for sequencing.

required
target_type Optional[Type[Any]]

Optional Pydantic model for deserialization.

None
priority MessagePriority

Message priority level.

NORMAL
concurrency Optional[int]

Limit on concurrent executions.

None
fetch_size Optional[int]

Number of messages to pre-fetch from broker (IO optimization).

None
idempotency_settings Optional[IdempotencySettings]

Resilience config (overrides global defaults).

None
idempotency_key_resolver Optional[Callable[..., str]]

Strategy to extract ID.

None

nala.athomic.integration.messaging.consumers.protocol.ConsumerProtocol

Bases: Protocol

Protocol for consuming messages from a messaging system.

Focuses on a subscription pattern where received messages are processed by an asynchronous callback function.

consume() async

Starts the continuous message consumption process.

Subscribes to topics/queues and passes received messages to the provided callback function for processing. This function typically blocks execution (or runs in a background task) until close() is called or an unrecoverable error occurs.

Raises:

Type Description
ConsumeError

If an unrecoverable error occurs during consumption (e.g., persistent authentication failure, error in the consume loop).

MessagingConnectionError

If there are initial connection problems with the broker when trying to start consumption.

ValueError

If essential arguments (like topics for Kafka) are not provided.

nala.athomic.integration.messaging.consumers.base.BaseConsumer

Bases: BaseService, ABC, ConsumerProtocol

Abstract base class for message consumers.

This class serves as the core lifecycle manager for any message consumer implementation (e.g., Kafka, Redis, SQS). It enforces the required dependencies and delegates all message-specific logic to injected protocols, thereby adhering strictly to the Dependency Inversion Principle (DIP).

Responsibilities include: - Managing the connection lifecycle to the message broker (via BaseService). - Holding the configuration and essential dependencies. - Delegating the message fetching loop to a ConsumptionStrategy. - Delegating message processing to a Processor.

group_id abstractmethod property

The consumer group identifier for this consumer instance.

Returns:

Type Description
str

The unique string identifying the consumer group.

__init__(settings, handler_config, message_processor, consumption_strategy)

Initializes the base consumer with all necessary dependencies.

Parameters:

Name Type Description Default
settings MessagingSettings

The general messaging configuration settings.

required
handler_config ConsumerHandlerConfig

The configuration metadata for the consumer handler, typically extracted from a decorator.

required
message_processor ProcessorProtocol

The strategy responsible for the entire message processing pipeline (deserialization, execution, DLQ).

required
consumption_strategy ConsumptionStrategyProtocol

The strategy defining how messages are fetched from the broker (e.g., individual or batch consumption).

required

Raises:

Type Description
InvalidConsumerConfigurationError

If handler_config is not provided.

get_batch(max_records, timeout_ms) abstractmethod async

Fetches a batch of raw messages from the broker in a single operation.

This method must be implemented by concrete consumer providers to enable batch processing strategies.

Parameters:

Name Type Description Default
max_records int

The maximum number of messages to attempt to fetch.

required
timeout_ms int

The maximum time (in milliseconds) to wait for records to arrive.

required

Returns:

Type Description
List[RawMessage]

A list of RawMessage objects, which may be empty.

get_message_iterator() abstractmethod async

Returns an async generator that yields raw messages one by one from the broker.

This method must be implemented by concrete consumer providers (e.g., Kafka, SQS) to handle the broker-specific low-level message fetching.

Yields:

Name Type Description
RawMessage AsyncGenerator[RawMessage, None]

The next raw message fetched from the broker.