Skip to content

Delayed Messages

Overview

The Messaging module provides a robust mechanism for publishing messages that should only be processed after a specified delay. This is a common requirement in many applications, for example:

  • Sending a follow-up email 24 hours after a user signs up.
  • Retrying a failed operation with an exponential backoff delay.
  • Scheduling a task to be executed at a specific time in the future.

Athomic implements this using a Delay Topics Strategy, which is a reliable and scalable pattern for handling message delays in high-throughput systems like Kafka.


How the Delay Topics Strategy Works

Instead of relying on a scheduler or timers within the application, this strategy uses a set of dedicated, intermediary topics in the message broker.

  1. Configuration: You define a set of "delay buckets" in your configuration—a mapping of a delay duration in seconds to a specific topic name.

    ```toml

    In settings.toml

    [default.integration.messaging.republisher.delay_topics] 300 = "platform.internal.delay-5m.v1" # 5-minute bucket 900 = "platform.internal.delay-15m.v1" # 15-minute bucket 3600 = "platform.internal.delay-1h.v1" # 1-hour bucket ```

  2. Publishing with Delay: When you call producer.publish(..., delay_seconds=X), the DelayedPublishStrategy is activated. It finds the smallest delay bucket that is greater than or equal to your requested delay.

  3. Envelope Wrapping: The producer wraps your original message (including its payload, headers, and final destination topic) inside a special "envelope" message.

  4. Publish to Delay Topic: This envelope is then published to the chosen delay topic (e.g., platform.internal.delay-5m.v1).

  5. The Republisher Service: Athomic runs a dedicated, internal consumer service (the "republisher"). This service is automatically configured to subscribe to all of your configured delay topics.

  6. Unwrap and Republish: When the republisher consumes a message from a delay topic, it unwraps the envelope, extracts the original message and its metadata, and publishes it to its final destination topic.

This approach is highly scalable and resilient because it leverages the underlying message broker's infrastructure for storing and delivering the delayed messages.


Usage

Using the delayed message feature is as simple as adding the delay_seconds parameter to a standard publish call.

from nala.athomic.integration.messaging import ProducerFactory

producer = ProducerFactory.create()

async def schedule_follow_up_email(user_id: str):
    # This message will be processed in approximately 1 hour.
    await producer.publish(
        topic="user.follow-ups.v1",
        message={"user_id": user_id, "email_type": "onboarding_d1"},
        key=user_id,
        delay_seconds=3600 # 1 hour
    )

Configuration

The delayed message system is configured under the [integration.messaging.republisher] section.

[default.integration.messaging.republisher]
enabled = true

# This must be set to use the delay topics strategy.
delay_strategy_backend = "kafka_topic_delay_strategy"

  # The consumer and producer settings for the internal republisher service.
  # This service needs its own configuration to connect to Kafka.
  [default.integration.messaging.republisher.consumer]
  bootstrap_servers = ["localhost:9092"]
  group_id = "nala-global-republisher-group"

  [default.integration.messaging.republisher.producer]
  bootstrap_servers = ["localhost:9092"]

  # The mapping of delay durations (in seconds) to topic names.
  [default.integration.messaging.republisher.delay_topics]
  60 = "platform.internal.delay-1m.v1"
  300 = "platform.internal.delay-5m.v1"
  3600 = "platform.internal.delay-1h.v1"

API Reference

nala.athomic.integration.messaging.producers.strategies.delayed.DelayedPublishStrategy

Bases: BasePublishingStrategy, PublishingStrategyProtocol

Implements the strategy for publishing a message with a delay.

This strategy works by: 1. Finding the closest matching delay topic bucket based on delay_seconds. 2. Wrapping the original message and its metadata in an envelope. 3. Publishing the envelope to the intermediary delay topic.

A separate consumer service (the republisher) will later unwrap and send the original message to the final destination after the delay.

__init__(lineage_settings)

Initializes the strategy with the specific lineage configuration.

Parameters:

Name Type Description Default
lineage_settings Optional[LineageSettings]

The application's global lineage configuration, injected by the factory.

required

publish(producer, message, topic, key, headers, delay_seconds) async

Processes the message for delayed delivery and dispatches it to the appropriate delay topic.

Parameters:

Name Type Description Default
producer BaseProducer

The producer instance used for transport.

required
message Any

The original message payload.

required
topic str

The final destination topic.

required
key Optional[Any]

The message key.

required
headers Optional[MessageHeaders]

The message headers.

required
delay_seconds Optional[int]

The duration of the delay.

required

Returns:

Name Type Description
PublishingOutcome PublishingOutcome

SUCCESS_DELAYED if the message was successfully published to the delay topic.

nala.athomic.integration.messaging.delay.republisher.DelayedMessageHandler

A callable class that acts as a consumer callback for delayed messages.

It unwraps the message envelope created by a delay strategy and republishes the original payload to its final destination topic using an injected producer.

__call__(message, message_key, raw_headers) async

Handles a message from a delay topic by republishing it to its final destination.

This method deserializes the envelope, rehydrates the original headers, and uses the injected producer to send the message.

Parameters:

Name Type Description Default
message Dict[str, Any]

The message payload from the delay topic (the envelope).

required
message_key Optional[str]

The message key (unused, as original key is in the envelope).

required
raw_headers MessageHeaders

The headers from the delay topic message.

required

Returns:

Type Description
ProcessingOutcome

ProcessingOutcome.ACK: If the message was successfully republished or if it was malformed and should be discarded.

Raises:

Type Description
Exception

Propagates exceptions from the producer (e.g., PublishError) to trigger the consumer's retry/DLQ mechanism.

__init__(producer)

Initializes the handler with its required producer dependency.

Parameters:

Name Type Description Default
producer ProducerProtocol

An active and connected producer instance that will be used to publish the unwrapped message.

required