Payload Processing Pipeline
Overview
The Payload Processing module provides a flexible and powerful implementation of the Pipes and Filters architectural pattern. Its core component, the PayloadProcessor, orchestrates a configurable pipeline of transformation steps that are applied to data payloads, primarily within the messaging system.
This architecture allows complex processing sequences—such as serialization, encryption, compression, and large message handling (Claim Check)—to be composed declaratively in your configuration file. It decouples the messaging producers and consumers from the specifics of the payload format, making the system highly extensible.
How The Pipeline Works
The PayloadProcessor manages an ordered list of "steps," where each step is a component that implements the ProcessingStepProtocol (defining encode and decode methods).
Encoding Flow (On Publish)
When a message is published, the pipeline is executed in the forward order as defined in your configuration. The output of one step becomes the input for the next.
Domain Object-> [Serializer] ->Bytes-> [Crypto] ->Encrypted Bytes-> [Compression] ->Compressed Bytes->Message Broker
Decoding Flow (On Consume)
When a message is consumed, the pipeline is executed in reverse order.
Message Broker->Compressed Bytes-> [Decompression] ->Encrypted Bytes-> [Decryption] ->Bytes-> [Deserializer] ->Domain Object
This symmetric design ensures that transformations are applied and reverted correctly and transparently.
Available Pipeline Steps
Athomic provides several built-in steps that you can include in your payload pipeline:
serializer: (Required first step for encoding). Converts Python objects (like Pydantic models) into a byte representation (e.g., JSON, Protobuf) and vice-versa.crypto: Encrypts the byte payload using the configured cryptographic provider (e.g., Fernet) to ensure end-to-end security.compression: Compresses the byte payload to reduce its size, saving network bandwidth and storage costs.claim_check: Implements the Claim Check pattern. If a payload exceeds a size threshold, it's uploaded to a file storage provider (like GCS), and a small reference is sent through the broker instead.
Configuration
You configure the pipeline in your settings.toml file under the [integration.messaging.payload] section. The pipeline is a list of tables, and their order matters.
Example: Full Pipeline
This example configures a pipeline that first serializes the object to JSON, then encrypts it, and finally compresses it.
[default.integration.messaging.payload]
# The pipeline is a list of steps. Order is crucial.
# Encoding runs top-to-bottom. Decoding runs bottom-to-top.
[[default.integration.messaging.payload.pipeline]]
step = "serializer"
# Optional settings for this specific step can be provided.
[default.integration.messaging.payload.pipeline.settings]
backend = "orjson"
[[default.integration.messaging.payload.pipeline]]
step = "crypto"
# No settings needed here; it will use the global [security.crypto] config.
[[default.integration.messaging.payload.pipeline]]
step = "compression"
Extending the Pipeline (Custom Steps)
The pipeline is fully extensible. To add your own custom processing step (e.g., for data validation or transformation):
- Create a class that implements the
ProcessingStepProtocol. - Create a factory class that implements the
StepCreatorProtocol. - Register an instance of your creator in the
payload_step_registrywith a unique name.
Once registered, your custom step can be added to the pipeline in your configuration just like the built-in ones.
API Reference
nala.athomic.payload.processor.PayloadProcessor
An orchestrator that executes a dynamic pipeline of processing steps (Pipes and Filters pattern).
This class is agnostic to the nature of the steps (serialization, encryption, compression), focusing only on the correct order of execution for both outbound (encode) and inbound (decode) flows.
__init__(pipeline_steps)
Initializes the processor with an ordered list of pipeline steps.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline_steps
|
List[ProcessingStepProtocol]
|
An ordered list of steps to be executed during encoding. |
required |
decode(raw_data, **kwargs)
async
Executes the decoding pipeline in reverse order (backward).
The flow is typically: Raw Bytes -> [Decompress -> Decrypt -> Deserialize] -> Domain Object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
raw_data
|
bytes
|
The initial payload (transport bytes). |
required |
**kwargs
|
Any
|
Contextual arguments. |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The final domain object or data structure. |
encode(data_object, **kwargs)
async
Executes the encoding pipeline in the defined order (forward).
The flow is typically: Domain Object -> [Serialize -> Encrypt -> Compress] -> Raw Bytes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_object
|
Any
|
The initial payload (usually a domain object or model). |
required |
**kwargs
|
Any
|
Contextual arguments to be passed to each step. |
{}
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If the final output of the pipeline is not bytes. |
Returns:
| Name | Type | Description |
|---|---|---|
bytes |
bytes
|
The final, transport-ready payload. |
nala.athomic.payload.protocol.ProcessingStepProtocol
Bases: Protocol
Defines the contract for a single, bidirectional (encode/decode) step in a payload processing pipeline.
Examples of steps include serialization (JSON/Protobuf), compression (Gzip), or encryption (AES).
decode(data, **kwargs)
async
Processes data in the inbound flow (e.g., preparing data for consumption).
The flow direction is typically from raw bytes/transport format to the final domain object expected by the consumer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Any
|
The received payload (can be bytes or a transport format). |
required |
**kwargs
|
Any
|
Contextual arguments. |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The decoded payload (usually the final domain object). |
encode(data, **kwargs)
async
Processes data in the outbound flow (e.g., preparing data for transmission).
The flow direction is typically from domain object to raw bytes/transport format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Any
|
The payload to be processed (can be an object or bytes). |
required |
**kwargs
|
Any
|
Contextual arguments (e.g., schema version, compression level). |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The processed payload (usually bytes for intermediate steps). |
nala.athomic.payload.factory.PayloadProcessorFactory
A Singleton Factory responsible for assembling the final PayloadProcessor.
It dynamically retrieves step creators from a central registry based on a structured pipeline configuration (Pipes and Filters pattern), ensuring the pipeline assembly logic runs only once.
clear()
classmethod
Clears the singleton instance. Used primarily for test isolation.
create(settings)
classmethod
Retrieves or creates the singleton instance of the fully assembled PayloadProcessor pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
PayloadSettings
|
The configuration specifying the order and type of steps in the processing pipeline. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
PayloadProcessor |
PayloadProcessor
|
The shared, single instance of the processor. |
nala.athomic.payload.creators_protocol.StepCreatorProtocol
Bases: Protocol
Defines the contract for an object (a specialized Factory) that knows how to create a specific ProcessingStepProtocol implementation.
This protocol is used by the PayloadProcessorFactory to dynamically construct the processing pipeline based on configuration, enabling Dependency Injection for complex steps.
create(settings)
Creates and returns a concrete instance of a ProcessingStepProtocol.
The creation logic within the implementer should handle the necessary setup and dependency injection (e.g., retrieving a key for an EncryptionStep).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
Optional[Dict[str, Any]]
|
The specific configuration dictionary for this step from the main PayloadSettings. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
ProcessingStepProtocol |
ProcessingStepProtocol
|
The fully initialized processing step. |