Created
March 30, 2025 18:35
-
-
Save adhishthite/905abd5e339a26809c603b2f6a117ec5 to your computer and use it in GitHub Desktop.
🧪 Benchmarking Elasticsearch kNN vector search with OpenAI embeddings: measure latency across various num_candidates to optimize performance and recall for RAG applications.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| import numpy as np | |
| from elasticsearch import Elasticsearch | |
| from typing import List, Dict | |
| from tabulate import tabulate | |
| # === CONFIGURATION SECTION === | |
| # Your Elasticsearch deployment details | |
| ES_HOST = "PUT_HOST_HERE" # Replace with your Elastic Cloud endpoint | |
| ES_API_KEY = "PUT_KEY_HERE" # Replace with your Elastic API key | |
| # Index and field settings | |
| INDEX_NAME = "PUT_INDEX_HERE" # Name of your index containing dense_vector field | |
| FIELD_NAME = "embedding" # Field name where embeddings are stored | |
| # Search parameters | |
| K = 5 # Top-k nearest neighbors to retrieve | |
| N_RUNS = 5 # Number of full benchmark runs to average out latency spikes | |
| # Simulated query vectors (replace with real ones if available) | |
| QUERY_VECTORS = np.random.rand(10, 1536).tolist() # For OpenAI embeddings (e.g., text-embedding-ada-002) | |
| # Range of num_candidates values to test for performance vs. quality trade-off | |
| NUM_CANDIDATES_LIST = [50, 100, 200, 500, 1000, 2000, 5000, 10000] | |
| # Optional: Ground truth mapping for recall calculation (fill if applicable) | |
| GROUND_TRUTH: Dict[int, List[str]] = {} | |
| # === CONNECT TO ELASTICSEARCH === | |
| # Instantiate Elasticsearch client using API key | |
| es = Elasticsearch( | |
| hosts=[ES_HOST], | |
| api_key=ES_API_KEY | |
| ) | |
| # === CORE SEARCH FUNCTION === | |
| def run_knn_query(query_vector: List[float], num_candidates: int): | |
| """ | |
| Runs a kNN vector search query on Elasticsearch with the specified number of candidates. | |
| Args: | |
| query_vector: The input embedding to search with. | |
| num_candidates: Number of approximate candidates to consider before reranking. | |
| Returns: | |
| Tuple of (list of hit IDs, latency in milliseconds). | |
| """ | |
| body = { | |
| "knn": { | |
| "field": FIELD_NAME, | |
| "query_vector": query_vector, | |
| "k": K, | |
| "num_candidates": num_candidates | |
| }, | |
| "_source": True # Set to False to exclude full document fields | |
| } | |
| start = time.time() | |
| response = es.search(index=INDEX_NAME, body=body) | |
| latency_ms = (time.time() - start) * 1000 | |
| hits = [hit['_id'] for hit in response['hits']['hits']] | |
| return hits, latency_ms | |
| # === RECALL CALCULATION FUNCTION (OPTIONAL) === | |
| def recall_at_k(predicted: List[str], actual: List[str], k: int) -> float: | |
| """ | |
| Computes recall@k for a query if ground truth is available. | |
| Args: | |
| predicted: List of retrieved document IDs. | |
| actual: List of ground truth document IDs. | |
| Returns: | |
| Recall@k as a float between 0 and 1. | |
| """ | |
| if not actual: | |
| return 0.0 | |
| return len(set(predicted[:k]) & set(actual[:k])) / len(set(actual[:k])) | |
| # === MAIN BENCHMARKING LOOP === | |
| # Stores aggregated latency and recall per num_candidate setting | |
| aggregated_results = {num_cand: {"latencies": [], "recalls": []} for num_cand in NUM_CANDIDATES_LIST} | |
| # Run the benchmark N_RUNS times to smooth out cold-start spikes | |
| for run in range(N_RUNS): | |
| print(f"🔁 Run {run + 1}/{N_RUNS}") | |
| for num_cand in NUM_CANDIDATES_LIST: | |
| for i, qv in enumerate(QUERY_VECTORS): | |
| hits, latency = run_knn_query(qv, num_cand) | |
| aggregated_results[num_cand]["latencies"].append(latency) | |
| # Calculate recall if ground truth provided | |
| if GROUND_TRUTH: | |
| recall = recall_at_k(hits, GROUND_TRUTH.get(i, []), K) | |
| aggregated_results[num_cand]["recalls"].append(recall) | |
| # === AGGREGATE + DISPLAY RESULTS === | |
| final_results = [] | |
| for num_cand, metrics in aggregated_results.items(): | |
| avg_latency = np.mean(metrics["latencies"]) | |
| avg_recall = np.mean(metrics["recalls"]) if metrics["recalls"] else None | |
| final_results.append({ | |
| "num_candidates": num_cand, | |
| "avg_latency_ms": round(avg_latency, 2), | |
| "avg_recall@k": round(avg_recall, 4) if avg_recall is not None else "N/A" | |
| }) | |
| print("\n📊 Final Averaged Results:") | |
| print(tabulate(final_results, headers="keys")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment