Delayed Messages
Overview
The Messaging module provides a robust mechanism for publishing messages that should only be processed after a specified delay. This is a common requirement in many applications, for example:
- Sending a follow-up email 24 hours after a user signs up.
- Retrying a failed operation with an exponential backoff delay.
- Scheduling a task to be executed at a specific time in the future.
Athomic implements this using a Delay Topics Strategy, which is a reliable and scalable pattern for handling message delays in high-throughput systems like Kafka.
How the Delay Topics Strategy Works
Instead of relying on a scheduler or timers within the application, this strategy uses a set of dedicated, intermediary topics in the message broker.
-
Configuration: You define a set of "delay buckets" in your configuration—a mapping of a delay duration in seconds to a specific topic name.
```toml
In settings.toml
[default.integration.messaging.republisher.delay_topics] 300 = "platform.internal.delay-5m.v1" # 5-minute bucket 900 = "platform.internal.delay-15m.v1" # 15-minute bucket 3600 = "platform.internal.delay-1h.v1" # 1-hour bucket ```
-
Publishing with Delay: When you call
producer.publish(..., delay_seconds=X), theDelayedPublishStrategyis activated. It finds the smallest delay bucket that is greater than or equal to your requested delay. -
Envelope Wrapping: The producer wraps your original message (including its payload, headers, and final destination topic) inside a special "envelope" message.
-
Publish to Delay Topic: This envelope is then published to the chosen delay topic (e.g.,
platform.internal.delay-5m.v1). -
The Republisher Service: Athomic runs a dedicated, internal consumer service (the "republisher"). This service is automatically configured to subscribe to all of your configured delay topics.
-
Unwrap and Republish: When the republisher consumes a message from a delay topic, it unwraps the envelope, extracts the original message and its metadata, and publishes it to its final destination topic.
This approach is highly scalable and resilient because it leverages the underlying message broker's infrastructure for storing and delivering the delayed messages.
Usage
Using the delayed message feature is as simple as adding the delay_seconds parameter to a standard publish call.
from nala.athomic.integration.messaging import ProducerFactory
producer = ProducerFactory.create()
async def schedule_follow_up_email(user_id: str):
# This message will be processed in approximately 1 hour.
await producer.publish(
topic="user.follow-ups.v1",
message={"user_id": user_id, "email_type": "onboarding_d1"},
key=user_id,
delay_seconds=3600 # 1 hour
)
Configuration
The delayed message system is configured under the [integration.messaging.republisher] section.
[default.integration.messaging.republisher]
enabled = true
# This must be set to use the delay topics strategy.
delay_strategy_backend = "kafka_topic_delay_strategy"
# The consumer and producer settings for the internal republisher service.
# This service needs its own configuration to connect to Kafka.
[default.integration.messaging.republisher.consumer]
bootstrap_servers = ["localhost:9092"]
group_id = "nala-global-republisher-group"
[default.integration.messaging.republisher.producer]
bootstrap_servers = ["localhost:9092"]
# The mapping of delay durations (in seconds) to topic names.
[default.integration.messaging.republisher.delay_topics]
60 = "platform.internal.delay-1m.v1"
300 = "platform.internal.delay-5m.v1"
3600 = "platform.internal.delay-1h.v1"
API Reference
nala.athomic.integration.messaging.producers.strategies.delayed.DelayedPublishStrategy
Bases: BasePublishingStrategy, PublishingStrategyProtocol
Implements the strategy for publishing a message with a delay.
This strategy works by:
1. Finding the closest matching delay topic bucket based on delay_seconds.
2. Wrapping the original message and its metadata in an envelope.
3. Publishing the envelope to the intermediary delay topic.
A separate consumer service (the republisher) will later unwrap and send the original message to the final destination after the delay.
__init__(lineage_settings)
Initializes the strategy with the specific lineage configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
lineage_settings
|
Optional[LineageSettings]
|
The application's global lineage configuration, injected by the factory. |
required |
publish(producer, message, topic, key, headers, delay_seconds)
async
Processes the message for delayed delivery and dispatches it to the appropriate delay topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
producer
|
BaseProducer
|
The producer instance used for transport. |
required |
message
|
Any
|
The original message payload. |
required |
topic
|
str
|
The final destination topic. |
required |
key
|
Optional[Any]
|
The message key. |
required |
headers
|
Optional[MessageHeaders]
|
The message headers. |
required |
delay_seconds
|
Optional[int]
|
The duration of the delay. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
PublishingOutcome |
PublishingOutcome
|
SUCCESS_DELAYED if the message was successfully published to the delay topic. |
nala.athomic.integration.messaging.delay.republisher.DelayedMessageHandler
A callable class that acts as a consumer callback for delayed messages.
It unwraps the message envelope created by a delay strategy and republishes the original payload to its final destination topic using an injected producer.
__call__(message, message_key, raw_headers)
async
Handles a message from a delay topic by republishing it to its final destination.
This method deserializes the envelope, rehydrates the original headers, and uses the injected producer to send the message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Dict[str, Any]
|
The message payload from the delay topic (the envelope). |
required |
message_key
|
Optional[str]
|
The message key (unused, as original key is in the envelope). |
required |
raw_headers
|
MessageHeaders
|
The headers from the delay topic message. |
required |
Returns:
| Type | Description |
|---|---|
ProcessingOutcome
|
ProcessingOutcome.ACK: If the message was successfully republished or if it was malformed and should be discarded. |
Raises:
| Type | Description |
|---|---|
Exception
|
Propagates exceptions from the producer (e.g., PublishError) to trigger the consumer's retry/DLQ mechanism. |
__init__(producer)
Initializes the handler with its required producer dependency.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
producer
|
ProducerProtocol
|
An active and connected producer instance that will be used to publish the unwrapped message. |
required |