Database Migrations
Overview
The Database Migrations module provides a programmatic, version-controlled way to manage your database schema and data transformations over time. It is a lightweight but powerful engine designed to be database-agnostic, with a built-in implementation for MongoDB.
When the application starts, the MigrationRunner automatically detects and applies any pending migrations, ensuring that your database schema is always in the state required by the current version of the code.
How It Works
The migration process is orchestrated by the MigrationRunner service:
- Discovery: At startup, the runner inspects the module
pathsdefined in your configuration to find all migration script files (e.g.,001_initial.py). - State Check: It connects to the database and queries a special history collection (e.g.,
athomic_migration_status) to determine the revision number of the last migration that was successfully applied. - Execution: The runner compares the list of discovered scripts with the database's history. For every script with a
revisionnumber greater than the last applied one, it executes the script'supgradefunction in sequential order. - Recording: After each script is executed successfully, a new record is added to the history collection, marking the new revision as applied.
This entire process is wrapped with observability, providing detailed logs, metrics, and tracing for each migration operation.
How to Create a Migration Script
Creating a new migration is straightforward.
1. Create the File
In your application's migrations directory (e.g., src/your_app/migrations/versions/), create a new Python file. The filename is important and must follow this format:
[NNN]_[description].py
[NNN]: A three-digit, zero-padded revision number (e.g.,001,002).[description]: A short, descriptive name for the migration (e.g.,create_user_indexes).
Example: 001_create_user_indexes.py
2. Define the Script Content
Each migration script must contain two key elements:
- A
revisionvariable holding the integer revision number. - An
async def upgrade(db)function that receives the native database client instance (for MongoDB, this is amotor.core.AgnosticDatabase).
Example Migration Script
This example creates a unique index on the users collection.
# src/your_app/migrations/versions/001_create_user_indexes.py
from motor.core import AgnosticDatabase
from pymongo import ASCENDING, IndexModel
# The sequential revision number for this migration.
revision = 1
async def upgrade(db: AgnosticDatabase) -> None:
"""
Applies the migration to create a unique index on the 'email' field
of the 'users' collection.
"""
users_collection = db.get_collection("users")
email_index = IndexModel(
[("email", ASCENDING)],
name="user_email_unique_idx",
unique=True
)
await users_collection.create_indexes([email_index])
# (Optional) You can also define an `async def downgrade(db)` function
# to support reverting the migration, although this is not currently used by the runner.
Configuration
To enable and configure the migration runner, you need to add a [database.migrations] section to your settings.toml.
[default.database.migrations]
# A master switch to enable or disable the migration engine.
enabled = true
# A dictionary of named database connections to run migrations against.
[default.database.migrations.connections.my_app_db]
# The migration engine to use.
engine = "mongodb"
# The name of the connection defined under '[database.documents]' to use.
db_connection_name = "default_mongo"
# The collection/table used to store migration history.
version_collection = "my_app_migrations"
# A list of Python module paths where your migration scripts are located.
# The runner will search these directories for migration files.
paths = ["my_app.migrations.versions"]
API Reference
nala.athomic.database.migrations.runner.MigrationRunner
Orchestrates the database migration process with distributed locking support.
This class is the core engine for applying database migrations. It is database-agnostic and delegates database-specific operations to a configured backend provider.
To prevent race conditions in clustered environments (e.g., Kubernetes),
this runner utilizes the LockingProtocol to acquire distributed locks
for each database connection before attempting to migrate.
Attributes:
| Name | Type | Description |
|---|---|---|
app_settings |
The root application settings object. |
|
migrations_settings |
The specific configuration for the migration engine. |
|
connection_manager |
The manager for obtaining database providers. |
|
locking_provider |
The provider used for distributed locking (e.g., RedisLock). |
|
logger |
A pre-configured logger instance for the runner. |
|
tracer |
An OpenTelemetry tracer instance for the runner. |
|
hostname |
The hostname of the current pod/machine. |
__init__(settings, connection_manager, locking_provider=None)
Initializes the MigrationRunner.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
AppSettings
|
The application's root configuration object. |
required |
connection_manager
|
ConnectionManager
|
The manager for obtaining database providers. |
required |
locking_provider
|
Optional[LockingProtocol]
|
Optional locking provider to enable distributed locking. If None, migrations run without locking protection. |
None
|
upgrade()
async
Runs the 'upgrade' process for all configured database connections.
Iterates through configured connections and triggers the upgrade process. Uses distributed locking if a provider is available.
nala.athomic.database.migrations.protocol.MigrationBackendProtocol
Bases: Protocol
Defines the contract for a database-specific migration backend.
This allows the MigrationRunner to be agnostic of the underlying database. All I/O operations are designed to be asynchronous.
apply_migration(source_name, revision, path)
async
Loads and executes the 'upgrade' function from a migration script.
connect()
async
Initializes the connection to the database.
disconnect()
async
Closes the connection to the database.
get_applied_migrations()
async
Gets the latest applied revision number for each migration source.
Returns:
| Name | Type | Description |
|---|---|---|
Dict[str, int]
|
A dictionary mapping a source_name to its latest revision_number. |
|
Example |
Dict[str, int]
|
{"athomic": 2, "my_app_billing": 1} |
record_migration(source_name, revision, description)
async
Records that a migration has been successfully applied.
remove_migration_record(source_name, revision)
async
Removes the record of a migration after it has been reverted.
revert_migration(source_name, revision, path)
async
Loads and executes the 'downgrade' function from a migration script.
nala.athomic.database.migrations.providers.mongodb.MongoDbBackend
Bases: BaseMigrationBackend
A migration backend for managing schema changes in a MongoDB database.
This class implements the MigrationBackendProtocol for MongoDB, handling
database connection retrieval, migration script execution, and version history
persistence using MongoDB's aggregation and document operations. It supports
dependency injection for testing.
Attributes:
| Name | Type | Description |
|---|---|---|
db_connection_name |
str
|
The logical name of the database connection to use. |
connection_manager |
ConnectionManager
|
The manager for obtaining database providers. |
db_provider |
DocumentsDatabaseProtocol
|
The active MongoDB client provider. |
migration_status_model |
Type[Document]
|
The Beanie Document model used for storing migration history. |
__init__(settings, version_target, connection_name, connection_manager=None, db_provider=None, migration_status_model=MigrationStatus)
Initializes the MongoDB Migration Backend, resolving dependencies via factories or injection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
MigrationConnectionSettings
|
Specific configuration for this migration connection. |
required |
version_target
|
str
|
The name of the collection to store version history (deprecated in favor of |
required |
connection_name
|
str
|
The logical name of this migration connection. |
required |
connection_manager
|
Optional[ConnectionManager]
|
Optional injected connection manager. |
None
|
db_provider
|
Optional[DocumentsDatabaseProtocol]
|
Optional injected database provider. |
None
|
migration_status_model
|
Type[Document]
|
The model used for migration history persistence. |
MigrationStatus
|
connect()
async
Ensures the MongoDB provider is connected and ready for migration operations.
Raises:
| Type | Description |
|---|---|
MigrationDatabaseError
|
If the configured database connection cannot be retrieved or fails to connect. |
disconnect()
async
Performs a non-operation (no-op) as the MongoDB connection lifecycle is managed by the central ConnectionManager.
get_applied_migrations()
async
Fetches the latest applied revision number for each migration source from the database.
Uses a MongoDB aggregation pipeline to find the maximum revision grouped by source_name.
Returns:
| Type | Description |
|---|---|
Dict[str, int]
|
Dict[str, int]: A dictionary mapping each |
Raises:
| Type | Description |
|---|---|
MigrationDatabaseError
|
If the database query fails. |
record_migration(source_name, revision, description)
async
Records a successfully applied migration in the version tracking collection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_name
|
str
|
The source of the migration (e.g., 'athomic', 'app_billing'). |
required |
revision
|
int
|
The revision number of the applied migration. |
required |
description
|
str
|
A brief description of the migration. |
required |
Raises:
| Type | Description |
|---|---|
MigrationDatabaseError
|
If the record insertion fails. |
remove_migration_record(source_name, revision)
async
Removes the record of a migration after it has been reverted (downgraded).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_name
|
str
|
The source of the migration. |
required |
revision
|
int
|
The revision number to remove. |
required |
Raises:
| Type | Description |
|---|---|
MigrationDatabaseError
|
If the deletion operation fails. |