Skip to content

Instantly share code, notes, and snippets.

@lorenzejay
Created February 2, 2026 18:31
Show Gist options
  • Select an option

  • Save lorenzejay/85a46f363f387da3ca34b5f504f3e03d to your computer and use it in GitHub Desktop.

Select an option

Save lorenzejay/85a46f363f387da3ca34b5f504f3e03d to your computer and use it in GitHub Desktop.
crewai output evals
"""
Opik Evaluation Module - Quality scoring for research outputs.
Uses Opik's open source evaluation metrics to score research outputs:
- Hallucination: Detects unsupported claims (lower is better)
- AnswerRelevance: Measures relevance to the question (higher is better)
- ContextRecall: Evaluates accuracy based on provided context (higher is better)
- GEval: Custom LLM-as-judge criteria for comprehensiveness
Reference: https://www.comet.com/docs/opik/python-sdk-reference/
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional, TypedDict
class MetricResult(TypedDict):
"""Result from a single Opik metric evaluation."""
score: float
reason: str
def run_opik_evaluation(
question: str,
output: str,
sources: List[str],
context_content: Optional[str] = None,
agent_context: Optional[List[str]] = None,
) -> Dict[str, MetricResult]:
"""
Run Opik evaluation on research output and return scores with reasoning.
OPTIMIZED: Uses parallel execution via asyncio for ~4x speedup.
Args:
question: The original research question
output: The final research report
sources: List of source URLs used
context_content: Optional aggregated context from research (for context recall)
agent_context: Optional list of agent messages/outputs as context strings
Returns:
Dictionary mapping metric names to MetricResult with score and reason
"""
# Use the parallel async version for speed
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If already in async context, use thread pool
return _run_opik_evaluation_parallel_sync(question, output, sources, context_content, agent_context)
else:
return loop.run_until_complete(
run_opik_evaluation_parallel(question, output, sources, context_content, agent_context)
)
except RuntimeError:
# No event loop, create new one
return asyncio.run(
run_opik_evaluation_parallel(question, output, sources, context_content, agent_context)
)
def _extract_result(result: Any, metric_name: str) -> MetricResult:
"""Extract score and reason from an Opik ScoreResult."""
score = float(result.value) if result and hasattr(result, 'value') else 0.0
reason = ""
if result and hasattr(result, 'reason'):
reason = result.reason if isinstance(result.reason, str) else str(result.reason)
return {"score": score, "reason": reason}
def _run_opik_evaluation_parallel_sync(
question: str,
output: str,
sources: List[str],
context_content: Optional[str] = None,
agent_context: Optional[List[str]] = None,
) -> Dict[str, MetricResult]:
"""
Synchronous parallel evaluation using ThreadPoolExecutor.
Fallback when asyncio event loop is already running.
"""
try:
from opik.evaluation.metrics import (
Hallucination,
AnswerRelevance,
ContextRecall,
GEval,
)
except ImportError:
print("Warning: opik not installed. Install with: pip install opik")
return _placeholder_scores()
# Build context from all sources including agent messages
context_list = _build_context_list(context_content, sources, agent_context)
def score_hallucination():
try:
metric = Hallucination()
result = metric.score(input=question, output=output, context=context_list if context_list else [output])
metric_result = _extract_result(result, "hallucination")
print(f" [Hallucination] Score: {metric_result['score']:.3f}")
return ("hallucination", metric_result)
except Exception as e:
print(f" [Hallucination] Error: {e}")
return ("hallucination", {"score": 0.0, "reason": f"Error: {e}"})
def score_relevance():
try:
# AnswerRelevance requires context by default
metric = AnswerRelevance(require_context=False) if not context_list else AnswerRelevance()
result = metric.score(
input=question, output=output,
context=context_list if context_list else None
)
metric_result = _extract_result(result, "answer_relevance")
print(f" [AnswerRelevance] Score: {metric_result['score']:.3f}")
return ("answer_relevance", metric_result)
except Exception as e:
print(f" [AnswerRelevance] Error: {e}")
return ("answer_relevance", {"score": 0.0, "reason": f"Error: {e}"})
def score_context_recall():
try:
metric = ContextRecall()
result = metric.score(
input=question, output=output, expected_output=question,
context=context_list if context_list else [output]
)
metric_result = _extract_result(result, "context_recall")
print(f" [ContextRecall] Score: {metric_result['score']:.3f}")
return ("context_recall", metric_result)
except Exception as e:
print(f" [ContextRecall] Error: {e}")
return ("context_recall", {"score": 0.0, "reason": f"Error: {e}"})
def score_comprehensiveness():
try:
metric = GEval(
task_introduction="You are an expert judge evaluating the comprehensiveness of a research report.",
evaluation_criteria=(
"The output must comprehensively address the research question. "
"Consider: Does it cover multiple perspectives? Does it provide depth of analysis? "
"Does it address relevant sub-topics? Does it cite sources appropriately?"
),
)
# Package question and output together as recommended by Opik docs
payload = f"QUESTION: {question}\nOUTPUT: {output}"
result = metric.score(output=payload)
metric_result = _extract_result(result, "comprehensiveness")
print(f" [Comprehensiveness] Score: {metric_result['score']:.3f}")
return ("comprehensiveness", metric_result)
except Exception as e:
print(f" [Comprehensiveness] Error: {e}")
return ("comprehensiveness", {"score": 0.0, "reason": f"Error: {e}"})
# Run all metrics in parallel using ThreadPoolExecutor
print(" Running 4 metrics in parallel...")
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(score_hallucination),
executor.submit(score_relevance),
executor.submit(score_context_recall),
executor.submit(score_comprehensiveness),
]
results = [f.result() for f in futures]
return dict(results)
async def run_opik_evaluation_parallel(
question: str,
output: str,
sources: List[str],
context_content: Optional[str] = None,
agent_context: Optional[List[str]] = None,
) -> Dict[str, MetricResult]:
"""
OPTIMIZED: Run all 4 Opik metrics in parallel using asyncio.gather().
This provides ~4x speedup compared to sequential evaluation since
each metric is an independent LLM call.
Args:
question: The original research question
output: The final research report
sources: List of source URLs used
context_content: Optional aggregated context from research
agent_context: Optional list of agent messages/outputs as context strings
Returns:
Dictionary mapping metric names to MetricResult with score and reason
"""
try:
from opik.evaluation.metrics import (
Hallucination,
AnswerRelevance,
ContextRecall,
GEval,
)
except ImportError:
print("Warning: opik not installed. Returning placeholder scores.")
return _placeholder_scores()
# Build context from all sources including agent messages
context_list = _build_context_list(context_content, sources, agent_context)
# Define async scoring functions
async def score_hallucination():
try:
metric = Hallucination()
result = await metric.ascore(
input=question, output=output,
context=context_list if context_list else [output]
)
metric_result = _extract_result(result, "hallucination")
print(f" [Hallucination] Score: {metric_result['score']:.3f}")
return ("hallucination", metric_result)
except Exception as e:
print(f" [Hallucination] Error: {e}")
return ("hallucination", {"score": 0.0, "reason": f"Error: {e}"})
async def score_relevance():
try:
# AnswerRelevance requires context by default
metric = AnswerRelevance(require_context=False) if not context_list else AnswerRelevance()
result = await metric.ascore(
input=question, output=output,
context=context_list if context_list else None
)
metric_result = _extract_result(result, "answer_relevance")
print(f" [AnswerRelevance] Score: {metric_result['score']:.3f}")
return ("answer_relevance", metric_result)
except Exception as e:
print(f" [AnswerRelevance] Error: {e}")
return ("answer_relevance", {"score": 0.0, "reason": f"Error: {e}"})
async def score_context_recall():
try:
metric = ContextRecall()
result = await metric.ascore(
input=question, output=output, expected_output=question,
context=context_list if context_list else [output]
)
metric_result = _extract_result(result, "context_recall")
print(f" [ContextRecall] Score: {metric_result['score']:.3f}")
return ("context_recall", metric_result)
except Exception as e:
print(f" [ContextRecall] Error: {e}")
return ("context_recall", {"score": 0.0, "reason": f"Error: {e}"})
async def score_comprehensiveness():
try:
metric = GEval(
task_introduction="You are an expert judge evaluating the comprehensiveness of a research report.",
evaluation_criteria=(
"The output must comprehensively address the research question. "
"Consider: Does it cover multiple perspectives? Does it provide depth of analysis? "
"Does it address relevant sub-topics? Does it cite sources appropriately?"
),
)
# Package question and output together as recommended by Opik docs
payload = f"QUESTION: {question}\nOUTPUT: {output}"
result = await metric.ascore(output=payload)
metric_result = _extract_result(result, "comprehensiveness")
print(f" [Comprehensiveness] Score: {metric_result['score']:.3f}")
return ("comprehensiveness", metric_result)
except Exception as e:
print(f" [Comprehensiveness] Error: {e}")
return ("comprehensiveness", {"score": 0.0, "reason": f"Error: {e}"})
# Run all 4 metrics in parallel
print(" Running 4 metrics in parallel (async)...")
results = await asyncio.gather(
score_hallucination(),
score_relevance(),
score_context_recall(),
score_comprehensiveness(),
)
return dict(results)
def _build_context_list(
context_content: Optional[str],
sources: List[str],
agent_context: Optional[List[str]] = None,
) -> List[str]:
"""
Build context list from all available sources.
Args:
context_content: Aggregated text content from research
sources: List of source URLs
agent_context: List of formatted agent messages/outputs
Returns:
Combined list of context strings for Opik evaluation
"""
context_list = []
# Add aggregated content (research findings text)
if context_content:
context_list.append(context_content)
# Add agent messages/outputs (includes reasoning, tool usage, etc.)
if agent_context:
context_list.extend(agent_context[:20]) # Limit to 20 agent context items
# Add source URLs
if sources:
context_list.extend([f"Source: {url}" for url in sources[:10]])
return context_list
def _placeholder_scores() -> Dict[str, MetricResult]:
"""Return placeholder scores when opik is not available."""
return {
"hallucination": {"score": 0.0, "reason": "Opik not installed"},
"answer_relevance": {"score": 0.0, "reason": "Opik not installed"},
"context_recall": {"score": 0.0, "reason": "Opik not installed"},
"comprehensiveness": {"score": 0.0, "reason": "Opik not installed"},
}
# Keep the old async function as an alias for backward compatibility
run_opik_evaluation_async = run_opik_evaluation_parallel
@lorenzejay
Copy link
Author

example usage:

raw_scores = run_opik_evaluation(
                question=self.state.research_question,
                output=self.state.final_report,
                sources=self.state.all_sources,
                context_content=context_content,
                agent_context=agent_context,
            )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment