Skip to content

Instantly share code, notes, and snippets.

@ondrejsojka
Last active November 4, 2025 14:11
Show Gist options
  • Select an option

  • Save ondrejsojka/2d6c8baddbcdf2a42a553c9ea51dc816 to your computer and use it in GitHub Desktop.

Select an option

Save ondrejsojka/2d6c8baddbcdf2a42a553c9ea51dc816 to your computer and use it in GitHub Desktop.
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "openai>=2.7.1",
# ]
# ///
"""
OpenAI Batch API script that uploads input.md, polls for completion, and writes to output.md
"""
import json
import sys
import tempfile
import time
from pathlib import Path
from openai import OpenAI
def create_batch_input_file(input_content: str) -> str:
"""Create a JSONL file for batch processing."""
batch_request = {
"custom_id": "request-1",
"method": "POST",
"url": "/v1/responses",
"body": {"model": "gpt-5-nano", "input": input_content},
}
# Create temporary JSONL file
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps(batch_request) + "\n")
return f.name
def main():
if len(sys.argv) != 3:
print("Usage: uv run batchapi.py input.md output.md")
sys.exit(1)
input_file = Path(sys.argv[1])
output_file = Path(sys.argv[2])
# Read input file
if not input_file.exists():
print(f"Error: {input_file} not found")
sys.exit(1)
input_content = input_file.read_text()
print(f"Read {len(input_content)} characters from {input_file}")
# Initialize OpenAI client
client = OpenAI()
# Create batch input JSONL file
print("Creating batch input file...")
batch_input_path = create_batch_input_file(input_content)
try:
# Upload the batch input file
print("Uploading batch input file...")
with open(batch_input_path, "rb") as f:
batch_input_file = client.files.create(file=f, purpose="batch")
print(f"Uploaded file ID: {batch_input_file.id}")
# Create batch job
print("Creating batch job...")
batch = client.batches.create(
input_file_id=batch_input_file.id,
endpoint="/v1/responses",
completion_window="24h",
)
print(f"Batch ID: {batch.id}")
print(f"Status: {batch.status}")
# Poll for completion
print("\nPolling for completion...")
while batch.status not in ["completed", "failed", "expired", "cancelled"]:
time.sleep(5) # Poll every 5 seconds
batch = client.batches.retrieve(batch.id)
print(f"Status: {batch.status}", end="\r")
print(f"\nFinal status: {batch.status}")
if batch.status == "completed":
# Download output file
print("Downloading results...")
output_file_id = batch.output_file_id
file_response = client.files.content(output_file_id)
# Parse JSONL output and extract the response
output_lines = file_response.text.strip().split("\n")
for line in output_lines:
result = json.loads(line)
if result.get("custom_id") == "request-1":
# Extract the response content
response = result.get("response", {})
# Try to get the content from different possible locations
# Based on Responses API structure
if "body" in response:
body = response["body"]
# Look for output in various formats
if "output" in body:
output_content = body["output"]
if isinstance(output_content, list):
# Extract text from output items
text_parts = []
for item in output_content:
if (
item.get("type") == "message"
and "content" in item
):
content = item["content"]
if isinstance(content, list):
for part in content:
if part.get("type") == "output_text":
text_parts.append(
part.get("text", "")
)
elif isinstance(content, str):
text_parts.append(content)
output_text = "\n".join(text_parts)
else:
output_text = str(output_content)
elif "choices" in body:
# Fallback to chat completions format
output_text = body["choices"][0]["message"]["content"]
else:
output_text = json.dumps(body, indent=2)
else:
output_text = json.dumps(response, indent=2)
# Write to output file
output_file.write_text(output_text)
print(f"Result written to {output_file}")
break
else:
print(f"Batch failed with status: {batch.status}")
if batch.error_file_id:
error_response = client.files.content(batch.error_file_id)
print("Errors:")
print(error_response.text)
sys.exit(1)
finally:
# Clean up temporary file
Path(batch_input_path).unlink(missing_ok=True)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment