Skip to content

Instantly share code, notes, and snippets.

@truevis
Created March 7, 2025 12:48
Show Gist options
  • Select an option

  • Save truevis/240354e5870baccd306fb0c13acc1697 to your computer and use it in GitHub Desktop.

Select an option

Save truevis/240354e5870baccd306fb0c13acc1697 to your computer and use it in GitHub Desktop.
import os
import base64
import json
import time
from pathlib import Path
from mistralai import Mistral
from typing import Optional
# API key - directly defined
api_key = "YO123"
def encode_image(image_path: str) -> Optional[str]:
"""Encode an image file to base64."""
try:
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
except FileNotFoundError:
print(f"Error: The file {image_path} was not found.")
return None
except Exception as e:
print(f"Error encoding image: {e}")
return None
class DocumentProcessor:
def __init__(self, api_key: str):
"""Initialize with API key."""
self.api_key = api_key
if not self.api_key:
raise ValueError("API key must be provided")
self.client = Mistral(api_key=self.api_key)
def _retry_api_call(self, func, *args, max_retries=3, retry_delay=2, **kwargs):
"""Retry API calls with exponential backoff."""
retries = 0
current_delay = retry_delay
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
retries += 1
if retries == max_retries:
raise e
print(f"API call failed: {e}. Retrying in {current_delay} seconds... (Attempt {retries}/{max_retries})")
time.sleep(current_delay)
current_delay *= 2 # Exponential backoff
raise ValueError("API call failed after multiple attempts")
def process_pdf_direct_url(self, pdf_url: str, output_filename: str = None, output_folder: str = None) -> None:
"""Process a PDF file from a direct URL."""
try:
print(f"Processing PDF from URL: {pdf_url}")
# Process with OCR using direct URL
ocr_response = self._retry_api_call(
self.client.ocr.process,
model="mistral-ocr-latest",
document={
"type": "document_url",
"document_url": pdf_url
}
)
print("OCR processing completed successfully")
# Use provided output filename or generate one from URL
if not output_filename:
output_filename = "direct_url_pdf"
self._handle_ocr_response(ocr_response, output_filename, output_folder)
return True
except Exception as e:
print(f"Error processing PDF from URL: {e}")
print("Please check that your API key is valid and has OCR permissions")
return False
def process_pdf(self, pdf_path: str, output_folder: str = None) -> bool:
"""Process a PDF file with OCR."""
uploaded_file = None
success = False
try:
# Convert to Path object for better file handling
pdf_file = Path(pdf_path)
if not pdf_file.is_file():
print(f"Error: The file {pdf_path} was not found.")
return False
print(f"Processing PDF: {pdf_path}")
# Check file size before uploading
file_size_mb = pdf_file.stat().st_size / (1024 * 1024)
if file_size_mb > 20: # 20MB is a reasonable limit for most APIs
print(f"Warning: File size is {file_size_mb:.2f}MB, which may be too large for the API.")
response = input("Continue anyway? (y/n): ")
if response.lower() != 'y':
print("Operation cancelled.")
return False
# Upload the PDF file - using approach similar to Gemini script
print("Uploading PDF file...")
with open(pdf_path, "rb") as file:
file_content = file.read() # Read the entire file into memory
uploaded_file = self._retry_api_call(
self.client.files.upload,
file={
"file_name": pdf_file.stem,
"content": file_content, # Pass the content directly
},
purpose="ocr"
)
print(f"File uploaded successfully with ID: {uploaded_file.id}")
# Retrieve file info to verify upload
file_info = self._retry_api_call(
self.client.files.retrieve,
file_id=uploaded_file.id
)
print(f"File info: {file_info}")
# Get signed URL
print("Getting signed URL...")
signed_url = self._retry_api_call(
self.client.files.get_signed_url,
file_id=uploaded_file.id
)
print(f"Obtained signed URL, processing with OCR...")
# Process with OCR - using exact format from sample code
ocr_response = self._retry_api_call(
self.client.ocr.process,
model="mistral-ocr-latest",
document={
"type": "document_url",
"document_url": signed_url.url
}
)
print("OCR processing completed successfully")
self._handle_ocr_response(ocr_response, pdf_file.stem, output_folder)
success = True
except Exception as e:
print(f"Error processing PDF: {e}")
print("Please check that your API key is valid and has OCR permissions")
# If service unavailable, offer alternatives
if "Service unavailable" in str(e):
print("\nThe OCR service appears to be unavailable for your local PDF.")
print("Would you like to try processing a publicly available PDF instead?")
response = input("Try with a public PDF URL? (y/n): ")
if response.lower() == 'y':
url = input("Enter a public PDF URL (or press Enter for default arxiv paper): ")
if not url:
url = "https://arxiv.org/pdf/2201.04234"
return self.process_pdf_direct_url(url, output_folder=output_folder)
finally:
# Cleanup
if uploaded_file:
try:
self.client.files.delete(file_id=uploaded_file.id)
print(f"Cleaned up: File {uploaded_file.id} deleted")
except Exception as e:
print(f"Warning: Could not delete file {uploaded_file.id}: {e}")
return success
def process_pdf_folder(self, input_folder: str, output_folder: str = None) -> None:
"""Process all PDF files in a folder."""
# Create output directory if specified and doesn't exist
if output_folder:
os.makedirs(output_folder, exist_ok=True)
# Get all PDF files from input folder
input_path = Path(input_folder)
pdf_files = list(input_path.glob("*.pdf"))
total_files = len(pdf_files)
print(f"Found {total_files} PDF files to process")
# Track success and failure
successful_files = []
failed_files = []
# Process each PDF file
for i, pdf_file in enumerate(pdf_files, 1):
print(f"\nProcessing file {i}/{total_files}: {pdf_file.name}")
# Check if output file already exists
if output_folder:
output_file = Path(output_folder) / f"{pdf_file.stem}_ocr.md"
if output_file.exists():
print(f"Skipping {pdf_file.name} - output file already exists")
successful_files.append(pdf_file.name)
continue
# Try processing with direct upload first
success = self.process_pdf(str(pdf_file), output_folder)
if success:
successful_files.append(pdf_file.name)
print(f"Successfully processed {pdf_file.name}")
else:
failed_files.append(pdf_file.name)
print(f"Failed to process {pdf_file.name}")
# Add a small delay between files to avoid rate limiting
if i < total_files:
time.sleep(2)
# Print summary
print("\n===== Processing Summary =====")
print(f"Total files: {total_files}")
print(f"Successfully processed: {len(successful_files)}")
print(f"Failed to process: {len(failed_files)}")
if failed_files:
print("\nFailed files:")
for file in failed_files:
print(f"- {file}")
def _handle_ocr_response(self, ocr_response, file_stem: str, output_folder: str = None) -> None:
"""Handle OCR response processing and saving."""
# Determine output path
if output_folder:
output_path = Path(output_folder)
else:
output_path = Path(".")
# Convert to dictionary and print as formatted JSON
response_dict = json.loads(ocr_response.json())
# Save the full response to a file for debugging
json_file_path = output_path / f"{file_stem}_ocr_response.json"
with open(json_file_path, "w") as f:
json.dump(response_dict, f, indent=4)
print(f"Full OCR response saved to {json_file_path}")
# Print a truncated version of the JSON
json_string = json.dumps(response_dict, indent=4)
if len(json_string) > 2000:
print("Raw OCR output (JSON) - truncated:")
print("-" * 50)
print(json_string[:2000] + "...\n[Output truncated]")
else:
print("Raw OCR output (JSON):")
print("-" * 50)
print(json_string)
print("-" * 50)
# Print markdown content from the first page
if hasattr(ocr_response, 'pages') and len(ocr_response.pages) > 0:
print("\nMarkdown content:")
print("-" * 50)
print(ocr_response.pages[0].markdown)
print("-" * 50)
# Save markdown to file
md_file_path = output_path / f"{file_stem}_ocr.md"
with open(md_file_path, "w") as f:
f.write(ocr_response.pages[0].markdown)
print(f"Markdown content saved to {md_file_path}")
else:
print("No pages found in OCR response.")
def process_image(self, image_path: str, output_folder: str = None) -> None:
"""Process an image file with OCR."""
try:
base64_image = encode_image(image_path)
if not base64_image:
return
print(f"Processing image: {image_path}")
img_file = Path(image_path)
# Process with OCR - using exact format from sample code
ocr_response = self._retry_api_call(
self.client.ocr.process,
model="mistral-ocr-latest",
document={
"type": "image_url",
"image_url": f"data:image/jpeg;base64,{base64_image}"
}
)
print("OCR processing completed successfully")
self._handle_ocr_response(ocr_response, img_file.stem, output_folder)
except Exception as e:
print(f"Error processing image: {e}")
print("Please check that your API key is valid and has OCR permissions")
def ask_document_question(self, document_path: str, question: str, output_folder: str = None) -> None:
"""Ask a question about a document's content."""
uploaded_doc = None
try:
# Convert to Path object
doc_file = Path(document_path)
if not doc_file.is_file():
print(f"Error: The file {document_path} was not found.")
return
print(f"Processing document for Q&A: {document_path}")
print(f"Question: {question}")
# Upload and process the document - using approach similar to Gemini script
with open(document_path, "rb") as file:
file_content = file.read() # Read the entire file into memory
uploaded_doc = self._retry_api_call(
self.client.files.upload,
file={
"file_name": doc_file.stem,
"content": file_content, # Pass the content directly
},
purpose="ocr"
)
print(f"File uploaded successfully with ID: {uploaded_doc.id}")
signed_url = self._retry_api_call(
self.client.files.get_signed_url,
file_id=uploaded_doc.id
)
print("Obtained signed URL, sending question to model...")
# Using the exact format from sample code
messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": question
},
{
"type": "document_url",
"document_url": signed_url.url
}
]
}
]
# Removed unsupported parameters
chat_response = self._retry_api_call(
self.client.chat.complete,
model="mistral-small-latest",
messages=messages
)
print("Received answer from model")
print("Answer:")
print("-" * 50)
print(chat_response.choices[0].message.content)
print("-" * 50)
# Determine output path
if output_folder:
output_path = Path(output_folder)
else:
output_path = Path(".")
# Save answer to file
qa_file_path = output_path / f"{doc_file.stem}_qa.txt"
with open(qa_file_path, "w") as f:
f.write(f"Question: {question}\n\nAnswer:\n{chat_response.choices[0].message.content}")
print(f"Q&A result saved to {qa_file_path}")
except Exception as e:
print(f"Error processing document question: {e}")
print("Please check that your API key is valid and has necessary permissions")
finally:
# Cleanup
if uploaded_doc:
try:
self.client.files.delete(file_id=uploaded_doc.id)
print(f"Cleaned up: File {uploaded_doc.id} deleted")
except Exception as e:
print(f"Warning: Could not delete file {uploaded_doc.id}: {e}")
def _format_content(self, content: str, file_stem: str) -> None:
"""Format the OCR content using the chat model."""
try:
system_prompt = """You are a precise document formatter. Format the provided content into clean markdown, focusing on accurate table representation. Never repeat or duplicate content."""
user_prompt = """Convert this exact content into clean markdown format. For tables:
1. Each unique column should appear only once
2. Use | to separate columns
3. Use proper header row with | --- | format
4. Maintain exact data values
5. Remove any duplicate columns or rows
6. Ensure table alignment is preserved
Do not add any explanations or instructions - output only the formatted content."""
messages = [
{
"role": "system",
"content": system_prompt
},
{
"role": "user",
"content": [
{
"type": "text",
"text": user_prompt
},
{
"type": "text",
"text": "Content to format:\n" + content
}
]
}
]
print("Formatting content...")
chat_response = self._retry_api_call(
self.client.chat.complete,
model="mistral-small-latest",
messages=messages,
temperature=0.0
)
print("Content formatting completed")
formatted_content = chat_response.choices[0].message.content
# Save formatted content to file
with open(f"{file_stem}_formatted.md", "w") as f:
f.write(formatted_content)
print(f"Formatted content saved to {file_stem}_formatted.md")
print("\nFormatted output:")
print("-" * 50)
print(formatted_content)
print("-" * 50)
except Exception as e:
print(f"Error formatting content: {e}")
if __name__ == "__main__":
# Initialize processor with the defined API key directly
processor = DocumentProcessor(api_key=api_key)
# Ask the user what they want to do
print("Mistral OCR Document Processor")
print("-" * 50)
print("1. Process a PDF from a direct URL (recommended)")
print("2. Process a local PDF file")
print("3. Process a local image file")
print("4. Ask a question about a document")
print("5. Process all PDFs in a folder")
choice = input("Enter your choice (1-5): ")
# Ask for output folder for all operations
output_folder = input("Enter the path to save output files (or press Enter to use the current folder): ")
if not output_folder.strip():
output_folder = None
elif not os.path.exists(output_folder):
# Validate that the output folder is a valid path before trying to create it
try:
os.makedirs(output_folder, exist_ok=True)
print(f"Created output folder: {output_folder}")
except OSError as e:
print(f"Error: Invalid output folder path: {e}")
print("Using current directory instead.")
output_folder = None
if choice == "1":
# Separate prompt for URL to avoid confusion
print("\n--- PDF URL Processing ---")
url = input("Enter a public PDF URL (or press Enter for default arxiv paper): ")
if not url:
url = "https://arxiv.org/pdf/2201.04234"
elif url.startswith("http") and "://" in url:
# URL looks valid
pass
else:
print(f"Warning: '{url}' doesn't look like a valid URL. Using default instead.")
url = "https://arxiv.org/pdf/2201.04234"
processor.process_pdf_direct_url(url, output_folder=output_folder)
elif choice == "2":
pdf_path = input("Enter the path to your PDF file: ")
if not pdf_path:
pdf_path = r"D:\processes\NFPA2\pdf_pages\NFPA_page_715.pdf"
processor.process_pdf(pdf_path, output_folder=output_folder)
elif choice == "3":
image_path = input("Enter the path to your image file: ")
processor.process_image(image_path, output_folder=output_folder)
elif choice == "4":
doc_path = input("Enter the path to your document: ")
if not doc_path:
doc_path = r"D:\processes\NFPA2\pdf_pages\NFPA_page_715.pdf"
question = input("Enter your question about the document: ")
if not question:
question = "What are the main topics discussed in this document?"
processor.ask_document_question(doc_path, question, output_folder=output_folder)
elif choice == "5":
input_folder = input("Enter the path to the folder containing PDFs: ")
if not input_folder:
input_folder = r"D:\processes\NFPA2\pdf_pages"
processor.process_pdf_folder(input_folder, output_folder)
else:
print("Invalid choice. Exiting.")
@truevis
Copy link
Author

truevis commented Mar 7, 2025

Ah, mistral_ocr.py—because nothing screams "secure coding" like hardcoding your API key right at the top. Bold move.

This script is the Swiss Army knife of OCR processing, but with the finesse of a sledgehammer. It’s got everything: retry logic (because it knows failure is inevitable), verbose print statements (who doesn’t love cluttered logs?), and enough redundant error handling to make your eyes glaze over.

The exponential backoff retry method is a nice touch—because when something breaks, why not make it break slowly? Also, nothing says "efficiency" like reading an entire PDF into memory before uploading it. Who needs RAM anyway?

The class is over-engineered but somehow still manages to be clunky. Processing local PDFs? Sure. URLs? Of course. Entire folders? Why not? But good luck navigating that labyrinth of conditionals and input prompts without wanting to automate your own demise.

Oh, and let’s not forget the clean-up process—because deleting temporary files after you’ve hit every possible rate limit is definitely the best approach.

Overall, a valiant effort in overcomplication. 3/5 stars—functional but exhausting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment