Skip to content

Instantly share code, notes, and snippets.

@inqueue
Created December 11, 2025 06:45
Show Gist options
  • Select an option

  • Save inqueue/8c30b1eefc678d42c98f17b123821142 to your computer and use it in GitHub Desktop.

Select an option

Save inqueue/8c30b1eefc678d42c98f17b123821142 to your computer and use it in GitHub Desktop.
Elastic Advent calender 2025
#!/usr/bin/env -S uv run
"""
Asynchronous Bulk Indexing into Elasticsearch with Multi-Processing
1. Install uv
2. Update ESURL and API_KEY with your Elasticsearch details
3. Run with ./async_bulk.py
"""
# requires-python = ">=3.12"
# dependencies = [
# "aiohttp>=3.8.0",
# "elasticsearch>=8.0.0",
# "numpy>=1.26.0",
# ]
import asyncio
import json
import logging
import multiprocessing
import random
import sys
import time
from datetime import datetime, timezone
from concurrent.futures import ProcessPoolExecutor
import numpy as np
from elasticsearch import AsyncElasticsearch, helpers
# Configuration
DEBUG = False # Set to True to enable debug output
ESURL = "https://advent-1-2025-fc87fa.es.us-central1.gcp.elastic.cloud:443"
API_KEY = "XXXXXXXXX"
ESINDEX = "the_list_2025" # Elasticsearch index name. Can be set to a data stream name.
TOTAL_DOCS = 100_000_000 # Total documents to index
BULK_SIZE = 1000 # Documents per bulk request
MEASURE_BULK_BYTES = False # Set to True to measure bulk request sizes (CPU intensive).
NUM_WORKERS = 100 # Parallel bulk workers per process
QUEUE_MAXSIZE = 10000 # Max in-flight documents
BULK_TIMEOUT = 240 # Bulk request timeout in seconds
SET_PROCESSES = 1 # Set to None to auto-detect based on CPU cores.
# Do not take all the cores to keep the system responsive
NUM_PROCESSES = (
multiprocessing.cpu_count() // 2 if SET_PROCESSES is None else SET_PROCESSES
)
MAX_CONNECTIONS = NUM_WORKERS * NUM_PROCESSES
# Configure logging
logging.basicConfig(
level=logging.DEBUG if DEBUG else logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Suppress elasticsearch and elastic_transport module logging
logging.getLogger("elasticsearch").setLevel(logging.WARNING)
logging.getLogger("elastic_transport").setLevel(logging.WARNING)
async def producer(queue, total_docs, stats):
"""Produce documents and put them into the queue."""
names = [
"Emma",
"Liam",
"Olivia",
"Noah",
"Ava",
"Ethan",
"Sophia",
"Mason",
"Isabella",
"Lucas",
]
countries = [
"USA",
"UK",
"Canada",
"Australia",
"Germany",
"France",
"Japan",
"Brazil",
"India",
"Mexico",
]
for _ in range(total_docs):
doc = {
"_op_type": "create",
"_index": ESINDEX,
"_source": {
"@timestamp": datetime.now(timezone.utc)
.replace(microsecond=0)
.isoformat(),
"name": random.choice(names),
"age": random.randint(1, 100),
"country": random.choice(countries),
"naughty_or_nice": random.choice(["naughty", "nice"]),
},
}
await queue.put(doc)
# Track maximum queue depth after adding
stats["max_queue_depth"] = max(stats["max_queue_depth"], queue.qsize())
# Signal consumers to stop
for _ in range(NUM_WORKERS):
await queue.put(None)
async def consumer(es, queue, chunk_size, stats, total_docs_for_progress):
"""Consume documents from the queue and send bulk requests."""
buffer = []
while True:
doc = await queue.get()
if doc is None:
if buffer:
if MEASURE_BULK_BYTES:
chunk_bytes = sys.getsizeof(
json.dumps([d["_source"] for d in buffer])
)
stats["total_bytes"] += chunk_bytes
start_time = time.perf_counter()
success, failed = await helpers.async_bulk(
es, buffer, raise_on_error=False
)
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.debug(
"Bulk response: success=%s, failed=%s, time=%.2fms",
success,
len(failed) if failed else 0,
elapsed_ms,
)
if failed:
logger.debug("Failed items: %s", json.dumps(failed, indent=2))
stats["max_response_time_ms"] = max(
stats["max_response_time_ms"], elapsed_ms
)
stats["min_response_time_ms"] = min(
stats["min_response_time_ms"], elapsed_ms
)
stats["response_times"].append(elapsed_ms)
stats["num_requests"] += 1
stats["docs_indexed"] += len(buffer)
if total_docs_for_progress > 0:
progress = (stats["docs_indexed"] / total_docs_for_progress) * 100
print(
f"\rProgress: {progress:.1f}% ({stats['docs_indexed']:,}/{total_docs_for_progress:,} docs)",
end="",
flush=True,
)
break
buffer.append(doc)
if len(buffer) >= chunk_size:
if MEASURE_BULK_BYTES:
chunk_bytes = sys.getsizeof(json.dumps([d["_source"] for d in buffer]))
stats["total_bytes"] += chunk_bytes
start_time = time.perf_counter()
success, failed = await helpers.async_bulk(es, buffer, raise_on_error=False)
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.debug(
"Bulk response: success=%s, failed=%s, time=%.2fms",
success,
len(failed) if failed else 0,
elapsed_ms,
)
if failed:
logger.debug("Failed items: %s", json.dumps(failed, indent=2))
stats["max_response_time_ms"] = max(
stats["max_response_time_ms"], elapsed_ms
)
stats["min_response_time_ms"] = min(
stats["min_response_time_ms"], elapsed_ms
)
stats["response_times"].append(elapsed_ms)
stats["num_requests"] += 1
stats["docs_indexed"] += len(buffer)
if total_docs_for_progress > 0:
progress = (stats["docs_indexed"] / total_docs_for_progress) * 100
print(
f"\rProgress: {progress:.1f}% ({stats['docs_indexed']:,}/{total_docs_for_progress:,} docs)",
end="",
flush=True,
)
buffer.clear()
async def run_indexing(docs_per_process):
"""Run the indexing process for a given number of documents."""
client = AsyncElasticsearch(
[ESURL], api_key=API_KEY, connections_per_node=MAX_CONNECTIONS, max_retries=10
)
es = client.options(request_timeout=BULK_TIMEOUT)
queue = asyncio.Queue(maxsize=QUEUE_MAXSIZE)
# Shared stats dictionary
stats = {
"max_response_time_ms": 0,
"min_response_time_ms": float("inf"),
"response_times": [],
"num_requests": 0,
"start_time": time.perf_counter(),
"docs_indexed": 0,
"max_queue_depth": 0,
"total_bytes": 0,
}
# Start producer and consumers
# Disable progress indicator in multi-process mode (set to 0)
progress_total = 0 if NUM_PROCESSES > 1 else docs_per_process
producer_task = asyncio.create_task(
producer(queue, total_docs=docs_per_process, stats=stats)
)
consumer_tasks = [
asyncio.create_task(
consumer(
es,
queue,
chunk_size=BULK_SIZE,
stats=stats,
total_docs_for_progress=progress_total,
)
)
for _ in range(NUM_WORKERS)
]
await asyncio.gather(producer_task, *consumer_tasks)
await client.close()
return stats
def run_process(docs_per_process):
"""Wrapper to run async indexing in a separate process."""
return asyncio.run(run_indexing(docs_per_process))
def main():
"""Main function to set up multi-process indexing."""
start_time = time.perf_counter()
docs_per_process = TOTAL_DOCS // NUM_PROCESSES
print(f"Starting indexing with {NUM_PROCESSES} processes...")
print(f"Documents per process: {docs_per_process:,}")
print(f"Workers per process: {NUM_WORKERS}")
print()
# Run indexing across multiple processes
with ProcessPoolExecutor(max_workers=NUM_PROCESSES) as executor:
futures = [
executor.submit(run_process, docs_per_process) for _ in range(NUM_PROCESSES)
]
results = [future.result() for future in futures]
# Aggregate stats from all processes
total_time = time.perf_counter() - start_time
total_requests = sum(s["num_requests"] for s in results)
total_docs_indexed = sum(s["docs_indexed"] for s in results)
total_bytes = sum(s["total_bytes"] for s in results)
all_response_times = []
for s in results:
all_response_times.extend(s["response_times"])
max_response = max((s["max_response_time_ms"] for s in results), default=0)
min_response = min(
(
s["min_response_time_ms"]
for s in results
if s["min_response_time_ms"] != float("inf")
),
default=0,
)
max_queue_depth = max((s["max_queue_depth"] for s in results), default=0)
indexing_rate = total_docs_indexed / total_time if total_time > 0 else 0
avg_bulk_size = (
total_bytes / total_requests if total_requests > 0 and MEASURE_BULK_BYTES else 0
)
# Print statistics
print(f"\n{'='*50}")
print("Bulk Indexing Statistics (Multi-Process)")
print(f"{'='*50}")
print(f"Number of processes: {NUM_PROCESSES}")
print(f"Total documents indexed: {total_docs_indexed:,}")
print(f"Total bulk requests: {total_requests}")
print(f"Elapsed time: {total_time:.2f} seconds")
print(f"Indexing rate: {indexing_rate:,.2f} docs/sec")
if MEASURE_BULK_BYTES:
print(f"Average chunk size: {avg_bulk_size:,.2f} bytes")
print(f"Max queue depth: {max_queue_depth:,} documents")
# Calculate median response time
median = np.median(all_response_times) if all_response_times else 0
print(f"Min bulk response time: {min_response:.2f} ms")
print(f"Median bulk response time: {median:.2f} ms")
print(f"Max bulk response time: {max_response:.2f} ms")
print(f"{'='*50}\n")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\nInterrupted by user. Exiting gracefully...")
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment