Skip to content

Distributed Workload Sharding

Overview

Distributed Workload Sharding (also known as partitioning) is a pattern for dynamically distributing a set of tasks or data items across a cluster of active worker instances. It is a fundamental technique for achieving horizontal scalability and high availability for background processing services.

The Athomic implementation uses Consistent Hashing to ensure that the workload is distributed in a stable and predictable way.

Core Use Case

The primary use case for sharding in the Athomic Layer is the OutboxPublisher. When you have multiple instances of your service running, sharding ensures that all instances can share the work of publishing outbox events without duplicating effort or creating conflicts. Each publisher instance will claim and process only a specific subset of the pending events.


Core Technology: Consistent Hashing

Traditional sharding (e.g., using a modulo operator) can be inefficient. If a worker is added or removed, the entire distribution of work changes, causing massive reshuffling.

Athomic uses Consistent Hashing (via the uhashring library) to solve this. With consistent hashing, when a worker joins or leaves the cluster, only a small, necessary fraction of the workload is redistributed. This minimizes disruption and ensures the system remains stable during scaling events or deployments.


How It Works (Integration with Service Discovery)

The sharding mechanism is tightly integrated with a Service Discovery backend (like Consul).

  1. Registration: At startup, each worker instance that participates in a sharding group (e.g., each OutboxPublisher service) registers itself with a specific group_name (e.g., "outbox-publishers") in the service discovery backend. Each instance gets a unique ID.

  2. Discovery: When it's time to process work, the ShardingService queries the service discovery backend to get a list of all currently healthy workers registered for its group.

  3. Hashing & Filtering: A consistent hash ring is built using the IDs of the active workers. The service then takes the total list of work items (e.g., all pending aggregate_keys from the outbox) and passes them through the hash ring. It claims and processes only the items that hash to its own unique instance ID.

  4. Deregistration: On graceful shutdown, the worker deregisters itself. On the next processing cycle, the hash ring will be rebuilt by the remaining workers without the departed instance, and its workload will be automatically and stably redistributed.


Usage Example

While the OutboxPublisher uses this internally, you could use the ShardingService for any custom distributed worker.

from nala.athomic.resilience.sharding import ShardingFactory

# Get the sharding service for a specific policy defined in settings.toml
sharding_service = ShardingFactory.create(policy_name="my_custom_workers")

async def process_all_tenants():
    # 1. Get the complete list of work from a central source
    all_tenant_ids = await db.get_all_tenant_ids() # e.g., ["tenant-a", "tenant-b", ...]

    # 2. Filter the list to get only the items this instance is responsible for
    my_tenant_ids = await sharding_service.filter_owned_items(all_tenant_ids)

    print(f"This worker owns {len(my_tenant_ids)} tenants.")

    # 3. Process only the assigned workload
    for tenant_id in my_tenant_ids:
        await process_data_for(tenant_id)

Configuration

You configure sharding policies in your settings.toml under the [resilience.sharding] section. A policy simply defines a group_name, which must correspond to the name used for service discovery.

[default.resilience.sharding]
enabled = true

  # A dictionary of named sharding policies
  [default.resilience.sharding.policies]
    # This policy is used by the OutboxPublisher
    [default.resilience.sharding.policies.outbox_publishers]
    # This name must match the service name registered in your discovery backend (e.g., Consul)
    group_name = "outbox-publisher-service"

    # You can define other policies for other worker types
    [default.resilience.sharding.policies.my_custom_workers]
    group_name = "custom-worker-service"

API Reference

nala.athomic.resilience.sharding.protocol.ShardingProtocol

Bases: Protocol

Defines the contract for a generic service that distributes items among a dynamic group of workers.

This protocol abstracts the mechanism for load balancing workloads (like processing message aggregates or tasks) using consistent hashing across multiple service instances.

deregister_self() async

Deregisters the current service instance from the sharding group, allowing its workload to be redistributed to remaining active workers.

This must be called during graceful service shutdown.

filter_owned_items(items) async

Given a list of all potential items (e.g., aggregate keys), returns the subset that belongs to the current worker instance based on a consistent hashing algorithm and the current set of active workers.

Parameters:

Name Type Description Default
items List[str]

A list of all item identifiers.

required

Returns:

Type Description
List[str]

List[str]: The subset of items assigned to this worker instance.

register_self() async

Registers the current service instance as an active worker in the sharding group via a service discovery mechanism (e.g., Consul).

This must be called during service startup.

nala.athomic.resilience.sharding.service.ShardingService

Bases: ShardingProtocol

Orchestrates distributed workload assignment for a group of workers using consistent hashing and a Service Discovery backend.

This service ensures that a set of items (e.g., message aggregate keys or tasks) is distributed stably among currently active service instances (workers).

Attributes:

Name Type Description
policy_settings ShardingPolicySettings

Configuration defining the target worker group name.

server_settings ServerSettings

Configuration defining the host and port for registration.

group_name str

The name used for service discovery (e.g., 'outbox-publishers').

instance_id str

A unique identifier for this specific worker instance.

discovery ServiceDiscoveryProtocol

The client for service discovery operations.

__init__(policy_settings, server_settings)

Initializes the ShardingService with policy and network configuration.

Parameters:

Name Type Description Default
policy_settings ShardingPolicySettings

The sharding policy configuration.

required
server_settings ServerSettings

The server host and port configuration.

required

deregister_self() async

Deregisters the current service instance from the sharding group during graceful shutdown.

filter_owned_items(items) async

Filters a list of items, returning only those mathematically owned by this worker instance based on the consistent hash ring.

Parameters:

Name Type Description Default
items List[str]

The list of all item identifiers (e.g., aggregate keys).

required

Returns:

Type Description
List[str]

List[str]: The subset of items assigned to this worker.

register_self() async

Registers the current service instance as an active worker in the group via the Service Discovery provider.

nala.athomic.resilience.sharding.factory.ShardingFactory

Manages singleton instances of ShardingService, one per configured policy.

clear() classmethod

Clears all cached instances. Used for testing.

create(policy_name) classmethod

Creates and returns a singleton instance of the ShardingService for a specific, named policy defined in the configuration.