Skip to content

Context Propagation in Messaging

Overview

A fundamental challenge in distributed, event-driven architectures is maintaining a consistent "context" as a request flows from one service to another through a message broker. This context includes critical observability data like the trace_id for distributed tracing and application-level data like the tenant_id for multi-tenancy.

The Athomic messaging module solves this problem with automatic context propagation. It ensures that when you consume a message in a background worker, you have access to the same execution context that existed in the service that originally published the message.


How It Works

The entire process is automated and built upon the Context Management module.

Injection (Producer Side)

  1. When you call producer.publish(), the BaseProducer automatically invokes the MessagingTelemetryAdapter.
  2. This adapter calls the capture_context() function, which serializes all context variables marked for propagation (like trace_id, tenant_id, request_id, etc.) into a dictionary.
  3. The W3C Trace Context (traceparent, tracestate) is injected into this dictionary for OpenTelemetry.
  4. Finally, this context dictionary is injected as a special header into the message before it is sent to the broker.

Extraction (Consumer Side)

  1. When a consumer receives a message, the BaseMessageProcessor receives the raw message with its headers.
  2. It uses the MessagingTelemetryAdapter to extract the W3C Trace Context, which creates and activates a new OpenTelemetry span that is correctly parented to the producer's span.
  3. The @run_task_with_context decorator (for background tasks) or internal consumer logic calls restore_context(), which takes the context dictionary from the message headers and applies its values to the contextvars of the current execution scope.

The result is seamless. When your consumer handler function is executed, a call to context_vars.get_tenant_id() will return the same tenant_id from the original publisher, and your logs and traces will be perfectly correlated.


What is Propagated?

By default, the following context variables are automatically propagated in message headers:

  • request_id
  • trace_id
  • span_id
  • tenant_id
  • user_id
  • role
  • locale
  • correlation_id
  • feature_flags

This list is fully configurable in your settings.toml file under the [context] section.


API Reference

nala.athomic.context.propagation.capture_context()

Captures the current values of all context variables marked for propagation.

This function iterates through the centrally registered context variables in the context_var_manager, collects the values of those flagged with propagate=True, and returns them as a dictionary. This dictionary is suitable for serialization and passing to background tasks or events.

Returns:

Type Description
Dict[str, Any]

A dictionary containing the current context values to be propagated.

nala.athomic.context.propagation.restore_context(context_dict)

A context manager to temporarily set context variables from a dictionary.

This is used on the worker side (e.g., in a background task or event handler) to re-establish the context that existed when the job was enqueued. It safely resets all variables to their previous state upon exiting the with block.

Parameters:

Name Type Description Default
context_dict Optional[Dict[str, Any]]

The dictionary of context values captured before the task was enqueued.

required

nala.athomic.observability.telemetry.adapters.messaging_adapter.MessagingTelemetryAdapter

Bases: TelemetryAdapterProtocol

A concrete Telemetry Adapter for messaging systems.

This adapter handles the injection and extraction of W3C Trace Context into and from the Athomic MessageHeaders object, enabling end-to-end tracing across message broker boundaries.

activate_from_headers(headers)

A context manager that extracts the trace context from incoming message headers and activates it for the duration of the message processing.

This is the primary method used by consumer implementations to continue the distributed trace from the producer.

extract_context(headers)

Extracts the trace context from an incoming MessageHeaders object and returns the active OpenTelemetry Context.

inject_context(headers)

Injects the current trace context and a unique 'message_id' into a MessageHeaders object for an outgoing message.