Skip to content

Retrieval-Augmented Generation (RAG)

Overview

The RAG module provides an orchestration service for combining semantic search (Memory) with generative capabilities (LLM). It implements a Stream-First architecture designed for high-performance user experiences, where users need to verify sources ("citations") immediately while the answer is being generated.

Key Features

  • True Streaming: Yields retrieval sources immediately, then streams the LLM answer token-by-token.
  • Deterministic Generation: Enforces strict parameters (temperature=0.0) on the LLM to minimize hallucinations based on retrieved context.
  • Strategy Pattern: Decouples Retrieval (getting data) from Generation (synthesizing answer), allowing easy swapping of algorithms (e.g., "Stuff", "Map-Reduce").
  • Observability: Full pipeline tracing (rag.retrieve -> rag.generate) with granular metrics for retrieval quality and generation latency.

Architecture

The RAG Pipeline

  1. Retrieval Phase:
    • The RetrievalStrategy executes (e.g., Vector Search).
    • Converts raw memory results into standardized RAGSource objects.
  2. Augmentation Phase:
    • The GenerationStrategy.augment_prompt() method constructs the final prompt, injecting the sources into the template (e.g., XML formatting <doc id='1'>...</doc>).
  3. Generation Phase:
    • The service invokes the LLM using the augmented prompt.
    • Response chunks are yielded to the client.

Usage Example

from nala.athomic.ai.rag.factory import RAGFactory
from nala.athomic.ai.schemas.rag import RAGRequest

async def ask_knowledge_base(question: str):
    # 1. Initialize Service
    rag_service = RAGFactory.create()
    await rag_service.connect()

    request = RAGRequest(query=question)
    print(f"Question: {question}")

    # 2. Iterate over RAGResponseChunk objects
    async for chunk in rag_service.stream_response(request):

        # A. Sources arrive first (Immediate Feedback)
        if chunk.sources:
            doc_ids = [s.document_id for s in chunk.sources]
            print(f"n[Sources Found]: {doc_ids}")

        # B. Content streams in real-time
        if chunk.content_delta:
            print(chunk.content_delta, end="", flush=True)

        # C. Final metadata (Latency, Tokens, etc.)
        if chunk.is_final:
            print(f"\n\n[Done] Latency: {chunk.metadata['latency_ms']}ms")

Configuration

RAG is configured under [ai.rag] in settings.toml.

[default.ai.rag]
enabled = true

# Strategies
retrieval_strategy = "vector_similarity"
generation_strategy = "stuff" # Concatenates context into prompt

# Prompting
default_prompt_template = "rag/default_qa"

# Constraints
max_context_length = 4000

API Reference

nala.athomic.ai.rag.service.RAGService

Bases: BaseService

Concrete implementation of the RAG pipeline with full observability.

Integrates: - Semantic Memory (Retrieval) via Strategies. - Prompt Engineering (Augmentation) via PromptService. - LLM (Generation) via Strategies. - Prometheus Metrics & OpenTelemetry Tracing.

generate_response(request) async

Executes the full RAG pipeline: Retrieve -> Augment -> Generate (Non-streaming).

Parameters:

Name Type Description Default
request RAGRequest

The RAG request containing the query and optional model override.

required

retrieve_context(request) async

Executes ONLY the retrieval step (Public API). Delegates to the internal helper which handles metrics/errors.

Returns:

Type Description
List[RAGSource]

List[RAGSource]: The retrieved document snippets.

stream_response(request) async

Executes the pipeline and yields RAGResponseChunks for real-time streaming.

Parameters:

Name Type Description Default
request RAGRequest

The RAG request containing the query and optional model override.

required

Yields: AsyncIterator[RAGResponseChunk]: Stream of chunks including sources, content delta, and final metadata.

nala.athomic.ai.schemas.rag.RAGRequest

Bases: BaseModel

Standard input payload for RAG operations. This is the complete replacement for the legacy 'RAGRequest'.

nala.athomic.ai.schemas.rag.RAGResponseChunk

Bases: BaseModel

Represents an incremental chunk of the final RAG response during streaming.

nala.athomic.ai.rag.generation.protocol.GenerationStrategyProtocol

Bases: Protocol

Protocol defining the contract for RAG generation strategies (e.g., Stuff, Map-Reduce).

augment_prompt(query, sources, prompt_service, template_name, **kwargs)

Augments the raw user query with the retrieved sources to create the final, context-aware prompt string. This is a synchronous operation.

Parameters:

Name Type Description Default
query str

The user's original query.

required
sources List[RAGSource]

The retrieved context documents (RAGSource objects).

required
prompt_service PromptService

Service to render the final prompt template.

required
template_name str

The template to use (e.g., 'rag/default_qa').

required
**kwargs Any

Additional parameters.

{}

Returns:

Type Description
str

The final prompt string ready to be sent to the LLM.

generate(query, sources, llm, prompt_service, template_name, **kwargs) async

Generates the final answer based on the query and retrieved sources.

Parameters:

Name Type Description Default
query str

The user's original query.

required
sources List[RAGSource]

The retrieved context documents (RAGSource objects).

required
llm BaseLLM

The LLM provider instance to use for completion.

required
prompt_service PromptService

Service to render the final prompt template.

required
template_name str

The template to use (e.g., 'rag/default_qa').

required
**kwargs Any

Additional generation parameters.

{}

Returns:

Type Description
str

The final generated answer string.