Publishing Messages (Producer)
Overview
The Message Producer is the component responsible for sending messages to the configured message broker (e.g., Kafka). It provides a high-level, abstract interface that handles complex underlying tasks transparently, allowing you to focus on the data you want to send.
When you publish a message, the producer automatically orchestrates: - Payload Processing: The message is passed through the configured Payload Processing Pipeline for serialization, encryption, and compression. - Context Propagation: W3C Trace Context and other execution context variables are automatically injected into the message headers. - Publishing Strategy: The producer intelligently selects the correct strategy for dispatching the message, supporting both immediate and delayed delivery.
Getting a Producer Instance
The Producer is managed as a singleton to ensure efficient use of resources like connection pools. The correct way to obtain the producer instance is through the ProducerFactory.
from nala.athomic.integration.messaging import ProducerFactory
# The factory will create and cache a singleton instance based on your configuration.
producer = ProducerFactory.create()
Publishing a Message
The producer.publish() method is the single entry point for sending all messages.
Basic (Immediate) Publishing
To send a message for immediate delivery, you provide the topic and the message payload. The payload can be a Pydantic model, a dictionary, or any other data type your configured serializer can handle.
from your_app.schemas import UserCreatedEvent
# The message can be a Pydantic model
event_payload = UserCreatedEvent(user_id="user-123", email="test@example.com")
await producer.publish(
topic="user.events.v1",
message=event_payload,
key="user-123" # The key is used for partitioning in Kafka
)
Publishing with a Delay
This is a key feature of the Athomic producer. To send a message that should only be processed after a delay, simply add the delay_seconds argument to the publish call.
# This message will be sent to an intermediary delay topic and only
# republished to its final destination after 300 seconds (5 minutes).
await producer.publish(
topic="notifications.v1",
message={"email": "test@example.com", "template": "follow_up"},
key="user-123",
delay_seconds=300
)
Behind the scenes, the DelayedPublishStrategy wraps your message in an "envelope" and sends it to a pre-configured delay topic. A separate background service, the republisher, is responsible for consuming from these delay topics and sending the message to its final destination when the time is up. See the Delayed Messages documentation for more details.
API Reference
nala.athomic.integration.messaging.producers.protocol.ProducerProtocol
Bases: Protocol
Defines the contract for sending messages to a messaging system.
publish(message=None, topic='default_topic', key=None, headers=None, delay_seconds=None, priority=MessagePriority.NORMAL)
async
Publishes a message, deciding the strategy based on parameters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Optional[Any]
|
The content of the message to be sent. |
None
|
topic
|
str
|
The target topic (or exchange). |
'default_topic'
|
key
|
Optional[str]
|
The routing or partitioning key for the message. |
None
|
headers
|
Optional[MessageHeaders]
|
A MessageHeaders object with metadata. |
None
|
delay_seconds
|
Optional[int]
|
If provided and > 0, the message will be sent using a delayed publishing strategy. |
None
|
priority
|
MessagePriority
|
The priority level for processing the message. |
NORMAL
|
Returns:
| Name | Type | Description |
|---|---|---|
PublishingOutcome |
PublishingOutcome
|
An enum indicating the result of the operation, e.g., SUCCESS_DIRECT or SUCCESS_DELAYED. |
Raises:
| Type | Description |
|---|---|
PublishError
|
If publishing fails after all internal retries. |
MessagingConnectionError
|
If there are connection problems with the broker. |
NotImplementedError
|
If a requested strategy (e.g., 'delayed') is not configured. |
nala.athomic.integration.messaging.producers.factory.ProducerFactory
A Factory class that creates concrete Producer instances.
This factory is responsible for the low-level assembly of a single producer
instance. It resolves all necessary dependencies, such as the payload
processor and publishing strategies, and injects them into the concrete
producer class retrieved from the messaging_producer_registry.
This factory is stateless and creates a new instance on every call.
clear()
classmethod
Clears any potential class-level caches (if any were used). This method is kept for compatibility but does nothing as the factory is now stateless.
create(settings)
staticmethod
Creates a new producer instance for a specific connection.
This is the primary public interface for obtaining a new producer. It handles
dependency resolution and instance creation based on the provided settings.
The ProducerManager (a BaseManager) uses this factory to build
all producer instances.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
MessagingSettings
|
The fully resolved messaging settings for a specific connection.
This object must contain the |
required |
Returns:
| Type | Description |
|---|---|
BaseProducer
|
A new, fully configured and initialized producer instance. |
nala.athomic.integration.messaging.producers.strategies.direct.DirectPublishStrategy
Bases: BasePublishingStrategy, PublishingStrategyProtocol
Implements the strategy for publishing a message directly to its final destination topic without any delay or intermediate processing route.
This is the default, high-performance path for message delivery, focusing on immediate serialization, telemetry capture, and dispatch.
__init__(lineage_settings=None)
Initializes the strategy with the specific lineage configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
lineage_settings
|
Optional[LineageSettings]
|
The application's global lineage configuration, injected by the factory. |
None
|
publish(producer, message, topic, key, headers, delay_seconds)
async
Processes and publishes the message immediately to the broker. It handles the injection of tracing context, lineage headers, payload processing (serialization, compression), and metrics recording.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
producer
|
BaseProducer
|
The producer instance used for transport and processing. |
required |
message
|
Optional[Any]
|
The message payload. |
required |
topic
|
str
|
The destination topic name. |
required |
key
|
Optional[Any]
|
The message key. |
required |
headers
|
Optional[MessageHeaders]
|
The message headers. |
required |
delay_seconds
|
Optional[int]
|
Ignored by this strategy. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
PublishingOutcome |
PublishingOutcome
|
SUCCESS_DIRECT if the message was successfully dispatched. |
nala.athomic.integration.messaging.producers.strategies.delayed.DelayedPublishStrategy
Bases: BasePublishingStrategy, PublishingStrategyProtocol
Implements the strategy for publishing a message with a delay.
This strategy works by:
1. Finding the closest matching delay topic bucket based on delay_seconds.
2. Wrapping the original message and its metadata in an envelope.
3. Publishing the envelope to the intermediary delay topic.
A separate consumer service (the republisher) will later unwrap and send the original message to the final destination after the delay.
__init__(lineage_settings)
Initializes the strategy with the specific lineage configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
lineage_settings
|
Optional[LineageSettings]
|
The application's global lineage configuration, injected by the factory. |
required |
publish(producer, message, topic, key, headers, delay_seconds)
async
Processes the message for delayed delivery and dispatches it to the appropriate delay topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
producer
|
BaseProducer
|
The producer instance used for transport. |
required |
message
|
Any
|
The original message payload. |
required |
topic
|
str
|
The final destination topic. |
required |
key
|
Optional[Any]
|
The message key. |
required |
headers
|
Optional[MessageHeaders]
|
The message headers. |
required |
delay_seconds
|
Optional[int]
|
The duration of the delay. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
PublishingOutcome |
PublishingOutcome
|
SUCCESS_DELAYED if the message was successfully published to the delay topic. |