Skip to content

Database Migrations

Overview

The Database Migrations module provides a programmatic, version-controlled way to manage your database schema and data transformations over time. It is a lightweight but powerful engine designed to be database-agnostic, with a built-in implementation for MongoDB.

When the application starts, the MigrationRunner automatically detects and applies any pending migrations, ensuring that your database schema is always in the state required by the current version of the code.


How It Works

The migration process is orchestrated by the MigrationRunner service:

  1. Discovery: At startup, the runner inspects the module paths defined in your configuration to find all migration script files (e.g., 001_initial.py).
  2. State Check: It connects to the database and queries a special history collection (e.g., athomic_migration_status) to determine the revision number of the last migration that was successfully applied.
  3. Execution: The runner compares the list of discovered scripts with the database's history. For every script with a revision number greater than the last applied one, it executes the script's upgrade function in sequential order.
  4. Recording: After each script is executed successfully, a new record is added to the history collection, marking the new revision as applied.

This entire process is wrapped with observability, providing detailed logs, metrics, and tracing for each migration operation.


How to Create a Migration Script

Creating a new migration is straightforward.

1. Create the File

In your application's migrations directory (e.g., src/your_app/migrations/versions/), create a new Python file. The filename is important and must follow this format:

[NNN]_[description].py

  • [NNN]: A three-digit, zero-padded revision number (e.g., 001, 002).
  • [description]: A short, descriptive name for the migration (e.g., create_user_indexes).

Example: 001_create_user_indexes.py

2. Define the Script Content

Each migration script must contain two key elements:

  • A revision variable holding the integer revision number.
  • An async def upgrade(db) function that receives the native database client instance (for MongoDB, this is a motor.core.AgnosticDatabase).

Example Migration Script

This example creates a unique index on the users collection.

# src/your_app/migrations/versions/001_create_user_indexes.py
from motor.core import AgnosticDatabase
from pymongo import ASCENDING, IndexModel

# The sequential revision number for this migration.
revision = 1

async def upgrade(db: AgnosticDatabase) -> None:
    """
    Applies the migration to create a unique index on the 'email' field
    of the 'users' collection.
    """
    users_collection = db.get_collection("users")

    email_index = IndexModel(
        [("email", ASCENDING)],
        name="user_email_unique_idx",
        unique=True
    )

    await users_collection.create_indexes([email_index])

# (Optional) You can also define an `async def downgrade(db)` function
# to support reverting the migration, although this is not currently used by the runner.

Configuration

To enable and configure the migration runner, you need to add a [database.migrations] section to your settings.toml.

[default.database.migrations]
# A master switch to enable or disable the migration engine.
enabled = true

  # A dictionary of named database connections to run migrations against.
  [default.database.migrations.connections.my_app_db]
  # The migration engine to use.
  engine = "mongodb"

  # The name of the connection defined under '[database.documents]' to use.
  db_connection_name = "default_mongo"

  # The collection/table used to store migration history.
  version_collection = "my_app_migrations"

  # A list of Python module paths where your migration scripts are located.
  # The runner will search these directories for migration files.
  paths = ["my_app.migrations.versions"]

API Reference

nala.athomic.database.migrations.runner.MigrationRunner

Orchestrates the database migration process with distributed locking support.

This class is the core engine for applying database migrations. It is database-agnostic and delegates database-specific operations to a configured backend provider.

To prevent race conditions in clustered environments (e.g., Kubernetes), this runner utilizes the LockingProtocol to acquire distributed locks for each database connection before attempting to migrate.

Attributes:

Name Type Description
app_settings

The root application settings object.

migrations_settings

The specific configuration for the migration engine.

connection_manager

The manager for obtaining database providers.

locking_provider

The provider used for distributed locking (e.g., RedisLock).

logger

A pre-configured logger instance for the runner.

tracer

An OpenTelemetry tracer instance for the runner.

hostname

The hostname of the current pod/machine.

__init__(settings, connection_manager, locking_provider=None)

Initializes the MigrationRunner.

Parameters:

Name Type Description Default
settings AppSettings

The application's root configuration object.

required
connection_manager ConnectionManager

The manager for obtaining database providers.

required
locking_provider Optional[LockingProtocol]

Optional locking provider to enable distributed locking. If None, migrations run without locking protection.

None

upgrade() async

Runs the 'upgrade' process for all configured database connections.

Iterates through configured connections and triggers the upgrade process. Uses distributed locking if a provider is available.

nala.athomic.database.migrations.protocol.MigrationBackendProtocol

Bases: Protocol

Defines the contract for a database-specific migration backend.

This allows the MigrationRunner to be agnostic of the underlying database. All I/O operations are designed to be asynchronous.

apply_migration(source_name, revision, path) async

Loads and executes the 'upgrade' function from a migration script.

connect() async

Initializes the connection to the database.

disconnect() async

Closes the connection to the database.

get_applied_migrations() async

Gets the latest applied revision number for each migration source.

Returns:

Name Type Description
Dict[str, int]

A dictionary mapping a source_name to its latest revision_number.

Example Dict[str, int]

{"athomic": 2, "my_app_billing": 1}

record_migration(source_name, revision, description) async

Records that a migration has been successfully applied.

remove_migration_record(source_name, revision) async

Removes the record of a migration after it has been reverted.

revert_migration(source_name, revision, path) async

Loads and executes the 'downgrade' function from a migration script.

nala.athomic.database.migrations.providers.mongodb.MongoDbBackend

Bases: BaseMigrationBackend

A migration backend for managing schema changes in a MongoDB database.

This class implements the MigrationBackendProtocol for MongoDB, handling database connection retrieval, migration script execution, and version history persistence using MongoDB's aggregation and document operations. It supports dependency injection for testing.

Attributes:

Name Type Description
db_connection_name str

The logical name of the database connection to use.

connection_manager ConnectionManager

The manager for obtaining database providers.

db_provider DocumentsDatabaseProtocol

The active MongoDB client provider.

migration_status_model Type[Document]

The Beanie Document model used for storing migration history.

__init__(settings, version_target, connection_name, connection_manager=None, db_provider=None, migration_status_model=MigrationStatus)

Initializes the MongoDB Migration Backend, resolving dependencies via factories or injection.

Parameters:

Name Type Description Default
settings MigrationConnectionSettings

Specific configuration for this migration connection.

required
version_target str

The name of the collection to store version history (deprecated in favor of migration_status_model).

required
connection_name str

The logical name of this migration connection.

required
connection_manager Optional[ConnectionManager]

Optional injected connection manager.

None
db_provider Optional[DocumentsDatabaseProtocol]

Optional injected database provider.

None
migration_status_model Type[Document]

The model used for migration history persistence.

MigrationStatus

connect() async

Ensures the MongoDB provider is connected and ready for migration operations.

Raises:

Type Description
MigrationDatabaseError

If the configured database connection cannot be retrieved or fails to connect.

disconnect() async

Performs a non-operation (no-op) as the MongoDB connection lifecycle is managed by the central ConnectionManager.

get_applied_migrations() async

Fetches the latest applied revision number for each migration source from the database.

Uses a MongoDB aggregation pipeline to find the maximum revision grouped by source_name.

Returns:

Type Description
Dict[str, int]

Dict[str, int]: A dictionary mapping each source_name to its latest applied revision number.

Raises:

Type Description
MigrationDatabaseError

If the database query fails.

record_migration(source_name, revision, description) async

Records a successfully applied migration in the version tracking collection.

Parameters:

Name Type Description Default
source_name str

The source of the migration (e.g., 'athomic', 'app_billing').

required
revision int

The revision number of the applied migration.

required
description str

A brief description of the migration.

required

Raises:

Type Description
MigrationDatabaseError

If the record insertion fails.

remove_migration_record(source_name, revision) async

Removes the record of a migration after it has been reverted (downgraded).

Parameters:

Name Type Description Default
source_name str

The source of the migration.

required
revision int

The revision number to remove.

required

Raises:

Type Description
MigrationDatabaseError

If the deletion operation fails.