Skip to content

Key-Value Stores

Overview

The Key-Value (KV) Store module provides a powerful and extensible abstraction layer for interacting with key-value data stores like Redis. It is a fundamental building block used by many other Athomic modules, including:

  • Caching: For storing the results of expensive operations.
  • Rate Limiting: For tracking request counts.
  • Feature Flags: For fetching flag configurations.
  • Distributed Locking: For managing lock states.
  • Sagas & Schedulers: For persisting state and scheduled tasks.

The architecture is designed to be highly flexible, featuring a protocol-based design, multiple backend providers, and a powerful wrapper system that allows for adding cross-cutting functionality declaratively.


Core Concepts

KVStoreProtocol

This is the contract that all KV store providers must implement. It defines a rich set of asynchronous operations, including not only basic CRUD (get, set, delete, exists) but also advanced commands for sorted sets (zadd, zrem, zpopbyscore), which are critical for implementing components like the task scheduler.

KVStoreFactory

This is the main entry point for creating a KV store client. The factory is responsible for: 1. Reading the configuration for a named connection. 2. Instantiating the correct base provider (e.g., RedisKVClient) using a registry. 3. Applying a chain of wrappers to the base provider, adding functionality like multi-tenancy and default TTLs.

Wrapper (Decorator) Pattern

A key architectural feature of the KV Store module is its use of the Decorator pattern. You can "wrap" a base client with additional functionality defined in your configuration. This makes the system extremely flexible. For example, you can add multi-tenant key resolution to any KV store (Redis, in-memory, or a future DynamoDB provider) just by adding a wrapper to the configuration.


Available Providers

  • RedisKVClient: The primary, production-ready provider for Redis. It uses the high-performance redis-py async client and supports the full KVStoreProtocol, including atomic sorted set operations implemented with Lua scripting for maximum safety.
  • LocalKVClient: A simple, in-memory provider that simulates a KV store using Python dictionaries. It is perfect for fast, dependency-free unit and integration testing.

Available Wrappers

  • KeyResolvingKVClient: This wrapper automatically prepends contextual prefixes to all keys before they hit the database. It uses the ContextKeyGenerator to include the namespace, tenant_id, and user_id (if configured), making multi-tenancy transparent.
  • DefaultTTLKvClient: This wrapper automatically applies a default Time-To-Live (TTL) to all set operations if one is not explicitly provided in the method call. This is useful for ensuring that cache keys, for example, always expire.

Configuration

The true power of this module is visible in its configuration. You define a base provider and then apply a list of wrappers to it.

# In settings.toml, under [default.database.kvstore]

[default.database.kvstore.default_redis]
enabled = true
# The namespace is used by the KeyResolvingKVClient wrapper
namespace = "cache"

  # 1. Define the base provider (e.g., Redis)
  [default.database.kvstore.default_redis.provider]
  backend = "redis"
  uri = "redis://localhost:6379/0"

  # 2. Define an ordered list of wrappers to apply
  [[default.database.kvstore.default_redis.wrappers]]
  name = "default_ttl" # This must match a name in the KVStoreWrapperRegistry
  enabled = true
    # Custom settings for this wrapper instance
    [default.database.kvstore.default_redis.wrappers.config]
    default_ttl_seconds = 3600 # 1 hour

  [[default.database.kvstore.default_redis.wrappers]]
  name = "key_resolver"
  enabled = true
    # The 'namespace' from the top level will be used by this wrapper
    [default.database.kvstore.default_redis.wrappers.config]
    # No extra config needed here, it uses the parent's namespace

In this example, when a set("my-key", "value") call is made, the final key in Redis will look something like nala:tenant-123:cache:my-key, and it will have a default TTL of 1 hour.


API Reference

nala.athomic.database.kvstore.protocol.KVStoreProtocol

Bases: BaseServiceProtocol, Protocol

Defines the contract for an asynchronous Key-Value store provider.

This protocol specifies the standard interface for all key-value store implementations within the framework. It includes basic CRUD operations, sorted set commands, and lifecycle management. Any class that conforms to this protocol can be used as a KV store backend for features like caching, batch operate, rate limiting, and feature flags.

clear() async

Removes all keys from the current database.

close() async

Closes the connection to the backend service.

connect() async

Establishes the connection to the backend service.

delete(key) async

Deletes a key from the store.

Parameters:

Name Type Description Default
key str

The key to delete.

required

delete_many(keys) async

Deletes multiple keys in a single operation.

Parameters:

Name Type Description Default
keys List[str]

A list of keys to delete.

required

Returns:

Type Description
int

The number of keys actually deleted.

exists(key) async

Checks if a key exists in the store.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Type Description
bool

True if the key exists, False otherwise.

get(key) async

Retrieves a value by its key.

Parameters:

Name Type Description Default
key str

The key of the item to retrieve.

required

Returns:

Type Description
Optional[Any]

The deserialized value, or None if the key is not found.

get_final_client() async

Returns the underlying raw asynchronous client instance.

This provides an escape hatch to access provider-specific features not covered by the protocol (e.g., calling a unique Redis command).

Returns:

Type Description
Any

The raw provider-specific async client instance (e.g., redis.asyncio.Redis).

get_sync_client()

Returns an underlying raw synchronous client instance, if applicable.

Returns:

Type Description
Any

The raw provider-specific sync client instance (e.g., redis.Redis).

Raises:

Type Description
NotImplementedError

If a synchronous client is not available or supported by the provider.

hdel(key, fields) async

Removes the specified fields from the hash stored at key.

hgetall(key) async

Returns all fields and values of the hash stored at key.

hset(key, field, value) async

Sets field in the hash stored at key to value.

increment(key, amount=1) async

Atomically increments the integer value of a key by a given amount.

If the key does not exist, it is set to 0 before performing the operation.

Parameters:

Name Type Description Default
key str

The key of the item to increment.

required
amount int

The amount to increment by. Defaults to 1.

1

Returns:

Type Description
int

The value of the key after the increment.

is_available() async

Checks if the provider is connected and ready to accept operations.

Returns:

Type Description
bool

True if the service is ready, False otherwise.

set(key, value, ttl=None, nx=False) async

Sets a key-value pair, with an optional TTL.

Parameters:

Name Type Description Default
key str

The key of the item to set.

required
value Any

The value to store. It will be serialized by the provider.

required
ttl Optional[int]

Optional time-to-live for the key in seconds.

None
nx bool

If True, set the key only if it does not already exist.

False

Returns:

Type Description
bool

True if the operation was successful.

set_many(mapping, ttl=None, nx=False) async

Sets multiple key-value pairs in a single operation.

Parameters:

Name Type Description Default
mapping Dict[str, Any]

A dictionary of key-value pairs to store.

required
ttl Optional[int]

Optional TTL in seconds applied to all keys in this batch.

None
nx bool

If True, only sets keys that do not exist (atomic per key).

False

Returns:

Type Description
Dict[str, bool]

A dictionary mapping each key to a boolean indicating success.

zadd(key, mapping) async

Adds one or more members with scores to a sorted set.

Parameters:

Name Type Description Default
key str

The key of the sorted set.

required
mapping Mapping[str, float]

A dictionary of member-score pairs to add.

required

zpopbyscore(key, max_score) async

Atomically removes and returns the member with the lowest score.

The member returned is the one with the lowest score that is less than or equal to the max_score.

Parameters:

Name Type Description Default
key str

The key of the sorted set.

required
max_score float

The maximum score to consider for popping a member.

required

Returns:

Type Description
Optional[str]

The member name as a string, or None if no qualifying members exist.

zrangebyscore(key, min_score, max_score) async

Returns members of a sorted set within a score range.

Retrieves all members in a sorted set with scores between min_score and max_score (inclusive).

Parameters:

Name Type Description Default
key str

The key of the sorted set.

required
min_score float

The minimum score of the range.

required
max_score float

The maximum score of the range.

required

Returns:

Type Description
list[str]

A list of member names.

zrem(key, members) async

Removes one or more members from a sorted set.

Parameters:

Name Type Description Default
key str

The key of the sorted set.

required
members list[str]

A list of members to remove.

required

Returns:

Type Description
int

The number of members that were removed from the set.

nala.athomic.database.kvstore.factory.KVStoreFactory

Bases: FactoryProtocol

create(settings) classmethod

Factory that creates a KVStore client based on the provided KVStoreSettings. Applies wrappers as defined INSIDE the kv_config.

nala.athomic.database.kvstore.providers.redis.client.RedisKVClient

Bases: BaseKVStore, CredentialResolve

A concrete KVStoreProvider implementation for Redis using the aioredis library.

This class manages the asynchronous connection lifecycle, handles low-level Redis commands (including atomic operations via Lua scripts), and securely resolves connection credentials.

Attributes:

Name Type Description
settings RedisSettings

The Redis-specific configuration settings.

_client Redis | None

The underlying asynchronous Redis client instance.

_claim_script_sha Optional[str]

The SHA hash of the Lua script used for ZPOPBYSCORE.

__init__(settings=None, serializer=None)

Initializes the RedisKVClient.

Parameters:

Name Type Description Default
settings Optional[KVStoreSettings]

The configuration for this KV store instance.

None
serializer Optional[SerializerProtocol]

The serializer used for values.

None

get_final_client() async

Returns the raw asynchronous Redis client instance (Public Contract).

get_sync_client()

Creates and returns a NEW instance of the synchronous Redis client (Public Contract).

This instance is not managed by the provider's lifecycle.

Raises:

Type Description
ValueError

If the Redis URI is not available for connection string construction.

RuntimeError

If the synchronous client creation fails.

nala.athomic.database.kvstore.wrappers.key_resolver_kv_client.KeyResolvingKVClient

Bases: WrapperBase

A decorator (Wrapper) for the KVStore client that dynamically resolves keys using a ContextKeyGenerator.

This wrapper applies a contextual prefix (e.g., namespace, tenant ID, user ID) to the logical key provided by the caller before forwarding the operation to the underlying KV store. This is essential for supporting multi-tenancy and avoiding key collisions in shared storage systems.

Attributes:

Name Type Description
resolver ContextKeyGenerator

The utility used to generate context-aware keys.

__init__(client, settings=None, wrapper_settings=None, context_settings=None)

Initializes the KeyResolvingKVClient and sets up the ContextKeyGenerator.

Parameters:

Name Type Description Default
client KVStoreProtocol

The underlying KVStore client being wrapped.

required
settings Optional[KVStoreSettings]

The global KVStore settings.

None
wrapper_settings Optional[KeyResolvingWrapperSettings]

Specific settings for this wrapper.

None
context_settings Optional[ContextSettings]

Optional settings for the key generator context.

None

delete(key) async

Deletes a key after resolving the contextual key.

delete_many(keys) async

Deletes multiple keys after resolving their contextual keys.

Parameters:

Name Type Description Default
keys List[str]

The list of logical keys to delete.

required

Returns:

Name Type Description
int int

The number of keys successfully deleted.

exists(key) async

Checks for key existence after resolving the contextual key.

get(key) async

Retrieves a value after resolving the contextual key.

hdel(key, fields) async

Delegates the hdel operation after resolving the contextual key.

hgetall(key) async

Delegates the hgetall operation after resolving the contextual key.

hset(key, field, value) async

Delegates the hset operation after resolving the contextual key.

set(key, value, ttl=None, nx=False) async

Sets a value after resolving the contextual key.

set_many(mapping, ttl=None, nx=False) async

Sets multiple values after resolving their contextual keys.

It resolves all keys in the batch, forwards the operation to the client, and then maps the results back to the original keys so the caller remains agnostic of the prefixes.

Parameters:

Name Type Description Default
mapping Dict[str, Any]

A dictionary of logical key-value pairs.

required
ttl Optional[int]

Time-To-Live in seconds.

None
nx bool

If True, set only if the key does not exist.

False

Returns:

Type Description
Dict[str, bool]

Dict[str, bool]: A map of logical keys to success status.

zadd(key, mapping) async

Delegates the zadd operation after resolving the contextual key.

zpopbyscore(key, max_score) async

Delegates the zpopbyscore operation after resolving the contextual key.

zrangebyscore(key, min_score, max_score) async

Delegates the zrangebyscore operation after resolving the contextual key.

zrem(key, members) async

Delegates the zrem operation after resolving the contextual key.

nala.athomic.database.kvstore.wrappers.default_ttl_kv_client.DefaultTTLKvClient

Bases: WrapperBase

A decorator (Wrapper) for the KVStore client that automatically applies a default Time-To-Live (TTL).

This class intercepts the set operation and injects a configured default TTL if no explicit TTL is provided by the caller. It ensures that keys are automatically expired, preventing cache overflow and maintaining consistency.

Attributes:

Name Type Description
default_ttl Optional[int]

The default TTL in seconds extracted from configuration, or None.

__init__(client, settings=None, wrapper_settings=None)

Initializes the wrapper, resolving the configured default TTL value.

Parameters:

Name Type Description Default
client KVStoreProtocol

The underlying KVStore client being wrapped.

required
settings Optional[KVStoreSettings]

Optional global KVStore settings.

None
wrapper_settings Optional[DefaultTTLWrapperSettings]

Specific settings for this wrapper instance.

None

set(key, value, ttl=None, nx=False) async

Sets a key-value pair, applying the default TTL if no explicit TTL is given.

Parameters:

Name Type Description Default
key str

The key of the item to set.

required
value Any

The value to store.

required
ttl Optional[int]

Explicit time-to-live in seconds provided by the caller.

None
nx bool

If True, set the key only if it does not already exist.

False

Returns:

Name Type Description
bool bool

True if the operation was successful.

set_many(mapping, ttl=None, nx=False) async

Sets multiple key-value pairs, applying the default TTL if no explicit TTL is given.

Parameters:

Name Type Description Default
mapping Dict[str, Any]

The dictionary of keys and values.

required
ttl Optional[int]

Explicit time-to-live in seconds for the batch.

None
nx bool

If True, set keys only if they do not exist.

False

Returns:

Type Description
Dict[str, bool]

Dict[str, bool]: The result map from the underlying client.

zadd(key, mapping) async

Delegates the zadd operation to the wrapped client.

zpopbyscore(key, max_score) async

Delegates the zpopbyscore operation to the wrapped client.

zrangebyscore(key, min_score, max_score) async

Delegates the zrangebyscore operation to the wrapped client.

zrem(key, members) async

Delegates the zrem operation to the wrapped client.