Created
December 21, 2025 19:54
-
-
Save alexlib/2ffe78bea8e6f73d343090c663408a94 to your computer and use it in GitHub Desktop.
marimo notebook that uses OpenCV to detect large sphere and flow tracers and present using matplotlib. Also creates a batch run.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.13" | |
| # dependencies = [ | |
| # "google-genai==1.56.0", | |
| # "matplotlib==3.10.8", | |
| # "nbformat==5.10.4", | |
| # "numpy==2.2.6", | |
| # "opencv-python==4.12.0.88", | |
| # "pandas==2.3.3", | |
| # ] | |
| # /// | |
| import marimo | |
| __generated_with = "0.18.4" | |
| app = marimo.App(width="medium", auto_download=["ipynb"]) | |
| @app.cell | |
| def _(): | |
| import marimo as mo | |
| import cv2 | |
| import numpy as np | |
| import pandas as pd | |
| import os | |
| import glob | |
| from concurrent.futures import ProcessPoolExecutor | |
| return ProcessPoolExecutor, cv2, glob, np, os, pd | |
| @app.cell(hide_code=True) | |
| def _(cv2, np, os): | |
| def process_single_image(image_path, threshold_value=30, min_star_area=2): | |
| """ | |
| Process a single image to find the Moon and Speckles. | |
| Returns a dictionary of data and the filename. | |
| """ | |
| # 1. Load Image in Grayscale | |
| # flagging 0 reads directly as grayscale, faster than converting later | |
| img = cv2.imread(image_path, 0) | |
| if img is None: | |
| return None | |
| # 2. Thresholding | |
| # We use a low threshold to catch faint speckles. | |
| # Everything above 'threshold_value' becomes white, else black. | |
| _, binary = cv2.threshold(img, threshold_value, 255, cv2.THRESH_BINARY) | |
| # 3. Connected Components (The fastest way to handle thousands of blobs) | |
| # This function returns the number of labels, the label matrix, | |
| # statistics (bounding box, area), and the centroids. | |
| # Note: These initial centroids are purely geometric (binary). | |
| num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats( | |
| binary, connectivity=8 | |
| ) | |
| # Data structure to hold results | |
| detected_objects = [] | |
| # 'stats' includes the background at index 0, so we skip it. | |
| # stats columns: [x, y, width, height, area] | |
| if num_labels > 1: | |
| # Slice out the background | |
| valid_stats = stats[1:] | |
| valid_centroids = centroids[1:] | |
| # 4. Identify the "Moon" (Largest Area) | |
| # We assume the moon is significantly larger than the speckles. | |
| areas = valid_stats[:, cv2.CC_STAT_AREA] | |
| max_area_idx = np.argmax(areas) | |
| # Iterate through all found components | |
| for i in range(len(valid_stats)): | |
| area = areas[i] | |
| # Skip noise (single pixel artifacts if desired) | |
| if area < min_star_area: | |
| continue | |
| is_moon = (i == max_area_idx) and ( | |
| area > 100 | |
| ) # Arbitrary size filter for moon | |
| # base coordinates (binary centroid) | |
| cx, cy = valid_centroids[i] | |
| # 5. SUBPIXEL ACCURACY REFINEMENT (Intensity Weighted) | |
| # For the moon (or all, if speed permits), we calculate moments on the gray image | |
| # heavily optimized using slicing. | |
| if is_moon: | |
| x, y, w, h = valid_stats[i, :4] | |
| # Crop the original grayscale image to the object's bounding box | |
| # to avoid processing the whole 4k image for moments | |
| roi = img[y : y + h, x : x + w] | |
| mask_roi = (labels[y : y + h, x : x + w] == (i + 1)).astype( | |
| np.uint8 | |
| ) | |
| # Calculate moments on the grayscale ROI | |
| M = cv2.moments(roi, binaryImage=False) | |
| # If moments are valid, calculate precise center | |
| if M["m00"] != 0: | |
| # Adjust ROI coordinates back to global image coordinates | |
| cx = (M["m10"] / M["m00"]) + x | |
| cy = (M["m01"] / M["m00"]) + y | |
| detected_objects.append( | |
| { | |
| "filename": os.path.basename(image_path), | |
| "type": "moon" if is_moon else "speckle", | |
| "x": round(cx, 4), | |
| "y": round(cy, 4), | |
| "area": area, | |
| "brightness_sum": np.sum( | |
| img[labels == (i + 1)] | |
| ), # Optional: total brightness | |
| } | |
| ) | |
| return detected_objects | |
| return (process_single_image,) | |
| @app.cell | |
| def _(process_single_image): | |
| image_path = "/media/user/ExtremePro/Chen_Mortenfeld_Data/experiment_2024-02-22/C001H001S0001/C001H001S0001000001.tif" | |
| detected_objects = process_single_image(image_path) | |
| return detected_objects, image_path | |
| @app.cell | |
| def _(cv2, image_path): | |
| import matplotlib.pyplot as plt | |
| # Load the image again for plotting | |
| # image_path_for_plot = '/media/user/ExtremePro/Chen_Mortenfeld_Data/experiment_2024-02-22/C001H001S0001/C001H001S0001000001.tif' | |
| img_to_plot = cv2.imread(image_path, 0) | |
| return img_to_plot, plt | |
| @app.cell | |
| def _(detected_objects, image_path, img_to_plot, os, plt): | |
| fig, ax = plt.subplots(figsize=(10, 10)) | |
| ax.imshow(img_to_plot, cmap="gray") | |
| ax.set_title(f"Detections on {os.path.basename(image_path)}") | |
| ax.set_xlabel("X-coordinate") | |
| ax.set_ylabel("Y-coordinate") | |
| # Overlay detections | |
| if detected_objects: | |
| moon_x = [obj["x"] for obj in detected_objects if obj["type"] == "moon"] | |
| moon_y = [obj["y"] for obj in detected_objects if obj["type"] == "moon"] | |
| speckle_x = [ | |
| obj["x"] for obj in detected_objects if obj["type"] == "speckle" | |
| ] | |
| speckle_y = [ | |
| obj["y"] for obj in detected_objects if obj["type"] == "speckle" | |
| ] | |
| if moon_x: | |
| ax.scatter( | |
| moon_x, | |
| moon_y, | |
| color="red", | |
| marker="o", | |
| s=100, | |
| label="Moon", | |
| edgecolors="yellow", | |
| linewidths=2, | |
| ) | |
| if speckle_x: | |
| ax.scatter( | |
| speckle_x, | |
| speckle_y, | |
| color="cyan", | |
| marker="x", | |
| s=50, | |
| label="Speckle", | |
| ) | |
| ax.legend() | |
| else: | |
| ax.text( | |
| 0.5, | |
| 0.5, | |
| "No objects detected", | |
| transform=ax.transAxes, | |
| ha="center", | |
| va="center", | |
| color="red", | |
| fontsize=16, | |
| ) | |
| plt.tight_layout() | |
| plt.gca() | |
| return | |
| @app.cell(hide_code=True) | |
| def _(ProcessPoolExecutor, glob, os, pd, process_single_image): | |
| def batch_process(image_folder, output_csv): | |
| # Get list of images | |
| image_files = glob.glob( | |
| os.path.join(image_folder, "C001H001S000100000*.tif") | |
| ) + glob.glob(os.path.join(image_folder, "*.png")) | |
| print(f"Found {len(image_files)} images. Starting processing...") | |
| all_data = [] | |
| # Use parallel processing for speed (CPU bound task) | |
| # Adjust max_workers based on your CPU cores | |
| with ProcessPoolExecutor() as executor: | |
| results = executor.map(process_single_image, image_files) | |
| for res in results: | |
| if res: | |
| all_data.extend(res) | |
| # Save to CSV | |
| df = pd.DataFrame(all_data) | |
| df.to_csv(output_csv, index=False) | |
| print(f"Processing complete. Data saved to {output_csv}") | |
| return | |
| @app.cell | |
| def _(): | |
| # if __name__ == "__main__": | |
| # CONFIGURATION | |
| FOLDER_PATH = "/media/user/ExtremePro/Chen_Mortenfeld_Data/experiment_2024-02-22/C001H001S0001/" # Change this to your folder | |
| OUTPUT_FILE = "./detected_lights.csv" | |
| # batch_process(FOLDER_PATH, OUTPUT_FILE) | |
| return | |
| if __name__ == "__main__": | |
| app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment