Distributed Workload Sharding
Overview
Distributed Workload Sharding (also known as partitioning) is a pattern for dynamically distributing a set of tasks or data items across a cluster of active worker instances. It is a fundamental technique for achieving horizontal scalability and high availability for background processing services.
The Athomic implementation uses Consistent Hashing to ensure that the workload is distributed in a stable and predictable way.
Core Use Case
The primary use case for sharding in the Athomic Layer is the OutboxPublisher. When you have multiple instances of your service running, sharding ensures that all instances can share the work of publishing outbox events without duplicating effort or creating conflicts. Each publisher instance will claim and process only a specific subset of the pending events.
Core Technology: Consistent Hashing
Traditional sharding (e.g., using a modulo operator) can be inefficient. If a worker is added or removed, the entire distribution of work changes, causing massive reshuffling.
Athomic uses Consistent Hashing (via the uhashring library) to solve this. With consistent hashing, when a worker joins or leaves the cluster, only a small, necessary fraction of the workload is redistributed. This minimizes disruption and ensures the system remains stable during scaling events or deployments.
How It Works (Integration with Service Discovery)
The sharding mechanism is tightly integrated with a Service Discovery backend (like Consul).
-
Registration: At startup, each worker instance that participates in a sharding group (e.g., each
OutboxPublisherservice) registers itself with a specificgroup_name(e.g.,"outbox-publishers") in the service discovery backend. Each instance gets a unique ID. -
Discovery: When it's time to process work, the
ShardingServicequeries the service discovery backend to get a list of all currently healthy workers registered for its group. -
Hashing & Filtering: A consistent hash ring is built using the IDs of the active workers. The service then takes the total list of work items (e.g., all pending
aggregate_keys from the outbox) and passes them through the hash ring. It claims and processes only the items that hash to its own unique instance ID. -
Deregistration: On graceful shutdown, the worker deregisters itself. On the next processing cycle, the hash ring will be rebuilt by the remaining workers without the departed instance, and its workload will be automatically and stably redistributed.
Usage Example
While the OutboxPublisher uses this internally, you could use the ShardingService for any custom distributed worker.
from nala.athomic.resilience.sharding import ShardingFactory
# Get the sharding service for a specific policy defined in settings.toml
sharding_service = ShardingFactory.create(policy_name="my_custom_workers")
async def process_all_tenants():
# 1. Get the complete list of work from a central source
all_tenant_ids = await db.get_all_tenant_ids() # e.g., ["tenant-a", "tenant-b", ...]
# 2. Filter the list to get only the items this instance is responsible for
my_tenant_ids = await sharding_service.filter_owned_items(all_tenant_ids)
print(f"This worker owns {len(my_tenant_ids)} tenants.")
# 3. Process only the assigned workload
for tenant_id in my_tenant_ids:
await process_data_for(tenant_id)
Configuration
You configure sharding policies in your settings.toml under the [resilience.sharding] section. A policy simply defines a group_name, which must correspond to the name used for service discovery.
[default.resilience.sharding]
enabled = true
# A dictionary of named sharding policies
[default.resilience.sharding.policies]
# This policy is used by the OutboxPublisher
[default.resilience.sharding.policies.outbox_publishers]
# This name must match the service name registered in your discovery backend (e.g., Consul)
group_name = "outbox-publisher-service"
# You can define other policies for other worker types
[default.resilience.sharding.policies.my_custom_workers]
group_name = "custom-worker-service"
API Reference
nala.athomic.resilience.sharding.protocol.ShardingProtocol
Bases: Protocol
Defines the contract for a generic service that distributes items among a dynamic group of workers.
This protocol abstracts the mechanism for load balancing workloads (like processing message aggregates or tasks) using consistent hashing across multiple service instances.
deregister_self()
async
Deregisters the current service instance from the sharding group, allowing its workload to be redistributed to remaining active workers.
This must be called during graceful service shutdown.
filter_owned_items(items)
async
Given a list of all potential items (e.g., aggregate keys), returns the subset that belongs to the current worker instance based on a consistent hashing algorithm and the current set of active workers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
List[str]
|
A list of all item identifiers. |
required |
Returns:
| Type | Description |
|---|---|
List[str]
|
List[str]: The subset of items assigned to this worker instance. |
register_self()
async
Registers the current service instance as an active worker in the sharding group via a service discovery mechanism (e.g., Consul).
This must be called during service startup.
nala.athomic.resilience.sharding.service.ShardingService
Bases: ShardingProtocol
Orchestrates distributed workload assignment for a group of workers using consistent hashing and a Service Discovery backend.
This service ensures that a set of items (e.g., message aggregate keys or tasks) is distributed stably among currently active service instances (workers).
Attributes:
| Name | Type | Description |
|---|---|---|
policy_settings |
ShardingPolicySettings
|
Configuration defining the target worker group name. |
server_settings |
ServerSettings
|
Configuration defining the host and port for registration. |
group_name |
str
|
The name used for service discovery (e.g., 'outbox-publishers'). |
instance_id |
str
|
A unique identifier for this specific worker instance. |
discovery |
ServiceDiscoveryProtocol
|
The client for service discovery operations. |
__init__(policy_settings, server_settings)
Initializes the ShardingService with policy and network configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
policy_settings
|
ShardingPolicySettings
|
The sharding policy configuration. |
required |
server_settings
|
ServerSettings
|
The server host and port configuration. |
required |
deregister_self()
async
Deregisters the current service instance from the sharding group during graceful shutdown.
filter_owned_items(items)
async
Filters a list of items, returning only those mathematically owned by this worker instance based on the consistent hash ring.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
List[str]
|
The list of all item identifiers (e.g., aggregate keys). |
required |
Returns:
| Type | Description |
|---|---|
List[str]
|
List[str]: The subset of items assigned to this worker. |
register_self()
async
Registers the current service instance as an active worker in the group via the Service Discovery provider.
nala.athomic.resilience.sharding.factory.ShardingFactory
Manages singleton instances of ShardingService, one per configured policy.
clear()
classmethod
Clears all cached instances. Used for testing.
create(policy_name)
classmethod
Creates and returns a singleton instance of the ShardingService for a specific, named policy defined in the configuration.