Skip to content

Instantly share code, notes, and snippets.

@espresso3389
Created January 30, 2026 00:54
Show Gist options
  • Select an option

  • Save espresso3389/01616b4956ddb41526ffc9f4933fd0cc to your computer and use it in GitHub Desktop.

Select an option

Save espresso3389/01616b4956ddb41526ffc9f4933fd0cc to your computer and use it in GitHub Desktop.
Face tracking with Insta360 Link on Linux
#!/usr/bin/env python3
"""
Motion Detector with MobileNet-SSD Object Detection
ε‹•δ½“ζ€œηŸ₯ + 物体θͺθ­˜ + PTZθΏ½θ·‘
"""
import cv2
import subprocess
import time
import numpy as np
import sys
import json
from pathlib import Path
from datetime import datetime
import threading
import queue
import json
# PTZ limits
PAN_MIN, PAN_MAX = -522000, 522000
TILT_MIN, TILT_MAX = -324000, 360000
# Current position
current_pan = 0
current_tilt = 0
# PID controller state
pid_state = {
'pan': {'error_sum': 0, 'last_error': 0, 'last_time': 0},
'tilt': {'error_sum': 0, 'last_error': 0, 'last_time': 0}
}
# Sound detection
sound_queue = queue.Queue()
sound_thread = None
sound_monitoring = False
# MobileNet-SSD config
MODEL_DIR = Path(__file__).parent / "models"
PROTOTXT = MODEL_DIR / "deploy.prototxt"
MODEL = MODEL_DIR / "mobilenet_iter_73000.caffemodel"
# Live stream output
STREAM_FRAME_PATH = Path(__file__).parent / "stream_frame.jpg"
# Config file
CONFIG_FILE = Path(__file__).parent / "detector-config.json"
# Global config (reloaded periodically)
config = {
"face_detection": {
"scaleFactor": 1.1,
"minNeighbors": 5,
"minSize": 50,
"aspectRatio_min": 0.8,
"aspectRatio_max": 1.5
},
"tracking": {
"max_duration": 20,
"track_fps": 4,
"save_interval": 2.0
},
"patrol": {
"enabled": True,
"interval_sec": 2
}
}
config_mtime = 0
def load_config():
"""Load config from JSON file if it has changed"""
global config, config_mtime, face_detector
if not CONFIG_FILE.exists():
return
current_mtime = CONFIG_FILE.stat().st_mtime
if current_mtime > config_mtime:
try:
with open(CONFIG_FILE, 'r') as f:
new_config = json.load(f)
# Check if face detection config changed
face_config_changed = (
config.get("face_detection") != new_config.get("face_detection")
)
config.update(new_config)
config_mtime = current_mtime
print(f"πŸ”§ Config reloaded: {config}", flush=True)
# Reload face detector if config changed
if face_config_changed:
face_detector = None
print("πŸ”„ Face detector will reload with new settings", flush=True)
except Exception as e:
print(f"⚠️ Config load error: {e}", flush=True)
# COCO class labels
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
"bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
"dog", "horse", "motorbike", "person", "pottedplant", "sheep",
"sofa", "train", "tvmonitor"]
# Load detector (global, load once)
net = None
face_detector = None
def load_detector():
"""Load MobileNet-SSD model"""
global net
if net is None:
print(f"πŸ”§ Loading MobileNet-SSD from {MODEL}...", flush=True)
net = cv2.dnn.readNetFromCaffe(str(PROTOTXT), str(MODEL))
print("βœ… Model loaded", flush=True)
return net
def load_face_detector():
"""Load MediaPipe face detector"""
global face_detector
if face_detector is None:
try:
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
# Download model if needed
model_path = Path(__file__).parent / "models" / "blaze_face_short_range.tflite"
if not model_path.exists():
print("πŸ“₯ Downloading MediaPipe face detection model...", flush=True)
import urllib.request
url = "https://storage.googleapis.com/mediapipe-models/face_detector/blaze_face_short_range/float16/1/blaze_face_short_range.tflite"
urllib.request.urlretrieve(url, str(model_path))
# Get confidence from config
cfg = config.get("face_detection", {})
min_confidence = cfg.get("min_detection_confidence", 0.5)
base_options = python.BaseOptions(model_asset_path=str(model_path))
options = vision.FaceDetectorOptions(
base_options=base_options,
min_detection_confidence=min_confidence
)
face_detector = vision.FaceDetector.create_from_options(options)
print(f"βœ… MediaPipe face detector loaded (confidence={min_confidence})", flush=True)
except Exception as e:
print(f"⚠️ MediaPipe error: {e}, using Haar Cascade fallback", flush=True)
face_detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
return face_detector
def detect_faces(frame):
"""
Detect faces in frame using MediaPipe (or fallback to Haar Cascade)
Returns: list of (x, y, w, h) bounding boxes
"""
detector = load_face_detector()
# Check if MediaPipe detector
try:
import mediapipe as mp
if hasattr(detector, 'detect'):
# MediaPipe new API
image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image_rgb)
detection_result = detector.detect(mp_image)
faces = []
if detection_result.detections:
h, w = frame.shape[:2]
for detection in detection_result.detections:
bbox = detection.bounding_box
# Convert to (x, y, width, height)
x = bbox.origin_x
y = bbox.origin_y
width = bbox.width
height = bbox.height
# Ensure positive dimensions and within frame
if width > 0 and height > 0:
x = max(0, x)
y = max(0, y)
faces.append((x, y, width, height))
return faces
except Exception as e:
# Silently fall back on error
pass
# Fallback to Haar Cascade
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
cfg = config["face_detection"]
scale_factor = cfg.get("scaleFactor", 1.1)
min_neighbors = cfg.get("minNeighbors", 5)
min_size = cfg.get("minSize", 50)
aspect_min = cfg.get("aspectRatio_min", 0.8)
aspect_max = cfg.get("aspectRatio_max", 1.5)
raw_faces = detector.detectMultiScale(
gray,
scaleFactor=scale_factor,
minNeighbors=min_neighbors,
minSize=(min_size, min_size),
flags=cv2.CASCADE_SCALE_IMAGE
)
# Filter by aspect ratio
filtered_faces = []
for (x, y, w, h) in raw_faces:
aspect_ratio = h / w
if aspect_min <= aspect_ratio <= aspect_max:
filtered_faces.append((x, y, w, h))
return filtered_faces
def save_stream_frame(frame, faces=None, objects=None, status=""):
"""
Save current frame with annotations for live stream
Optimized for web display (lower resolution, higher compression)
"""
annotated = frame.copy()
# Reload config to get latest settings
load_config()
# Get stream settings from config
cam_cfg = config.get("camera", {})
stream_width = cam_cfg.get("stream_width", 320)
stream_quality = cam_cfg.get("stream_quality", 70) # JPEG quality 0-100
# Resize if needed
h, w = annotated.shape[:2]
if w > stream_width:
scale = stream_width / w
new_w = stream_width
new_h = int(h * scale)
annotated = cv2.resize(annotated, (new_w, new_h))
# Scale face rectangles
if faces and len(faces) > 0:
scaled_faces = []
for (x, y, fw, fh) in faces:
scaled_faces.append((
int(x * scale),
int(y * scale),
int(fw * scale),
int(fh * scale)
))
faces = scaled_faces
# Draw face rectangles
if faces and len(faces) > 0:
for (x, y, fw, fh) in faces:
cv2.rectangle(annotated, (x, y), (x+fw, y+fh), (0, 255, 0), 2)
cv2.putText(annotated, "Face", (x, y-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 0), 1)
# Draw status text
if status:
cv2.putText(annotated, status, (10, 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# Draw detected objects
if objects:
obj_text = ", ".join([f"{o['class']} ({o['confidence']:.0%})" for o in objects[:3]])
cv2.putText(annotated, obj_text, (10, 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 0), 1)
# Save with compression
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), stream_quality]
cv2.imwrite(str(STREAM_FRAME_PATH), annotated, encode_param)
def set_ptz(pan, tilt, zoom=100):
"""Set camera PTZ position"""
pan = max(PAN_MIN, min(PAN_MAX, pan))
tilt = max(TILT_MIN, min(TILT_MAX, tilt))
cmd = f"v4l2-ctl -d /dev/video0 --set-ctrl pan_absolute={pan} --set-ctrl tilt_absolute={tilt} --set-ctrl zoom_absolute={zoom}"
subprocess.run(cmd, shell=True, capture_output=True)
global current_pan, current_tilt
current_pan = pan
current_tilt = tilt
def capture_frame(width=None, height=None):
"""Capture a frame from camera"""
# Get resolution from config if not specified
if width is None or height is None:
cam_cfg = config.get("camera", {})
width = cam_cfg.get("width", 640)
height = cam_cfg.get("height", 480)
cap = cv2.VideoCapture('/dev/video0')
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# Flush buffer
for _ in range(3):
cap.read()
ret, frame = cap.read()
cap.release()
if not ret:
return None
return frame
def detect_motion_center(prev_frame, curr_frame, threshold=None):
"""
Detect motion and return the center of motion
Returns: (x, y, motion_detected) where x,y are normalized 0-1
"""
if prev_frame is None:
return None, None, False
# Get motion detection settings from config
motion_cfg = config.get("motion_detection", {})
if threshold is None:
threshold = motion_cfg.get("threshold", 30)
min_area = motion_cfg.get("min_area", 1000)
# Convert to grayscale and blur
gray1 = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)
gray1 = cv2.GaussianBlur(gray1, (21, 21), 0)
gray2 = cv2.GaussianBlur(gray2, (21, 21), 0)
# Frame difference
diff = cv2.absdiff(gray1, gray2)
thresh_img = cv2.threshold(diff, threshold, 255, cv2.THRESH_BINARY)[1]
# Dilate to fill holes
thresh_img = cv2.dilate(thresh_img, None, iterations=2)
# Find contours
contours, _ = cv2.findContours(thresh_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if len(contours) == 0:
return None, None, False
# Find largest contour (main motion)
largest = max(contours, key=cv2.contourArea)
area = cv2.contourArea(largest)
# Ignore small movements
if area < min_area:
return None, None, False
# Calculate center of motion
M = cv2.moments(largest)
if M["m00"] == 0:
return None, None, False
cx = int(M["m10"] / M["m00"])
cy = int(M["m01"] / M["m00"])
# Normalize to 0-1 (center is 0.5, 0.5)
h, w = curr_frame.shape[:2]
norm_x = cx / w
norm_y = cy / h
return norm_x, norm_y, True
def detect_objects(frame, confidence_threshold=0.5):
"""
Detect objects in frame using MobileNet-SSD
Returns: list of (class_name, confidence, bbox)
"""
detector = load_detector()
h, w = frame.shape[:2]
# Prepare blob
blob = cv2.dnn.blobFromImage(cv2.resize(frame, (300, 300)), 0.007843,
(300, 300), 127.5)
detector.setInput(blob)
detections = detector.forward()
results = []
# Loop over detections
for i in range(detections.shape[2]):
confidence = detections[0, 0, i, 2]
if confidence > confidence_threshold:
idx = int(detections[0, 0, i, 1])
class_name = CLASSES[idx]
# Skip background
if class_name == "background":
continue
# Bounding box
box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
(startX, startY, endX, endY) = box.astype("int")
results.append({
"class": class_name,
"confidence": float(confidence),
"bbox": [startX, startY, endX, endY]
})
return results
def pid_control(error, axis, kp=1.0, ki=0.0, kd=0.0, max_output=100000):
"""
PID controller for smooth camera tracking
Args:
error: Current error (distance from target)
axis: 'pan' or 'tilt'
kp, ki, kd: PID coefficients
max_output: Maximum adjustment per step
Returns:
Control output (adjustment value)
"""
global pid_state
current_time = time.time()
state = pid_state[axis]
# Time delta
dt = current_time - state['last_time'] if state['last_time'] > 0 else 0.1
dt = max(0.01, min(dt, 1.0)) # Clamp dt to reasonable range
# P term: proportional to current error
p_term = kp * error
# I term: integral of error over time (anti-windup: limit accumulation)
state['error_sum'] += error * dt
state['error_sum'] = max(-10.0, min(state['error_sum'], 10.0)) # Anti-windup
i_term = ki * state['error_sum']
# D term: derivative of error (rate of change)
d_term = kd * (error - state['last_error']) / dt if dt > 0 else 0
# Update state
state['last_error'] = error
state['last_time'] = current_time
# Combined output
output = p_term + i_term + d_term
# Clamp output
output = max(-max_output, min(output, max_output))
return output
def adjust_ptz_to_center(motion_x, motion_y, use_pid=True):
"""
Adjust PTZ to center the motion using PID control
motion_x, motion_y: normalized 0-1 coordinates
"""
# Calculate error from center (0.5, 0.5)
error_x = motion_x - 0.5
error_y = motion_y - 0.5
# Deadzone
deadzone = 0.05
if abs(error_x) < deadzone and abs(error_y) < deadzone:
return False
if use_pid:
# PID parameters tuned for camera tracking
# kp: Proportional gain (higher = more aggressive)
# ki: Integral gain (fixes steady-state error)
# kd: Derivative gain (dampens oscillation)
kp = 60000 # Increased for faster response
ki = 5000 # Small integral to eliminate steady-state error
kd = 15000 # Derivative to smooth movement
# Calculate PID adjustments
pan_adjust = int(pid_control(error_x, 'pan', kp, ki, kd))
tilt_adjust = int(pid_control(-error_y, 'tilt', kp, ki, kd)) # Inverted
else:
# Fallback: simple proportional
pan_adjust = int(error_x * 50000)
tilt_adjust = int(-error_y * 50000)
new_pan = current_pan + pan_adjust
new_tilt = current_tilt + tilt_adjust
set_ptz(new_pan, new_tilt)
return True
def reset_pid():
"""Reset PID controller state (call when tracking target changes)"""
global pid_state
for axis in ['pan', 'tilt']:
pid_state[axis] = {'error_sum': 0, 'last_error': 0, 'last_time': 0}
def capture_audio(duration=0.5, mic_device="plughw:2,0", sample_rate=16000, channels=2):
"""Capture audio chunk from microphone"""
cmd = [
'arecord',
'-D', mic_device,
'-f', 'S16_LE',
'-c', str(channels),
'-r', str(sample_rate),
'-d', str(duration),
'-t', 'raw'
]
try:
result = subprocess.run(cmd, capture_output=True, timeout=duration + 1)
audio_data = np.frombuffer(result.stdout, dtype=np.int16)
if len(audio_data) > 0 and channels == 2 and len(audio_data) % 2 == 0:
audio_data = audio_data.reshape(-1, 2)
return audio_data
except Exception as e:
return None
def analyze_sound_direction(audio_data):
"""
Analyze stereo audio to detect sound direction
Returns: (volume, direction_score)
direction_score: -1.0 (left) to 1.0 (right)
"""
if audio_data is None or len(audio_data) == 0:
return 0, 0.0
if audio_data.ndim == 2: # Stereo
left = audio_data[:, 0]
right = audio_data[:, 1]
# Calculate RMS for each channel
rms_left = np.sqrt(np.mean(left.astype(np.float32) ** 2))
rms_right = np.sqrt(np.mean(right.astype(np.float32) ** 2))
total_volume = (rms_left + rms_right) / 2
# Direction score based on L/R balance
if total_volume > 10:
direction = (rms_right - rms_left) / (rms_right + rms_left + 1e-6)
else:
direction = 0.0
return total_volume, direction
else:
rms = np.sqrt(np.mean(audio_data.astype(np.float32) ** 2))
return rms, 0.0
def sound_monitor_thread():
"""Background thread for sound monitoring"""
global sound_monitoring
sound_cfg = config.get("sound_detection", {})
threshold = sound_cfg.get("threshold", 800)
sensitivity = sound_cfg.get("sensitivity", 200000)
print(f"🎧 Sound monitoring started (threshold={threshold})", flush=True)
while sound_monitoring:
try:
audio = capture_audio(duration=0.3)
if audio is not None:
volume, direction = analyze_sound_direction(audio)
if volume > threshold:
print(f"πŸ”Š Sound detected! Volume={volume:.0f}, Direction={direction:.2f}", flush=True)
sound_queue.put({'volume': volume, 'direction': direction})
except Exception as e:
print(f"⚠️ Sound monitoring error: {e}", flush=True)
time.sleep(1)
time.sleep(0.1)
def start_sound_monitoring():
"""Start sound monitoring in background thread"""
global sound_thread, sound_monitoring
if sound_thread is not None and sound_thread.is_alive():
return
sound_monitoring = True
sound_thread = threading.Thread(target=sound_monitor_thread, daemon=True)
sound_thread.start()
def stop_sound_monitoring():
"""Stop sound monitoring"""
global sound_monitoring
sound_monitoring = False
def notify(message):
"""Send notification via clawdbot (if available)"""
try:
subprocess.run(
["clawdbot", "wake", "-m", message],
capture_output=True,
timeout=5
)
except Exception as e:
print(f"⚠️ Notification failed: {e}", flush=True)
def track_continuously(initial_frame, max_duration=300, track_fps=6, is_person=False):
"""
Enter tracking mode - continuously follow detected motion
If is_person=True, prioritize face tracking
Returns: duration tracked (seconds)
Note: max_duration is a safety timeout. Tracking continues as long as target is found.
"""
mode_str = "person (face priority)" if is_person else "object"
print(f"🎯 Entering tracking mode for {mode_str} (max {max_duration}s)", flush=True)
# Reset PID controller for new tracking target
reset_pid()
prev_frame = initial_frame
start_time = time.time()
last_found_time = time.time() # Track when we last saw the target
frame_delay = 1.0 / track_fps
lost_count = 0
max_lost = 5 # Give up after 5 consecutive lost frames (about 1 second)
lost_timeout = 3.0 # If not found for 3 seconds, give up
last_save_time = 0 # Track last save time
while True:
# Safety timeout
if time.time() - start_time > max_duration:
print(f"⏱️ Max duration ({max_duration}s) reached, ending tracking", flush=True)
break
# Lost target timeout
if time.time() - last_found_time > lost_timeout:
print(f"⏱️ Target not found for {lost_timeout}s, ending tracking", flush=True)
break
loop_start = time.time()
# Check for loud sounds (surprise/startle response)
if not sound_queue.empty():
try:
sound_event = sound_queue.get_nowait()
volume = sound_event['volume']
direction = sound_event['direction']
# Get sound config
sound_cfg = config.get("sound_detection", {})
threshold = sound_cfg.get("threshold", 800)
sensitivity = sound_cfg.get("sensitivity", 200000)
startle_multiplier = sound_cfg.get("startle_multiplier", 1.5)
# Only react to very loud sounds during tracking (startle threshold)
startle_threshold = threshold * startle_multiplier
if volume > startle_threshold:
print(f"😲 LOUD SOUND during tracking! Looking towards it...", flush=True)
# Calculate pan position
pan_adjust = int(direction * sensitivity)
new_pan = current_pan + pan_adjust
new_pan = max(-300000, min(300000, new_pan))
# Turn towards sound
set_ptz(new_pan, current_tilt)
time.sleep(0.5)
# Look for face at new position
sound_frame = capture_frame()
if sound_frame:
sound_faces = detect_faces(sound_frame)
if len(sound_faces) > 0:
print(f"😊 Found face after turning to sound!", flush=True)
# Continue tracking from this new position
frame = sound_frame
faces = sound_faces
# Reset PID for new target
reset_pid()
last_found_time = time.time()
else:
print(f"πŸ‘‚ No face found, resuming previous tracking", flush=True)
# Clear queue after processing startle
while not sound_queue.empty():
sound_queue.get_nowait()
except queue.Empty:
pass
# Capture frame
frame = capture_frame()
if frame is None:
lost_count += 1
if lost_count >= max_lost:
print("πŸ“· Camera error, exiting tracking", flush=True)
break
time.sleep(frame_delay)
continue
# For person tracking, try face detection first
target_found = False
mx, my = None, None
if is_person:
faces = detect_faces(frame)
if len(faces) > 0:
# Save stream frame with face rectangles
save_stream_frame(frame, faces=faces, status="Tracking Person")
# Face crop saving disabled
# (Images are only shown in live stream, not saved to disk)
# Use largest face
largest_face = max(faces, key=lambda f: f[2] * f[3])
x, y, w, h = largest_face
# Calculate center of face (normalized)
frame_h, frame_w = frame.shape[:2]
mx = (x + w/2) / frame_w
my = (y + h/2) / frame_h
target_found = True
print(f"😊 Face at ({mx:.2f}, {my:.2f}) [detected {len(faces)} face(s)]", flush=True)
else:
# No face found, log it
save_stream_frame(frame, status="Tracking Motion")
print(f"πŸ” No face detected, fallback to motion", flush=True)
# Fallback to motion detection if no face found
if not target_found:
mx, my, detected = detect_motion_center(prev_frame, frame)
target_found = detected
if detected:
print(f"πŸ‘οΈ Motion at ({mx:.2f}, {my:.2f})", flush=True)
if target_found:
lost_count = 0
last_found_time = time.time() # Update last seen time
# Adjust PTZ using PID control for smooth tracking
adjusted = adjust_ptz_to_center(mx, my, use_pid=True)
if adjusted:
# Minimal delay - PID handles smoothing
time.sleep(0.05) # Just enough for camera response
else:
lost_count += 1
print(f"βšͺ Lost target ({lost_count}/{max_lost})", flush=True)
if lost_count >= max_lost:
print("πŸ”š Target lost (consecutive frames), ending tracking", flush=True)
break
prev_frame = frame
# Maintain fps
elapsed = time.time() - loop_start
if elapsed < frame_delay:
time.sleep(frame_delay - elapsed)
duration = time.time() - start_time
print(f"⏱️ Tracked for {duration:.1f}s", flush=True)
# Return to center
print("πŸ”„ Returning to center...", flush=True)
set_ptz(0, 0)
time.sleep(1)
return duration
def patrol_scan(prev_frame, scan_positions, pause_sec=1.0):
"""
Perform one patrol scan cycle
Returns: (face_detected, faces, frame) or (False, None, frame)
"""
patrol_cfg = config.get("patrol", {})
use_motion_detection = patrol_cfg.get("use_motion_detection", False)
direct_face_detection = patrol_cfg.get("direct_face_detection", True)
for pan, tilt in scan_positions:
set_ptz(pan, tilt)
time.sleep(pause_sec) # Wait for camera to settle
# Capture frame after camera stops
frame = capture_frame()
if frame is None:
continue
time.sleep(0.2) # Additional settling
frame = capture_frame() # Get stable frame
if frame is None:
continue
if direct_face_detection:
# Direct face detection (skip motion detection)
faces = detect_faces(frame)
if len(faces) > 0:
save_stream_frame(frame, faces=faces, status="Face Found!")
print(f"😊 Found {len(faces)} face(s) during patrol", flush=True)
return True, faces, frame
else:
save_stream_frame(frame, status="Patrol Mode")
elif use_motion_detection:
# Legacy: motion detection mode
save_stream_frame(frame, status="Patrol Mode")
time.sleep(0.5)
frame2 = capture_frame()
if frame2 is None:
prev_frame = frame
continue
mx, my, detected = detect_motion_center(frame, frame2)
if detected:
return True, None, frame2
prev_frame = frame
return False, None, prev_frame
def monitor(interval_sec=2, min_interval_between_detections=10, patrol_mode=True):
"""
Continuous monitoring mode with optional patrol and sound detection
"""
mode_str = "patrol" if patrol_mode else "static"
print(f"πŸ‘οΈ Monitoring started ({mode_str} mode, interval={interval_sec}s)", flush=True)
# Reset to center
set_ptz(0, 0)
time.sleep(2)
# Load config first
load_config()
# Start sound monitoring if enabled
sound_cfg = config.get("sound_detection", {})
if sound_cfg.get("enabled", False):
print(f"🎧 Starting sound monitoring...", flush=True)
start_sound_monitoring()
else:
print(f"πŸ”‡ Sound monitoring disabled", flush=True)
# Patrol positions (pan, tilt)
# Sweep from left to right and back
scan_positions = [
(0, 0), # Center
(-300000, 0), # Left
(-150000, 0), # Mid-left
(0, 0), # Center
(150000, 0), # Mid-right
(300000, 0), # Right
(0, 0), # Center
]
prev_frame = None
last_detection_time = 0
while True:
loop_start = time.time()
# Reload config if changed
load_config()
# Check for sound events
if not sound_queue.empty():
try:
sound_event = sound_queue.get_nowait()
volume = sound_event['volume']
direction = sound_event['direction']
# Calculate pan position based on direction
sound_cfg = config.get("sound_detection", {})
sensitivity = sound_cfg.get("sensitivity", 200000)
pan_adjust = int(direction * sensitivity)
new_pan = current_pan + pan_adjust
new_pan = max(-300000, min(300000, new_pan))
print(f"πŸ‘‚ Turning towards sound (direction={direction:.2f}, pan={new_pan})", flush=True)
set_ptz(new_pan, current_tilt)
time.sleep(1)
# Look for face at this position
frame = capture_frame()
if frame:
faces = detect_faces(frame)
if len(faces) > 0:
print(f"😊 Found face after sound detection!", flush=True)
save_stream_frame(frame, faces=faces, status="Sound + Face!")
notify(f"πŸ”Š Sound β†’ Face detected!")
track_continuously(frame, is_person=True)
prev_frame = None
set_ptz(0, 0)
time.sleep(1)
continue
else:
save_stream_frame(frame, status="Sound detected (no face)")
except queue.Empty:
pass
# Patrol scan if enabled
if patrol_mode:
print("πŸ” Patrol scanning...", flush=True)
face_detected, faces, frame = patrol_scan(prev_frame, scan_positions)
prev_frame = frame
if face_detected and faces is not None:
# Direct face detection - skip motion/object detection
current_time = time.time()
# Throttle detections
if current_time - last_detection_time < min_interval_between_detections:
print(f"⏭️ Face detected but throttled (last: {int(current_time - last_detection_time)}s ago)", flush=True)
continue
last_detection_time = current_time
# Notify and start tracking
notify(f"😊 Face detected ({len(faces)} face(s))")
track_continuously(frame, is_person=True)
# Reset and return to center
prev_frame = None
set_ptz(0, 0)
time.sleep(1)
continue
else:
# Static monitoring (legacy motion detection mode)
frame = capture_frame()
if frame is None:
print("⚠️ Failed to capture frame", flush=True)
time.sleep(interval_sec)
continue
# Update stream
save_stream_frame(frame, status="Static Mode")
mx, my, detected = detect_motion_center(prev_frame, frame)
prev_frame = frame
if detected:
current_time = time.time()
# Throttle detections
if current_time - last_detection_time < min_interval_between_detections:
print(f"⏭️ Motion detected but throttled (last: {int(current_time - last_detection_time)}s ago)", flush=True)
time.sleep(interval_sec)
continue
last_detection_time = current_time
print(f"🎯 Motion at ({mx:.2f}, {my:.2f})", flush=True)
# Object detection (use config threshold)
threshold = config.get("tracking", {}).get("object_confidence_threshold", 0.5)
objects = detect_objects(frame, confidence_threshold=threshold)
if objects:
# Format detection message
obj_str = ", ".join([f"{o['class']} ({o['confidence']:.0%})" for o in objects])
msg = f"πŸŽ₯ Detected: {obj_str}"
print(msg, flush=True)
# Check if person detected
is_person = any(obj['class'] == 'person' for obj in objects)
if is_person:
# Send notification
notify(msg)
# Enter continuous tracking mode for person
track_continuously(frame, is_person=True)
# Reset prev_frame after tracking
prev_frame = None
# Return to center before resuming patrol
set_ptz(0, 0)
time.sleep(1)
continue
else:
print(f"⏭️ Detected {obj_str} but ignoring (not a person)", flush=True)
else:
print("πŸ” Motion but no objects recognized", flush=True)
# Maintain interval (only matters in static mode)
if not patrol_mode:
elapsed = time.time() - loop_start
if elapsed < interval_sec:
time.sleep(interval_sec - elapsed)
if __name__ == "__main__":
mode = sys.argv[1] if len(sys.argv) > 1 else "monitor"
try:
if mode == "monitor":
monitor()
else:
print(f"Unknown mode: {mode}", flush=True)
sys.exit(1)
except KeyboardInterrupt:
print("\n⏹️ Stopped by user", flush=True)
set_ptz(0, 0)
except Exception as e:
print(f"❌ Error: {e}", flush=True)
import traceback
traceback.print_exc()
set_ptz(0, 0)
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment