Skip to content

Serializer

Overview

The Serializer module is a core component of the Athomic Layer responsible for the crucial task of converting Python objects into a byte representation (serialization) and back (deserialization). This functionality is essential for any operation that involves data persistence or network transmission, such as:

  • Storing data in a Key-Value store (e.g., Redis).
  • Publishing messages to a message broker (e.g., Kafka).
  • Caching function results.

The module is designed with extensibility in mind, built upon a protocol-centric architecture. This allows developers to easily add new serialization formats without modifying the core components that rely on it.


Core Concepts

The Serializer module is built on three key components that work together:

1. SerializerProtocol

This is the abstract contract that every serializer provider must implement. It defines the standard interface for all serialization and deserialization operations, ensuring that any component can use any serializer interchangeably. The key methods include:

  • serialize_value(value): Converts a Python object into bytes.
  • deserialize_value(value): Converts bytes back into a Python object.
  • serialize_key(key): A dedicated method for serializing message keys.
  • serialize_headers(headers): A dedicated method for serializing message headers.

2. SerializerRegistry

This is a central registry that maps string identifiers (backend names) to the concrete serializer classes that implement the SerializerProtocol. For example, it maps the name "orjson" to the OrjsonSerializer class. This allows the factory to be decoupled from the implementations.

3. SerializerFactory

This is the primary entry point for the rest of the application to obtain a serializer instance. It reads the application's configuration, finds the configured backend name in the SerializerRegistry, and instantiates the correct class. It manages a singleton instance to ensure the same serializer is reused throughout the application.


Available Providers

The Athomic Layer comes with several built-in serializer providers:

  • JsonPydanticSerializer: A robust JSON serializer that enhances deserialization by validating the incoming data against a target Pydantic model. This is great for ensuring data integrity at the boundaries of your application.
  • OrjsonSerializer: A high-performance JSON serializer that uses the orjson library for significant speed improvements over the standard json library.
  • ProtobufSerializer: A highly efficient, schema-based serializer for Google Protocol Buffers. It can optionally integrate with a Schema Registry to handle schema validation and wire format encapsulation, making it ideal for high-throughput messaging systems.

Configuration

The serializer is configured under the serializer section in your settings file (e.g., settings.toml). You can select the backend and provide specific configurations for features like Schema Registry.

[default.serializer]
# The name of the specific serializer implementation to use (e.g., 'json_pydantic', 'protobuf', 'orjson').
backend = "orjson"

# The fundamental data format that this serializer handles.
format = "json"

# --- Schema Registry (for Protobuf/Avro) ---
schema_validation_enabled = false
schema_registry_url = "http://localhost:8081"
handler = "confluent_protobuf"

API Reference

The following sections are auto-generated from the source code docstrings.

SerializerProtocol

nala.athomic.serializer.protocol.SerializerProtocol

Bases: ABC

Defines the bidirectional contract for message serializers.

This protocol handles the conversion of payloads, keys, and headers between Python objects/dictionaries and the raw byte format suitable for transport-specific backends (e.g., Kafka, Pub/Sub) or storage layers (e.g., Cache, Outbox). It is a critical component of the message processing pipeline.

__init__(settings)

Initializes the serializer with its required configuration settings.

deserialize_headers(headers, **kwargs) abstractmethod async

Deserializes the message headers from the backend's raw format into a dictionary.

Parameters:

Name Type Description Default
headers Optional[Any]

The serialized headers from the broker, which may be a list of tuples or a raw object.

required
**kwargs Any

Contextual arguments.

{}

Returns:

Type Description
Optional[Dict[str, str]]

Optional[Dict[str, str]]: The deserialized headers as a dictionary of key-value strings, or None.

Raises:

Type Description
Exception

If deserialization fails.

deserialize_key(key, **kwargs) abstractmethod async

Deserializes the raw byte message key back into a string or object.

Parameters:

Name Type Description Default
key bytes

The serialized message key as bytes.

required
**kwargs Any

Contextual arguments.

{}

Returns:

Type Description
Optional[str]

Optional[str]: The deserialized key (typically a string), or None.

Raises:

Type Description
Exception

If deserialization fails.

deserialize_value(value, **kwargs) abstractmethod async

Deserializes the raw byte message payload back into a Python object.

Parameters:

Name Type Description Default
value bytes

The serialized message payload as bytes.

required
**kwargs Any

Contextual arguments, potentially including a target_type (e.g., a Pydantic model) for validation.

{}

Returns:

Name Type Description
Any Any

The deserialized message value (e.g., dictionary, Pydantic model instance).

Raises:

Type Description
Exception

If deserialization fails (e.g., invalid JSON/Protobuf format, schema mismatch).

serialize_headers(headers, **kwargs) abstractmethod async

Serializes the message headers into a list of backend-compatible byte tuples.

Parameters:

Name Type Description Default
headers Optional[Dict[str, str]]

Optional dictionary of string-based headers.

required
**kwargs Any

Contextual arguments.

{}

Returns:

Type Description
Optional[list[tuple[str, bytes]]]

Optional[list[tuple[str, bytes]]]: A list of (key, value_bytes) tuples, or None.

Raises:

Type Description
Exception

If serialization fails (e.g., non-string key or value).

serialize_key(key, **kwargs) abstractmethod async

Serializes the message key for partitioning or routing.

Parameters:

Name Type Description Default
key Optional[Any]

The optional message key.

required
**kwargs Any

Contextual arguments (e.g., topic name).

{}

Returns:

Type Description
Optional[bytes]

Optional[bytes]: The serialized key as raw bytes, or None.

Raises:

Type Description
Exception

If serialization fails.

serialize_value(value, **kwargs) abstractmethod async

Serializes the message payload for outbound transmission or storage.

Parameters:

Name Type Description Default
value Any

The original message payload (e.g., Pydantic model, dictionary).

required
**kwargs Any

Contextual arguments (e.g., topic name, schema version).

{}

Returns:

Type Description
Optional[bytes]

Optional[bytes]: The serialized payload as raw bytes, or None if the input was None.

Raises:

Type Description
Exception

If serialization fails (e.g., unhandled data type, schema incompatibility).

SerializerFactory

nala.athomic.serializer.factory.SerializerFactory

Factory for instantiating message serializers based on messaging backend. Falls back to BaseSerializer if no specific implementation is registered. Caches instances by backend name for singleton-like behavior per backend.

clear() classmethod

Clears the singleton cache of serializer instances.

create(settings=None) classmethod

Creates and returns a singleton instance of the configured SerializerProtocol by delegating to a registered creator.

JsonPydanticSerializer

nala.athomic.serializer.providers.json_pydantic_serializer.JsonPydanticSerializer

Bases: BaseSerializer

A JSON serializer implementation optimized for data validation using Pydantic models.

This serializer inherits the core functionality for header and key handling, as well as default serialization (serialize_value), from BaseSerializer. Its specialized role is to enhance the deserialization process by enforcing the schema defined by a target Pydantic model.

deserialize_value(value, target_type=dict, **kwargs) async

Deserializes a JSON payload from bytes and validates it against a target Pydantic model.

If a target_type (that is a subclass of BaseModel) is provided, the data is validated against that schema before being returned.

Parameters:

Name Type Description Default
value bytes | None

The raw JSON payload as bytes.

required
target_type Any

The expected class for the deserialized result, typically a pydantic.BaseModel subclass. Defaults to dict.

dict

Returns:

Type Description
Any | None

Any | None: The validated target_type instance or the raw data (dict/list).

Raises:

Type Description
DeserializationError

If JSON decoding fails or if Pydantic validation fails.

OrjsonSerializer

nala.athomic.serializer.providers.orjson_serializer.OrjsonSerializer

Bases: BaseSerializer

A high-performance serializer that uses the orjson library to encode and decode JSON.

This class provides a fast, dedicated JSON implementation for message payloads, inheriting all common features (logging, tracing, header/key handling) from BaseSerializer.

__init__(settings)

Initializes the OrjsonSerializer.

Parameters:

Name Type Description Default
settings SerializerSettings

The configuration for this serializer instance.

required

deserialize_value(data, **kwargs) async

Deserializes a JSON payload from bytes back into a Python object (dict/list).

Parameters:

Name Type Description Default
data Any

The serialized message payload, expected as bytes.

required

Returns:

Name Type Description
Any Any

The deserialized Python object.

Raises:

Type Description
DeserializationError

If orjson fails to decode the payload (e.g., invalid JSON structure).

serialize_value(value, **kwargs) async

Serializes a value into a JSON byte string using orjson.

It uses the custom default encoder to handle complex types like Pydantic models.

Parameters:

Name Type Description Default
value Any

The original message payload.

required

Returns:

Type Description
Optional[bytes]

Optional[bytes]: The serialized payload as raw bytes.

Raises:

Type Description
SerializationError

If orjson fails to serialize the object (e.g., non-serializable type).

ProtobufSerializer

nala.athomic.serializer.providers.protobuf_serializer.ProtobufSerializer

Bases: BaseSerializer

Serializes and deserializes messages using Google Protocol Buffers.

This serializer is optimized for performance and type safety. It conditionally delegates schema validation and wire format encapsulation (Confluent Wire Format) to a specialized Schema Handler (e.g., Avro, Protobuf, etc.) when configured. This design keeps the class decoupled (SRP) and focused on Protobuf operations.

Attributes:

Name Type Description
handler Optional[SchemaHandlerProtocol]

The component responsible for Schema Registry interaction and wire format handling.

__init__(settings)

Initializes the serializer.

It attempts to inject a schema handler if schema validation is enabled in the configuration.

Parameters:

Name Type Description Default
settings SerializerSettings

The specific configuration for this serializer instance.

required

deserialize_value(data, **kwargs) async

Deserializes raw bytes into a Protobuf message instance.

If the payload is in the Confluent Wire Format (starts with MAGIC_BYTE), it delegates the process to the schema handler for schema ID lookup. Otherwise, it attempts standard Protobuf deserialization.

Parameters:

Name Type Description Default
data Any

The raw message data, expected as bytes.

required
**kwargs Any

Contextual arguments (must include 'target_type' and 'topic' if schema handler is active).

{}

Returns:

Name Type Description
Any Any

The deserialized Protobuf message instance.

Raises:

Type Description
DeserializationError

If 'target_type' is missing, data is malformed, or standard deserialization fails.

serialize_value(value, **kwargs) async

Serializes a Protobuf message into raw bytes.

Delegates to the schema handler if configured to include the wire format (magic byte + schema ID). Otherwise, performs standard Protobuf serialization.

Parameters:

Name Type Description Default
value Any

The Protobuf message instance to serialize.

required
**kwargs Any

Contextual arguments (must include 'topic' if schema handler is active).

{}

Returns:

Type Description
Optional[bytes]

Optional[bytes]: The serialized payload.

Raises:

Type Description
SerializationError

If the value is not a Protobuf message or if the required 'topic' is missing when a schema handler is used.