Skip to content

Publishing Messages (Producer)

Overview

The Message Producer is the component responsible for sending messages to the configured message broker (e.g., Kafka). It provides a high-level, abstract interface that handles complex underlying tasks transparently, allowing you to focus on the data you want to send.

When you publish a message, the producer automatically orchestrates: - Payload Processing: The message is passed through the configured Payload Processing Pipeline for serialization, encryption, and compression. - Context Propagation: W3C Trace Context and other execution context variables are automatically injected into the message headers. - Publishing Strategy: The producer intelligently selects the correct strategy for dispatching the message, supporting both immediate and delayed delivery.


Getting a Producer Instance

The Producer is managed as a singleton to ensure efficient use of resources like connection pools. The correct way to obtain the producer instance is through the ProducerFactory.

from nala.athomic.integration.messaging import ProducerFactory

# The factory will create and cache a singleton instance based on your configuration.
producer = ProducerFactory.create()

Publishing a Message

The producer.publish() method is the single entry point for sending all messages.

Basic (Immediate) Publishing

To send a message for immediate delivery, you provide the topic and the message payload. The payload can be a Pydantic model, a dictionary, or any other data type your configured serializer can handle.

from your_app.schemas import UserCreatedEvent

# The message can be a Pydantic model
event_payload = UserCreatedEvent(user_id="user-123", email="test@example.com")

await producer.publish(
    topic="user.events.v1",
    message=event_payload,
    key="user-123"  # The key is used for partitioning in Kafka
)

Publishing with a Delay

This is a key feature of the Athomic producer. To send a message that should only be processed after a delay, simply add the delay_seconds argument to the publish call.

# This message will be sent to an intermediary delay topic and only
# republished to its final destination after 300 seconds (5 minutes).
await producer.publish(
    topic="notifications.v1",
    message={"email": "test@example.com", "template": "follow_up"},
    key="user-123",
    delay_seconds=300
)

Behind the scenes, the DelayedPublishStrategy wraps your message in an "envelope" and sends it to a pre-configured delay topic. A separate background service, the republisher, is responsible for consuming from these delay topics and sending the message to its final destination when the time is up. See the Delayed Messages documentation for more details.


API Reference

nala.athomic.integration.messaging.producers.protocol.ProducerProtocol

Bases: Protocol

Defines the contract for sending messages to a messaging system.

publish(message=None, topic='default_topic', key=None, headers=None, delay_seconds=None, priority=MessagePriority.NORMAL) async

Publishes a message, deciding the strategy based on parameters.

Parameters:

Name Type Description Default
message Optional[Any]

The content of the message to be sent.

None
topic str

The target topic (or exchange).

'default_topic'
key Optional[str]

The routing or partitioning key for the message.

None
headers Optional[MessageHeaders]

A MessageHeaders object with metadata.

None
delay_seconds Optional[int]

If provided and > 0, the message will be sent using a delayed publishing strategy.

None
priority MessagePriority

The priority level for processing the message.

NORMAL

Returns:

Name Type Description
PublishingOutcome PublishingOutcome

An enum indicating the result of the operation, e.g., SUCCESS_DIRECT or SUCCESS_DELAYED.

Raises:

Type Description
PublishError

If publishing fails after all internal retries.

MessagingConnectionError

If there are connection problems with the broker.

NotImplementedError

If a requested strategy (e.g., 'delayed') is not configured.

nala.athomic.integration.messaging.producers.factory.ProducerFactory

A Factory class that creates concrete Producer instances.

This factory is responsible for the low-level assembly of a single producer instance. It resolves all necessary dependencies, such as the payload processor and publishing strategies, and injects them into the concrete producer class retrieved from the messaging_producer_registry.

This factory is stateless and creates a new instance on every call.

clear() classmethod

Clears any potential class-level caches (if any were used). This method is kept for compatibility but does nothing as the factory is now stateless.

create(settings) staticmethod

Creates a new producer instance for a specific connection.

This is the primary public interface for obtaining a new producer. It handles dependency resolution and instance creation based on the provided settings. The ProducerManager (a BaseManager) uses this factory to build all producer instances.

Parameters:

Name Type Description Default
settings MessagingSettings

The fully resolved messaging settings for a specific connection. This object must contain the connection_name.

required

Returns:

Type Description
BaseProducer

A new, fully configured and initialized producer instance.

nala.athomic.integration.messaging.producers.strategies.direct.DirectPublishStrategy

Bases: BasePublishingStrategy, PublishingStrategyProtocol

Implements the strategy for publishing a message directly to its final destination topic without any delay or intermediate processing route.

This is the default, high-performance path for message delivery, focusing on immediate serialization, telemetry capture, and dispatch.

__init__(lineage_settings=None)

Initializes the strategy with the specific lineage configuration.

Parameters:

Name Type Description Default
lineage_settings Optional[LineageSettings]

The application's global lineage configuration, injected by the factory.

None

publish(producer, message, topic, key, headers, delay_seconds) async

Processes and publishes the message immediately to the broker. It handles the injection of tracing context, lineage headers, payload processing (serialization, compression), and metrics recording.

Parameters:

Name Type Description Default
producer BaseProducer

The producer instance used for transport and processing.

required
message Optional[Any]

The message payload.

required
topic str

The destination topic name.

required
key Optional[Any]

The message key.

required
headers Optional[MessageHeaders]

The message headers.

required
delay_seconds Optional[int]

Ignored by this strategy.

required

Returns:

Name Type Description
PublishingOutcome PublishingOutcome

SUCCESS_DIRECT if the message was successfully dispatched.

nala.athomic.integration.messaging.producers.strategies.delayed.DelayedPublishStrategy

Bases: BasePublishingStrategy, PublishingStrategyProtocol

Implements the strategy for publishing a message with a delay.

This strategy works by: 1. Finding the closest matching delay topic bucket based on delay_seconds. 2. Wrapping the original message and its metadata in an envelope. 3. Publishing the envelope to the intermediary delay topic.

A separate consumer service (the republisher) will later unwrap and send the original message to the final destination after the delay.

__init__(lineage_settings)

Initializes the strategy with the specific lineage configuration.

Parameters:

Name Type Description Default
lineage_settings Optional[LineageSettings]

The application's global lineage configuration, injected by the factory.

required

publish(producer, message, topic, key, headers, delay_seconds) async

Processes the message for delayed delivery and dispatches it to the appropriate delay topic.

Parameters:

Name Type Description Default
producer BaseProducer

The producer instance used for transport.

required
message Any

The original message payload.

required
topic str

The final destination topic.

required
key Optional[Any]

The message key.

required
headers Optional[MessageHeaders]

The message headers.

required
delay_seconds Optional[int]

The duration of the delay.

required

Returns:

Name Type Description
PublishingOutcome PublishingOutcome

SUCCESS_DELAYED if the message was successfully published to the delay topic.