RAG Pipeline

Last updated: 2026-04-13

This document details the Retrieval-Augmented Generation (RAG) pipeline, focusing on how and why it is integrated into the agent’s decision-making process.

What is Retrieval-Augmented Generation (RAG)?

Retrieval-Augmented Generation (RAG) is a technique that enhances the capabilities of large language models (LLMs) by giving them access to an external knowledge base. Instead of relying solely on its training data, a RAG system first retrieves relevant information and then uses this information to generate a more accurate, evidence-based, and contextually relevant response.

RAG in the Monstermessenger Agent

In this project, RAG is used to ensure the agent’s advice is grounded in specific, trusted strategies for dealing with cyberviolence. It allows the agent to provide more than just generic support by referencing concrete information from our curated knowledge base.

The RAG workflow is orchestrated through a LangGraph ToolNode called research_tools. The give_advice node emits a tool call, which routes to research_tools, which returns results back to give_advice via the graph state.

1. Triggering the RAG Query

The RAG process is initiated within the give_advice node under specific conditions:

  • First Advice: When the agent is about to give its first piece of advice after collecting context, it forces a RAG query to ensure the initial response is well-informed.
  • Explicit Need: The give_advice node can also decide to trigger a RAG query if it determines more information is needed to answer a follow-up question.

2. Executing the RAG Query (research_educational_strategies)

The research_tools ToolNode executes the research_educational_strategies tool:

  1. It uses the LLM to generate multiple diverse research queries from the user’s situation and context.
  2. The async RAGService performs parallelised vector searches across all queries simultaneously.
  3. Retrieved text chunks are deduplicated across queries.

4. Relevance Assessment (Planned)

Note

Status: Inactive. The relevance assessment step is currently bypassed to reduce latency. All retrieved chunks are passed directly to the synthesis step.

Planned improvements for relevance assessment include: 1. Online Assessment: Performing assessment during the give_advice response generation using structured LLM output. 2. Asynchronous Assessment: Utilizing the cached (query, docs) tuples to perform relevance scoring in the background, gradually improving cache quality.

5. Using the Retrieved Context

The retrieved research_result (currently all chunks from search/cache) is returned to the give_advice node via the LangGraph state. The enrichment process is handled declaratively:

  1. The Node Manifest (give_advice.yaml) detects the research_results_ready flag.
  2. It automatically injects the research_result from the state into the system prompt using a dedicated formatter.
  3. The LLM generates the final user-facing advice, grounded in the retrieved knowledge.

The Retrieval Service (services/rag/rag_service.py)

RAGService is a multi-tenant async service:

  • Variant-Specific Collections: Automatically queries docs_youth (teenager) or docs_adult (parent) based on user_type.
  • Lazy PGEngine Initialisation: A PGEngine per collection is created on first access and cached in-process for the lifetime of the application.
  • PGVectorStore: Uses langchain_postgres.PGVectorStore (migrated from the older PGVector). Collections live in the dedicated rag schema.
  • Parallelised abatch_search: Multiple queries are dispatched concurrently using asyncio.gather, reducing total latency.

Known Workaround: Document hashability patch

LangChain’s Document class is a Pydantic BaseModel. Pydantic v2 sets __hash__ = None on all models by default because they are mutable, making Document instances unhashable and therefore unusable as dictionary keys or set members.

The semantic cache needs to deduplicate and key retrieved documents, so api/services/rag/__init__.py applies two complementary patches at import time:

from langchain_core.documents import Document
from langchain_core import documents

# Patch 1 — add __hash__ to the existing Document class
def doc_hash(self):
    return hash((self.page_content, frozenset(self.metadata.items())))

Document.__hash__ = doc_hash

# Patch 2 — replace Document in the module namespace with a subclass
class HashableDocument(Document):
    def __hash__(self):
        return hash((self.page_content, frozenset(self.metadata.items())))

documents.Document = HashableDocument

Why two patches? Patch 1 covers already-imported references to Document that hold the original class object. Patch 2 ensures that any code importing from langchain_core.documents import Document after this module is loaded also receives the hashable version.

The hash is computed from (page_content, frozenset(metadata.items())) — a stable, content-based hash that treats two documents with identical text and metadata as equal, which is the correct semantics for cache deduplication.

Configuration

The RAG pipeline behaviour can be tuned through api/config.py:

Setting Default Description
rag.no_queries (NO_RAG_QUERIES) 1 Number of diverse queries generated per RAG call (max 5). More queries improve coverage at the cost of latency.
rag.n_chunks 3 Number of chunks returned per query.
rag.search_type mmr Retrieval strategy (mmr for Maximum Marginal Relevance, or similarity).
rag.embeddings_model gemini-embedding-001 Embedding model used for both indexing and retrieval.

Knowledge Base Indexing

Warning

Note: The current indexing scripts are deprecated. They are preserved for reference but do not support all current variant naming conventions and multi-tenant schema requirements perfectly.

The knowledge base consists of .pdf documents stored in rag/raw_files/, split into youth/ and adult/ sub-directories. To index new documents (legacy method):

cd rag
# Index documents for the teenager/youth collection
python index_documents.py --variant teenager

# Index documents for the parent/adult collection
python index_documents.py --variant parent

# Index both
python index_documents.py --variant all

The indexing script splits documents into chunks, generates embeddings via the configured model, and stores them in the appropriate PGVectorStore collection.