Skip to content

Document Ingestion (RAG ETL)

Overview

The Document Ingestion module provides a high-level orchestration service for transforming unstructured data (PDFs, Text files) into semantic vectors persisted in a Vector Store. It implements a complete Extract-Transform-Load (ETL) pipeline designed for Retrieval-Augmented Generation (RAG).

Key Features

  • Modular Loaders: Support for multiple file formats (.txt, .pdf, .md) via a plugin-like registry.
  • Semantic Splitting: Uses RecursiveCharacterTextSplitter to break documents into chunks while preserving semantic context and metadata.
  • Managed Pipeline: Coordinates loading, splitting, embedding, and upserting in a single method call.
  • Observability: Metrics tracks pages loaded, chunks created, and processing latency.

How It Works

  1. Extract (Load): The service detects the file extension and selects the appropriate DocumentLoader (e.g., PDFLoader using pypdf) to extract raw text and metadata (page numbers, filenames).
  2. Transform (Split): The raw text is passed to a TextSplitter which divides it into chunks based on configurable chunk_size and chunk_overlap.
  3. Load (Persist): The chunks are sent to the SemanticMemoryService, which generates embeddings and upserts them into the configured VectorStore.

Usage Example

from nala.athomic.ai.documents import DocumentIngestionFactory

async def ingest_knowledge_base(file_path: str):
    # 1. Get the pre-configured service
    ingestion_service = DocumentIngestionFactory.create_default()
    await ingestion_service.connect()

    # 2. Ingest the file
    # Handles loading, splitting, embedding, and storage automatically.
    memory_ids = await ingestion_service.ingest(
        source=file_path,
        filename="company_policy.pdf",
        metadata={"category": "hr_policy", "version": "1.0"}
    )

    print(f"Successfully created {len(memory_ids)} vector records.")

Configuration

Configure ingestion parameters under [ai.documents] in settings.toml.

[default.ai.documents]
enabled = true

# Chunking strategy parameters
default_chunk_size = 1000
default_chunk_overlap = 200

API Reference

nala.athomic.ai.documents.service.DocumentIngestionService

Bases: BaseService

Orchestrates the document ingestion pipeline.

This service coordinates the loading, splitting, and persisting of documents into the semantic memory system. It acts as a Facade over the specialized components (Loaders, Splitters, Memory).

Flow: 1. Resolve Loader based on file extension. 2. Load raw Documents. 3. Split Documents into Chunks. 4. Persist Chunks to Semantic Memory (Embed + Store).

__init__(memory_service)

Initialize the ingestion service.

Parameters:

Name Type Description Default
memory_service SemanticMemoryService

The initialized SemanticMemoryService instance used for embedding and storing the document chunks.

required

ingest(source, filename, metadata=None, collection_name=None) async

Ingests a document into the semantic memory.

Parameters:

Name Type Description Default
source Union[str, Path, bytes]

The file path or raw bytes.

required
filename str

Name of the file (critical for extension detection).

required
metadata Optional[Dict[str, Any]]

Additional context tags (e.g., user_id, category).

None
collection_name Optional[str]

Target vector store collection (currently unused due to MemoryService limitations).

None

Returns:

Type Description
List[str]

List of generated Memory IDs.

Raises:

Type Description
UnsupportedFileFormatError

If no loader exists for the file extension.

DocumentIngestionError

If any step of the pipeline fails.

nala.athomic.ai.documents.factory.DocumentIngestionFactory

Factory responsible for creating fully configured DocumentIngestionService instances.

It acts as a composition root, wiring together the Vector Store, Embedding Model (via Manager), and Memory Service.

create_default() classmethod

Creates a DocumentIngestionService using the default application settings.

This method leverages the global Managers (EmbeddingManager) to resolve shared resources, ensuring consistent configuration across the app.

Returns:

Type Description
DocumentIngestionService

A configured and ready-to-connect DocumentIngestionService.

Raises:

Type Description
ValueError

If required configurations or providers are missing.

RuntimeError

If modules are disabled.

nala.athomic.ai.documents.registry.DocumentLoaderRegistry

Bases: BaseRegistry[Type[DocumentLoaderProtocol]]

Registry for Document Loaders mapped by file extension. Allows runtime resolution of the correct strategy to read a file.

get_loader_for_extension(extension)

Instantiates a loader for the given extension.

Parameters:

Name Type Description Default
extension str

File extension (e.g., '.pdf', '.txt').

required

Returns:

Type Description
Optional[DocumentLoaderProtocol]

An instance of the specific loader, or None if not supported.