Every RAG implementation must provide a query_for_evaluation() function:
def query_for_evaluation( question: str, llm_model: str = None, custom_llm: Optional[BaseChatModel] = None) -> dict: """ Process a question and return results for evaluation. Args: question: The question to process llm_model: Model name string (e.g., "gpt-4o") custom_llm: Pre-configured language model instance Returns: dict: { "question": str, "answer": str, "contexts": List[str], "metadata": dict } """
# Example: QA prompt for answer generationqa_template = """You are a medical expert specializing in pregnancy and childbirth.Your task is to analyze the provided medical context and answer the user's question accurately and concisely.STRICT INSTRUCTIONS:1. Base your answer exclusively on the MEDICAL CONTEXT section.2. The context is ordered by relevance. Prioritize early documents.3. Provide a direct and integrated answer in a single paragraph.4. If insufficient information exists, state that clearly.5. Always answer in Spanish.MEDICAL CONTEXT (ordered by relevance):{context}QUESTION: {question}DETAILED MEDICAL ANSWER:"""qa_prompt = ChatPromptTemplate.from_template(qa_template)
4
Implement Core Processing Function
Create the main processing function for your strategy:
def format_docs(docs: List[Document]) -> str: """Format retrieved documents for the prompt.""" formatted_docs = [] for i, doc in enumerate(docs): source = doc.metadata.get('source', 'N/A') page = doc.metadata.get('page_number', 'N/A') formatted_doc = f"""--- Document {i+1} ---Source: {source}, Page: {page}Content: {doc.page_content}""" formatted_docs.append(formatted_doc) return "\n\n".join(formatted_docs)def process_my_rag_query( query: str, custom_llm: ChatOpenAI = None) -> Dict[str, Any]: """ Process a query using your novel RAG strategy. Args: query: The user's question custom_llm: Optional custom LLM to use Returns: Dictionary with answer, contexts, and metrics """ # 1. Implement your retrieval strategy here # Example: standard retrieval retrieved_docs = retriever.invoke(query) # 2. Format context formatted_context = format_docs(retrieved_docs) # 3. Generate answer current_llm = custom_llm if custom_llm else llm response = current_llm.invoke(qa_prompt.format_messages( context=formatted_context, question=query )) # 4. Extract usage metrics usage = extract_usage_from_ai_message(response) provider_cost = extract_cost_from_ai_message(response) # 5. Return results return { 'answer': response.content, 'contexts': [doc.page_content for doc in retrieved_docs], 'retrieved_documents': retrieved_docs, 'metrics': { 'input_tokens': int(usage['input_tokens']), 'output_tokens': int(usage['output_tokens']), 'total_tokens': int(usage['total_tokens']), 'usage_source': str(usage['usage_source']), 'cost': float(provider_cost['total_cost']) if provider_cost['total_cost'] is not None else 0.0, 'cost_source': str(provider_cost['cost_source']) } }
5
Implement Evaluation Wrapper
Create the required query_for_evaluation() function:
def query_for_evaluation( question: str, llm_model: str = None, custom_llm: Optional[BaseChatModel] = None) -> dict: """ Wrapper function for RAG evaluation frameworks like RAGAS. This function ensures compatibility with the evaluation pipeline. Args: question: The question to process llm_model: Model name string custom_llm: Pre-configured language model Returns: Dictionary structured for evaluation """ start_time = time.time() # Determine which LLM to use if custom_llm: result = process_my_rag_query(question, custom_llm) model_identity = get_model_identity(llm=custom_llm) elif llm_model: custom_llm_instance = ChatOpenAI(model_name=llm_model, temperature=0) result = process_my_rag_query(question, custom_llm_instance) model_identity = get_model_identity(model_name=llm_model, llm=custom_llm_instance) else: result = process_my_rag_query(question) model_identity = get_model_identity(model_name="gpt-4o", llm=llm) end_time = time.time() execution_time = end_time - start_time # Resolve costs input_tokens = result["metrics"]["input_tokens"] output_tokens = result["metrics"]["output_tokens"] resolved_cost = resolve_total_cost( provider=model_identity["provider"], model_name=model_identity["model_name"], model_id=model_identity["model_id"], input_tokens=input_tokens, output_tokens=output_tokens, provider_reported_cost=result["metrics"]["cost"], provider_cost_source=result["metrics"]["cost_source"], execution_time_seconds=execution_time, ) return { "question": question, "answer": result["answer"], "contexts": result["contexts"], "source_documents": result["retrieved_documents"], "metadata": { "num_contexts": len(result["contexts"]), "retrieval_method": "my_novel_strategy", "llm_model": model_identity["model_name"], "provider": model_identity["provider"], "model_id": model_identity["model_id"], "embedding_model": "text-embedding-3-small", "execution_time": execution_time, "input_tokens": input_tokens, "output_tokens": output_tokens, "total_cost": resolved_cost["total_cost"], "tokens_used": input_tokens + output_tokens, "usage_source": result["metrics"]["usage_source"], "cost_source": resolved_cost["cost_source"], } }
6
Integrate with Evaluator
Register your RAG architecture in src/evaluation/ragas_evaluator.py:
# Add import at top of filefrom src.rag.my_rag import query_for_evaluation as my_rag_query_for_evaluation# In RAGASEvaluator.__init__() method, add new case:elif rag_type.lower() == "my-rag": self.query_function = my_rag_query_for_evaluation self.rag_name = "My Novel RAG Strategy" self.rag_type = "my-rag" self.llm_model = "gpt-4o"
# tests/test_my_rag.pyimport pytestfrom src.rag.my_rag import query_for_evaluationdef test_query_for_evaluation(): """Test basic query processing""" result = query_for_evaluation( "¿Cuál es la cantidad ideal de controles prenatales?" ) assert "question" in result assert "answer" in result assert "contexts" in result assert "metadata" in result assert len(result["contexts"]) > 0 assert result["metadata"]["retrieval_method"] == "my_novel_strategy"