Retrieval-Augmented Generation (RAG)
Overview
The RAG module provides an orchestration service for combining semantic search (Memory) with generative capabilities (LLM). It implements a Stream-First architecture designed for high-performance user experiences, where users need to verify sources ("citations") immediately while the answer is being generated.
Key Features
- True Streaming: Yields retrieval sources immediately, then streams the LLM answer token-by-token.
- Deterministic Generation: Enforces strict parameters (
temperature=0.0) on the LLM to minimize hallucinations based on retrieved context. - Strategy Pattern: Decouples Retrieval (getting data) from Generation (synthesizing answer), allowing easy swapping of algorithms (e.g., "Stuff", "Map-Reduce").
- Observability: Full pipeline tracing (
rag.retrieve->rag.generate) with granular metrics for retrieval quality and generation latency.
Architecture
The RAG Pipeline
- Retrieval Phase:
- The
RetrievalStrategyexecutes (e.g., Vector Search). - Converts raw memory results into standardized
RAGSourceobjects.
- The
- Augmentation Phase:
- The
GenerationStrategy.augment_prompt()method constructs the final prompt, injecting the sources into the template (e.g., XML formatting<doc id='1'>...</doc>).
- The
- Generation Phase:
- The service invokes the LLM using the augmented prompt.
- Response chunks are yielded to the client.
Usage Example
Streaming Response (Recommended)
from nala.athomic.ai.rag.factory import RAGFactory
from nala.athomic.ai.schemas.rag import RAGRequest
async def ask_knowledge_base(question: str):
# 1. Initialize Service
rag_service = RAGFactory.create()
await rag_service.connect()
request = RAGRequest(query=question)
print(f"Question: {question}")
# 2. Iterate over RAGResponseChunk objects
async for chunk in rag_service.stream_response(request):
# A. Sources arrive first (Immediate Feedback)
if chunk.sources:
doc_ids = [s.document_id for s in chunk.sources]
print(f"n[Sources Found]: {doc_ids}")
# B. Content streams in real-time
if chunk.content_delta:
print(chunk.content_delta, end="", flush=True)
# C. Final metadata (Latency, Tokens, etc.)
if chunk.is_final:
print(f"\n\n[Done] Latency: {chunk.metadata['latency_ms']}ms")
Configuration
RAG is configured under [ai.rag] in settings.toml.
[default.ai.rag]
enabled = true
# Strategies
retrieval_strategy = "vector_similarity"
generation_strategy = "stuff" # Concatenates context into prompt
# Prompting
default_prompt_template = "rag/default_qa"
# Constraints
max_context_length = 4000
API Reference
nala.athomic.ai.rag.service.RAGService
Bases: BaseService
Concrete implementation of the RAG pipeline with full observability.
Integrates: - Semantic Memory (Retrieval) via Strategies. - Prompt Engineering (Augmentation) via PromptService. - LLM (Generation) via Strategies. - Prometheus Metrics & OpenTelemetry Tracing.
generate_response(request)
async
Executes the full RAG pipeline: Retrieve -> Augment -> Generate (Non-streaming).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
RAGRequest
|
The RAG request containing the query and optional model override. |
required |
retrieve_context(request)
async
Executes ONLY the retrieval step (Public API). Delegates to the internal helper which handles metrics/errors.
Returns:
| Type | Description |
|---|---|
List[RAGSource]
|
List[RAGSource]: The retrieved document snippets. |
stream_response(request)
async
Executes the pipeline and yields RAGResponseChunks for real-time streaming.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
RAGRequest
|
The RAG request containing the query and optional model override. |
required |
Yields: AsyncIterator[RAGResponseChunk]: Stream of chunks including sources, content delta, and final metadata.
nala.athomic.ai.schemas.rag.RAGRequest
Bases: BaseModel
Standard input payload for RAG operations. This is the complete replacement for the legacy 'RAGRequest'.
nala.athomic.ai.schemas.rag.RAGResponseChunk
Bases: BaseModel
Represents an incremental chunk of the final RAG response during streaming.
nala.athomic.ai.rag.generation.protocol.GenerationStrategyProtocol
Bases: Protocol
Protocol defining the contract for RAG generation strategies (e.g., Stuff, Map-Reduce).
augment_prompt(query, sources, prompt_service, template_name, **kwargs)
Augments the raw user query with the retrieved sources to create the final, context-aware prompt string. This is a synchronous operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
The user's original query. |
required |
sources
|
List[RAGSource]
|
The retrieved context documents (RAGSource objects). |
required |
prompt_service
|
PromptService
|
Service to render the final prompt template. |
required |
template_name
|
str
|
The template to use (e.g., 'rag/default_qa'). |
required |
**kwargs
|
Any
|
Additional parameters. |
{}
|
Returns:
| Type | Description |
|---|---|
str
|
The final prompt string ready to be sent to the LLM. |
generate(query, sources, llm, prompt_service, template_name, **kwargs)
async
Generates the final answer based on the query and retrieved sources.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
The user's original query. |
required |
sources
|
List[RAGSource]
|
The retrieved context documents (RAGSource objects). |
required |
llm
|
BaseLLM
|
The LLM provider instance to use for completion. |
required |
prompt_service
|
PromptService
|
Service to render the final prompt template. |
required |
template_name
|
str
|
The template to use (e.g., 'rag/default_qa'). |
required |
**kwargs
|
Any
|
Additional generation parameters. |
{}
|
Returns:
| Type | Description |
|---|---|
str
|
The final generated answer string. |