Skip to content

Distributed Leasing

Overview

Distributed Leasing is an advanced resilience pattern used for leader election or to ensure a single service instance has exclusive ownership of a long-running task or resource. It is a more sophisticated form of distributed locking.

The key difference between a lease and a lock is that a lease is time-based and requires the holder to actively maintain ownership by sending periodic "heartbeats" to renew it. If the leaseholder crashes or becomes unresponsive, its lease automatically expires after a configured duration, allowing another healthy instance to acquire it and take over the task.

This pattern is critical for building fault-tolerant, active-passive systems. A primary use case within the Athomic Layer is in the OutboxPublisher, where leasing ensures that only one publisher instance is processing events for a specific aggregate_key at any given time.

Key Features

  • Exclusive Ownership: Guarantees that only one worker in a cluster can operate on a leased resource.
  • Fault Tolerance: Leases automatically expire, allowing a new worker to take over if the current owner fails.
  • Automatic Renewal: An active lease is automatically renewed in a background task (heartbeat) as long as the worker is healthy.
  • Simple Interface: Implemented via a clean async with context manager.

How It Works

  1. Acquisition: A worker requests a lease on a specific resource by calling lease_manager.acquire("resource_id") inside an async with block. The manager attempts to atomically acquire the lease from the backend (Redis) using a SET NX EX command.

  2. Heartbeat: If the lease is acquired successfully, the LeaseManager yields a LeaseHolder object. This object immediately starts a background asyncio.Task which periodically sends a heartbeat to the backend, renewing the lease before it expires.

  3. Execution: The code inside the async with block executes. The worker can now safely perform its task, confident that it has exclusive ownership of the resource.

  4. Release: When the async with block is exited (either by completing successfully or by raising an exception), the heartbeat task is automatically stopped, and the lease is explicitly released from the backend.


Usage Example

Here is a conceptual example of how a background worker could use leasing to ensure only one instance is processing a specific job at a time.

from nala.athomic.resilience.leasing import LeaseFactory, LeaseAcquisitionError

lease_manager = LeaseFactory.create()
job_id = "process-daily-reports"

async def run_daily_report_job():
    try:
        # Try to acquire an exclusive lease for this job
        async with lease_manager.acquire(resource_id=job_id):
            # This code will only be executed by the single worker
            # that successfully acquired the lease.
            print("Lease acquired! Running the daily report job...")
            await generate_reports()
            print("Job finished, releasing lease.")

    except LeaseAcquisitionError:
        # This will be raised if another worker already holds the lease.
        print("Could not acquire lease. Another worker is already running the job.")
    except Exception as e:
        print(f"An error occurred during the job: {e}")

Configuration

The leasing system is configured under the [resilience.leasing] section in your settings.toml. It requires a KV store connection (Redis) to manage the lease state.

[default.resilience.leasing]
enabled = true

# The name of the KVStore connection (from [database.kvstore]) to use.
kv_store_connection_name = "default_redis"

# The duration in seconds a lease is held before it expires if not renewed.
duration_seconds = 60

# How often the background heartbeat task should renew the lease.
# This MUST be shorter than duration_seconds.
renewal_interval_seconds = 20

# The default time in seconds a process will wait to acquire a lease.
default_acquire_timeout_seconds = 5

API Reference

nala.athomic.resilience.leasing.manager.LeaseManager

Orchestrates the acquisition, heartbeat renewal, and release of distributed leases.

This class provides a high-level async context manager interface for the leasing mechanism, handling the complex distributed locking logic, continuous renewal in the background, and graceful cleanup.

Attributes:

Name Type Description
provider LeaseProtocol

The underlying, concrete distributed lease provider (e.g., Redis).

settings LeasingSettings

The configuration for lease duration and timeouts.

owner_id str

A unique identifier for this service instance, used to claim ownership of leases.

__init__(provider, settings)

Initializes the LeaseManager.

Parameters:

Name Type Description Default
provider LeaseProtocol

The underlying distributed lease provider.

required
settings LeasingSettings

The configuration settings.

required

acquire(resource_id) async

Attempts to acquire an exclusive lease on a specified resource.

This is a non-blocking operation managed by a timeout. If successful, it yields a LeaseHolder that automatically renews the lease in a background task (heartbeat).

Parameters:

Name Type Description Default
resource_id str

The unique identifier of the resource to lock (e.g., a message aggregate key).

required

Yields:

Name Type Description
LeaseHolder AsyncGenerator[LeaseHolder, None]

An object representing the active lease, providing renewal management.

Raises:

Type Description
LeaseAcquisitionError

If the lease cannot be acquired within the configured acquisition timeout.

nala.athomic.resilience.leasing.holder.LeaseHolder

Represents an acquired lease and manages its renewal heartbeat.

An instance of this class is yielded by the LeaseManager's context manager, encapsulating the active lease and the background task responsible for periodically renewing its expiration time.

is_acquired property

Returns True if the lease was successfully acquired.

__init__(lease, provider, settings)

Initializes the LeaseHolder.

Parameters:

Name Type Description Default
lease Optional[Lease]

The acquired lease object. None if acquisition failed.

required
provider LeaseProtocol

The concrete lease provider used for renewal/release operations.

required
settings LeasingSettings

The configuration defining renewal intervals and durations.

required

start_heartbeat()

Starts the background renewal task if a lease was acquired.

stop_heartbeat() async

Stops the background renewal task gracefully and waits for it to finish.

nala.athomic.resilience.leasing.exceptions.LeaseAcquisitionError

Bases: LeaseError

Raised when a worker fails to acquire a lease on a resource.