Skip to content

Messaging (Producers & Consumers)

Overview

The Messaging module provides the core abstractions and implementations for producing and consuming messages from external, distributed message brokers like Apache Kafka. It is designed for durable, reliable, and observable inter-service communication, forming the backbone of an event-driven microservices architecture.

Reminder: For intra-service communication (within a single service), use the lightweight Internal Event Bus.

Key Features

  • Producer/Consumer Protocols: Built on clean ProducerProtocol and ConsumerProtocol interfaces, decoupling your business logic from the specific message broker technology.
  • Decorator-Based Consumers: Define your message consumers declaratively and elegantly with a simple @subscribe_to decorator, keeping your code clean and focused.
  • Extensible Payload Pipeline: Leverages the Payload Processing Pipeline for transparent serialization, encryption, and compression of messages.
  • Built-in Resilience: Deep integration with resilience patterns like Retry/DLQ and the Transactional Outbox to ensure messages are not lost.
  • Full Observability: Every message produced and consumed is automatically instrumented with distributed tracing, detailed Prometheus metrics, and data lineage events.

How It Works

  • Producers: You obtain a singleton Producer instance from the ProducerFactory. When you call producer.publish(), the message passes through the configured payload processing pipeline (e.g., serialize, encrypt) before being sent to the broker by a backend-specific provider like KafkaProducer.

  • Consumers: At application startup, the ConsumerManager discovers all functions that have been decorated with @subscribe_to. For each one, it uses a ConsumerFactory to build a complete consumer service, assembling all necessary components like the message processor, DLQ handler, and the backend-specific client (e.g., KafkaConsumer). These services are then managed by the application's main lifecycle.


Deeper Dive

Explore the specific components of the messaging system in more detail:

  • Producer Guide: Learn how to publish messages and use advanced features like delayed publishing.
  • Consumer Guide: Learn how to write consumer handlers using the @subscribe_to decorator.
  • Retry & DLQ: Understand the automatic retry mechanism and the Dead Letter Queue for handling failed messages.
  • Delayed Messages: Learn how to publish messages that should only be processed after a delay.
  • Context Propagation: See how tracing and multi-tenancy context flows seamlessly across the message bus.

API Reference

nala.athomic.integration.messaging.decorators.subscribe_to(topic, *, target_type=None, group_id=None, priority=MessagePriority.NORMAL, concurrency=None, batch_size=None, fetch_timeout_ms=None, fetch_max_linger_ms=None, fetch_max_bytes=None, connection_name=None, idempotency_settings=None, idempotency_key_resolver=None, enable_lineage=True)

This function collects the configuration metadata and registers it with the global decorated_consumer_registry.

Parameters:

Name Type Description Default
topic Union[str, List[str]]

The topic name (str) or a list of topic names.

required
target_type Optional[Type[Any]]

Optional Pydantic model or class used to deserialize.

None
group_id Optional[str]

Optional consumer group identifier.

None
priority MessagePriority

The priority queue this handler will consume from.

NORMAL
concurrency Optional[int]

The maximum number of messages processed concurrently.

None
batch_size Optional[int]

If > 0, enables "true batch" processing mode.

None
fetch_timeout_ms Optional[int]

Optional override for batch fetch timeout.

None
fetch_max_linger_ms Optional[int]

Optional override for batch linger time.

None
fetch_max_bytes Optional[int]

Optional override for batch size in bytes.

None
connection_name Optional[str]

The specific messaging connection.

None
idempotency_settings Optional[IdempotencySettings]

Optional configuration for idempotency.

None
idempotency_key_resolver Optional[Callable[..., str]]

Optional function to resolve idempotency key.

None
enable_lineage bool

If False, disables automatic lineage tracking for this consumer. Use this for internal infrastructure consumers (e.g., Lineage Collector).

True

Returns:

Type Description
Callable

The decorator function.

nala.athomic.integration.messaging.producers.factory.ProducerFactory

A Factory class that creates concrete Producer instances.

This factory is responsible for the low-level assembly of a single producer instance. It resolves all necessary dependencies, such as the payload processor and publishing strategies, and injects them into the concrete producer class retrieved from the messaging_producer_registry.

This factory is stateless and creates a new instance on every call.

clear() classmethod

Clears any potential class-level caches (if any were used). This method is kept for compatibility but does nothing as the factory is now stateless.

create(settings) staticmethod

Creates a new producer instance for a specific connection.

This is the primary public interface for obtaining a new producer. It handles dependency resolution and instance creation based on the provided settings. The ProducerManager (a BaseManager) uses this factory to build all producer instances.

Parameters:

Name Type Description Default
settings MessagingSettings

The fully resolved messaging settings for a specific connection. This object must contain the connection_name.

required

Returns:

Type Description
BaseProducer

A new, fully configured and initialized producer instance.

nala.athomic.integration.messaging.consumers.factory.ConsumerFactory

Builds a fully configured consumer instance by assembling components retrieved from registries based on configuration.

This factory acts as the main assembler for the messaging consumer subsystem. It adheres to the Open/Closed Principle (OCP) by retrieving concrete classes (Consumer, Strategies) from registries rather than using explicit conditional logic (e.g., if-elif-else on backend type).

create(settings, handler_config, connection_manager=None, serializer=None, dlq_handler=None) staticmethod

Assembles and returns a fully configured concrete consumer instance.

The assembly process involves resolving the Consumer class, the DLQ Handler, the Serializer, the Consumption Strategy, and the Message Processor before injecting them into the final BaseConsumer implementation.

Parameters:

Name Type Description Default
settings MessagingSettings

The global messaging configuration.

required
handler_config ConsumerHandlerConfig

The configuration for the specific handler (topic, group, batch mode).

required
connection_manager Optional[ConnectionManager]

Optional manager for databases/KV stores, required for ordering logic.

None
serializer Optional[SerializerProtocol]

Optional pre-resolved serializer instance.

None
dlq_handler Optional[DLQHandler]

Optional pre-resolved Dead Letter Queue handler.

None

Returns:

Type Description
BaseConsumer

A concrete instance of BaseConsumer (e.g., KafkaConsumer).

Raises:

Type Description
RuntimeError

If ordering is enabled but connection_manager is not provided or connected.

KeyError

If the specified backend or strategy creator is not found in the registries.