Skip to content

File Storage Abstraction

Overview

The Storage module provides a unified, protocol-based interface for interacting with various file and object storage backends, such as the local filesystem or cloud providers like Google Cloud Storage (GCS).

This abstraction is essential for decoupling application logic from the underlying storage technology. Its primary use cases include: - Handling large message payloads via the Claim Check pattern. - General application file storage (e.g., user uploads, generated reports).


Core Concepts

StorageProtocol

This is the abstract contract that all storage providers must implement. It defines a standard set of asynchronous operations for object storage: - upload(source_path, destination_path) - download(source_path, destination_path) - delete(path) - get_url(path) - exists(path)

StorageFactory

A singleton factory that creates the configured storage provider instance. It ensures that a single, shared client is used throughout the application, managing resources efficiently.


Available Providers

LocalStorageProvider

This provider stores files on the local filesystem. It is ideal for local development and testing. A key feature is its built-in path traversal protection, which ensures that all file operations are securely contained within the configured base_path, preventing security vulnerabilities.

GcsStorageProvider

This is a production-ready provider for Google Cloud Storage (GCS). It handles authentication using either a service account JSON file or Application Default Credentials (ADC). It also supports generating temporary, secure signed URLs for accessing private objects.


Integration with Messaging: The Claim Check Pattern

A powerful feature built on the Storage module is the implementation of the Claim Check pattern for the messaging system. This pattern solves the problem of sending large messages through brokers that have size limits.

Here’s how it works: 1. You add the "claim_check" step to your messaging payload processing pipeline in your configuration. 2. When a producer sends a message, the ClaimCheckStepAdapter intercepts it. 3. If the message's size exceeds a configured threshold (e.g., 250KB), the adapter automatically uploads the large payload to the configured storage provider (e.g., GCS). 4. The original message payload is replaced with a small JSON object—the "claim check"—that contains a reference to the storage path. 5. On the consumer side, the ClaimCheckStepAdapter detects the claim check, downloads the original large payload from storage, and transparently passes it to the next step in the processing pipeline.

This entire process is seamless to the developer and provides a robust solution for handling large payloads.


Configuration

The storage provider is configured under the [storage] section in your settings.toml.

Local Storage Example

[default.storage]
enabled = true

  [default.storage.provider]
  backend = "local"
  # The base directory where files will be stored.
  base_path = "./.storage"

Google Cloud Storage (GCS) Example

[default.storage]
enabled = true

  [default.storage.provider]
  backend = "gcs"
  bucket_name = "my-gcs-bucket-name"

  # Optional: Path to your service account JSON key file.
  # If omitted, Application Default Credentials (ADC) are used.
  credentials_path = "path/to/your/gcs-credentials.json"

API Reference

nala.athomic.storage.protocol.StorageProtocol

Bases: Protocol

Protocol defining the standard interface for storage operations. Implementations will handle interaction with specific backends like local filesystem, S3, GCS, Azure Blob, etc.

delete(path) async

Deletes an object from the storage backend.

Parameters:

Name Type Description Default
path str

The path (key or blob name) of the object to delete.

required

Raises:

Type Description
StorageOperationError

If the deletion fails. (Often idempotent if object doesn't exist)

download(source_path, destination_path) async

Downloads an object from the storage backend to a local destination path.

Parameters:

Name Type Description Default
source_path str

The path (key or blob name) of the object in the storage backend.

required
destination_path str

The local path where the file should be saved.

required

Raises:

Type Description
ObjectNotFoundError

If the source_path does not exist in the storage.

StorageOperationError

If the download fails for other reasons.

exists(path) async

Checks if an object exists at the specified path in the storage backend.

Parameters:

Name Type Description Default
path str

The path (key or blob name) to check.

required

Returns:

Type Description
bool

True if the object exists, False otherwise.

Raises:

Type Description
StorageOperationError

If the check fails due to connection or permission issues.

get_url(path, expires_in=3600) async

Generates a public or pre-signed URL to access an object.

Parameters:

Name Type Description Default
path str

The path (key or blob name) of the object.

required
expires_in Optional[int]

Optional duration (in seconds) for which the URL should be valid (relevant for pre-signed URLs). Defaults to 1 hour.

3600

Returns:

Type Description
Optional[str]

The accessible URL as a string, or None if the provider cannot generate URLs

Optional[str]

or the object doesn't exist.

Raises:

Type Description
StorageOperationError

If URL generation fails.

upload(source_path, destination_path, metadata=None) async

Uploads a file from a local source path to a destination path in the storage backend.

Parameters:

Name Type Description Default
source_path str

The local path of the file to upload.

required
destination_path str

The target path (key or blob name) in the storage backend.

required
metadata Optional[Dict[str, str]]

Optional dictionary of metadata to associate with the object.

None

Raises:

Type Description
FileNotFoundError

If the source_path does not exist.

StorageOperationError

If the upload fails for other reasons (permissions, connection, etc.).

nala.athomic.storage.factory.StorageFactory

A factory responsible for creating and managing the singleton instance of the configured StorageProvider.

This factory ensures that only one instance of the storage provider (e.g., LocalStorageProvider, GcsStorageProvider) is created during the application's lifecycle, managing resource efficiency and centralized configuration access.

clear() classmethod

Clears the cached singleton instance. Intended for use in test cleanup.

create(settings=None) classmethod

Creates or retrieves the singleton instance of the configured storage provider.

The creation process validates settings, retrieves the correct provider class from the storage_registry, and instantiates it with the corresponding configuration.

Parameters:

Name Type Description Default
settings Optional[StorageSettings]

Optional explicit settings to override global configuration.

None

Returns:

Name Type Description
StorageProtocol StorageProtocol

The fully initialized singleton instance of the storage provider.

Raises:

Type Description
RuntimeError

If the storage module is disabled or configuration is missing/invalid.

ValueError

If the configured storage backend is not registered.

nala.athomic.storage.providers.local_storage.LocalStorageProvider

Bases: StorageBase

A storage provider implementation that saves and retrieves files from the local filesystem.

This provider is primarily intended for development, testing, and single-instance deployments. It inherits base lifecycle management and observability from StorageBase. All synchronous file I/O operations are wrapped using asyncio.to_thread to prevent blocking the asynchronous event loop.

It includes critical path traversal defense to ensure all operations are contained within the configured base_path.

Attributes:

Name Type Description
base_path Path

The resolved absolute path to the root storage directory.

__init__(config)

Initializes the provider and ensures the base storage directory exists.

Parameters:

Name Type Description Default
config LocalStorageProviderSettings

The configuration settings for the local storage.

required

Raises:

Type Description
StorageOperationError

If the base directory cannot be created.

nala.athomic.storage.providers.gcs_storage.GcsStorageProvider

Bases: StorageBase

A storage provider implementation for Google Cloud Storage (GCS).

This provider uses the synchronous GCS client (google.cloud.storage) and wraps all blocking I/O operations with asyncio.to_thread to maintain compatibility with the asynchronous event loop. It handles initialization with service account credentials or Application Default Credentials (ADC).

Attributes:

Name Type Description
bucket_name str

The name of the target GCS bucket.

client Client

The underlying synchronous GCS client instance.

bucket Bucket

The reference to the configured GCS bucket.

__init__(config)

Initializes the GcsStorageProvider.

It attempts to create the synchronous GCS client and resolve the target bucket.

Parameters:

Name Type Description Default
config GcsStorageProviderSettings

The configuration settings for GCS.

required

Raises:

Type Description
StorageOperationError

If the GCS client fails to initialize (e.g., due to invalid credentials).

nala.athomic.storage.payload.adapter.ClaimCheckStepAdapter

Bases: ProcessingStepProtocol

An adapter that implements the Claim Check pattern as a payload processing step.

This component is integrated into the messaging payload pipeline to handle messages whose size exceeds a configurable threshold. It stores the large payload in external blob storage (StorageProtocol) and replaces the original message body with a small "claim check" (a reference to the storage location).

Attributes:

Name Type Description
settings

Configuration for the Claim Check pattern, including the size threshold.

storage Optional[StorageProtocol]

The configured instance of the StorageProtocol (e.g., GCS, S3).

__init__(settings=None)

Initializes the adapter and resolves the storage dependency and configuration settings.

decode(data, **kwargs) async

Executes the Claim Check logic for inbound messages.

If the message contains the claim check header and payload structure, it downloads the original, large payload from storage.

Parameters:

Name Type Description Default
data bytes

The raw payload bytes received from the message broker.

required
**kwargs Any

Contextual arguments (must include headers).

{}

Returns:

Name Type Description
bytes bytes

The original, large payload bytes.

Raises:

Type Description
StorageOperationError

If the download fails or the claim check path is missing.

encode(data, **kwargs) async

Executes the Claim Check logic for outbound messages.

If the payload size exceeds the threshold, the data is uploaded to storage, and the original payload is replaced by a JSON envelope containing the path.

Parameters:

Name Type Description Default
data bytes

The raw payload bytes from the previous pipeline step.

required
**kwargs Any

Contextual arguments (must include headers).

{}

Returns:

Name Type Description
bytes bytes

The original payload or the small claim check JSON envelope.

Raises:

Type Description
StorageOperationError

If the upload operation fails.