Created
December 11, 2025 06:45
-
-
Save inqueue/8c30b1eefc678d42c98f17b123821142 to your computer and use it in GitHub Desktop.
Elastic Advent calender 2025
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run | |
| """ | |
| Asynchronous Bulk Indexing into Elasticsearch with Multi-Processing | |
| 1. Install uv | |
| 2. Update ESURL and API_KEY with your Elasticsearch details | |
| 3. Run with ./async_bulk.py | |
| """ | |
| # requires-python = ">=3.12" | |
| # dependencies = [ | |
| # "aiohttp>=3.8.0", | |
| # "elasticsearch>=8.0.0", | |
| # "numpy>=1.26.0", | |
| # ] | |
| import asyncio | |
| import json | |
| import logging | |
| import multiprocessing | |
| import random | |
| import sys | |
| import time | |
| from datetime import datetime, timezone | |
| from concurrent.futures import ProcessPoolExecutor | |
| import numpy as np | |
| from elasticsearch import AsyncElasticsearch, helpers | |
| # Configuration | |
| DEBUG = False # Set to True to enable debug output | |
| ESURL = "https://advent-1-2025-fc87fa.es.us-central1.gcp.elastic.cloud:443" | |
| API_KEY = "XXXXXXXXX" | |
| ESINDEX = "the_list_2025" # Elasticsearch index name. Can be set to a data stream name. | |
| TOTAL_DOCS = 100_000_000 # Total documents to index | |
| BULK_SIZE = 1000 # Documents per bulk request | |
| MEASURE_BULK_BYTES = False # Set to True to measure bulk request sizes (CPU intensive). | |
| NUM_WORKERS = 100 # Parallel bulk workers per process | |
| QUEUE_MAXSIZE = 10000 # Max in-flight documents | |
| BULK_TIMEOUT = 240 # Bulk request timeout in seconds | |
| SET_PROCESSES = 1 # Set to None to auto-detect based on CPU cores. | |
| # Do not take all the cores to keep the system responsive | |
| NUM_PROCESSES = ( | |
| multiprocessing.cpu_count() // 2 if SET_PROCESSES is None else SET_PROCESSES | |
| ) | |
| MAX_CONNECTIONS = NUM_WORKERS * NUM_PROCESSES | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.DEBUG if DEBUG else logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Suppress elasticsearch and elastic_transport module logging | |
| logging.getLogger("elasticsearch").setLevel(logging.WARNING) | |
| logging.getLogger("elastic_transport").setLevel(logging.WARNING) | |
| async def producer(queue, total_docs, stats): | |
| """Produce documents and put them into the queue.""" | |
| names = [ | |
| "Emma", | |
| "Liam", | |
| "Olivia", | |
| "Noah", | |
| "Ava", | |
| "Ethan", | |
| "Sophia", | |
| "Mason", | |
| "Isabella", | |
| "Lucas", | |
| ] | |
| countries = [ | |
| "USA", | |
| "UK", | |
| "Canada", | |
| "Australia", | |
| "Germany", | |
| "France", | |
| "Japan", | |
| "Brazil", | |
| "India", | |
| "Mexico", | |
| ] | |
| for _ in range(total_docs): | |
| doc = { | |
| "_op_type": "create", | |
| "_index": ESINDEX, | |
| "_source": { | |
| "@timestamp": datetime.now(timezone.utc) | |
| .replace(microsecond=0) | |
| .isoformat(), | |
| "name": random.choice(names), | |
| "age": random.randint(1, 100), | |
| "country": random.choice(countries), | |
| "naughty_or_nice": random.choice(["naughty", "nice"]), | |
| }, | |
| } | |
| await queue.put(doc) | |
| # Track maximum queue depth after adding | |
| stats["max_queue_depth"] = max(stats["max_queue_depth"], queue.qsize()) | |
| # Signal consumers to stop | |
| for _ in range(NUM_WORKERS): | |
| await queue.put(None) | |
| async def consumer(es, queue, chunk_size, stats, total_docs_for_progress): | |
| """Consume documents from the queue and send bulk requests.""" | |
| buffer = [] | |
| while True: | |
| doc = await queue.get() | |
| if doc is None: | |
| if buffer: | |
| if MEASURE_BULK_BYTES: | |
| chunk_bytes = sys.getsizeof( | |
| json.dumps([d["_source"] for d in buffer]) | |
| ) | |
| stats["total_bytes"] += chunk_bytes | |
| start_time = time.perf_counter() | |
| success, failed = await helpers.async_bulk( | |
| es, buffer, raise_on_error=False | |
| ) | |
| elapsed_ms = (time.perf_counter() - start_time) * 1000 | |
| logger.debug( | |
| "Bulk response: success=%s, failed=%s, time=%.2fms", | |
| success, | |
| len(failed) if failed else 0, | |
| elapsed_ms, | |
| ) | |
| if failed: | |
| logger.debug("Failed items: %s", json.dumps(failed, indent=2)) | |
| stats["max_response_time_ms"] = max( | |
| stats["max_response_time_ms"], elapsed_ms | |
| ) | |
| stats["min_response_time_ms"] = min( | |
| stats["min_response_time_ms"], elapsed_ms | |
| ) | |
| stats["response_times"].append(elapsed_ms) | |
| stats["num_requests"] += 1 | |
| stats["docs_indexed"] += len(buffer) | |
| if total_docs_for_progress > 0: | |
| progress = (stats["docs_indexed"] / total_docs_for_progress) * 100 | |
| print( | |
| f"\rProgress: {progress:.1f}% ({stats['docs_indexed']:,}/{total_docs_for_progress:,} docs)", | |
| end="", | |
| flush=True, | |
| ) | |
| break | |
| buffer.append(doc) | |
| if len(buffer) >= chunk_size: | |
| if MEASURE_BULK_BYTES: | |
| chunk_bytes = sys.getsizeof(json.dumps([d["_source"] for d in buffer])) | |
| stats["total_bytes"] += chunk_bytes | |
| start_time = time.perf_counter() | |
| success, failed = await helpers.async_bulk(es, buffer, raise_on_error=False) | |
| elapsed_ms = (time.perf_counter() - start_time) * 1000 | |
| logger.debug( | |
| "Bulk response: success=%s, failed=%s, time=%.2fms", | |
| success, | |
| len(failed) if failed else 0, | |
| elapsed_ms, | |
| ) | |
| if failed: | |
| logger.debug("Failed items: %s", json.dumps(failed, indent=2)) | |
| stats["max_response_time_ms"] = max( | |
| stats["max_response_time_ms"], elapsed_ms | |
| ) | |
| stats["min_response_time_ms"] = min( | |
| stats["min_response_time_ms"], elapsed_ms | |
| ) | |
| stats["response_times"].append(elapsed_ms) | |
| stats["num_requests"] += 1 | |
| stats["docs_indexed"] += len(buffer) | |
| if total_docs_for_progress > 0: | |
| progress = (stats["docs_indexed"] / total_docs_for_progress) * 100 | |
| print( | |
| f"\rProgress: {progress:.1f}% ({stats['docs_indexed']:,}/{total_docs_for_progress:,} docs)", | |
| end="", | |
| flush=True, | |
| ) | |
| buffer.clear() | |
| async def run_indexing(docs_per_process): | |
| """Run the indexing process for a given number of documents.""" | |
| client = AsyncElasticsearch( | |
| [ESURL], api_key=API_KEY, connections_per_node=MAX_CONNECTIONS, max_retries=10 | |
| ) | |
| es = client.options(request_timeout=BULK_TIMEOUT) | |
| queue = asyncio.Queue(maxsize=QUEUE_MAXSIZE) | |
| # Shared stats dictionary | |
| stats = { | |
| "max_response_time_ms": 0, | |
| "min_response_time_ms": float("inf"), | |
| "response_times": [], | |
| "num_requests": 0, | |
| "start_time": time.perf_counter(), | |
| "docs_indexed": 0, | |
| "max_queue_depth": 0, | |
| "total_bytes": 0, | |
| } | |
| # Start producer and consumers | |
| # Disable progress indicator in multi-process mode (set to 0) | |
| progress_total = 0 if NUM_PROCESSES > 1 else docs_per_process | |
| producer_task = asyncio.create_task( | |
| producer(queue, total_docs=docs_per_process, stats=stats) | |
| ) | |
| consumer_tasks = [ | |
| asyncio.create_task( | |
| consumer( | |
| es, | |
| queue, | |
| chunk_size=BULK_SIZE, | |
| stats=stats, | |
| total_docs_for_progress=progress_total, | |
| ) | |
| ) | |
| for _ in range(NUM_WORKERS) | |
| ] | |
| await asyncio.gather(producer_task, *consumer_tasks) | |
| await client.close() | |
| return stats | |
| def run_process(docs_per_process): | |
| """Wrapper to run async indexing in a separate process.""" | |
| return asyncio.run(run_indexing(docs_per_process)) | |
| def main(): | |
| """Main function to set up multi-process indexing.""" | |
| start_time = time.perf_counter() | |
| docs_per_process = TOTAL_DOCS // NUM_PROCESSES | |
| print(f"Starting indexing with {NUM_PROCESSES} processes...") | |
| print(f"Documents per process: {docs_per_process:,}") | |
| print(f"Workers per process: {NUM_WORKERS}") | |
| print() | |
| # Run indexing across multiple processes | |
| with ProcessPoolExecutor(max_workers=NUM_PROCESSES) as executor: | |
| futures = [ | |
| executor.submit(run_process, docs_per_process) for _ in range(NUM_PROCESSES) | |
| ] | |
| results = [future.result() for future in futures] | |
| # Aggregate stats from all processes | |
| total_time = time.perf_counter() - start_time | |
| total_requests = sum(s["num_requests"] for s in results) | |
| total_docs_indexed = sum(s["docs_indexed"] for s in results) | |
| total_bytes = sum(s["total_bytes"] for s in results) | |
| all_response_times = [] | |
| for s in results: | |
| all_response_times.extend(s["response_times"]) | |
| max_response = max((s["max_response_time_ms"] for s in results), default=0) | |
| min_response = min( | |
| ( | |
| s["min_response_time_ms"] | |
| for s in results | |
| if s["min_response_time_ms"] != float("inf") | |
| ), | |
| default=0, | |
| ) | |
| max_queue_depth = max((s["max_queue_depth"] for s in results), default=0) | |
| indexing_rate = total_docs_indexed / total_time if total_time > 0 else 0 | |
| avg_bulk_size = ( | |
| total_bytes / total_requests if total_requests > 0 and MEASURE_BULK_BYTES else 0 | |
| ) | |
| # Print statistics | |
| print(f"\n{'='*50}") | |
| print("Bulk Indexing Statistics (Multi-Process)") | |
| print(f"{'='*50}") | |
| print(f"Number of processes: {NUM_PROCESSES}") | |
| print(f"Total documents indexed: {total_docs_indexed:,}") | |
| print(f"Total bulk requests: {total_requests}") | |
| print(f"Elapsed time: {total_time:.2f} seconds") | |
| print(f"Indexing rate: {indexing_rate:,.2f} docs/sec") | |
| if MEASURE_BULK_BYTES: | |
| print(f"Average chunk size: {avg_bulk_size:,.2f} bytes") | |
| print(f"Max queue depth: {max_queue_depth:,} documents") | |
| # Calculate median response time | |
| median = np.median(all_response_times) if all_response_times else 0 | |
| print(f"Min bulk response time: {min_response:.2f} ms") | |
| print(f"Median bulk response time: {median:.2f} ms") | |
| print(f"Max bulk response time: {max_response:.2f} ms") | |
| print(f"{'='*50}\n") | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print("\n\nInterrupted by user. Exiting gracefully...") | |
| sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment