Skip to content

Internal Event Bus

Overview

The Internal Event Bus provides a lightweight, in-process publish/subscribe mechanism that allows different components within your application to communicate without being directly coupled to one another. It is a powerful tool for building modular and maintainable systems.

Important Distinction: The Internal Event Bus is designed for intra-service communication (communication within a single service instance or across instances of the same service if using Redis). For durable, guaranteed, inter-service communication (between different microservices), you should use the more robust Messaging Module.

Key Features

  • Decoupled Communication: Allows components to react to events happening elsewhere in the application without needing a direct reference.
  • Automatic Context Propagation: This is a critical feature. The event bus automatically captures the ExecutionContext (trace_id, tenant_id, etc.) when an event is published and restores it when the subscriber callback is executed. This ensures perfect continuity for tracing and multi-tenancy, even in an event-driven flow.
  • Multiple Backends: Supports both an in-memory provider for single-instance applications and a Redis Pub/Sub provider for multi-instance scenarios.

How It Works

The module implements the classic Publish/Subscribe pattern:

  1. Subscribe: A component (the "subscriber") registers an asynchronous callback function for a specific event name (e.g., "user.created").
  2. Publish: Another component (the "publisher") publishes an event with that name and a data payload.
  3. Dispatch: The event bus finds all registered callbacks for that event name and executes them concurrently, passing the payload to each one.

Available Providers

  • LocalEventBus: The default, in-memory provider. It is extremely fast and requires no external dependencies. It is the perfect choice for decoupling components within a single service instance.
  • RedisEventBus: This provider uses Redis Pub/Sub as its backend. This allows multiple instances of the same service to share the same event bus, enabling simple real-time communication across processes without the overhead of a full message broker like Kafka.

Usage Example

Imagine a UserService that needs to notify an AuditService whenever a new user is created, without the two services knowing about each other.

Subscriber (AuditService)

# In your_app/services/audit_service.py
from nala.athomic.events import get_event_bus
from nala.athomic.observability import get_logger

logger = get_logger(__name__)

class AuditService:
    def __init__(self):
        # Get the singleton event bus instance
        event_bus = get_event_bus()
        # Subscribe the `on_user_created` method to the "user.created" event
        event_bus.subscribe("user.created", self.on_user_created)

    async def on_user_created(self, payload: dict):
        # This method is executed when the event is published.
        # It runs with the same context (trace_id, etc.) as the publisher.
        user_id = payload.get("id")
        logger.info(f"Auditing creation of user {user_id}")

Publisher (UserService)

# In your_app/services/user_service.py
from nala.athomic.events import get_event_bus

class UserService:
    def __init__(self):
        self.event_bus = get_event_bus()

    async def create_user(self, name: str) -> dict:
        user = {"id": "user-123", "name": name}

        # ... logic to save the user to the database ...

        # Publish an event with the user data as the payload
        await self.event_bus.publish("user.created", payload=user)

        return user

Configuration

The event bus is configured under the [events] section in your settings.toml.

Local Provider (Default)

[default.events]
enabled = true

  [default.events.provider]
  backend = "local"

Redis Provider

[default.events]
enabled = true

  [default.events.provider]
  backend = "redis"

    # The redis provider reuses a KVStore connection configuration
    [default.events.provider.provider]
    # The key_prefix is used to namespace the Redis channels
    key_prefix = "app_events"

      [default.events.provider.provider.provider]
      backend = "redis"
      uri = "redis://localhost:6379/2"

API Reference

nala.athomic.events.protocol.EventBusProtocol

Bases: Protocol

Defines the standard interface for an internal event bus.

Implementations of this protocol handle publishing events and managing subscriptions within the application, enabling decoupled, in-process (or cross-process with backends like Redis) communication.

close() async

Closes any open connections or releases resources.

publish(event_name, payload) async

Publishes an event to all registered subscribers for that event name.

Implementations should automatically capture and include the current ExecutionContext within the payload (e.g., under a '_nala_context' key) before dispatching to subscribers.

Parameters:

Name Type Description Default
event_name str

The string identifier of the event being published.

required
payload Dict[str, Any]

A dictionary containing the event data.

required

start() async

Initializes the event bus backend if necessary (e.g., connections).

subscribe(event_name, callback)

Registers an async callback to be executed when an event is published.

Parameters:

Name Type Description Default
event_name str

The name of the event to subscribe to.

required
callback Callable[[Dict[str, Any]], Awaitable[None]]

An async function that will receive the event payload (including context). Callbacks should handle their own exceptions to prevent disrupting the event bus.

required

unsubscribe(event_name, callback)

Unregisters a previously subscribed callback for a specific event.

Parameters:

Name Type Description Default
event_name str

The name of the event from which to unsubscribe.

required
callback Callable

The specific callback function instance to remove.

required

nala.athomic.events.factory.get_event_bus()

Convenience function that delegates to the factory.

nala.athomic.events.providers.local_events_provider.LocalEventBus

Bases: EventBusBase

An in-memory event bus implementation for in-process communication.

This provider executes subscribed callbacks immediately and concurrently upon publication, avoiding the need for a separate continuous polling loop (_run_loop). It is designed for high-speed, decoupled communication within a single application instance, making it ideal for testing and development environments.

__init__(settings)

Initializes the LocalEventBus.

Parameters:

Name Type Description Default
settings EventsSettings

The event bus configuration settings.

required

nala.athomic.events.providers.redis_events_provider.RedisEventBus

Bases: EventBusBase

An event bus provider implementation using Redis Pub/Sub for cross-process communication.

This provider enables multiple application instances to share the same event bus. It manages the connection to Redis, dynamically subscribes to channels based on active subscribers, and processes incoming events in a continuous background loop.

Attributes:

Name Type Description
redis_events_settings RedisEventsSettings

The Redis-specific configuration model.

kvstore_settings KVStoreSettings

The underlying KVStore configuration used to connect to Redis.

kvstore KVStoreProtocol

The instance of the KVStore client.

_redis_client Optional[Redis]

The raw asynchronous Redis client instance.

_pubsub_client Optional[PubSub]

The Redis PubSub client used for listening to channels.

__init__(settings, kv_store=None)

Initializes the RedisEventBus.

Parameters:

Name Type Description Default
settings EventsSettings

The event bus configuration settings.

required
kv_store Optional[KVStoreProtocol]

Optional pre-configured KV store instance for dependency injection.

None

before_start() async

Establishes the connection to Redis and initializes the PubSub client.

This hook is called by the BaseService lifecycle before the main processing loop starts.

Raises:

Type Description
EventBusError

If the connection to Redis fails.

before_stop() async

Performs cleanup before the service stops. Currently a no-op as connection closure is handled by the KVStore client if it's managed globally.