Skip to content

Data Lineage

Overview

The Data Lineage module provides a robust, asynchronous system for generating, transporting, and persisting lineage events that comply with the OpenLineage standard.

Data lineage tracks the origin, movement, and transformation of data. In the Athomic Layer, this is achieved through a Provider-Collector Architecture, which decouples the application (Producer) from the storage backends (Collector).

Architecture & Components

The system is split into two main phases to ensure high performance and scalability:

1. The Producer (Application Side)

The LineageProducer is a lightweight service embedded in the application.

  • Responsibility: Constructs LineageEvent models and publishes them to a messaging topic (platform.lineage.events).
  • Performance: It does not connect to databases or external APIs directly, ensuring zero impact on the application's request latency.
  • Identity: Configured with the "Producer URI" and "Namespace" to identify the source of events.

2. The Collector (Worker Side)

The LineageCollectorService is a dedicated background service (or separate microservice).

  • Responsibility: Consumes events from the lineage topic, hydrates them, and delegates persistence to the LineageManager.
  • Composite Storage: Supports writing to multiple backends simultaneously (e.g., Document DB for audit + Graph DB for analysis).
  • Fail-Fast: If writing to any configured store fails, the collector triggers a retry (DLQ flow), ensuring data consistency.

How It Works

  1. Instrumentation: When a function decorated with @track_lineage executes, or a message is consumed, the LineageProducer is invoked.
  2. Emission: The producer creates a START, COMPLETE, or FAIL event and publishes it to the message broker.
  3. Ingestion: The LineageCollectorService picks up the message asynchronously.
  4. Persistence: The LineageManager routes the event to the configured Composite Store, which broadcasts it to all active backends (e.g., MongoDB, Neo4j, Logging).

Storage Backends

The system supports multiple storage backends via the LineageStoreProtocol.

  • DocumentLineageStore (MongoDB): Stores the raw OpenLineage event payload. Ideal for audit trails and historical debugging.
  • Neo4jLineageStore (Graph): Projects the event into a property graph (Nodes: Job, Run, Dataset). Ideal for impact analysis and visualization.
  • LoggingLineageStore: Writes the lineage event as a structured JSON log entry.
  • OpenLineageStore: Sends the event directly to an external HTTP API (like Marquez).

Usage

The @track_lineage Decorator

To track a business operation, apply the decorator to your function.

from nala.athomic.integration.messaging.lineage import track_lineage
from nala.athomic.lineage.models import LineageDataset

input_data = LineageDataset(namespace="mongo", name="users_collection")

@track_lineage(
    job_name="process_user_report",
    inputs=[input_data]
)
async def generate_report(user_id: str):
    # ... business logic ...
    # Automatic START event emitted before execution.
    # Automatic COMPLETE/FAIL event emitted after execution.
    return "report_generated"

Configuration

Configuration is split between the Producer (Identity) and the Store (Persistence).

# 1. Enable Lineage Globally
[lineage]
enabled = true
collector_topic = "platform.lineage.events"

# 2. Configure Identity (Who am I?)
[lineage.producer]
job_namespace = "my-service-ns"
producer_uri = "[https://github.com/my-org/my-service](https://github.com/my-org/my-service)"

# 3. Configure Storage Backends (Where does data go?)
[lineage.store]
# Use 'composite' to enable multiple backends
backend = "composite"
backends = ["document", "graph"]

    # --- MongoDB Store Settings ---
    [lineage.store.document]
    collection_name = "audit_lineage_events"
    document_connection_name = "default_mongo"

    # --- Neo4j Store Settings ---
    [lineage.store.graph]
    graph_connection_name = "analytics_graph"

API Reference

nala.athomic.integration.messaging.lineage.producer.LineageProducer

Bases: BaseService

Service responsible for constructing and dispatching Data Lineage events.

record_consumption(event_type, job_name, job_namespace=None, run_id=None, inputs=None, outputs=None, producer_uri=None) async

Records a lineage event by constructing the model and publishing it.

record_message_consumption(topic, group_id, source_namespace) async

Helper method specifically for Consumer Processors. Maps messaging concepts (Topic, Group) to Lineage concepts (Dataset, Job).

nala.athomic.lineage.collector.service.LineageCollectorService

Bases: BaseService

Service responsible for consuming LineageEvents from the messaging system and delegating their persistence to the underlying lineage store(s) managed by the LineageManager.

This service uses the standard BaseService lifecycle hooks (_connect/_close) to manage the Orchestrator dependency safely.

__init__(lineage_manager)

Parameters:

Name Type Description Default
lineage_manager LineageManager

The orchestrator responsible for initializing and providing access to the configured lineage stores.

required

handle_lineage_event(event, raw_message=None, **kwargs) async

Handles an incoming LineageEvent and persists it using the active lineage store provided by the LineageManager.

Parameters:

Name Type Description Default
event Union[LineageEvent, Dict[str, Any]]

The deserialized payload (LineageEvent model or raw dict).

required
raw_message Optional[Any]

The raw message object (optional).

None
**kwargs

Captures additional metadata injected by the messaging framework (e.g., message_key, raw_headers).

{}

nala.athomic.lineage.manager.LineageManager

Bases: BaseService

Orchestrates the lineage storage subsystem.

This manager follows the Provider pattern similar to the Database module. It dynamically discovers enabled storage backends based on the provided configuration and the registered store creators.

It is responsible for: 1. initializing active stores based on configuration. 2. Managing the lifecycle (startup/shutdown) of these stores. 3. Providing a unified interface (Composite or Single) to the collector.

get_store()

Returns the active lineage store instance.

nala.athomic.lineage.models.LineageEvent

Bases: BaseModel

Standard OpenLineage-compatible event structure.

This model acts as the Data Transfer Object (DTO) for the lineage collector and storage layers. It captures the 'who', 'what', 'when', and 'where' of data processing.