Key-Value Stores
Overview
The Key-Value (KV) Store module provides a powerful and extensible abstraction layer for interacting with key-value data stores like Redis. It is a fundamental building block used by many other Athomic modules, including:
- Caching: For storing the results of expensive operations.
- Rate Limiting: For tracking request counts.
- Feature Flags: For fetching flag configurations.
- Distributed Locking: For managing lock states.
- Sagas & Schedulers: For persisting state and scheduled tasks.
The architecture is designed to be highly flexible, featuring a protocol-based design, multiple backend providers, and a powerful wrapper system that allows for adding cross-cutting functionality declaratively.
Core Concepts
KVStoreProtocol
This is the contract that all KV store providers must implement. It defines a rich set of asynchronous operations, including not only basic CRUD (get, set, delete, exists) but also advanced commands for sorted sets (zadd, zrem, zpopbyscore), which are critical for implementing components like the task scheduler.
KVStoreFactory
This is the main entry point for creating a KV store client. The factory is responsible for:
1. Reading the configuration for a named connection.
2. Instantiating the correct base provider (e.g., RedisKVClient) using a registry.
3. Applying a chain of wrappers to the base provider, adding functionality like multi-tenancy and default TTLs.
Wrapper (Decorator) Pattern
A key architectural feature of the KV Store module is its use of the Decorator pattern. You can "wrap" a base client with additional functionality defined in your configuration. This makes the system extremely flexible. For example, you can add multi-tenant key resolution to any KV store (Redis, in-memory, or a future DynamoDB provider) just by adding a wrapper to the configuration.
Available Providers
RedisKVClient: The primary, production-ready provider for Redis. It uses the high-performanceredis-pyasync client and supports the fullKVStoreProtocol, including atomic sorted set operations implemented with Lua scripting for maximum safety.LocalKVClient: A simple, in-memory provider that simulates a KV store using Python dictionaries. It is perfect for fast, dependency-free unit and integration testing.
Available Wrappers
KeyResolvingKVClient: This wrapper automatically prepends contextual prefixes to all keys before they hit the database. It uses theContextKeyGeneratorto include thenamespace,tenant_id, anduser_id(if configured), making multi-tenancy transparent.DefaultTTLKvClient: This wrapper automatically applies a default Time-To-Live (TTL) to allsetoperations if one is not explicitly provided in the method call. This is useful for ensuring that cache keys, for example, always expire.
Configuration
The true power of this module is visible in its configuration. You define a base provider and then apply a list of wrappers to it.
# In settings.toml, under [default.database.kvstore]
[default.database.kvstore.default_redis]
enabled = true
# The namespace is used by the KeyResolvingKVClient wrapper
namespace = "cache"
# 1. Define the base provider (e.g., Redis)
[default.database.kvstore.default_redis.provider]
backend = "redis"
uri = "redis://localhost:6379/0"
# 2. Define an ordered list of wrappers to apply
[[default.database.kvstore.default_redis.wrappers]]
name = "default_ttl" # This must match a name in the KVStoreWrapperRegistry
enabled = true
# Custom settings for this wrapper instance
[default.database.kvstore.default_redis.wrappers.config]
default_ttl_seconds = 3600 # 1 hour
[[default.database.kvstore.default_redis.wrappers]]
name = "key_resolver"
enabled = true
# The 'namespace' from the top level will be used by this wrapper
[default.database.kvstore.default_redis.wrappers.config]
# No extra config needed here, it uses the parent's namespace
In this example, when a set("my-key", "value") call is made, the final key in Redis will look something like nala:tenant-123:cache:my-key, and it will have a default TTL of 1 hour.
API Reference
nala.athomic.database.kvstore.protocol.KVStoreProtocol
Bases: BaseServiceProtocol, Protocol
Defines the contract for an asynchronous Key-Value store provider.
This protocol specifies the standard interface for all key-value store implementations within the framework. It includes basic CRUD operations, sorted set commands, and lifecycle management. Any class that conforms to this protocol can be used as a KV store backend for features like caching, batch operate, rate limiting, and feature flags.
clear()
async
Removes all keys from the current database.
close()
async
Closes the connection to the backend service.
connect()
async
Establishes the connection to the backend service.
delete(key)
async
Deletes a key from the store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key to delete. |
required |
delete_many(keys)
async
Deletes multiple keys in a single operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
List[str]
|
A list of keys to delete. |
required |
Returns:
| Type | Description |
|---|---|
int
|
The number of keys actually deleted. |
exists(key)
async
Checks if a key exists in the store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the key exists, False otherwise. |
get(key)
async
Retrieves a value by its key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key of the item to retrieve. |
required |
Returns:
| Type | Description |
|---|---|
Optional[Any]
|
The deserialized value, or None if the key is not found. |
get_final_client()
async
Returns the underlying raw asynchronous client instance.
This provides an escape hatch to access provider-specific features not covered by the protocol (e.g., calling a unique Redis command).
Returns:
| Type | Description |
|---|---|
Any
|
The raw provider-specific async client instance (e.g., |
get_sync_client()
Returns an underlying raw synchronous client instance, if applicable.
Returns:
| Type | Description |
|---|---|
Any
|
The raw provider-specific sync client instance (e.g., |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If a synchronous client is not available or supported by the provider. |
hdel(key, fields)
async
Removes the specified fields from the hash stored at key.
hgetall(key)
async
Returns all fields and values of the hash stored at key.
hset(key, field, value)
async
Sets field in the hash stored at key to value.
increment(key, amount=1)
async
Atomically increments the integer value of a key by a given amount.
If the key does not exist, it is set to 0 before performing the operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key of the item to increment. |
required |
amount
|
int
|
The amount to increment by. Defaults to 1. |
1
|
Returns:
| Type | Description |
|---|---|
int
|
The value of the key after the increment. |
is_available()
async
Checks if the provider is connected and ready to accept operations.
Returns:
| Type | Description |
|---|---|
bool
|
True if the service is ready, False otherwise. |
set(key, value, ttl=None, nx=False)
async
Sets a key-value pair, with an optional TTL.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key of the item to set. |
required |
value
|
Any
|
The value to store. It will be serialized by the provider. |
required |
ttl
|
Optional[int]
|
Optional time-to-live for the key in seconds. |
None
|
nx
|
bool
|
If True, set the key only if it does not already exist. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the operation was successful. |
set_many(mapping, ttl=None, nx=False)
async
Sets multiple key-value pairs in a single operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mapping
|
Dict[str, Any]
|
A dictionary of key-value pairs to store. |
required |
ttl
|
Optional[int]
|
Optional TTL in seconds applied to all keys in this batch. |
None
|
nx
|
bool
|
If True, only sets keys that do not exist (atomic per key). |
False
|
Returns:
| Type | Description |
|---|---|
Dict[str, bool]
|
A dictionary mapping each key to a boolean indicating success. |
zadd(key, mapping)
async
Adds one or more members with scores to a sorted set.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key of the sorted set. |
required |
mapping
|
Mapping[str, float]
|
A dictionary of member-score pairs to add. |
required |
zpopbyscore(key, max_score)
async
Atomically removes and returns the member with the lowest score.
The member returned is the one with the lowest score that is less than
or equal to the max_score.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key of the sorted set. |
required |
max_score
|
float
|
The maximum score to consider for popping a member. |
required |
Returns:
| Type | Description |
|---|---|
Optional[str]
|
The member name as a string, or None if no qualifying members exist. |
zrangebyscore(key, min_score, max_score)
async
Returns members of a sorted set within a score range.
Retrieves all members in a sorted set with scores between min_score
and max_score (inclusive).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key of the sorted set. |
required |
min_score
|
float
|
The minimum score of the range. |
required |
max_score
|
float
|
The maximum score of the range. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
A list of member names. |
zrem(key, members)
async
Removes one or more members from a sorted set.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key of the sorted set. |
required |
members
|
list[str]
|
A list of members to remove. |
required |
Returns:
| Type | Description |
|---|---|
int
|
The number of members that were removed from the set. |
nala.athomic.database.kvstore.factory.KVStoreFactory
Bases: FactoryProtocol
create(settings)
classmethod
Factory that creates a KVStore client based on the provided KVStoreSettings. Applies wrappers as defined INSIDE the kv_config.
nala.athomic.database.kvstore.providers.redis.client.RedisKVClient
Bases: BaseKVStore, CredentialResolve
A concrete KVStoreProvider implementation for Redis using the aioredis library.
This class manages the asynchronous connection lifecycle, handles low-level Redis commands (including atomic operations via Lua scripts), and securely resolves connection credentials.
Attributes:
| Name | Type | Description |
|---|---|---|
settings |
RedisSettings
|
The Redis-specific configuration settings. |
_client |
Redis | None
|
The underlying asynchronous Redis client instance. |
_claim_script_sha |
Optional[str]
|
The SHA hash of the Lua script used for ZPOPBYSCORE. |
__init__(settings=None, serializer=None)
Initializes the RedisKVClient.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
Optional[KVStoreSettings]
|
The configuration for this KV store instance. |
None
|
serializer
|
Optional[SerializerProtocol]
|
The serializer used for values. |
None
|
get_final_client()
async
Returns the raw asynchronous Redis client instance (Public Contract).
get_sync_client()
Creates and returns a NEW instance of the synchronous Redis client (Public Contract).
This instance is not managed by the provider's lifecycle.
Raises:
| Type | Description |
|---|---|
ValueError
|
If the Redis URI is not available for connection string construction. |
RuntimeError
|
If the synchronous client creation fails. |
nala.athomic.database.kvstore.wrappers.key_resolver_kv_client.KeyResolvingKVClient
Bases: WrapperBase
A decorator (Wrapper) for the KVStore client that dynamically resolves keys using a ContextKeyGenerator.
This wrapper applies a contextual prefix (e.g., namespace, tenant ID, user ID) to the logical key provided by the caller before forwarding the operation to the underlying KV store. This is essential for supporting multi-tenancy and avoiding key collisions in shared storage systems.
Attributes:
| Name | Type | Description |
|---|---|---|
resolver |
ContextKeyGenerator
|
The utility used to generate context-aware keys. |
__init__(client, settings=None, wrapper_settings=None, context_settings=None)
Initializes the KeyResolvingKVClient and sets up the ContextKeyGenerator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
KVStoreProtocol
|
The underlying KVStore client being wrapped. |
required |
settings
|
Optional[KVStoreSettings]
|
The global KVStore settings. |
None
|
wrapper_settings
|
Optional[KeyResolvingWrapperSettings]
|
Specific settings for this wrapper. |
None
|
context_settings
|
Optional[ContextSettings]
|
Optional settings for the key generator context. |
None
|
delete(key)
async
Deletes a key after resolving the contextual key.
delete_many(keys)
async
Deletes multiple keys after resolving their contextual keys.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
List[str]
|
The list of logical keys to delete. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
The number of keys successfully deleted. |
exists(key)
async
Checks for key existence after resolving the contextual key.
get(key)
async
Retrieves a value after resolving the contextual key.
hdel(key, fields)
async
Delegates the hdel operation after resolving the contextual key.
hgetall(key)
async
Delegates the hgetall operation after resolving the contextual key.
hset(key, field, value)
async
Delegates the hset operation after resolving the contextual key.
set(key, value, ttl=None, nx=False)
async
Sets a value after resolving the contextual key.
set_many(mapping, ttl=None, nx=False)
async
Sets multiple values after resolving their contextual keys.
It resolves all keys in the batch, forwards the operation to the client, and then maps the results back to the original keys so the caller remains agnostic of the prefixes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mapping
|
Dict[str, Any]
|
A dictionary of logical key-value pairs. |
required |
ttl
|
Optional[int]
|
Time-To-Live in seconds. |
None
|
nx
|
bool
|
If True, set only if the key does not exist. |
False
|
Returns:
| Type | Description |
|---|---|
Dict[str, bool]
|
Dict[str, bool]: A map of logical keys to success status. |
zadd(key, mapping)
async
Delegates the zadd operation after resolving the contextual key.
zpopbyscore(key, max_score)
async
Delegates the zpopbyscore operation after resolving the contextual key.
zrangebyscore(key, min_score, max_score)
async
Delegates the zrangebyscore operation after resolving the contextual key.
zrem(key, members)
async
Delegates the zrem operation after resolving the contextual key.
nala.athomic.database.kvstore.wrappers.default_ttl_kv_client.DefaultTTLKvClient
Bases: WrapperBase
A decorator (Wrapper) for the KVStore client that automatically applies a default Time-To-Live (TTL).
This class intercepts the set operation and injects a configured default TTL
if no explicit TTL is provided by the caller. It ensures that keys are automatically
expired, preventing cache overflow and maintaining consistency.
Attributes:
| Name | Type | Description |
|---|---|---|
default_ttl |
Optional[int]
|
The default TTL in seconds extracted from configuration, or None. |
__init__(client, settings=None, wrapper_settings=None)
Initializes the wrapper, resolving the configured default TTL value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
KVStoreProtocol
|
The underlying KVStore client being wrapped. |
required |
settings
|
Optional[KVStoreSettings]
|
Optional global KVStore settings. |
None
|
wrapper_settings
|
Optional[DefaultTTLWrapperSettings]
|
Specific settings for this wrapper instance. |
None
|
set(key, value, ttl=None, nx=False)
async
Sets a key-value pair, applying the default TTL if no explicit TTL is given.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key of the item to set. |
required |
value
|
Any
|
The value to store. |
required |
ttl
|
Optional[int]
|
Explicit time-to-live in seconds provided by the caller. |
None
|
nx
|
bool
|
If True, set the key only if it does not already exist. |
False
|
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if the operation was successful. |
set_many(mapping, ttl=None, nx=False)
async
Sets multiple key-value pairs, applying the default TTL if no explicit TTL is given.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mapping
|
Dict[str, Any]
|
The dictionary of keys and values. |
required |
ttl
|
Optional[int]
|
Explicit time-to-live in seconds for the batch. |
None
|
nx
|
bool
|
If True, set keys only if they do not exist. |
False
|
Returns:
| Type | Description |
|---|---|
Dict[str, bool]
|
Dict[str, bool]: The result map from the underlying client. |
zadd(key, mapping)
async
Delegates the zadd operation to the wrapped client.
zpopbyscore(key, max_score)
async
Delegates the zpopbyscore operation to the wrapped client.
zrangebyscore(key, min_score, max_score)
async
Delegates the zrangebyscore operation to the wrapped client.
zrem(key, members)
async
Delegates the zrem operation to the wrapped client.