Context Propagation in Messaging
Overview
A fundamental challenge in distributed, event-driven architectures is maintaining a consistent "context" as a request flows from one service to another through a message broker. This context includes critical observability data like the trace_id for distributed tracing and application-level data like the tenant_id for multi-tenancy.
The Athomic messaging module solves this problem with automatic context propagation. It ensures that when you consume a message in a background worker, you have access to the same execution context that existed in the service that originally published the message.
How It Works
The entire process is automated and built upon the Context Management module.
Injection (Producer Side)
- When you call
producer.publish(), theBaseProducerautomatically invokes theMessagingTelemetryAdapter. - This adapter calls the
capture_context()function, which serializes all context variables marked for propagation (liketrace_id,tenant_id,request_id, etc.) into a dictionary. - The W3C Trace Context (
traceparent,tracestate) is injected into this dictionary for OpenTelemetry. - Finally, this context dictionary is injected as a special header into the message before it is sent to the broker.
Extraction (Consumer Side)
- When a consumer receives a message, the
BaseMessageProcessorreceives the raw message with its headers. - It uses the
MessagingTelemetryAdapterto extract the W3C Trace Context, which creates and activates a new OpenTelemetry span that is correctly parented to the producer's span. - The
@run_task_with_contextdecorator (for background tasks) or internal consumer logic callsrestore_context(), which takes the context dictionary from the message headers and applies its values to thecontextvarsof the current execution scope.
The result is seamless. When your consumer handler function is executed, a call to context_vars.get_tenant_id() will return the same tenant_id from the original publisher, and your logs and traces will be perfectly correlated.
What is Propagated?
By default, the following context variables are automatically propagated in message headers:
request_idtrace_idspan_idtenant_iduser_idrolelocalecorrelation_idfeature_flags
This list is fully configurable in your settings.toml file under the [context] section.
API Reference
nala.athomic.context.propagation.capture_context()
Captures the current values of all context variables marked for propagation.
This function iterates through the centrally registered context variables
in the context_var_manager, collects the values of those flagged with
propagate=True, and returns them as a dictionary. This dictionary is
suitable for serialization and passing to background tasks or events.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
A dictionary containing the current context values to be propagated. |
nala.athomic.context.propagation.restore_context(context_dict)
A context manager to temporarily set context variables from a dictionary.
This is used on the worker side (e.g., in a background task or event handler)
to re-establish the context that existed when the job was enqueued.
It safely resets all variables to their previous state upon exiting the with block.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context_dict
|
Optional[Dict[str, Any]]
|
The dictionary of context values captured before the task was enqueued. |
required |
nala.athomic.observability.telemetry.adapters.messaging_adapter.MessagingTelemetryAdapter
Bases: TelemetryAdapterProtocol
A concrete Telemetry Adapter for messaging systems.
This adapter handles the injection and extraction of W3C Trace Context into and from the Athomic MessageHeaders object, enabling end-to-end tracing across message broker boundaries.
activate_from_headers(headers)
A context manager that extracts the trace context from incoming message headers and activates it for the duration of the message processing.
This is the primary method used by consumer implementations to continue the distributed trace from the producer.
extract_context(headers)
Extracts the trace context from an incoming MessageHeaders object and returns the active OpenTelemetry Context.
inject_context(headers)
Injects the current trace context and a unique 'message_id' into a MessageHeaders object for an outgoing message.