Serializer
Overview
The Serializer module is a core component of the Athomic Layer responsible for the crucial task of converting Python objects into a byte representation (serialization) and back (deserialization). This functionality is essential for any operation that involves data persistence or network transmission, such as:
- Storing data in a Key-Value store (e.g., Redis).
- Publishing messages to a message broker (e.g., Kafka).
- Caching function results.
The module is designed with extensibility in mind, built upon a protocol-centric architecture. This allows developers to easily add new serialization formats without modifying the core components that rely on it.
Core Concepts
The Serializer module is built on three key components that work together:
1. SerializerProtocol
This is the abstract contract that every serializer provider must implement. It defines the standard interface for all serialization and deserialization operations, ensuring that any component can use any serializer interchangeably. The key methods include:
serialize_value(value): Converts a Python object into bytes.deserialize_value(value): Converts bytes back into a Python object.serialize_key(key): A dedicated method for serializing message keys.serialize_headers(headers): A dedicated method for serializing message headers.
2. SerializerRegistry
This is a central registry that maps string identifiers (backend names) to the concrete serializer classes that implement the SerializerProtocol. For example, it maps the name "orjson" to the OrjsonSerializer class. This allows the factory to be decoupled from the implementations.
3. SerializerFactory
This is the primary entry point for the rest of the application to obtain a serializer instance. It reads the application's configuration, finds the configured backend name in the SerializerRegistry, and instantiates the correct class. It manages a singleton instance to ensure the same serializer is reused throughout the application.
Available Providers
The Athomic Layer comes with several built-in serializer providers:
JsonPydanticSerializer: A robust JSON serializer that enhances deserialization by validating the incoming data against a target Pydantic model. This is great for ensuring data integrity at the boundaries of your application.OrjsonSerializer: A high-performance JSON serializer that uses theorjsonlibrary for significant speed improvements over the standardjsonlibrary.ProtobufSerializer: A highly efficient, schema-based serializer for Google Protocol Buffers. It can optionally integrate with a Schema Registry to handle schema validation and wire format encapsulation, making it ideal for high-throughput messaging systems.
Configuration
The serializer is configured under the serializer section in your settings file (e.g., settings.toml). You can select the backend and provide specific configurations for features like Schema Registry.
[default.serializer]
# The name of the specific serializer implementation to use (e.g., 'json_pydantic', 'protobuf', 'orjson').
backend = "orjson"
# The fundamental data format that this serializer handles.
format = "json"
# --- Schema Registry (for Protobuf/Avro) ---
schema_validation_enabled = false
schema_registry_url = "http://localhost:8081"
handler = "confluent_protobuf"
API Reference
The following sections are auto-generated from the source code docstrings.
SerializerProtocol
nala.athomic.serializer.protocol.SerializerProtocol
Bases: ABC
Defines the bidirectional contract for message serializers.
This protocol handles the conversion of payloads, keys, and headers between Python objects/dictionaries and the raw byte format suitable for transport-specific backends (e.g., Kafka, Pub/Sub) or storage layers (e.g., Cache, Outbox). It is a critical component of the message processing pipeline.
__init__(settings)
Initializes the serializer with its required configuration settings.
deserialize_headers(headers, **kwargs)
abstractmethod
async
Deserializes the message headers from the backend's raw format into a dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
headers
|
Optional[Any]
|
The serialized headers from the broker, which may be a list of tuples or a raw object. |
required |
**kwargs
|
Any
|
Contextual arguments. |
{}
|
Returns:
| Type | Description |
|---|---|
Optional[Dict[str, str]]
|
Optional[Dict[str, str]]: The deserialized headers as a dictionary of key-value strings, or None. |
Raises:
| Type | Description |
|---|---|
Exception
|
If deserialization fails. |
deserialize_key(key, **kwargs)
abstractmethod
async
Deserializes the raw byte message key back into a string or object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
bytes
|
The serialized message key as bytes. |
required |
**kwargs
|
Any
|
Contextual arguments. |
{}
|
Returns:
| Type | Description |
|---|---|
Optional[str]
|
Optional[str]: The deserialized key (typically a string), or None. |
Raises:
| Type | Description |
|---|---|
Exception
|
If deserialization fails. |
deserialize_value(value, **kwargs)
abstractmethod
async
Deserializes the raw byte message payload back into a Python object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
bytes
|
The serialized message payload as bytes. |
required |
**kwargs
|
Any
|
Contextual arguments, potentially including a |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The deserialized message value (e.g., dictionary, Pydantic model instance). |
Raises:
| Type | Description |
|---|---|
Exception
|
If deserialization fails (e.g., invalid JSON/Protobuf format, schema mismatch). |
serialize_headers(headers, **kwargs)
abstractmethod
async
Serializes the message headers into a list of backend-compatible byte tuples.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
headers
|
Optional[Dict[str, str]]
|
Optional dictionary of string-based headers. |
required |
**kwargs
|
Any
|
Contextual arguments. |
{}
|
Returns:
| Type | Description |
|---|---|
Optional[list[tuple[str, bytes]]]
|
Optional[list[tuple[str, bytes]]]: A list of (key, value_bytes) tuples, or None. |
Raises:
| Type | Description |
|---|---|
Exception
|
If serialization fails (e.g., non-string key or value). |
serialize_key(key, **kwargs)
abstractmethod
async
Serializes the message key for partitioning or routing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
Optional[Any]
|
The optional message key. |
required |
**kwargs
|
Any
|
Contextual arguments (e.g., topic name). |
{}
|
Returns:
| Type | Description |
|---|---|
Optional[bytes]
|
Optional[bytes]: The serialized key as raw bytes, or None. |
Raises:
| Type | Description |
|---|---|
Exception
|
If serialization fails. |
serialize_value(value, **kwargs)
abstractmethod
async
Serializes the message payload for outbound transmission or storage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
Any
|
The original message payload (e.g., Pydantic model, dictionary). |
required |
**kwargs
|
Any
|
Contextual arguments (e.g., topic name, schema version). |
{}
|
Returns:
| Type | Description |
|---|---|
Optional[bytes]
|
Optional[bytes]: The serialized payload as raw bytes, or None if the input was None. |
Raises:
| Type | Description |
|---|---|
Exception
|
If serialization fails (e.g., unhandled data type, schema incompatibility). |
SerializerFactory
nala.athomic.serializer.factory.SerializerFactory
Factory for instantiating message serializers based on messaging backend. Falls back to BaseSerializer if no specific implementation is registered. Caches instances by backend name for singleton-like behavior per backend.
clear()
classmethod
Clears the singleton cache of serializer instances.
create(settings=None)
classmethod
Creates and returns a singleton instance of the configured SerializerProtocol by delegating to a registered creator.
JsonPydanticSerializer
nala.athomic.serializer.providers.json_pydantic_serializer.JsonPydanticSerializer
Bases: BaseSerializer
A JSON serializer implementation optimized for data validation using Pydantic models.
This serializer inherits the core functionality for header and key handling,
as well as default serialization (serialize_value), from BaseSerializer.
Its specialized role is to enhance the deserialization process by enforcing
the schema defined by a target Pydantic model.
deserialize_value(value, target_type=dict, **kwargs)
async
Deserializes a JSON payload from bytes and validates it against a target Pydantic model.
If a target_type (that is a subclass of BaseModel) is provided, the data
is validated against that schema before being returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
bytes | None
|
The raw JSON payload as bytes. |
required |
target_type
|
Any
|
The expected class for the deserialized result, typically
a |
dict
|
Returns:
| Type | Description |
|---|---|
Any | None
|
Any | None: The validated |
Raises:
| Type | Description |
|---|---|
DeserializationError
|
If JSON decoding fails or if Pydantic validation fails. |
OrjsonSerializer
nala.athomic.serializer.providers.orjson_serializer.OrjsonSerializer
Bases: BaseSerializer
A high-performance serializer that uses the orjson library to encode and decode JSON.
This class provides a fast, dedicated JSON implementation for message payloads,
inheriting all common features (logging, tracing, header/key handling) from
BaseSerializer.
__init__(settings)
Initializes the OrjsonSerializer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
SerializerSettings
|
The configuration for this serializer instance. |
required |
deserialize_value(data, **kwargs)
async
Deserializes a JSON payload from bytes back into a Python object (dict/list).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Any
|
The serialized message payload, expected as bytes. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The deserialized Python object. |
Raises:
| Type | Description |
|---|---|
DeserializationError
|
If orjson fails to decode the payload (e.g., invalid JSON structure). |
serialize_value(value, **kwargs)
async
Serializes a value into a JSON byte string using orjson.
It uses the custom default encoder to handle complex types like Pydantic models.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
Any
|
The original message payload. |
required |
Returns:
| Type | Description |
|---|---|
Optional[bytes]
|
Optional[bytes]: The serialized payload as raw bytes. |
Raises:
| Type | Description |
|---|---|
SerializationError
|
If orjson fails to serialize the object (e.g., non-serializable type). |
ProtobufSerializer
nala.athomic.serializer.providers.protobuf_serializer.ProtobufSerializer
Bases: BaseSerializer
Serializes and deserializes messages using Google Protocol Buffers.
This serializer is optimized for performance and type safety. It conditionally delegates schema validation and wire format encapsulation (Confluent Wire Format) to a specialized Schema Handler (e.g., Avro, Protobuf, etc.) when configured. This design keeps the class decoupled (SRP) and focused on Protobuf operations.
Attributes:
| Name | Type | Description |
|---|---|---|
handler |
Optional[SchemaHandlerProtocol]
|
The component responsible for Schema Registry interaction and wire format handling. |
__init__(settings)
Initializes the serializer.
It attempts to inject a schema handler if schema validation is enabled in the configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
SerializerSettings
|
The specific configuration for this serializer instance. |
required |
deserialize_value(data, **kwargs)
async
Deserializes raw bytes into a Protobuf message instance.
If the payload is in the Confluent Wire Format (starts with MAGIC_BYTE), it delegates the process to the schema handler for schema ID lookup. Otherwise, it attempts standard Protobuf deserialization.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Any
|
The raw message data, expected as bytes. |
required |
**kwargs
|
Any
|
Contextual arguments (must include 'target_type' and 'topic' if schema handler is active). |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The deserialized Protobuf message instance. |
Raises:
| Type | Description |
|---|---|
DeserializationError
|
If 'target_type' is missing, data is malformed, or standard deserialization fails. |
serialize_value(value, **kwargs)
async
Serializes a Protobuf message into raw bytes.
Delegates to the schema handler if configured to include the wire format (magic byte + schema ID). Otherwise, performs standard Protobuf serialization.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
Any
|
The Protobuf message instance to serialize. |
required |
**kwargs
|
Any
|
Contextual arguments (must include 'topic' if schema handler is active). |
{}
|
Returns:
| Type | Description |
|---|---|
Optional[bytes]
|
Optional[bytes]: The serialized payload. |
Raises:
| Type | Description |
|---|---|
SerializationError
|
If the value is not a Protobuf message or if the required 'topic' is missing when a schema handler is used. |