Document Ingestion (RAG ETL)
Overview
The Document Ingestion module provides a high-level orchestration service for transforming unstructured data (PDFs, Text files) into semantic vectors persisted in a Vector Store. It implements a complete Extract-Transform-Load (ETL) pipeline designed for Retrieval-Augmented Generation (RAG).
Key Features
- Modular Loaders: Support for multiple file formats (
.txt,.pdf,.md) via a plugin-like registry. - Semantic Splitting: Uses
RecursiveCharacterTextSplitterto break documents into chunks while preserving semantic context and metadata. - Managed Pipeline: Coordinates loading, splitting, embedding, and upserting in a single method call.
- Observability: Metrics tracks pages loaded, chunks created, and processing latency.
How It Works
- Extract (Load): The service detects the file extension and selects the appropriate
DocumentLoader(e.g.,PDFLoaderusingpypdf) to extract raw text and metadata (page numbers, filenames). - Transform (Split): The raw text is passed to a
TextSplitterwhich divides it into chunks based on configurablechunk_sizeandchunk_overlap. - Load (Persist): The chunks are sent to the
SemanticMemoryService, which generates embeddings and upserts them into the configuredVectorStore.
Usage Example
from nala.athomic.ai.documents import DocumentIngestionFactory
async def ingest_knowledge_base(file_path: str):
# 1. Get the pre-configured service
ingestion_service = DocumentIngestionFactory.create_default()
await ingestion_service.connect()
# 2. Ingest the file
# Handles loading, splitting, embedding, and storage automatically.
memory_ids = await ingestion_service.ingest(
source=file_path,
filename="company_policy.pdf",
metadata={"category": "hr_policy", "version": "1.0"}
)
print(f"Successfully created {len(memory_ids)} vector records.")
Configuration
Configure ingestion parameters under [ai.documents] in settings.toml.
[default.ai.documents]
enabled = true
# Chunking strategy parameters
default_chunk_size = 1000
default_chunk_overlap = 200
API Reference
nala.athomic.ai.documents.service.DocumentIngestionService
Bases: BaseService
Orchestrates the document ingestion pipeline.
This service coordinates the loading, splitting, and persisting of documents into the semantic memory system. It acts as a Facade over the specialized components (Loaders, Splitters, Memory).
Flow: 1. Resolve Loader based on file extension. 2. Load raw Documents. 3. Split Documents into Chunks. 4. Persist Chunks to Semantic Memory (Embed + Store).
__init__(memory_service)
Initialize the ingestion service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
memory_service
|
SemanticMemoryService
|
The initialized SemanticMemoryService instance used for embedding and storing the document chunks. |
required |
ingest(source, filename, metadata=None, collection_name=None)
async
Ingests a document into the semantic memory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
Union[str, Path, bytes]
|
The file path or raw bytes. |
required |
filename
|
str
|
Name of the file (critical for extension detection). |
required |
metadata
|
Optional[Dict[str, Any]]
|
Additional context tags (e.g., user_id, category). |
None
|
collection_name
|
Optional[str]
|
Target vector store collection (currently unused due to MemoryService limitations). |
None
|
Returns:
| Type | Description |
|---|---|
List[str]
|
List of generated Memory IDs. |
Raises:
| Type | Description |
|---|---|
UnsupportedFileFormatError
|
If no loader exists for the file extension. |
DocumentIngestionError
|
If any step of the pipeline fails. |
nala.athomic.ai.documents.factory.DocumentIngestionFactory
Factory responsible for creating fully configured DocumentIngestionService instances.
It acts as a composition root, wiring together the Vector Store, Embedding Model (via Manager), and Memory Service.
create_default()
classmethod
Creates a DocumentIngestionService using the default application settings.
This method leverages the global Managers (EmbeddingManager) to resolve shared resources, ensuring consistent configuration across the app.
Returns:
| Type | Description |
|---|---|
DocumentIngestionService
|
A configured and ready-to-connect DocumentIngestionService. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If required configurations or providers are missing. |
RuntimeError
|
If modules are disabled. |
nala.athomic.ai.documents.registry.DocumentLoaderRegistry
Bases: BaseRegistry[Type[DocumentLoaderProtocol]]
Registry for Document Loaders mapped by file extension. Allows runtime resolution of the correct strategy to read a file.
get_loader_for_extension(extension)
Instantiates a loader for the given extension.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
extension
|
str
|
File extension (e.g., '.pdf', '.txt'). |
required |
Returns:
| Type | Description |
|---|---|
Optional[DocumentLoaderProtocol]
|
An instance of the specific loader, or None if not supported. |