Distributed Leasing
Overview
Distributed Leasing is an advanced resilience pattern used for leader election or to ensure a single service instance has exclusive ownership of a long-running task or resource. It is a more sophisticated form of distributed locking.
The key difference between a lease and a lock is that a lease is time-based and requires the holder to actively maintain ownership by sending periodic "heartbeats" to renew it. If the leaseholder crashes or becomes unresponsive, its lease automatically expires after a configured duration, allowing another healthy instance to acquire it and take over the task.
This pattern is critical for building fault-tolerant, active-passive systems. A primary use case within the Athomic Layer is in the OutboxPublisher, where leasing ensures that only one publisher instance is processing events for a specific aggregate_key at any given time.
Key Features
- Exclusive Ownership: Guarantees that only one worker in a cluster can operate on a leased resource.
- Fault Tolerance: Leases automatically expire, allowing a new worker to take over if the current owner fails.
- Automatic Renewal: An active lease is automatically renewed in a background task (heartbeat) as long as the worker is healthy.
- Simple Interface: Implemented via a clean
async withcontext manager.
How It Works
-
Acquisition: A worker requests a lease on a specific resource by calling
lease_manager.acquire("resource_id")inside anasync withblock. The manager attempts to atomically acquire the lease from the backend (Redis) using aSET NX EXcommand. -
Heartbeat: If the lease is acquired successfully, the
LeaseManageryields aLeaseHolderobject. This object immediately starts a backgroundasyncio.Taskwhich periodically sends a heartbeat to the backend, renewing the lease before it expires. -
Execution: The code inside the
async withblock executes. The worker can now safely perform its task, confident that it has exclusive ownership of the resource. -
Release: When the
async withblock is exited (either by completing successfully or by raising an exception), the heartbeat task is automatically stopped, and the lease is explicitly released from the backend.
Usage Example
Here is a conceptual example of how a background worker could use leasing to ensure only one instance is processing a specific job at a time.
from nala.athomic.resilience.leasing import LeaseFactory, LeaseAcquisitionError
lease_manager = LeaseFactory.create()
job_id = "process-daily-reports"
async def run_daily_report_job():
try:
# Try to acquire an exclusive lease for this job
async with lease_manager.acquire(resource_id=job_id):
# This code will only be executed by the single worker
# that successfully acquired the lease.
print("Lease acquired! Running the daily report job...")
await generate_reports()
print("Job finished, releasing lease.")
except LeaseAcquisitionError:
# This will be raised if another worker already holds the lease.
print("Could not acquire lease. Another worker is already running the job.")
except Exception as e:
print(f"An error occurred during the job: {e}")
Configuration
The leasing system is configured under the [resilience.leasing] section in your settings.toml. It requires a KV store connection (Redis) to manage the lease state.
[default.resilience.leasing]
enabled = true
# The name of the KVStore connection (from [database.kvstore]) to use.
kv_store_connection_name = "default_redis"
# The duration in seconds a lease is held before it expires if not renewed.
duration_seconds = 60
# How often the background heartbeat task should renew the lease.
# This MUST be shorter than duration_seconds.
renewal_interval_seconds = 20
# The default time in seconds a process will wait to acquire a lease.
default_acquire_timeout_seconds = 5
API Reference
nala.athomic.resilience.leasing.manager.LeaseManager
Orchestrates the acquisition, heartbeat renewal, and release of distributed leases.
This class provides a high-level async context manager interface for the
leasing mechanism, handling the complex distributed locking logic, continuous
renewal in the background, and graceful cleanup.
Attributes:
| Name | Type | Description |
|---|---|---|
provider |
LeaseProtocol
|
The underlying, concrete distributed lease provider (e.g., Redis). |
settings |
LeasingSettings
|
The configuration for lease duration and timeouts. |
owner_id |
str
|
A unique identifier for this service instance, used to claim ownership of leases. |
__init__(provider, settings)
Initializes the LeaseManager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
provider
|
LeaseProtocol
|
The underlying distributed lease provider. |
required |
settings
|
LeasingSettings
|
The configuration settings. |
required |
acquire(resource_id)
async
Attempts to acquire an exclusive lease on a specified resource.
This is a non-blocking operation managed by a timeout. If successful,
it yields a LeaseHolder that automatically renews the lease in a
background task (heartbeat).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource_id
|
str
|
The unique identifier of the resource to lock (e.g., a message aggregate key). |
required |
Yields:
| Name | Type | Description |
|---|---|---|
LeaseHolder |
AsyncGenerator[LeaseHolder, None]
|
An object representing the active lease, providing renewal management. |
Raises:
| Type | Description |
|---|---|
LeaseAcquisitionError
|
If the lease cannot be acquired within the configured acquisition timeout. |
nala.athomic.resilience.leasing.holder.LeaseHolder
Represents an acquired lease and manages its renewal heartbeat.
An instance of this class is yielded by the LeaseManager's context manager,
encapsulating the active lease and the background task responsible for
periodically renewing its expiration time.
is_acquired
property
Returns True if the lease was successfully acquired.
__init__(lease, provider, settings)
Initializes the LeaseHolder.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
lease
|
Optional[Lease]
|
The acquired lease object. None if acquisition failed. |
required |
provider
|
LeaseProtocol
|
The concrete lease provider used for renewal/release operations. |
required |
settings
|
LeasingSettings
|
The configuration defining renewal intervals and durations. |
required |
start_heartbeat()
Starts the background renewal task if a lease was acquired.
stop_heartbeat()
async
Stops the background renewal task gracefully and waits for it to finish.
nala.athomic.resilience.leasing.exceptions.LeaseAcquisitionError
Bases: LeaseError
Raised when a worker fails to acquire a lease on a resource.