Internal Event Bus
Overview
The Internal Event Bus provides a lightweight, in-process publish/subscribe mechanism that allows different components within your application to communicate without being directly coupled to one another. It is a powerful tool for building modular and maintainable systems.
Important Distinction: The Internal Event Bus is designed for intra-service communication (communication within a single service instance or across instances of the same service if using Redis). For durable, guaranteed, inter-service communication (between different microservices), you should use the more robust Messaging Module.
Key Features
- Decoupled Communication: Allows components to react to events happening elsewhere in the application without needing a direct reference.
- Automatic Context Propagation: This is a critical feature. The event bus automatically captures the
ExecutionContext(trace_id,tenant_id, etc.) when an event is published and restores it when the subscriber callback is executed. This ensures perfect continuity for tracing and multi-tenancy, even in an event-driven flow. - Multiple Backends: Supports both an in-memory provider for single-instance applications and a Redis Pub/Sub provider for multi-instance scenarios.
How It Works
The module implements the classic Publish/Subscribe pattern:
- Subscribe: A component (the "subscriber") registers an asynchronous callback function for a specific event name (e.g.,
"user.created"). - Publish: Another component (the "publisher") publishes an event with that name and a data payload.
- Dispatch: The event bus finds all registered callbacks for that event name and executes them concurrently, passing the payload to each one.
Available Providers
LocalEventBus: The default, in-memory provider. It is extremely fast and requires no external dependencies. It is the perfect choice for decoupling components within a single service instance.RedisEventBus: This provider uses Redis Pub/Sub as its backend. This allows multiple instances of the same service to share the same event bus, enabling simple real-time communication across processes without the overhead of a full message broker like Kafka.
Usage Example
Imagine a UserService that needs to notify an AuditService whenever a new user is created, without the two services knowing about each other.
Subscriber (AuditService)
# In your_app/services/audit_service.py
from nala.athomic.events import get_event_bus
from nala.athomic.observability import get_logger
logger = get_logger(__name__)
class AuditService:
def __init__(self):
# Get the singleton event bus instance
event_bus = get_event_bus()
# Subscribe the `on_user_created` method to the "user.created" event
event_bus.subscribe("user.created", self.on_user_created)
async def on_user_created(self, payload: dict):
# This method is executed when the event is published.
# It runs with the same context (trace_id, etc.) as the publisher.
user_id = payload.get("id")
logger.info(f"Auditing creation of user {user_id}")
Publisher (UserService)
# In your_app/services/user_service.py
from nala.athomic.events import get_event_bus
class UserService:
def __init__(self):
self.event_bus = get_event_bus()
async def create_user(self, name: str) -> dict:
user = {"id": "user-123", "name": name}
# ... logic to save the user to the database ...
# Publish an event with the user data as the payload
await self.event_bus.publish("user.created", payload=user)
return user
Configuration
The event bus is configured under the [events] section in your settings.toml.
Local Provider (Default)
[default.events]
enabled = true
[default.events.provider]
backend = "local"
Redis Provider
[default.events]
enabled = true
[default.events.provider]
backend = "redis"
# The redis provider reuses a KVStore connection configuration
[default.events.provider.provider]
# The key_prefix is used to namespace the Redis channels
key_prefix = "app_events"
[default.events.provider.provider.provider]
backend = "redis"
uri = "redis://localhost:6379/2"
API Reference
nala.athomic.events.protocol.EventBusProtocol
Bases: Protocol
Defines the standard interface for an internal event bus.
Implementations of this protocol handle publishing events and managing subscriptions within the application, enabling decoupled, in-process (or cross-process with backends like Redis) communication.
close()
async
Closes any open connections or releases resources.
publish(event_name, payload)
async
Publishes an event to all registered subscribers for that event name.
Implementations should automatically capture and include the current ExecutionContext within the payload (e.g., under a '_nala_context' key) before dispatching to subscribers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_name
|
str
|
The string identifier of the event being published. |
required |
payload
|
Dict[str, Any]
|
A dictionary containing the event data. |
required |
start()
async
Initializes the event bus backend if necessary (e.g., connections).
subscribe(event_name, callback)
Registers an async callback to be executed when an event is published.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_name
|
str
|
The name of the event to subscribe to. |
required |
callback
|
Callable[[Dict[str, Any]], Awaitable[None]]
|
An async function that will receive the event payload (including context). Callbacks should handle their own exceptions to prevent disrupting the event bus. |
required |
unsubscribe(event_name, callback)
Unregisters a previously subscribed callback for a specific event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_name
|
str
|
The name of the event from which to unsubscribe. |
required |
callback
|
Callable
|
The specific callback function instance to remove. |
required |
nala.athomic.events.factory.get_event_bus()
Convenience function that delegates to the factory.
nala.athomic.events.providers.local_events_provider.LocalEventBus
Bases: EventBusBase
An in-memory event bus implementation for in-process communication.
This provider executes subscribed callbacks immediately and concurrently upon
publication, avoiding the need for a separate continuous polling loop (_run_loop).
It is designed for high-speed, decoupled communication within a single application
instance, making it ideal for testing and development environments.
__init__(settings)
Initializes the LocalEventBus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
EventsSettings
|
The event bus configuration settings. |
required |
nala.athomic.events.providers.redis_events_provider.RedisEventBus
Bases: EventBusBase
An event bus provider implementation using Redis Pub/Sub for cross-process communication.
This provider enables multiple application instances to share the same event bus. It manages the connection to Redis, dynamically subscribes to channels based on active subscribers, and processes incoming events in a continuous background loop.
Attributes:
| Name | Type | Description |
|---|---|---|
redis_events_settings |
RedisEventsSettings
|
The Redis-specific configuration model. |
kvstore_settings |
KVStoreSettings
|
The underlying KVStore configuration used to connect to Redis. |
kvstore |
KVStoreProtocol
|
The instance of the KVStore client. |
_redis_client |
Optional[Redis]
|
The raw asynchronous Redis client instance. |
_pubsub_client |
Optional[PubSub]
|
The Redis PubSub client used for listening to channels. |
__init__(settings, kv_store=None)
Initializes the RedisEventBus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
EventsSettings
|
The event bus configuration settings. |
required |
kv_store
|
Optional[KVStoreProtocol]
|
Optional pre-configured KV store instance for dependency injection. |
None
|
before_start()
async
Establishes the connection to Redis and initializes the PubSub client.
This hook is called by the BaseService lifecycle before the main processing loop starts.
Raises:
| Type | Description |
|---|---|
EventBusError
|
If the connection to Redis fails. |
before_stop()
async
Performs cleanup before the service stops. Currently a no-op as connection closure is handled by the KVStore client if it's managed globally.