Messaging (Producers & Consumers)
Overview
The Messaging module provides the core abstractions and implementations for producing and consuming messages from external, distributed message brokers like Apache Kafka. It is designed for durable, reliable, and observable inter-service communication, forming the backbone of an event-driven microservices architecture.
Reminder: For intra-service communication (within a single service), use the lightweight Internal Event Bus.
Key Features
- Producer/Consumer Protocols: Built on clean
ProducerProtocolandConsumerProtocolinterfaces, decoupling your business logic from the specific message broker technology. - Decorator-Based Consumers: Define your message consumers declaratively and elegantly with a simple
@subscribe_todecorator, keeping your code clean and focused. - Extensible Payload Pipeline: Leverages the Payload Processing Pipeline for transparent serialization, encryption, and compression of messages.
- Built-in Resilience: Deep integration with resilience patterns like Retry/DLQ and the Transactional Outbox to ensure messages are not lost.
- Full Observability: Every message produced and consumed is automatically instrumented with distributed tracing, detailed Prometheus metrics, and data lineage events.
How It Works
-
Producers: You obtain a singleton
Producerinstance from theProducerFactory. When you callproducer.publish(), the message passes through the configured payload processing pipeline (e.g., serialize, encrypt) before being sent to the broker by a backend-specific provider likeKafkaProducer. -
Consumers: At application startup, the
ConsumerManagerdiscovers all functions that have been decorated with@subscribe_to. For each one, it uses aConsumerFactoryto build a complete consumer service, assembling all necessary components like the message processor, DLQ handler, and the backend-specific client (e.g.,KafkaConsumer). These services are then managed by the application's main lifecycle.
Deeper Dive
Explore the specific components of the messaging system in more detail:
- Producer Guide: Learn how to publish messages and use advanced features like delayed publishing.
- Consumer Guide: Learn how to write consumer handlers using the
@subscribe_todecorator. - Retry & DLQ: Understand the automatic retry mechanism and the Dead Letter Queue for handling failed messages.
- Delayed Messages: Learn how to publish messages that should only be processed after a delay.
- Context Propagation: See how tracing and multi-tenancy context flows seamlessly across the message bus.
API Reference
nala.athomic.integration.messaging.decorators.subscribe_to(topic, *, target_type=None, group_id=None, priority=MessagePriority.NORMAL, concurrency=None, batch_size=None, fetch_timeout_ms=None, fetch_max_linger_ms=None, fetch_max_bytes=None, connection_name=None, idempotency_settings=None, idempotency_key_resolver=None, enable_lineage=True)
This function collects the configuration metadata and registers it with the
global decorated_consumer_registry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
Union[str, List[str]]
|
The topic name (str) or a list of topic names. |
required |
target_type
|
Optional[Type[Any]]
|
Optional Pydantic model or class used to deserialize. |
None
|
group_id
|
Optional[str]
|
Optional consumer group identifier. |
None
|
priority
|
MessagePriority
|
The priority queue this handler will consume from. |
NORMAL
|
concurrency
|
Optional[int]
|
The maximum number of messages processed concurrently. |
None
|
batch_size
|
Optional[int]
|
If > 0, enables "true batch" processing mode. |
None
|
fetch_timeout_ms
|
Optional[int]
|
Optional override for batch fetch timeout. |
None
|
fetch_max_linger_ms
|
Optional[int]
|
Optional override for batch linger time. |
None
|
fetch_max_bytes
|
Optional[int]
|
Optional override for batch size in bytes. |
None
|
connection_name
|
Optional[str]
|
The specific messaging connection. |
None
|
idempotency_settings
|
Optional[IdempotencySettings]
|
Optional configuration for idempotency. |
None
|
idempotency_key_resolver
|
Optional[Callable[..., str]]
|
Optional function to resolve idempotency key. |
None
|
enable_lineage
|
bool
|
If False, disables automatic lineage tracking for this consumer. Use this for internal infrastructure consumers (e.g., Lineage Collector). |
True
|
Returns:
| Type | Description |
|---|---|
Callable
|
The decorator function. |
nala.athomic.integration.messaging.producers.factory.ProducerFactory
A Factory class that creates concrete Producer instances.
This factory is responsible for the low-level assembly of a single producer
instance. It resolves all necessary dependencies, such as the payload
processor and publishing strategies, and injects them into the concrete
producer class retrieved from the messaging_producer_registry.
This factory is stateless and creates a new instance on every call.
clear()
classmethod
Clears any potential class-level caches (if any were used). This method is kept for compatibility but does nothing as the factory is now stateless.
create(settings)
staticmethod
Creates a new producer instance for a specific connection.
This is the primary public interface for obtaining a new producer. It handles
dependency resolution and instance creation based on the provided settings.
The ProducerManager (a BaseManager) uses this factory to build
all producer instances.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
MessagingSettings
|
The fully resolved messaging settings for a specific connection.
This object must contain the |
required |
Returns:
| Type | Description |
|---|---|
BaseProducer
|
A new, fully configured and initialized producer instance. |
nala.athomic.integration.messaging.consumers.factory.ConsumerFactory
Builds a fully configured consumer instance by assembling components retrieved from registries based on configuration.
This factory acts as the main assembler for the messaging consumer subsystem. It adheres to the Open/Closed Principle (OCP) by retrieving concrete classes (Consumer, Strategies) from registries rather than using explicit conditional logic (e.g., if-elif-else on backend type).
create(settings, handler_config, connection_manager=None, serializer=None, dlq_handler=None)
staticmethod
Assembles and returns a fully configured concrete consumer instance.
The assembly process involves resolving the Consumer class, the DLQ Handler, the Serializer, the Consumption Strategy, and the Message Processor before injecting them into the final BaseConsumer implementation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
MessagingSettings
|
The global messaging configuration. |
required |
handler_config
|
ConsumerHandlerConfig
|
The configuration for the specific handler (topic, group, batch mode). |
required |
connection_manager
|
Optional[ConnectionManager]
|
Optional manager for databases/KV stores, required for ordering logic. |
None
|
serializer
|
Optional[SerializerProtocol]
|
Optional pre-resolved serializer instance. |
None
|
dlq_handler
|
Optional[DLQHandler]
|
Optional pre-resolved Dead Letter Queue handler. |
None
|
Returns:
| Type | Description |
|---|---|
BaseConsumer
|
A concrete instance of |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If ordering is enabled but |
KeyError
|
If the specified backend or strategy creator is not found in the registries. |