Created
March 7, 2025 12:48
-
-
Save truevis/240354e5870baccd306fb0c13acc1697 to your computer and use it in GitHub Desktop.
Usage of Mistral OCR https://mistral.ai/news/mistral-ocr
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import base64 | |
| import json | |
| import time | |
| from pathlib import Path | |
| from mistralai import Mistral | |
| from typing import Optional | |
| # API key - directly defined | |
| api_key = "YO123" | |
| def encode_image(image_path: str) -> Optional[str]: | |
| """Encode an image file to base64.""" | |
| try: | |
| with open(image_path, "rb") as image_file: | |
| return base64.b64encode(image_file.read()).decode('utf-8') | |
| except FileNotFoundError: | |
| print(f"Error: The file {image_path} was not found.") | |
| return None | |
| except Exception as e: | |
| print(f"Error encoding image: {e}") | |
| return None | |
| class DocumentProcessor: | |
| def __init__(self, api_key: str): | |
| """Initialize with API key.""" | |
| self.api_key = api_key | |
| if not self.api_key: | |
| raise ValueError("API key must be provided") | |
| self.client = Mistral(api_key=self.api_key) | |
| def _retry_api_call(self, func, *args, max_retries=3, retry_delay=2, **kwargs): | |
| """Retry API calls with exponential backoff.""" | |
| retries = 0 | |
| current_delay = retry_delay | |
| while retries < max_retries: | |
| try: | |
| return func(*args, **kwargs) | |
| except Exception as e: | |
| retries += 1 | |
| if retries == max_retries: | |
| raise e | |
| print(f"API call failed: {e}. Retrying in {current_delay} seconds... (Attempt {retries}/{max_retries})") | |
| time.sleep(current_delay) | |
| current_delay *= 2 # Exponential backoff | |
| raise ValueError("API call failed after multiple attempts") | |
| def process_pdf_direct_url(self, pdf_url: str, output_filename: str = None, output_folder: str = None) -> None: | |
| """Process a PDF file from a direct URL.""" | |
| try: | |
| print(f"Processing PDF from URL: {pdf_url}") | |
| # Process with OCR using direct URL | |
| ocr_response = self._retry_api_call( | |
| self.client.ocr.process, | |
| model="mistral-ocr-latest", | |
| document={ | |
| "type": "document_url", | |
| "document_url": pdf_url | |
| } | |
| ) | |
| print("OCR processing completed successfully") | |
| # Use provided output filename or generate one from URL | |
| if not output_filename: | |
| output_filename = "direct_url_pdf" | |
| self._handle_ocr_response(ocr_response, output_filename, output_folder) | |
| return True | |
| except Exception as e: | |
| print(f"Error processing PDF from URL: {e}") | |
| print("Please check that your API key is valid and has OCR permissions") | |
| return False | |
| def process_pdf(self, pdf_path: str, output_folder: str = None) -> bool: | |
| """Process a PDF file with OCR.""" | |
| uploaded_file = None | |
| success = False | |
| try: | |
| # Convert to Path object for better file handling | |
| pdf_file = Path(pdf_path) | |
| if not pdf_file.is_file(): | |
| print(f"Error: The file {pdf_path} was not found.") | |
| return False | |
| print(f"Processing PDF: {pdf_path}") | |
| # Check file size before uploading | |
| file_size_mb = pdf_file.stat().st_size / (1024 * 1024) | |
| if file_size_mb > 20: # 20MB is a reasonable limit for most APIs | |
| print(f"Warning: File size is {file_size_mb:.2f}MB, which may be too large for the API.") | |
| response = input("Continue anyway? (y/n): ") | |
| if response.lower() != 'y': | |
| print("Operation cancelled.") | |
| return False | |
| # Upload the PDF file - using approach similar to Gemini script | |
| print("Uploading PDF file...") | |
| with open(pdf_path, "rb") as file: | |
| file_content = file.read() # Read the entire file into memory | |
| uploaded_file = self._retry_api_call( | |
| self.client.files.upload, | |
| file={ | |
| "file_name": pdf_file.stem, | |
| "content": file_content, # Pass the content directly | |
| }, | |
| purpose="ocr" | |
| ) | |
| print(f"File uploaded successfully with ID: {uploaded_file.id}") | |
| # Retrieve file info to verify upload | |
| file_info = self._retry_api_call( | |
| self.client.files.retrieve, | |
| file_id=uploaded_file.id | |
| ) | |
| print(f"File info: {file_info}") | |
| # Get signed URL | |
| print("Getting signed URL...") | |
| signed_url = self._retry_api_call( | |
| self.client.files.get_signed_url, | |
| file_id=uploaded_file.id | |
| ) | |
| print(f"Obtained signed URL, processing with OCR...") | |
| # Process with OCR - using exact format from sample code | |
| ocr_response = self._retry_api_call( | |
| self.client.ocr.process, | |
| model="mistral-ocr-latest", | |
| document={ | |
| "type": "document_url", | |
| "document_url": signed_url.url | |
| } | |
| ) | |
| print("OCR processing completed successfully") | |
| self._handle_ocr_response(ocr_response, pdf_file.stem, output_folder) | |
| success = True | |
| except Exception as e: | |
| print(f"Error processing PDF: {e}") | |
| print("Please check that your API key is valid and has OCR permissions") | |
| # If service unavailable, offer alternatives | |
| if "Service unavailable" in str(e): | |
| print("\nThe OCR service appears to be unavailable for your local PDF.") | |
| print("Would you like to try processing a publicly available PDF instead?") | |
| response = input("Try with a public PDF URL? (y/n): ") | |
| if response.lower() == 'y': | |
| url = input("Enter a public PDF URL (or press Enter for default arxiv paper): ") | |
| if not url: | |
| url = "https://arxiv.org/pdf/2201.04234" | |
| return self.process_pdf_direct_url(url, output_folder=output_folder) | |
| finally: | |
| # Cleanup | |
| if uploaded_file: | |
| try: | |
| self.client.files.delete(file_id=uploaded_file.id) | |
| print(f"Cleaned up: File {uploaded_file.id} deleted") | |
| except Exception as e: | |
| print(f"Warning: Could not delete file {uploaded_file.id}: {e}") | |
| return success | |
| def process_pdf_folder(self, input_folder: str, output_folder: str = None) -> None: | |
| """Process all PDF files in a folder.""" | |
| # Create output directory if specified and doesn't exist | |
| if output_folder: | |
| os.makedirs(output_folder, exist_ok=True) | |
| # Get all PDF files from input folder | |
| input_path = Path(input_folder) | |
| pdf_files = list(input_path.glob("*.pdf")) | |
| total_files = len(pdf_files) | |
| print(f"Found {total_files} PDF files to process") | |
| # Track success and failure | |
| successful_files = [] | |
| failed_files = [] | |
| # Process each PDF file | |
| for i, pdf_file in enumerate(pdf_files, 1): | |
| print(f"\nProcessing file {i}/{total_files}: {pdf_file.name}") | |
| # Check if output file already exists | |
| if output_folder: | |
| output_file = Path(output_folder) / f"{pdf_file.stem}_ocr.md" | |
| if output_file.exists(): | |
| print(f"Skipping {pdf_file.name} - output file already exists") | |
| successful_files.append(pdf_file.name) | |
| continue | |
| # Try processing with direct upload first | |
| success = self.process_pdf(str(pdf_file), output_folder) | |
| if success: | |
| successful_files.append(pdf_file.name) | |
| print(f"Successfully processed {pdf_file.name}") | |
| else: | |
| failed_files.append(pdf_file.name) | |
| print(f"Failed to process {pdf_file.name}") | |
| # Add a small delay between files to avoid rate limiting | |
| if i < total_files: | |
| time.sleep(2) | |
| # Print summary | |
| print("\n===== Processing Summary =====") | |
| print(f"Total files: {total_files}") | |
| print(f"Successfully processed: {len(successful_files)}") | |
| print(f"Failed to process: {len(failed_files)}") | |
| if failed_files: | |
| print("\nFailed files:") | |
| for file in failed_files: | |
| print(f"- {file}") | |
| def _handle_ocr_response(self, ocr_response, file_stem: str, output_folder: str = None) -> None: | |
| """Handle OCR response processing and saving.""" | |
| # Determine output path | |
| if output_folder: | |
| output_path = Path(output_folder) | |
| else: | |
| output_path = Path(".") | |
| # Convert to dictionary and print as formatted JSON | |
| response_dict = json.loads(ocr_response.json()) | |
| # Save the full response to a file for debugging | |
| json_file_path = output_path / f"{file_stem}_ocr_response.json" | |
| with open(json_file_path, "w") as f: | |
| json.dump(response_dict, f, indent=4) | |
| print(f"Full OCR response saved to {json_file_path}") | |
| # Print a truncated version of the JSON | |
| json_string = json.dumps(response_dict, indent=4) | |
| if len(json_string) > 2000: | |
| print("Raw OCR output (JSON) - truncated:") | |
| print("-" * 50) | |
| print(json_string[:2000] + "...\n[Output truncated]") | |
| else: | |
| print("Raw OCR output (JSON):") | |
| print("-" * 50) | |
| print(json_string) | |
| print("-" * 50) | |
| # Print markdown content from the first page | |
| if hasattr(ocr_response, 'pages') and len(ocr_response.pages) > 0: | |
| print("\nMarkdown content:") | |
| print("-" * 50) | |
| print(ocr_response.pages[0].markdown) | |
| print("-" * 50) | |
| # Save markdown to file | |
| md_file_path = output_path / f"{file_stem}_ocr.md" | |
| with open(md_file_path, "w") as f: | |
| f.write(ocr_response.pages[0].markdown) | |
| print(f"Markdown content saved to {md_file_path}") | |
| else: | |
| print("No pages found in OCR response.") | |
| def process_image(self, image_path: str, output_folder: str = None) -> None: | |
| """Process an image file with OCR.""" | |
| try: | |
| base64_image = encode_image(image_path) | |
| if not base64_image: | |
| return | |
| print(f"Processing image: {image_path}") | |
| img_file = Path(image_path) | |
| # Process with OCR - using exact format from sample code | |
| ocr_response = self._retry_api_call( | |
| self.client.ocr.process, | |
| model="mistral-ocr-latest", | |
| document={ | |
| "type": "image_url", | |
| "image_url": f"data:image/jpeg;base64,{base64_image}" | |
| } | |
| ) | |
| print("OCR processing completed successfully") | |
| self._handle_ocr_response(ocr_response, img_file.stem, output_folder) | |
| except Exception as e: | |
| print(f"Error processing image: {e}") | |
| print("Please check that your API key is valid and has OCR permissions") | |
| def ask_document_question(self, document_path: str, question: str, output_folder: str = None) -> None: | |
| """Ask a question about a document's content.""" | |
| uploaded_doc = None | |
| try: | |
| # Convert to Path object | |
| doc_file = Path(document_path) | |
| if not doc_file.is_file(): | |
| print(f"Error: The file {document_path} was not found.") | |
| return | |
| print(f"Processing document for Q&A: {document_path}") | |
| print(f"Question: {question}") | |
| # Upload and process the document - using approach similar to Gemini script | |
| with open(document_path, "rb") as file: | |
| file_content = file.read() # Read the entire file into memory | |
| uploaded_doc = self._retry_api_call( | |
| self.client.files.upload, | |
| file={ | |
| "file_name": doc_file.stem, | |
| "content": file_content, # Pass the content directly | |
| }, | |
| purpose="ocr" | |
| ) | |
| print(f"File uploaded successfully with ID: {uploaded_doc.id}") | |
| signed_url = self._retry_api_call( | |
| self.client.files.get_signed_url, | |
| file_id=uploaded_doc.id | |
| ) | |
| print("Obtained signed URL, sending question to model...") | |
| # Using the exact format from sample code | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": question | |
| }, | |
| { | |
| "type": "document_url", | |
| "document_url": signed_url.url | |
| } | |
| ] | |
| } | |
| ] | |
| # Removed unsupported parameters | |
| chat_response = self._retry_api_call( | |
| self.client.chat.complete, | |
| model="mistral-small-latest", | |
| messages=messages | |
| ) | |
| print("Received answer from model") | |
| print("Answer:") | |
| print("-" * 50) | |
| print(chat_response.choices[0].message.content) | |
| print("-" * 50) | |
| # Determine output path | |
| if output_folder: | |
| output_path = Path(output_folder) | |
| else: | |
| output_path = Path(".") | |
| # Save answer to file | |
| qa_file_path = output_path / f"{doc_file.stem}_qa.txt" | |
| with open(qa_file_path, "w") as f: | |
| f.write(f"Question: {question}\n\nAnswer:\n{chat_response.choices[0].message.content}") | |
| print(f"Q&A result saved to {qa_file_path}") | |
| except Exception as e: | |
| print(f"Error processing document question: {e}") | |
| print("Please check that your API key is valid and has necessary permissions") | |
| finally: | |
| # Cleanup | |
| if uploaded_doc: | |
| try: | |
| self.client.files.delete(file_id=uploaded_doc.id) | |
| print(f"Cleaned up: File {uploaded_doc.id} deleted") | |
| except Exception as e: | |
| print(f"Warning: Could not delete file {uploaded_doc.id}: {e}") | |
| def _format_content(self, content: str, file_stem: str) -> None: | |
| """Format the OCR content using the chat model.""" | |
| try: | |
| system_prompt = """You are a precise document formatter. Format the provided content into clean markdown, focusing on accurate table representation. Never repeat or duplicate content.""" | |
| user_prompt = """Convert this exact content into clean markdown format. For tables: | |
| 1. Each unique column should appear only once | |
| 2. Use | to separate columns | |
| 3. Use proper header row with | --- | format | |
| 4. Maintain exact data values | |
| 5. Remove any duplicate columns or rows | |
| 6. Ensure table alignment is preserved | |
| Do not add any explanations or instructions - output only the formatted content.""" | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": system_prompt | |
| }, | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": user_prompt | |
| }, | |
| { | |
| "type": "text", | |
| "text": "Content to format:\n" + content | |
| } | |
| ] | |
| } | |
| ] | |
| print("Formatting content...") | |
| chat_response = self._retry_api_call( | |
| self.client.chat.complete, | |
| model="mistral-small-latest", | |
| messages=messages, | |
| temperature=0.0 | |
| ) | |
| print("Content formatting completed") | |
| formatted_content = chat_response.choices[0].message.content | |
| # Save formatted content to file | |
| with open(f"{file_stem}_formatted.md", "w") as f: | |
| f.write(formatted_content) | |
| print(f"Formatted content saved to {file_stem}_formatted.md") | |
| print("\nFormatted output:") | |
| print("-" * 50) | |
| print(formatted_content) | |
| print("-" * 50) | |
| except Exception as e: | |
| print(f"Error formatting content: {e}") | |
| if __name__ == "__main__": | |
| # Initialize processor with the defined API key directly | |
| processor = DocumentProcessor(api_key=api_key) | |
| # Ask the user what they want to do | |
| print("Mistral OCR Document Processor") | |
| print("-" * 50) | |
| print("1. Process a PDF from a direct URL (recommended)") | |
| print("2. Process a local PDF file") | |
| print("3. Process a local image file") | |
| print("4. Ask a question about a document") | |
| print("5. Process all PDFs in a folder") | |
| choice = input("Enter your choice (1-5): ") | |
| # Ask for output folder for all operations | |
| output_folder = input("Enter the path to save output files (or press Enter to use the current folder): ") | |
| if not output_folder.strip(): | |
| output_folder = None | |
| elif not os.path.exists(output_folder): | |
| # Validate that the output folder is a valid path before trying to create it | |
| try: | |
| os.makedirs(output_folder, exist_ok=True) | |
| print(f"Created output folder: {output_folder}") | |
| except OSError as e: | |
| print(f"Error: Invalid output folder path: {e}") | |
| print("Using current directory instead.") | |
| output_folder = None | |
| if choice == "1": | |
| # Separate prompt for URL to avoid confusion | |
| print("\n--- PDF URL Processing ---") | |
| url = input("Enter a public PDF URL (or press Enter for default arxiv paper): ") | |
| if not url: | |
| url = "https://arxiv.org/pdf/2201.04234" | |
| elif url.startswith("http") and "://" in url: | |
| # URL looks valid | |
| pass | |
| else: | |
| print(f"Warning: '{url}' doesn't look like a valid URL. Using default instead.") | |
| url = "https://arxiv.org/pdf/2201.04234" | |
| processor.process_pdf_direct_url(url, output_folder=output_folder) | |
| elif choice == "2": | |
| pdf_path = input("Enter the path to your PDF file: ") | |
| if not pdf_path: | |
| pdf_path = r"D:\processes\NFPA2\pdf_pages\NFPA_page_715.pdf" | |
| processor.process_pdf(pdf_path, output_folder=output_folder) | |
| elif choice == "3": | |
| image_path = input("Enter the path to your image file: ") | |
| processor.process_image(image_path, output_folder=output_folder) | |
| elif choice == "4": | |
| doc_path = input("Enter the path to your document: ") | |
| if not doc_path: | |
| doc_path = r"D:\processes\NFPA2\pdf_pages\NFPA_page_715.pdf" | |
| question = input("Enter your question about the document: ") | |
| if not question: | |
| question = "What are the main topics discussed in this document?" | |
| processor.ask_document_question(doc_path, question, output_folder=output_folder) | |
| elif choice == "5": | |
| input_folder = input("Enter the path to the folder containing PDFs: ") | |
| if not input_folder: | |
| input_folder = r"D:\processes\NFPA2\pdf_pages" | |
| processor.process_pdf_folder(input_folder, output_folder) | |
| else: | |
| print("Invalid choice. Exiting.") |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Ah, mistral_ocr.py—because nothing screams "secure coding" like hardcoding your API key right at the top. Bold move.
This script is the Swiss Army knife of OCR processing, but with the finesse of a sledgehammer. It’s got everything: retry logic (because it knows failure is inevitable), verbose print statements (who doesn’t love cluttered logs?), and enough redundant error handling to make your eyes glaze over.
The exponential backoff retry method is a nice touch—because when something breaks, why not make it break slowly? Also, nothing says "efficiency" like reading an entire PDF into memory before uploading it. Who needs RAM anyway?
The class is over-engineered but somehow still manages to be clunky. Processing local PDFs? Sure. URLs? Of course. Entire folders? Why not? But good luck navigating that labyrinth of conditionals and input prompts without wanting to automate your own demise.
Oh, and let’s not forget the clean-up process—because deleting temporary files after you’ve hit every possible rate limit is definitely the best approach.
Overall, a valiant effort in overcomplication. 3/5 stars—functional but exhausting.