Created
January 30, 2026 00:54
-
-
Save espresso3389/01616b4956ddb41526ffc9f4933fd0cc to your computer and use it in GitHub Desktop.
Face tracking with Insta360 Link on Linux
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Motion Detector with MobileNet-SSD Object Detection | |
| εδ½ζ€η₯ + η©δ½θͺθ + PTZθΏ½θ·‘ | |
| """ | |
| import cv2 | |
| import subprocess | |
| import time | |
| import numpy as np | |
| import sys | |
| import json | |
| from pathlib import Path | |
| from datetime import datetime | |
| import threading | |
| import queue | |
| import json | |
| # PTZ limits | |
| PAN_MIN, PAN_MAX = -522000, 522000 | |
| TILT_MIN, TILT_MAX = -324000, 360000 | |
| # Current position | |
| current_pan = 0 | |
| current_tilt = 0 | |
| # PID controller state | |
| pid_state = { | |
| 'pan': {'error_sum': 0, 'last_error': 0, 'last_time': 0}, | |
| 'tilt': {'error_sum': 0, 'last_error': 0, 'last_time': 0} | |
| } | |
| # Sound detection | |
| sound_queue = queue.Queue() | |
| sound_thread = None | |
| sound_monitoring = False | |
| # MobileNet-SSD config | |
| MODEL_DIR = Path(__file__).parent / "models" | |
| PROTOTXT = MODEL_DIR / "deploy.prototxt" | |
| MODEL = MODEL_DIR / "mobilenet_iter_73000.caffemodel" | |
| # Live stream output | |
| STREAM_FRAME_PATH = Path(__file__).parent / "stream_frame.jpg" | |
| # Config file | |
| CONFIG_FILE = Path(__file__).parent / "detector-config.json" | |
| # Global config (reloaded periodically) | |
| config = { | |
| "face_detection": { | |
| "scaleFactor": 1.1, | |
| "minNeighbors": 5, | |
| "minSize": 50, | |
| "aspectRatio_min": 0.8, | |
| "aspectRatio_max": 1.5 | |
| }, | |
| "tracking": { | |
| "max_duration": 20, | |
| "track_fps": 4, | |
| "save_interval": 2.0 | |
| }, | |
| "patrol": { | |
| "enabled": True, | |
| "interval_sec": 2 | |
| } | |
| } | |
| config_mtime = 0 | |
| def load_config(): | |
| """Load config from JSON file if it has changed""" | |
| global config, config_mtime, face_detector | |
| if not CONFIG_FILE.exists(): | |
| return | |
| current_mtime = CONFIG_FILE.stat().st_mtime | |
| if current_mtime > config_mtime: | |
| try: | |
| with open(CONFIG_FILE, 'r') as f: | |
| new_config = json.load(f) | |
| # Check if face detection config changed | |
| face_config_changed = ( | |
| config.get("face_detection") != new_config.get("face_detection") | |
| ) | |
| config.update(new_config) | |
| config_mtime = current_mtime | |
| print(f"π§ Config reloaded: {config}", flush=True) | |
| # Reload face detector if config changed | |
| if face_config_changed: | |
| face_detector = None | |
| print("π Face detector will reload with new settings", flush=True) | |
| except Exception as e: | |
| print(f"β οΈ Config load error: {e}", flush=True) | |
| # COCO class labels | |
| CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", | |
| "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", | |
| "dog", "horse", "motorbike", "person", "pottedplant", "sheep", | |
| "sofa", "train", "tvmonitor"] | |
| # Load detector (global, load once) | |
| net = None | |
| face_detector = None | |
| def load_detector(): | |
| """Load MobileNet-SSD model""" | |
| global net | |
| if net is None: | |
| print(f"π§ Loading MobileNet-SSD from {MODEL}...", flush=True) | |
| net = cv2.dnn.readNetFromCaffe(str(PROTOTXT), str(MODEL)) | |
| print("β Model loaded", flush=True) | |
| return net | |
| def load_face_detector(): | |
| """Load MediaPipe face detector""" | |
| global face_detector | |
| if face_detector is None: | |
| try: | |
| import mediapipe as mp | |
| from mediapipe.tasks import python | |
| from mediapipe.tasks.python import vision | |
| # Download model if needed | |
| model_path = Path(__file__).parent / "models" / "blaze_face_short_range.tflite" | |
| if not model_path.exists(): | |
| print("π₯ Downloading MediaPipe face detection model...", flush=True) | |
| import urllib.request | |
| url = "https://storage.googleapis.com/mediapipe-models/face_detector/blaze_face_short_range/float16/1/blaze_face_short_range.tflite" | |
| urllib.request.urlretrieve(url, str(model_path)) | |
| # Get confidence from config | |
| cfg = config.get("face_detection", {}) | |
| min_confidence = cfg.get("min_detection_confidence", 0.5) | |
| base_options = python.BaseOptions(model_asset_path=str(model_path)) | |
| options = vision.FaceDetectorOptions( | |
| base_options=base_options, | |
| min_detection_confidence=min_confidence | |
| ) | |
| face_detector = vision.FaceDetector.create_from_options(options) | |
| print(f"β MediaPipe face detector loaded (confidence={min_confidence})", flush=True) | |
| except Exception as e: | |
| print(f"β οΈ MediaPipe error: {e}, using Haar Cascade fallback", flush=True) | |
| face_detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
| return face_detector | |
| def detect_faces(frame): | |
| """ | |
| Detect faces in frame using MediaPipe (or fallback to Haar Cascade) | |
| Returns: list of (x, y, w, h) bounding boxes | |
| """ | |
| detector = load_face_detector() | |
| # Check if MediaPipe detector | |
| try: | |
| import mediapipe as mp | |
| if hasattr(detector, 'detect'): | |
| # MediaPipe new API | |
| image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image_rgb) | |
| detection_result = detector.detect(mp_image) | |
| faces = [] | |
| if detection_result.detections: | |
| h, w = frame.shape[:2] | |
| for detection in detection_result.detections: | |
| bbox = detection.bounding_box | |
| # Convert to (x, y, width, height) | |
| x = bbox.origin_x | |
| y = bbox.origin_y | |
| width = bbox.width | |
| height = bbox.height | |
| # Ensure positive dimensions and within frame | |
| if width > 0 and height > 0: | |
| x = max(0, x) | |
| y = max(0, y) | |
| faces.append((x, y, width, height)) | |
| return faces | |
| except Exception as e: | |
| # Silently fall back on error | |
| pass | |
| # Fallback to Haar Cascade | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| cfg = config["face_detection"] | |
| scale_factor = cfg.get("scaleFactor", 1.1) | |
| min_neighbors = cfg.get("minNeighbors", 5) | |
| min_size = cfg.get("minSize", 50) | |
| aspect_min = cfg.get("aspectRatio_min", 0.8) | |
| aspect_max = cfg.get("aspectRatio_max", 1.5) | |
| raw_faces = detector.detectMultiScale( | |
| gray, | |
| scaleFactor=scale_factor, | |
| minNeighbors=min_neighbors, | |
| minSize=(min_size, min_size), | |
| flags=cv2.CASCADE_SCALE_IMAGE | |
| ) | |
| # Filter by aspect ratio | |
| filtered_faces = [] | |
| for (x, y, w, h) in raw_faces: | |
| aspect_ratio = h / w | |
| if aspect_min <= aspect_ratio <= aspect_max: | |
| filtered_faces.append((x, y, w, h)) | |
| return filtered_faces | |
| def save_stream_frame(frame, faces=None, objects=None, status=""): | |
| """ | |
| Save current frame with annotations for live stream | |
| Optimized for web display (lower resolution, higher compression) | |
| """ | |
| annotated = frame.copy() | |
| # Reload config to get latest settings | |
| load_config() | |
| # Get stream settings from config | |
| cam_cfg = config.get("camera", {}) | |
| stream_width = cam_cfg.get("stream_width", 320) | |
| stream_quality = cam_cfg.get("stream_quality", 70) # JPEG quality 0-100 | |
| # Resize if needed | |
| h, w = annotated.shape[:2] | |
| if w > stream_width: | |
| scale = stream_width / w | |
| new_w = stream_width | |
| new_h = int(h * scale) | |
| annotated = cv2.resize(annotated, (new_w, new_h)) | |
| # Scale face rectangles | |
| if faces and len(faces) > 0: | |
| scaled_faces = [] | |
| for (x, y, fw, fh) in faces: | |
| scaled_faces.append(( | |
| int(x * scale), | |
| int(y * scale), | |
| int(fw * scale), | |
| int(fh * scale) | |
| )) | |
| faces = scaled_faces | |
| # Draw face rectangles | |
| if faces and len(faces) > 0: | |
| for (x, y, fw, fh) in faces: | |
| cv2.rectangle(annotated, (x, y), (x+fw, y+fh), (0, 255, 0), 2) | |
| cv2.putText(annotated, "Face", (x, y-10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 0), 1) | |
| # Draw status text | |
| if status: | |
| cv2.putText(annotated, status, (10, 20), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) | |
| # Draw detected objects | |
| if objects: | |
| obj_text = ", ".join([f"{o['class']} ({o['confidence']:.0%})" for o in objects[:3]]) | |
| cv2.putText(annotated, obj_text, (10, 40), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 0), 1) | |
| # Save with compression | |
| encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), stream_quality] | |
| cv2.imwrite(str(STREAM_FRAME_PATH), annotated, encode_param) | |
| def set_ptz(pan, tilt, zoom=100): | |
| """Set camera PTZ position""" | |
| pan = max(PAN_MIN, min(PAN_MAX, pan)) | |
| tilt = max(TILT_MIN, min(TILT_MAX, tilt)) | |
| cmd = f"v4l2-ctl -d /dev/video0 --set-ctrl pan_absolute={pan} --set-ctrl tilt_absolute={tilt} --set-ctrl zoom_absolute={zoom}" | |
| subprocess.run(cmd, shell=True, capture_output=True) | |
| global current_pan, current_tilt | |
| current_pan = pan | |
| current_tilt = tilt | |
| def capture_frame(width=None, height=None): | |
| """Capture a frame from camera""" | |
| # Get resolution from config if not specified | |
| if width is None or height is None: | |
| cam_cfg = config.get("camera", {}) | |
| width = cam_cfg.get("width", 640) | |
| height = cam_cfg.get("height", 480) | |
| cap = cv2.VideoCapture('/dev/video0') | |
| cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) | |
| cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) | |
| cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) | |
| # Flush buffer | |
| for _ in range(3): | |
| cap.read() | |
| ret, frame = cap.read() | |
| cap.release() | |
| if not ret: | |
| return None | |
| return frame | |
| def detect_motion_center(prev_frame, curr_frame, threshold=None): | |
| """ | |
| Detect motion and return the center of motion | |
| Returns: (x, y, motion_detected) where x,y are normalized 0-1 | |
| """ | |
| if prev_frame is None: | |
| return None, None, False | |
| # Get motion detection settings from config | |
| motion_cfg = config.get("motion_detection", {}) | |
| if threshold is None: | |
| threshold = motion_cfg.get("threshold", 30) | |
| min_area = motion_cfg.get("min_area", 1000) | |
| # Convert to grayscale and blur | |
| gray1 = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY) | |
| gray2 = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY) | |
| gray1 = cv2.GaussianBlur(gray1, (21, 21), 0) | |
| gray2 = cv2.GaussianBlur(gray2, (21, 21), 0) | |
| # Frame difference | |
| diff = cv2.absdiff(gray1, gray2) | |
| thresh_img = cv2.threshold(diff, threshold, 255, cv2.THRESH_BINARY)[1] | |
| # Dilate to fill holes | |
| thresh_img = cv2.dilate(thresh_img, None, iterations=2) | |
| # Find contours | |
| contours, _ = cv2.findContours(thresh_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if len(contours) == 0: | |
| return None, None, False | |
| # Find largest contour (main motion) | |
| largest = max(contours, key=cv2.contourArea) | |
| area = cv2.contourArea(largest) | |
| # Ignore small movements | |
| if area < min_area: | |
| return None, None, False | |
| # Calculate center of motion | |
| M = cv2.moments(largest) | |
| if M["m00"] == 0: | |
| return None, None, False | |
| cx = int(M["m10"] / M["m00"]) | |
| cy = int(M["m01"] / M["m00"]) | |
| # Normalize to 0-1 (center is 0.5, 0.5) | |
| h, w = curr_frame.shape[:2] | |
| norm_x = cx / w | |
| norm_y = cy / h | |
| return norm_x, norm_y, True | |
| def detect_objects(frame, confidence_threshold=0.5): | |
| """ | |
| Detect objects in frame using MobileNet-SSD | |
| Returns: list of (class_name, confidence, bbox) | |
| """ | |
| detector = load_detector() | |
| h, w = frame.shape[:2] | |
| # Prepare blob | |
| blob = cv2.dnn.blobFromImage(cv2.resize(frame, (300, 300)), 0.007843, | |
| (300, 300), 127.5) | |
| detector.setInput(blob) | |
| detections = detector.forward() | |
| results = [] | |
| # Loop over detections | |
| for i in range(detections.shape[2]): | |
| confidence = detections[0, 0, i, 2] | |
| if confidence > confidence_threshold: | |
| idx = int(detections[0, 0, i, 1]) | |
| class_name = CLASSES[idx] | |
| # Skip background | |
| if class_name == "background": | |
| continue | |
| # Bounding box | |
| box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) | |
| (startX, startY, endX, endY) = box.astype("int") | |
| results.append({ | |
| "class": class_name, | |
| "confidence": float(confidence), | |
| "bbox": [startX, startY, endX, endY] | |
| }) | |
| return results | |
| def pid_control(error, axis, kp=1.0, ki=0.0, kd=0.0, max_output=100000): | |
| """ | |
| PID controller for smooth camera tracking | |
| Args: | |
| error: Current error (distance from target) | |
| axis: 'pan' or 'tilt' | |
| kp, ki, kd: PID coefficients | |
| max_output: Maximum adjustment per step | |
| Returns: | |
| Control output (adjustment value) | |
| """ | |
| global pid_state | |
| current_time = time.time() | |
| state = pid_state[axis] | |
| # Time delta | |
| dt = current_time - state['last_time'] if state['last_time'] > 0 else 0.1 | |
| dt = max(0.01, min(dt, 1.0)) # Clamp dt to reasonable range | |
| # P term: proportional to current error | |
| p_term = kp * error | |
| # I term: integral of error over time (anti-windup: limit accumulation) | |
| state['error_sum'] += error * dt | |
| state['error_sum'] = max(-10.0, min(state['error_sum'], 10.0)) # Anti-windup | |
| i_term = ki * state['error_sum'] | |
| # D term: derivative of error (rate of change) | |
| d_term = kd * (error - state['last_error']) / dt if dt > 0 else 0 | |
| # Update state | |
| state['last_error'] = error | |
| state['last_time'] = current_time | |
| # Combined output | |
| output = p_term + i_term + d_term | |
| # Clamp output | |
| output = max(-max_output, min(output, max_output)) | |
| return output | |
| def adjust_ptz_to_center(motion_x, motion_y, use_pid=True): | |
| """ | |
| Adjust PTZ to center the motion using PID control | |
| motion_x, motion_y: normalized 0-1 coordinates | |
| """ | |
| # Calculate error from center (0.5, 0.5) | |
| error_x = motion_x - 0.5 | |
| error_y = motion_y - 0.5 | |
| # Deadzone | |
| deadzone = 0.05 | |
| if abs(error_x) < deadzone and abs(error_y) < deadzone: | |
| return False | |
| if use_pid: | |
| # PID parameters tuned for camera tracking | |
| # kp: Proportional gain (higher = more aggressive) | |
| # ki: Integral gain (fixes steady-state error) | |
| # kd: Derivative gain (dampens oscillation) | |
| kp = 60000 # Increased for faster response | |
| ki = 5000 # Small integral to eliminate steady-state error | |
| kd = 15000 # Derivative to smooth movement | |
| # Calculate PID adjustments | |
| pan_adjust = int(pid_control(error_x, 'pan', kp, ki, kd)) | |
| tilt_adjust = int(pid_control(-error_y, 'tilt', kp, ki, kd)) # Inverted | |
| else: | |
| # Fallback: simple proportional | |
| pan_adjust = int(error_x * 50000) | |
| tilt_adjust = int(-error_y * 50000) | |
| new_pan = current_pan + pan_adjust | |
| new_tilt = current_tilt + tilt_adjust | |
| set_ptz(new_pan, new_tilt) | |
| return True | |
| def reset_pid(): | |
| """Reset PID controller state (call when tracking target changes)""" | |
| global pid_state | |
| for axis in ['pan', 'tilt']: | |
| pid_state[axis] = {'error_sum': 0, 'last_error': 0, 'last_time': 0} | |
| def capture_audio(duration=0.5, mic_device="plughw:2,0", sample_rate=16000, channels=2): | |
| """Capture audio chunk from microphone""" | |
| cmd = [ | |
| 'arecord', | |
| '-D', mic_device, | |
| '-f', 'S16_LE', | |
| '-c', str(channels), | |
| '-r', str(sample_rate), | |
| '-d', str(duration), | |
| '-t', 'raw' | |
| ] | |
| try: | |
| result = subprocess.run(cmd, capture_output=True, timeout=duration + 1) | |
| audio_data = np.frombuffer(result.stdout, dtype=np.int16) | |
| if len(audio_data) > 0 and channels == 2 and len(audio_data) % 2 == 0: | |
| audio_data = audio_data.reshape(-1, 2) | |
| return audio_data | |
| except Exception as e: | |
| return None | |
| def analyze_sound_direction(audio_data): | |
| """ | |
| Analyze stereo audio to detect sound direction | |
| Returns: (volume, direction_score) | |
| direction_score: -1.0 (left) to 1.0 (right) | |
| """ | |
| if audio_data is None or len(audio_data) == 0: | |
| return 0, 0.0 | |
| if audio_data.ndim == 2: # Stereo | |
| left = audio_data[:, 0] | |
| right = audio_data[:, 1] | |
| # Calculate RMS for each channel | |
| rms_left = np.sqrt(np.mean(left.astype(np.float32) ** 2)) | |
| rms_right = np.sqrt(np.mean(right.astype(np.float32) ** 2)) | |
| total_volume = (rms_left + rms_right) / 2 | |
| # Direction score based on L/R balance | |
| if total_volume > 10: | |
| direction = (rms_right - rms_left) / (rms_right + rms_left + 1e-6) | |
| else: | |
| direction = 0.0 | |
| return total_volume, direction | |
| else: | |
| rms = np.sqrt(np.mean(audio_data.astype(np.float32) ** 2)) | |
| return rms, 0.0 | |
| def sound_monitor_thread(): | |
| """Background thread for sound monitoring""" | |
| global sound_monitoring | |
| sound_cfg = config.get("sound_detection", {}) | |
| threshold = sound_cfg.get("threshold", 800) | |
| sensitivity = sound_cfg.get("sensitivity", 200000) | |
| print(f"π§ Sound monitoring started (threshold={threshold})", flush=True) | |
| while sound_monitoring: | |
| try: | |
| audio = capture_audio(duration=0.3) | |
| if audio is not None: | |
| volume, direction = analyze_sound_direction(audio) | |
| if volume > threshold: | |
| print(f"π Sound detected! Volume={volume:.0f}, Direction={direction:.2f}", flush=True) | |
| sound_queue.put({'volume': volume, 'direction': direction}) | |
| except Exception as e: | |
| print(f"β οΈ Sound monitoring error: {e}", flush=True) | |
| time.sleep(1) | |
| time.sleep(0.1) | |
| def start_sound_monitoring(): | |
| """Start sound monitoring in background thread""" | |
| global sound_thread, sound_monitoring | |
| if sound_thread is not None and sound_thread.is_alive(): | |
| return | |
| sound_monitoring = True | |
| sound_thread = threading.Thread(target=sound_monitor_thread, daemon=True) | |
| sound_thread.start() | |
| def stop_sound_monitoring(): | |
| """Stop sound monitoring""" | |
| global sound_monitoring | |
| sound_monitoring = False | |
| def notify(message): | |
| """Send notification via clawdbot (if available)""" | |
| try: | |
| subprocess.run( | |
| ["clawdbot", "wake", "-m", message], | |
| capture_output=True, | |
| timeout=5 | |
| ) | |
| except Exception as e: | |
| print(f"β οΈ Notification failed: {e}", flush=True) | |
| def track_continuously(initial_frame, max_duration=300, track_fps=6, is_person=False): | |
| """ | |
| Enter tracking mode - continuously follow detected motion | |
| If is_person=True, prioritize face tracking | |
| Returns: duration tracked (seconds) | |
| Note: max_duration is a safety timeout. Tracking continues as long as target is found. | |
| """ | |
| mode_str = "person (face priority)" if is_person else "object" | |
| print(f"π― Entering tracking mode for {mode_str} (max {max_duration}s)", flush=True) | |
| # Reset PID controller for new tracking target | |
| reset_pid() | |
| prev_frame = initial_frame | |
| start_time = time.time() | |
| last_found_time = time.time() # Track when we last saw the target | |
| frame_delay = 1.0 / track_fps | |
| lost_count = 0 | |
| max_lost = 5 # Give up after 5 consecutive lost frames (about 1 second) | |
| lost_timeout = 3.0 # If not found for 3 seconds, give up | |
| last_save_time = 0 # Track last save time | |
| while True: | |
| # Safety timeout | |
| if time.time() - start_time > max_duration: | |
| print(f"β±οΈ Max duration ({max_duration}s) reached, ending tracking", flush=True) | |
| break | |
| # Lost target timeout | |
| if time.time() - last_found_time > lost_timeout: | |
| print(f"β±οΈ Target not found for {lost_timeout}s, ending tracking", flush=True) | |
| break | |
| loop_start = time.time() | |
| # Check for loud sounds (surprise/startle response) | |
| if not sound_queue.empty(): | |
| try: | |
| sound_event = sound_queue.get_nowait() | |
| volume = sound_event['volume'] | |
| direction = sound_event['direction'] | |
| # Get sound config | |
| sound_cfg = config.get("sound_detection", {}) | |
| threshold = sound_cfg.get("threshold", 800) | |
| sensitivity = sound_cfg.get("sensitivity", 200000) | |
| startle_multiplier = sound_cfg.get("startle_multiplier", 1.5) | |
| # Only react to very loud sounds during tracking (startle threshold) | |
| startle_threshold = threshold * startle_multiplier | |
| if volume > startle_threshold: | |
| print(f"π² LOUD SOUND during tracking! Looking towards it...", flush=True) | |
| # Calculate pan position | |
| pan_adjust = int(direction * sensitivity) | |
| new_pan = current_pan + pan_adjust | |
| new_pan = max(-300000, min(300000, new_pan)) | |
| # Turn towards sound | |
| set_ptz(new_pan, current_tilt) | |
| time.sleep(0.5) | |
| # Look for face at new position | |
| sound_frame = capture_frame() | |
| if sound_frame: | |
| sound_faces = detect_faces(sound_frame) | |
| if len(sound_faces) > 0: | |
| print(f"π Found face after turning to sound!", flush=True) | |
| # Continue tracking from this new position | |
| frame = sound_frame | |
| faces = sound_faces | |
| # Reset PID for new target | |
| reset_pid() | |
| last_found_time = time.time() | |
| else: | |
| print(f"π No face found, resuming previous tracking", flush=True) | |
| # Clear queue after processing startle | |
| while not sound_queue.empty(): | |
| sound_queue.get_nowait() | |
| except queue.Empty: | |
| pass | |
| # Capture frame | |
| frame = capture_frame() | |
| if frame is None: | |
| lost_count += 1 | |
| if lost_count >= max_lost: | |
| print("π· Camera error, exiting tracking", flush=True) | |
| break | |
| time.sleep(frame_delay) | |
| continue | |
| # For person tracking, try face detection first | |
| target_found = False | |
| mx, my = None, None | |
| if is_person: | |
| faces = detect_faces(frame) | |
| if len(faces) > 0: | |
| # Save stream frame with face rectangles | |
| save_stream_frame(frame, faces=faces, status="Tracking Person") | |
| # Face crop saving disabled | |
| # (Images are only shown in live stream, not saved to disk) | |
| # Use largest face | |
| largest_face = max(faces, key=lambda f: f[2] * f[3]) | |
| x, y, w, h = largest_face | |
| # Calculate center of face (normalized) | |
| frame_h, frame_w = frame.shape[:2] | |
| mx = (x + w/2) / frame_w | |
| my = (y + h/2) / frame_h | |
| target_found = True | |
| print(f"π Face at ({mx:.2f}, {my:.2f}) [detected {len(faces)} face(s)]", flush=True) | |
| else: | |
| # No face found, log it | |
| save_stream_frame(frame, status="Tracking Motion") | |
| print(f"π No face detected, fallback to motion", flush=True) | |
| # Fallback to motion detection if no face found | |
| if not target_found: | |
| mx, my, detected = detect_motion_center(prev_frame, frame) | |
| target_found = detected | |
| if detected: | |
| print(f"ποΈ Motion at ({mx:.2f}, {my:.2f})", flush=True) | |
| if target_found: | |
| lost_count = 0 | |
| last_found_time = time.time() # Update last seen time | |
| # Adjust PTZ using PID control for smooth tracking | |
| adjusted = adjust_ptz_to_center(mx, my, use_pid=True) | |
| if adjusted: | |
| # Minimal delay - PID handles smoothing | |
| time.sleep(0.05) # Just enough for camera response | |
| else: | |
| lost_count += 1 | |
| print(f"βͺ Lost target ({lost_count}/{max_lost})", flush=True) | |
| if lost_count >= max_lost: | |
| print("π Target lost (consecutive frames), ending tracking", flush=True) | |
| break | |
| prev_frame = frame | |
| # Maintain fps | |
| elapsed = time.time() - loop_start | |
| if elapsed < frame_delay: | |
| time.sleep(frame_delay - elapsed) | |
| duration = time.time() - start_time | |
| print(f"β±οΈ Tracked for {duration:.1f}s", flush=True) | |
| # Return to center | |
| print("π Returning to center...", flush=True) | |
| set_ptz(0, 0) | |
| time.sleep(1) | |
| return duration | |
| def patrol_scan(prev_frame, scan_positions, pause_sec=1.0): | |
| """ | |
| Perform one patrol scan cycle | |
| Returns: (face_detected, faces, frame) or (False, None, frame) | |
| """ | |
| patrol_cfg = config.get("patrol", {}) | |
| use_motion_detection = patrol_cfg.get("use_motion_detection", False) | |
| direct_face_detection = patrol_cfg.get("direct_face_detection", True) | |
| for pan, tilt in scan_positions: | |
| set_ptz(pan, tilt) | |
| time.sleep(pause_sec) # Wait for camera to settle | |
| # Capture frame after camera stops | |
| frame = capture_frame() | |
| if frame is None: | |
| continue | |
| time.sleep(0.2) # Additional settling | |
| frame = capture_frame() # Get stable frame | |
| if frame is None: | |
| continue | |
| if direct_face_detection: | |
| # Direct face detection (skip motion detection) | |
| faces = detect_faces(frame) | |
| if len(faces) > 0: | |
| save_stream_frame(frame, faces=faces, status="Face Found!") | |
| print(f"π Found {len(faces)} face(s) during patrol", flush=True) | |
| return True, faces, frame | |
| else: | |
| save_stream_frame(frame, status="Patrol Mode") | |
| elif use_motion_detection: | |
| # Legacy: motion detection mode | |
| save_stream_frame(frame, status="Patrol Mode") | |
| time.sleep(0.5) | |
| frame2 = capture_frame() | |
| if frame2 is None: | |
| prev_frame = frame | |
| continue | |
| mx, my, detected = detect_motion_center(frame, frame2) | |
| if detected: | |
| return True, None, frame2 | |
| prev_frame = frame | |
| return False, None, prev_frame | |
| def monitor(interval_sec=2, min_interval_between_detections=10, patrol_mode=True): | |
| """ | |
| Continuous monitoring mode with optional patrol and sound detection | |
| """ | |
| mode_str = "patrol" if patrol_mode else "static" | |
| print(f"ποΈ Monitoring started ({mode_str} mode, interval={interval_sec}s)", flush=True) | |
| # Reset to center | |
| set_ptz(0, 0) | |
| time.sleep(2) | |
| # Load config first | |
| load_config() | |
| # Start sound monitoring if enabled | |
| sound_cfg = config.get("sound_detection", {}) | |
| if sound_cfg.get("enabled", False): | |
| print(f"π§ Starting sound monitoring...", flush=True) | |
| start_sound_monitoring() | |
| else: | |
| print(f"π Sound monitoring disabled", flush=True) | |
| # Patrol positions (pan, tilt) | |
| # Sweep from left to right and back | |
| scan_positions = [ | |
| (0, 0), # Center | |
| (-300000, 0), # Left | |
| (-150000, 0), # Mid-left | |
| (0, 0), # Center | |
| (150000, 0), # Mid-right | |
| (300000, 0), # Right | |
| (0, 0), # Center | |
| ] | |
| prev_frame = None | |
| last_detection_time = 0 | |
| while True: | |
| loop_start = time.time() | |
| # Reload config if changed | |
| load_config() | |
| # Check for sound events | |
| if not sound_queue.empty(): | |
| try: | |
| sound_event = sound_queue.get_nowait() | |
| volume = sound_event['volume'] | |
| direction = sound_event['direction'] | |
| # Calculate pan position based on direction | |
| sound_cfg = config.get("sound_detection", {}) | |
| sensitivity = sound_cfg.get("sensitivity", 200000) | |
| pan_adjust = int(direction * sensitivity) | |
| new_pan = current_pan + pan_adjust | |
| new_pan = max(-300000, min(300000, new_pan)) | |
| print(f"π Turning towards sound (direction={direction:.2f}, pan={new_pan})", flush=True) | |
| set_ptz(new_pan, current_tilt) | |
| time.sleep(1) | |
| # Look for face at this position | |
| frame = capture_frame() | |
| if frame: | |
| faces = detect_faces(frame) | |
| if len(faces) > 0: | |
| print(f"π Found face after sound detection!", flush=True) | |
| save_stream_frame(frame, faces=faces, status="Sound + Face!") | |
| notify(f"π Sound β Face detected!") | |
| track_continuously(frame, is_person=True) | |
| prev_frame = None | |
| set_ptz(0, 0) | |
| time.sleep(1) | |
| continue | |
| else: | |
| save_stream_frame(frame, status="Sound detected (no face)") | |
| except queue.Empty: | |
| pass | |
| # Patrol scan if enabled | |
| if patrol_mode: | |
| print("π Patrol scanning...", flush=True) | |
| face_detected, faces, frame = patrol_scan(prev_frame, scan_positions) | |
| prev_frame = frame | |
| if face_detected and faces is not None: | |
| # Direct face detection - skip motion/object detection | |
| current_time = time.time() | |
| # Throttle detections | |
| if current_time - last_detection_time < min_interval_between_detections: | |
| print(f"βοΈ Face detected but throttled (last: {int(current_time - last_detection_time)}s ago)", flush=True) | |
| continue | |
| last_detection_time = current_time | |
| # Notify and start tracking | |
| notify(f"π Face detected ({len(faces)} face(s))") | |
| track_continuously(frame, is_person=True) | |
| # Reset and return to center | |
| prev_frame = None | |
| set_ptz(0, 0) | |
| time.sleep(1) | |
| continue | |
| else: | |
| # Static monitoring (legacy motion detection mode) | |
| frame = capture_frame() | |
| if frame is None: | |
| print("β οΈ Failed to capture frame", flush=True) | |
| time.sleep(interval_sec) | |
| continue | |
| # Update stream | |
| save_stream_frame(frame, status="Static Mode") | |
| mx, my, detected = detect_motion_center(prev_frame, frame) | |
| prev_frame = frame | |
| if detected: | |
| current_time = time.time() | |
| # Throttle detections | |
| if current_time - last_detection_time < min_interval_between_detections: | |
| print(f"βοΈ Motion detected but throttled (last: {int(current_time - last_detection_time)}s ago)", flush=True) | |
| time.sleep(interval_sec) | |
| continue | |
| last_detection_time = current_time | |
| print(f"π― Motion at ({mx:.2f}, {my:.2f})", flush=True) | |
| # Object detection (use config threshold) | |
| threshold = config.get("tracking", {}).get("object_confidence_threshold", 0.5) | |
| objects = detect_objects(frame, confidence_threshold=threshold) | |
| if objects: | |
| # Format detection message | |
| obj_str = ", ".join([f"{o['class']} ({o['confidence']:.0%})" for o in objects]) | |
| msg = f"π₯ Detected: {obj_str}" | |
| print(msg, flush=True) | |
| # Check if person detected | |
| is_person = any(obj['class'] == 'person' for obj in objects) | |
| if is_person: | |
| # Send notification | |
| notify(msg) | |
| # Enter continuous tracking mode for person | |
| track_continuously(frame, is_person=True) | |
| # Reset prev_frame after tracking | |
| prev_frame = None | |
| # Return to center before resuming patrol | |
| set_ptz(0, 0) | |
| time.sleep(1) | |
| continue | |
| else: | |
| print(f"βοΈ Detected {obj_str} but ignoring (not a person)", flush=True) | |
| else: | |
| print("π Motion but no objects recognized", flush=True) | |
| # Maintain interval (only matters in static mode) | |
| if not patrol_mode: | |
| elapsed = time.time() - loop_start | |
| if elapsed < interval_sec: | |
| time.sleep(interval_sec - elapsed) | |
| if __name__ == "__main__": | |
| mode = sys.argv[1] if len(sys.argv) > 1 else "monitor" | |
| try: | |
| if mode == "monitor": | |
| monitor() | |
| else: | |
| print(f"Unknown mode: {mode}", flush=True) | |
| sys.exit(1) | |
| except KeyboardInterrupt: | |
| print("\nβΉοΈ Stopped by user", flush=True) | |
| set_ptz(0, 0) | |
| except Exception as e: | |
| print(f"β Error: {e}", flush=True) | |
| import traceback | |
| traceback.print_exc() | |
| set_ptz(0, 0) | |
| sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment