Last active
November 4, 2025 14:11
-
-
Save ondrejsojka/2d6c8baddbcdf2a42a553c9ea51dc816 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.12" | |
| # dependencies = [ | |
| # "openai>=2.7.1", | |
| # ] | |
| # /// | |
| """ | |
| OpenAI Batch API script that uploads input.md, polls for completion, and writes to output.md | |
| """ | |
| import json | |
| import sys | |
| import tempfile | |
| import time | |
| from pathlib import Path | |
| from openai import OpenAI | |
| def create_batch_input_file(input_content: str) -> str: | |
| """Create a JSONL file for batch processing.""" | |
| batch_request = { | |
| "custom_id": "request-1", | |
| "method": "POST", | |
| "url": "/v1/responses", | |
| "body": {"model": "gpt-5-nano", "input": input_content}, | |
| } | |
| # Create temporary JSONL file | |
| with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f: | |
| f.write(json.dumps(batch_request) + "\n") | |
| return f.name | |
| def main(): | |
| if len(sys.argv) != 3: | |
| print("Usage: uv run batchapi.py input.md output.md") | |
| sys.exit(1) | |
| input_file = Path(sys.argv[1]) | |
| output_file = Path(sys.argv[2]) | |
| # Read input file | |
| if not input_file.exists(): | |
| print(f"Error: {input_file} not found") | |
| sys.exit(1) | |
| input_content = input_file.read_text() | |
| print(f"Read {len(input_content)} characters from {input_file}") | |
| # Initialize OpenAI client | |
| client = OpenAI() | |
| # Create batch input JSONL file | |
| print("Creating batch input file...") | |
| batch_input_path = create_batch_input_file(input_content) | |
| try: | |
| # Upload the batch input file | |
| print("Uploading batch input file...") | |
| with open(batch_input_path, "rb") as f: | |
| batch_input_file = client.files.create(file=f, purpose="batch") | |
| print(f"Uploaded file ID: {batch_input_file.id}") | |
| # Create batch job | |
| print("Creating batch job...") | |
| batch = client.batches.create( | |
| input_file_id=batch_input_file.id, | |
| endpoint="/v1/responses", | |
| completion_window="24h", | |
| ) | |
| print(f"Batch ID: {batch.id}") | |
| print(f"Status: {batch.status}") | |
| # Poll for completion | |
| print("\nPolling for completion...") | |
| while batch.status not in ["completed", "failed", "expired", "cancelled"]: | |
| time.sleep(5) # Poll every 5 seconds | |
| batch = client.batches.retrieve(batch.id) | |
| print(f"Status: {batch.status}", end="\r") | |
| print(f"\nFinal status: {batch.status}") | |
| if batch.status == "completed": | |
| # Download output file | |
| print("Downloading results...") | |
| output_file_id = batch.output_file_id | |
| file_response = client.files.content(output_file_id) | |
| # Parse JSONL output and extract the response | |
| output_lines = file_response.text.strip().split("\n") | |
| for line in output_lines: | |
| result = json.loads(line) | |
| if result.get("custom_id") == "request-1": | |
| # Extract the response content | |
| response = result.get("response", {}) | |
| # Try to get the content from different possible locations | |
| # Based on Responses API structure | |
| if "body" in response: | |
| body = response["body"] | |
| # Look for output in various formats | |
| if "output" in body: | |
| output_content = body["output"] | |
| if isinstance(output_content, list): | |
| # Extract text from output items | |
| text_parts = [] | |
| for item in output_content: | |
| if ( | |
| item.get("type") == "message" | |
| and "content" in item | |
| ): | |
| content = item["content"] | |
| if isinstance(content, list): | |
| for part in content: | |
| if part.get("type") == "output_text": | |
| text_parts.append( | |
| part.get("text", "") | |
| ) | |
| elif isinstance(content, str): | |
| text_parts.append(content) | |
| output_text = "\n".join(text_parts) | |
| else: | |
| output_text = str(output_content) | |
| elif "choices" in body: | |
| # Fallback to chat completions format | |
| output_text = body["choices"][0]["message"]["content"] | |
| else: | |
| output_text = json.dumps(body, indent=2) | |
| else: | |
| output_text = json.dumps(response, indent=2) | |
| # Write to output file | |
| output_file.write_text(output_text) | |
| print(f"Result written to {output_file}") | |
| break | |
| else: | |
| print(f"Batch failed with status: {batch.status}") | |
| if batch.error_file_id: | |
| error_response = client.files.content(batch.error_file_id) | |
| print("Errors:") | |
| print(error_response.text) | |
| sys.exit(1) | |
| finally: | |
| # Clean up temporary file | |
| Path(batch_input_path).unlink(missing_ok=True) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment