Created
February 2, 2026 18:31
-
-
Save lorenzejay/85a46f363f387da3ca34b5f504f3e03d to your computer and use it in GitHub Desktop.
crewai output evals
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Opik Evaluation Module - Quality scoring for research outputs. | |
| Uses Opik's open source evaluation metrics to score research outputs: | |
| - Hallucination: Detects unsupported claims (lower is better) | |
| - AnswerRelevance: Measures relevance to the question (higher is better) | |
| - ContextRecall: Evaluates accuracy based on provided context (higher is better) | |
| - GEval: Custom LLM-as-judge criteria for comprehensiveness | |
| Reference: https://www.comet.com/docs/opik/python-sdk-reference/ | |
| """ | |
| import asyncio | |
| from concurrent.futures import ThreadPoolExecutor | |
| from typing import Any, Dict, List, Optional, TypedDict | |
| class MetricResult(TypedDict): | |
| """Result from a single Opik metric evaluation.""" | |
| score: float | |
| reason: str | |
| def run_opik_evaluation( | |
| question: str, | |
| output: str, | |
| sources: List[str], | |
| context_content: Optional[str] = None, | |
| agent_context: Optional[List[str]] = None, | |
| ) -> Dict[str, MetricResult]: | |
| """ | |
| Run Opik evaluation on research output and return scores with reasoning. | |
| OPTIMIZED: Uses parallel execution via asyncio for ~4x speedup. | |
| Args: | |
| question: The original research question | |
| output: The final research report | |
| sources: List of source URLs used | |
| context_content: Optional aggregated context from research (for context recall) | |
| agent_context: Optional list of agent messages/outputs as context strings | |
| Returns: | |
| Dictionary mapping metric names to MetricResult with score and reason | |
| """ | |
| # Use the parallel async version for speed | |
| try: | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| # If already in async context, use thread pool | |
| return _run_opik_evaluation_parallel_sync(question, output, sources, context_content, agent_context) | |
| else: | |
| return loop.run_until_complete( | |
| run_opik_evaluation_parallel(question, output, sources, context_content, agent_context) | |
| ) | |
| except RuntimeError: | |
| # No event loop, create new one | |
| return asyncio.run( | |
| run_opik_evaluation_parallel(question, output, sources, context_content, agent_context) | |
| ) | |
| def _extract_result(result: Any, metric_name: str) -> MetricResult: | |
| """Extract score and reason from an Opik ScoreResult.""" | |
| score = float(result.value) if result and hasattr(result, 'value') else 0.0 | |
| reason = "" | |
| if result and hasattr(result, 'reason'): | |
| reason = result.reason if isinstance(result.reason, str) else str(result.reason) | |
| return {"score": score, "reason": reason} | |
| def _run_opik_evaluation_parallel_sync( | |
| question: str, | |
| output: str, | |
| sources: List[str], | |
| context_content: Optional[str] = None, | |
| agent_context: Optional[List[str]] = None, | |
| ) -> Dict[str, MetricResult]: | |
| """ | |
| Synchronous parallel evaluation using ThreadPoolExecutor. | |
| Fallback when asyncio event loop is already running. | |
| """ | |
| try: | |
| from opik.evaluation.metrics import ( | |
| Hallucination, | |
| AnswerRelevance, | |
| ContextRecall, | |
| GEval, | |
| ) | |
| except ImportError: | |
| print("Warning: opik not installed. Install with: pip install opik") | |
| return _placeholder_scores() | |
| # Build context from all sources including agent messages | |
| context_list = _build_context_list(context_content, sources, agent_context) | |
| def score_hallucination(): | |
| try: | |
| metric = Hallucination() | |
| result = metric.score(input=question, output=output, context=context_list if context_list else [output]) | |
| metric_result = _extract_result(result, "hallucination") | |
| print(f" [Hallucination] Score: {metric_result['score']:.3f}") | |
| return ("hallucination", metric_result) | |
| except Exception as e: | |
| print(f" [Hallucination] Error: {e}") | |
| return ("hallucination", {"score": 0.0, "reason": f"Error: {e}"}) | |
| def score_relevance(): | |
| try: | |
| # AnswerRelevance requires context by default | |
| metric = AnswerRelevance(require_context=False) if not context_list else AnswerRelevance() | |
| result = metric.score( | |
| input=question, output=output, | |
| context=context_list if context_list else None | |
| ) | |
| metric_result = _extract_result(result, "answer_relevance") | |
| print(f" [AnswerRelevance] Score: {metric_result['score']:.3f}") | |
| return ("answer_relevance", metric_result) | |
| except Exception as e: | |
| print(f" [AnswerRelevance] Error: {e}") | |
| return ("answer_relevance", {"score": 0.0, "reason": f"Error: {e}"}) | |
| def score_context_recall(): | |
| try: | |
| metric = ContextRecall() | |
| result = metric.score( | |
| input=question, output=output, expected_output=question, | |
| context=context_list if context_list else [output] | |
| ) | |
| metric_result = _extract_result(result, "context_recall") | |
| print(f" [ContextRecall] Score: {metric_result['score']:.3f}") | |
| return ("context_recall", metric_result) | |
| except Exception as e: | |
| print(f" [ContextRecall] Error: {e}") | |
| return ("context_recall", {"score": 0.0, "reason": f"Error: {e}"}) | |
| def score_comprehensiveness(): | |
| try: | |
| metric = GEval( | |
| task_introduction="You are an expert judge evaluating the comprehensiveness of a research report.", | |
| evaluation_criteria=( | |
| "The output must comprehensively address the research question. " | |
| "Consider: Does it cover multiple perspectives? Does it provide depth of analysis? " | |
| "Does it address relevant sub-topics? Does it cite sources appropriately?" | |
| ), | |
| ) | |
| # Package question and output together as recommended by Opik docs | |
| payload = f"QUESTION: {question}\nOUTPUT: {output}" | |
| result = metric.score(output=payload) | |
| metric_result = _extract_result(result, "comprehensiveness") | |
| print(f" [Comprehensiveness] Score: {metric_result['score']:.3f}") | |
| return ("comprehensiveness", metric_result) | |
| except Exception as e: | |
| print(f" [Comprehensiveness] Error: {e}") | |
| return ("comprehensiveness", {"score": 0.0, "reason": f"Error: {e}"}) | |
| # Run all metrics in parallel using ThreadPoolExecutor | |
| print(" Running 4 metrics in parallel...") | |
| with ThreadPoolExecutor(max_workers=4) as executor: | |
| futures = [ | |
| executor.submit(score_hallucination), | |
| executor.submit(score_relevance), | |
| executor.submit(score_context_recall), | |
| executor.submit(score_comprehensiveness), | |
| ] | |
| results = [f.result() for f in futures] | |
| return dict(results) | |
| async def run_opik_evaluation_parallel( | |
| question: str, | |
| output: str, | |
| sources: List[str], | |
| context_content: Optional[str] = None, | |
| agent_context: Optional[List[str]] = None, | |
| ) -> Dict[str, MetricResult]: | |
| """ | |
| OPTIMIZED: Run all 4 Opik metrics in parallel using asyncio.gather(). | |
| This provides ~4x speedup compared to sequential evaluation since | |
| each metric is an independent LLM call. | |
| Args: | |
| question: The original research question | |
| output: The final research report | |
| sources: List of source URLs used | |
| context_content: Optional aggregated context from research | |
| agent_context: Optional list of agent messages/outputs as context strings | |
| Returns: | |
| Dictionary mapping metric names to MetricResult with score and reason | |
| """ | |
| try: | |
| from opik.evaluation.metrics import ( | |
| Hallucination, | |
| AnswerRelevance, | |
| ContextRecall, | |
| GEval, | |
| ) | |
| except ImportError: | |
| print("Warning: opik not installed. Returning placeholder scores.") | |
| return _placeholder_scores() | |
| # Build context from all sources including agent messages | |
| context_list = _build_context_list(context_content, sources, agent_context) | |
| # Define async scoring functions | |
| async def score_hallucination(): | |
| try: | |
| metric = Hallucination() | |
| result = await metric.ascore( | |
| input=question, output=output, | |
| context=context_list if context_list else [output] | |
| ) | |
| metric_result = _extract_result(result, "hallucination") | |
| print(f" [Hallucination] Score: {metric_result['score']:.3f}") | |
| return ("hallucination", metric_result) | |
| except Exception as e: | |
| print(f" [Hallucination] Error: {e}") | |
| return ("hallucination", {"score": 0.0, "reason": f"Error: {e}"}) | |
| async def score_relevance(): | |
| try: | |
| # AnswerRelevance requires context by default | |
| metric = AnswerRelevance(require_context=False) if not context_list else AnswerRelevance() | |
| result = await metric.ascore( | |
| input=question, output=output, | |
| context=context_list if context_list else None | |
| ) | |
| metric_result = _extract_result(result, "answer_relevance") | |
| print(f" [AnswerRelevance] Score: {metric_result['score']:.3f}") | |
| return ("answer_relevance", metric_result) | |
| except Exception as e: | |
| print(f" [AnswerRelevance] Error: {e}") | |
| return ("answer_relevance", {"score": 0.0, "reason": f"Error: {e}"}) | |
| async def score_context_recall(): | |
| try: | |
| metric = ContextRecall() | |
| result = await metric.ascore( | |
| input=question, output=output, expected_output=question, | |
| context=context_list if context_list else [output] | |
| ) | |
| metric_result = _extract_result(result, "context_recall") | |
| print(f" [ContextRecall] Score: {metric_result['score']:.3f}") | |
| return ("context_recall", metric_result) | |
| except Exception as e: | |
| print(f" [ContextRecall] Error: {e}") | |
| return ("context_recall", {"score": 0.0, "reason": f"Error: {e}"}) | |
| async def score_comprehensiveness(): | |
| try: | |
| metric = GEval( | |
| task_introduction="You are an expert judge evaluating the comprehensiveness of a research report.", | |
| evaluation_criteria=( | |
| "The output must comprehensively address the research question. " | |
| "Consider: Does it cover multiple perspectives? Does it provide depth of analysis? " | |
| "Does it address relevant sub-topics? Does it cite sources appropriately?" | |
| ), | |
| ) | |
| # Package question and output together as recommended by Opik docs | |
| payload = f"QUESTION: {question}\nOUTPUT: {output}" | |
| result = await metric.ascore(output=payload) | |
| metric_result = _extract_result(result, "comprehensiveness") | |
| print(f" [Comprehensiveness] Score: {metric_result['score']:.3f}") | |
| return ("comprehensiveness", metric_result) | |
| except Exception as e: | |
| print(f" [Comprehensiveness] Error: {e}") | |
| return ("comprehensiveness", {"score": 0.0, "reason": f"Error: {e}"}) | |
| # Run all 4 metrics in parallel | |
| print(" Running 4 metrics in parallel (async)...") | |
| results = await asyncio.gather( | |
| score_hallucination(), | |
| score_relevance(), | |
| score_context_recall(), | |
| score_comprehensiveness(), | |
| ) | |
| return dict(results) | |
| def _build_context_list( | |
| context_content: Optional[str], | |
| sources: List[str], | |
| agent_context: Optional[List[str]] = None, | |
| ) -> List[str]: | |
| """ | |
| Build context list from all available sources. | |
| Args: | |
| context_content: Aggregated text content from research | |
| sources: List of source URLs | |
| agent_context: List of formatted agent messages/outputs | |
| Returns: | |
| Combined list of context strings for Opik evaluation | |
| """ | |
| context_list = [] | |
| # Add aggregated content (research findings text) | |
| if context_content: | |
| context_list.append(context_content) | |
| # Add agent messages/outputs (includes reasoning, tool usage, etc.) | |
| if agent_context: | |
| context_list.extend(agent_context[:20]) # Limit to 20 agent context items | |
| # Add source URLs | |
| if sources: | |
| context_list.extend([f"Source: {url}" for url in sources[:10]]) | |
| return context_list | |
| def _placeholder_scores() -> Dict[str, MetricResult]: | |
| """Return placeholder scores when opik is not available.""" | |
| return { | |
| "hallucination": {"score": 0.0, "reason": "Opik not installed"}, | |
| "answer_relevance": {"score": 0.0, "reason": "Opik not installed"}, | |
| "context_recall": {"score": 0.0, "reason": "Opik not installed"}, | |
| "comprehensiveness": {"score": 0.0, "reason": "Opik not installed"}, | |
| } | |
| # Keep the old async function as an alias for backward compatibility | |
| run_opik_evaluation_async = run_opik_evaluation_parallel |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
example usage: