Skip to content

Instantly share code, notes, and snippets.

@adhishthite
Created March 30, 2025 18:35
Show Gist options
  • Select an option

  • Save adhishthite/905abd5e339a26809c603b2f6a117ec5 to your computer and use it in GitHub Desktop.

Select an option

Save adhishthite/905abd5e339a26809c603b2f6a117ec5 to your computer and use it in GitHub Desktop.
🧪 Benchmarking Elasticsearch kNN vector search with OpenAI embeddings: measure latency across various num_candidates to optimize performance and recall for RAG applications.
import time
import numpy as np
from elasticsearch import Elasticsearch
from typing import List, Dict
from tabulate import tabulate
# === CONFIGURATION SECTION ===
# Your Elasticsearch deployment details
ES_HOST = "PUT_HOST_HERE" # Replace with your Elastic Cloud endpoint
ES_API_KEY = "PUT_KEY_HERE" # Replace with your Elastic API key
# Index and field settings
INDEX_NAME = "PUT_INDEX_HERE" # Name of your index containing dense_vector field
FIELD_NAME = "embedding" # Field name where embeddings are stored
# Search parameters
K = 5 # Top-k nearest neighbors to retrieve
N_RUNS = 5 # Number of full benchmark runs to average out latency spikes
# Simulated query vectors (replace with real ones if available)
QUERY_VECTORS = np.random.rand(10, 1536).tolist() # For OpenAI embeddings (e.g., text-embedding-ada-002)
# Range of num_candidates values to test for performance vs. quality trade-off
NUM_CANDIDATES_LIST = [50, 100, 200, 500, 1000, 2000, 5000, 10000]
# Optional: Ground truth mapping for recall calculation (fill if applicable)
GROUND_TRUTH: Dict[int, List[str]] = {}
# === CONNECT TO ELASTICSEARCH ===
# Instantiate Elasticsearch client using API key
es = Elasticsearch(
hosts=[ES_HOST],
api_key=ES_API_KEY
)
# === CORE SEARCH FUNCTION ===
def run_knn_query(query_vector: List[float], num_candidates: int):
"""
Runs a kNN vector search query on Elasticsearch with the specified number of candidates.
Args:
query_vector: The input embedding to search with.
num_candidates: Number of approximate candidates to consider before reranking.
Returns:
Tuple of (list of hit IDs, latency in milliseconds).
"""
body = {
"knn": {
"field": FIELD_NAME,
"query_vector": query_vector,
"k": K,
"num_candidates": num_candidates
},
"_source": True # Set to False to exclude full document fields
}
start = time.time()
response = es.search(index=INDEX_NAME, body=body)
latency_ms = (time.time() - start) * 1000
hits = [hit['_id'] for hit in response['hits']['hits']]
return hits, latency_ms
# === RECALL CALCULATION FUNCTION (OPTIONAL) ===
def recall_at_k(predicted: List[str], actual: List[str], k: int) -> float:
"""
Computes recall@k for a query if ground truth is available.
Args:
predicted: List of retrieved document IDs.
actual: List of ground truth document IDs.
Returns:
Recall@k as a float between 0 and 1.
"""
if not actual:
return 0.0
return len(set(predicted[:k]) & set(actual[:k])) / len(set(actual[:k]))
# === MAIN BENCHMARKING LOOP ===
# Stores aggregated latency and recall per num_candidate setting
aggregated_results = {num_cand: {"latencies": [], "recalls": []} for num_cand in NUM_CANDIDATES_LIST}
# Run the benchmark N_RUNS times to smooth out cold-start spikes
for run in range(N_RUNS):
print(f"🔁 Run {run + 1}/{N_RUNS}")
for num_cand in NUM_CANDIDATES_LIST:
for i, qv in enumerate(QUERY_VECTORS):
hits, latency = run_knn_query(qv, num_cand)
aggregated_results[num_cand]["latencies"].append(latency)
# Calculate recall if ground truth provided
if GROUND_TRUTH:
recall = recall_at_k(hits, GROUND_TRUTH.get(i, []), K)
aggregated_results[num_cand]["recalls"].append(recall)
# === AGGREGATE + DISPLAY RESULTS ===
final_results = []
for num_cand, metrics in aggregated_results.items():
avg_latency = np.mean(metrics["latencies"])
avg_recall = np.mean(metrics["recalls"]) if metrics["recalls"] else None
final_results.append({
"num_candidates": num_cand,
"avg_latency_ms": round(avg_latency, 2),
"avg_recall@k": round(avg_recall, 4) if avg_recall is not None else "N/A"
})
print("\n📊 Final Averaged Results:")
print(tabulate(final_results, headers="keys"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment