Created
February 8, 2026 05:57
-
-
Save 9000cats/87d929d10a65eb2d36167c5409948bf9 to your computer and use it in GitHub Desktop.
Displays CPU, Memory, and GPU utilization as three full-height vertical bar graphs on a Framework Laptop 16 LED Matrix spacer module. Dual GPU support. Powers off on lid close & sleep.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Framework LED Matrix System Monitor | |
| ==================================== | |
| Displays CPU, Memory, and GPU utilization as three full-height vertical | |
| bar graphs on a Framework Laptop 16 LED Matrix spacer module (9x34 LEDs). | |
| Column layout: CPU [0-2] | MEM [3-5] | GPU [6-8] | |
| Dual-GPU support (AMD Radeon 890M iGPU + NVIDIA RTX 5070 dGPU): | |
| - Uses Windows Performance Counters to read 3D engine utilization — | |
| the same data source Task Manager uses. | |
| - GPU LUID-to-name mapping is read from the DirectX registry, which | |
| is the definitive source Windows uses internally. | |
| - Whichever GPU has higher 3D utilization is displayed. | |
| - NPUs, virtual adapters, and other non-GPU devices are filtered out. | |
| Sleep/wake support (works in Session 0 / NSSM services): | |
| - Registers three independent power callbacks via powrprof.dll: | |
| 1. PowerRegisterSuspendResumeNotification (PBT_APMSUSPEND / RESUME) | |
| 2. PowerSettingRegisterNotification for GUID_CONSOLE_DISPLAY_STATE | |
| 3. PowerSettingRegisterNotification for GUID_LIDSWITCH_STATE_CHANGE | |
| - On Modern Standby (S0ix) systems like the Framework 16, USB | |
| periodically power-cycles during sleep, resetting the LED module | |
| to its default breathing animation. To combat this, the main loop | |
| enters a "sleep guard" mode that continuously re-sends clear+sleep | |
| commands every SLEEP_GUARD_SECS, catching each USB reset. | |
| - Debug log written to fw_led_power.log beside this script. | |
| Requirements (just two packages): | |
| pip install pyserial psutil | |
| Usage: | |
| python fw_led_monitor.py | |
| Press Ctrl+C to stop (the display is cleared on exit). | |
| """ | |
| import os | |
| import re | |
| import sys | |
| import time | |
| import signal | |
| import logging | |
| import subprocess | |
| import threading | |
| import ctypes | |
| import serial | |
| import psutil | |
| # --- Configuration -------------------------------------------------------- | |
| SERIAL_PORT = "COM3" | |
| BAUD_RATE = 115200 | |
| UPDATE_SECS = 2 | |
| # How often to re-send clear+sleep while the system is in Modern Standby. | |
| # Shorter = less visible breathing animation between USB resets. | |
| SLEEP_GUARD_SECS = 1 | |
| COLS = 9 | |
| ROWS = 34 | |
| MAGIC = [0x32, 0xAC] | |
| CMD_BRIGHTNESS = 0x00 | |
| CMD_SLEEP = 0x03 | |
| CMD_STAGE_COL = 0x07 | |
| CMD_FLUSH_COLS = 0x08 | |
| BAR_BRIGHT = 255 | |
| BAR_DIM = 10 | |
| # How many seconds a sleep() must overshoot before we assume the PC slept | |
| TIME_JUMP_THRESHOLD = 30 | |
| _SW_FLAGS = 0 | |
| if sys.platform == "win32": | |
| _SW_FLAGS = subprocess.CREATE_NO_WINDOW | |
| _GPU_KEYWORDS = ["radeon", "geforce", "nvidia", "amd", "rtx", "rx ", | |
| "intel arc", "intel iris", "intel uhd", "intel hd", | |
| "quadro", "tesla"] | |
| _EXCLUDE_KEYWORDS = ["npu", "compute accelerator", "basic display", | |
| "basic render", "microsoft"] | |
| # --- Debug log (critical for diagnosing Session 0 / NSSM issues) --------- | |
| _log = logging.getLogger("fw_led_power") | |
| _log.setLevel(logging.DEBUG) | |
| _log_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), | |
| "fw_led_power.log") | |
| _fh = logging.FileHandler(_log_path, encoding="utf-8") | |
| _fh.setFormatter(logging.Formatter("%(asctime)s %(message)s")) | |
| _log.addHandler(_fh) | |
| # ===================================================================== | |
| # Power Monitor — detect Windows sleep / wake (Session 0 safe) | |
| # ===================================================================== | |
| # | |
| # Three independent callback registrations, all via powrprof.dll with | |
| # DEVICE_NOTIFY_CALLBACK (no window, no message pump): | |
| # | |
| # 1. PowerRegisterSuspendResumeNotification | |
| # → PBT_APMSUSPEND / PBT_APMRESUMEAUTOMATIC | |
| # | |
| # 2. PowerSettingRegisterNotification (GUID_CONSOLE_DISPLAY_STATE) | |
| # → Display off (0) / on (1) / dimmed (2) | |
| # | |
| # 3. PowerSettingRegisterNotification (GUID_LIDSWITCH_STATE_CHANGE) | |
| # → Lid closed (0) / opened (1) | |
| # | |
| # Lid-close and display-off often fire BEFORE PBT_APMSUSPEND, which | |
| # gives us the best chance of clearing the LED matrix before the USB | |
| # bus is suspended — especially under Modern Standby (S0ix). | |
| # | |
| # All callbacks run on OS thread-pool threads; serial access is | |
| # protected by ser_lock. | |
| # ===================================================================== | |
| # --- Win32 constants and types --- | |
| PBT_APMSUSPEND = 0x0004 | |
| PBT_APMRESUMEAUTOMATIC = 0x0012 | |
| PBT_APMRESUMESUSPEND = 0x0007 | |
| PBT_POWERSETTINGCHANGE = 0x8013 | |
| DEVICE_NOTIFY_CALLBACK = 2 | |
| # Callback: ULONG CALLBACK fn(PVOID Context, ULONG Type, PVOID Setting) | |
| _POWER_CB_TYPE = ctypes.WINFUNCTYPE( | |
| ctypes.c_ulong, # return | |
| ctypes.c_void_p, # Context | |
| ctypes.c_ulong, # Type | |
| ctypes.c_void_p, # Setting (NULL or -> POWERBROADCAST_SETTING) | |
| ) | |
| class _DEVICE_NOTIFY_SUBSCRIBE_PARAMETERS(ctypes.Structure): | |
| _fields_ = [ | |
| ("Callback", _POWER_CB_TYPE), | |
| ("Context", ctypes.c_void_p), | |
| ] | |
| class _GUID(ctypes.Structure): | |
| _fields_ = [ | |
| ("Data1", ctypes.c_ulong), | |
| ("Data2", ctypes.c_ushort), | |
| ("Data3", ctypes.c_ushort), | |
| ("Data4", ctypes.c_ubyte * 8), | |
| ] | |
| # {6FE69556-704A-47A0-8F24-C28D936FDA47} | |
| _GUID_CONSOLE_DISPLAY_STATE = _GUID( | |
| 0x6FE69556, 0x704A, 0x47A0, | |
| (ctypes.c_ubyte * 8)(0x8F, 0x24, 0xC2, 0x8D, 0x93, 0x6F, 0xDA, 0x47), | |
| ) | |
| # {BA3E0F4D-B817-4094-A2D1-D56379E6A0F3} | |
| _GUID_LIDSWITCH_STATE_CHANGE = _GUID( | |
| 0xBA3E0F4D, 0xB817, 0x4094, | |
| (ctypes.c_ubyte * 8)(0xA2, 0xD1, 0xD5, 0x63, 0x79, 0xE6, 0xA0, 0xF3), | |
| ) | |
| class PowerMonitor: | |
| """ | |
| Registers multiple power-state callbacks via powrprof.dll. | |
| Works from Session 0 (Windows Services / NSSM) — no window required. | |
| Exposes a simple boolean `asleep` property that the main loop polls | |
| to decide whether to show system stats or guard the display. | |
| """ | |
| def __init__(self): | |
| self._asleep = False | |
| self._state_lock = threading.Lock() | |
| self._wake_event = threading.Event() # pulsed once on wake | |
| self._handles = [] | |
| self._prevent_gc = [] | |
| self._registrations = {} | |
| powrprof = ctypes.windll.powrprof | |
| self._register_suspend_resume(powrprof) | |
| self._register_power_setting( | |
| powrprof, "DisplayState", _GUID_CONSOLE_DISPLAY_STATE) | |
| self._register_power_setting( | |
| powrprof, "LidSwitch", _GUID_LIDSWITCH_STATE_CHANGE) | |
| for name, ok in self._registrations.items(): | |
| _log.info("Registration %-20s : %s", | |
| name, "OK" if ok else "FAILED") | |
| # -- registration helpers ---------------------------------------------- | |
| def _register_suspend_resume(self, powrprof): | |
| name = "SuspendResume" | |
| cb_func = _POWER_CB_TYPE(self._suspend_resume_cb) | |
| params = _DEVICE_NOTIFY_SUBSCRIBE_PARAMETERS() | |
| params.Callback = cb_func | |
| params.Context = None | |
| handle = ctypes.c_void_p() | |
| self._prevent_gc.extend([cb_func, params]) | |
| try: | |
| rc = powrprof.PowerRegisterSuspendResumeNotification( | |
| DEVICE_NOTIFY_CALLBACK, | |
| ctypes.byref(params), | |
| ctypes.byref(handle), | |
| ) | |
| if rc == 0: | |
| self._handles.append(("suspend_resume", handle)) | |
| self._registrations[name] = True | |
| _log.info("PowerRegisterSuspendResumeNotification -> OK") | |
| else: | |
| self._registrations[name] = False | |
| _log.warning("PowerRegisterSuspendResumeNotification -> rc=%d", rc) | |
| except Exception as e: | |
| self._registrations[name] = False | |
| _log.warning("PowerRegisterSuspendResumeNotification -> %s", e) | |
| def _register_power_setting(self, powrprof, name, guid): | |
| cb_func = _POWER_CB_TYPE(self._power_setting_cb) | |
| params = _DEVICE_NOTIFY_SUBSCRIBE_PARAMETERS() | |
| params.Callback = cb_func | |
| params.Context = None | |
| handle = ctypes.c_void_p() | |
| self._prevent_gc.extend([cb_func, params]) | |
| try: | |
| rc = powrprof.PowerSettingRegisterNotification( | |
| ctypes.byref(guid), | |
| DEVICE_NOTIFY_CALLBACK, | |
| ctypes.byref(params), | |
| ctypes.byref(handle), | |
| ) | |
| if rc == 0: | |
| self._handles.append(("power_setting", handle)) | |
| self._registrations[name] = True | |
| _log.info("PowerSettingRegisterNotification(%s) -> OK", name) | |
| else: | |
| self._registrations[name] = False | |
| _log.warning("PowerSettingRegisterNotification(%s) -> rc=%d", | |
| name, rc) | |
| except Exception as e: | |
| self._registrations[name] = False | |
| _log.warning("PowerSettingRegisterNotification(%s) -> %s", name, e) | |
| # -- public interface -------------------------------------------------- | |
| @property | |
| def active(self): | |
| return any(self._registrations.values()) | |
| @property | |
| def asleep(self): | |
| return self._asleep | |
| def status_line(self): | |
| parts = [] | |
| for name, ok in self._registrations.items(): | |
| parts.append(f"{name}={'OK' if ok else 'FAIL'}") | |
| return ", ".join(parts) | |
| def check_and_clear_wake(self): | |
| """Return True (once) if the system has woken since last check.""" | |
| if self._wake_event.is_set(): | |
| self._wake_event.clear() | |
| return True | |
| return False | |
| def force_sleep(self): | |
| """External trigger (e.g. time-jump fallback).""" | |
| self._handle_sleep("time-jump-fallback") | |
| def force_wake(self): | |
| """External trigger (e.g. time-jump fallback).""" | |
| self._handle_wake("time-jump-fallback") | |
| def unregister(self): | |
| powrprof = ctypes.windll.powrprof | |
| for kind, handle in self._handles: | |
| try: | |
| if kind == "suspend_resume": | |
| powrprof.PowerUnregisterSuspendResumeNotification(handle) | |
| else: | |
| powrprof.PowerSettingUnregisterNotification(handle) | |
| except Exception: | |
| pass | |
| self._handles.clear() | |
| self._registrations.clear() | |
| # -- internal state transitions (deduplicated) ------------------------- | |
| def _handle_sleep(self, source): | |
| with self._state_lock: | |
| if self._asleep: | |
| _log.debug("Sleep signal from %s — already asleep, ignoring.", | |
| source) | |
| return | |
| self._asleep = True | |
| _log.info(">>> SLEEP triggered by: %s", source) | |
| def _handle_wake(self, source): | |
| with self._state_lock: | |
| if not self._asleep: | |
| _log.debug("Wake signal from %s — already awake, ignoring.", | |
| source) | |
| return | |
| self._asleep = False | |
| _log.info(">>> WAKE triggered by: %s", source) | |
| self._wake_event.set() | |
| # -- OS callbacks (run on Windows thread-pool threads) ------------------ | |
| def _suspend_resume_cb(self, context, event_type, setting): | |
| _log.debug("_suspend_resume_cb event_type=0x%04X", event_type) | |
| if event_type == PBT_APMSUSPEND: | |
| self._handle_sleep("PBT_APMSUSPEND") | |
| elif event_type in (PBT_APMRESUMEAUTOMATIC, PBT_APMRESUMESUSPEND): | |
| self._handle_wake("PBT_APMRESUME (0x%04X)" % event_type) | |
| return 0 | |
| def _power_setting_cb(self, context, event_type, setting): | |
| _log.debug("_power_setting_cb event_type=0x%04X setting=%s", | |
| event_type, setting) | |
| if event_type != PBT_POWERSETTINGCHANGE or not setting: | |
| return 0 | |
| # POWERBROADCAST_SETTING layout: | |
| # offset 0: GUID PowerSetting (16 bytes) | |
| # offset 16: DWORD DataLength (4 bytes) | |
| # offset 20: BYTE Data[1] (we read as DWORD) | |
| try: | |
| data_len = ctypes.c_ulong.from_address(setting + 16).value | |
| if data_len < 4: | |
| return 0 | |
| value = ctypes.c_ulong.from_address(setting + 20).value | |
| except Exception as e: | |
| _log.warning("Failed to read POWERBROADCAST_SETTING: %s", e) | |
| return 0 | |
| try: | |
| guid_data1 = ctypes.c_ulong.from_address(setting).value | |
| except Exception: | |
| guid_data1 = 0 | |
| if guid_data1 == 0x6FE69556: | |
| label = {0: "off", 1: "on", 2: "dimmed"}.get(value, "?") | |
| _log.info("DisplayState changed -> %d (%s)", value, label) | |
| if value == 0: | |
| self._handle_sleep("DisplayState=off") | |
| elif value == 1: | |
| self._handle_wake("DisplayState=on") | |
| elif guid_data1 == 0xBA3E0F4D: | |
| label = {0: "closed", 1: "open"}.get(value, "?") | |
| _log.info("LidSwitch changed -> %d (%s)", value, label) | |
| if value == 0: | |
| self._handle_sleep("LidSwitch=closed") | |
| elif value == 1: | |
| self._handle_wake("LidSwitch=open") | |
| return 0 | |
| # ===================================================================== | |
| # GPU Discovery — DirectX Registry for LUID mapping | |
| # ===================================================================== | |
| def _is_real_gpu(name): | |
| lower = name.lower() | |
| for kw in _EXCLUDE_KEYWORDS: | |
| if kw in lower: | |
| return False | |
| for kw in _GPU_KEYWORDS: | |
| if kw in lower: | |
| return True | |
| return False | |
| def _discover_gpu_luids_from_registry(): | |
| """ | |
| Read HKLM\\SOFTWARE\\Microsoft\\DirectX\\{GUID} subkeys. | |
| Each adapter has a 'Description' (name) and 'AdapterLuid' (QWORD). | |
| The LUID QWORD's low 32 bits correspond to the 0x<LOW> part | |
| of the performance counter instance name format: | |
| pid_NNN_luid_0x00000000_0x<LOW>_phys_... | |
| Returns: dict { "0x<low_hex>": "Adapter Name" } for real GPUs only. | |
| """ | |
| ps = ( | |
| "$results = @();" | |
| "Get-ChildItem 'HKLM:\\SOFTWARE\\Microsoft\\DirectX' -ErrorAction SilentlyContinue | " | |
| "ForEach-Object {" | |
| " $desc = (Get-ItemProperty $_.PSPath -ErrorAction SilentlyContinue).Description;" | |
| " $luid = (Get-ItemProperty $_.PSPath -ErrorAction SilentlyContinue).AdapterLuid;" | |
| " if ($desc -and $luid) {" | |
| " $low = [uint32]($luid -band 0xFFFFFFFF);" | |
| " $hexLow = '0x' + $low.ToString('x8');" | |
| " Write-Output ($desc + '|' + $hexLow)" | |
| " }" | |
| "}" | |
| ) | |
| mapping = {} | |
| try: | |
| r = subprocess.run( | |
| ["powershell", "-NoProfile", "-Command", ps], | |
| capture_output=True, text=True, timeout=15, | |
| creationflags=_SW_FLAGS, | |
| ) | |
| if r.returncode != 0: | |
| return mapping | |
| for line in r.stdout.strip().splitlines(): | |
| parts = line.rsplit("|", 1) | |
| if len(parts) != 2: | |
| continue | |
| name, luid_hex = parts[0].strip(), parts[1].strip().lower() | |
| if name and luid_hex: | |
| mapping[luid_hex] = name | |
| except Exception: | |
| pass | |
| return mapping | |
| def _sample_gpu_counters(): | |
| """ | |
| Call Get-Counter once and return per-LUID 3D engine utilization. | |
| Only includes engtype_3D instances. | |
| Sums across all processes per LUID (matching Task Manager's 3D graph). | |
| Returns: dict { luid_suffix: utilization_percent } | |
| """ | |
| ps = ( | |
| "try {" | |
| " $s = (Get-Counter '\\GPU Engine(*)\\Utilization Percentage'" | |
| " -ErrorAction Stop).CounterSamples;" | |
| " $s | ForEach-Object { $_.InstanceName + '|' + $_.CookedValue }" | |
| "} catch { Write-Output 'ERROR' }" | |
| ) | |
| try: | |
| r = subprocess.run( | |
| ["powershell", "-NoProfile", "-Command", ps], | |
| capture_output=True, text=True, timeout=10, | |
| creationflags=_SW_FLAGS, | |
| ) | |
| if r.returncode != 0 or "ERROR" in r.stdout: | |
| return {} | |
| per_luid = {} | |
| for line in r.stdout.strip().splitlines(): | |
| parts = line.rsplit("|", 1) | |
| if len(parts) != 2: | |
| continue | |
| instance, val_str = parts | |
| if "engtype_3d" not in instance.lower(): | |
| continue | |
| try: | |
| val = float(val_str) | |
| except ValueError: | |
| continue | |
| m = re.search(r"luid_0x[0-9a-fA-F]+_(0x[0-9a-fA-F]+)", instance) | |
| if not m: | |
| continue | |
| luid = m.group(1).lower() | |
| per_luid[luid] = per_luid.get(luid, 0.0) + val | |
| return {k: min(v, 100.0) for k, v in per_luid.items()} | |
| except Exception: | |
| return {} | |
| class GPUManager: | |
| def __init__(self): | |
| self.luid_to_name = {} | |
| self.nvidia_luid = None | |
| self.amd_luid = None | |
| print(" Reading GPU LUID mapping from DirectX registry...") | |
| print() | |
| all_adapters = _discover_gpu_luids_from_registry() | |
| if not all_adapters: | |
| print(" [WARNING] Could not read DirectX registry.") | |
| print(" Falling back without GPU monitoring.") | |
| print() | |
| return | |
| print(" All DirectX adapters:") | |
| for luid, name in all_adapters.items(): | |
| is_gpu = _is_real_gpu(name) | |
| print(f" {luid} -> {name} {'[GPU]' if is_gpu else '[skip]'}") | |
| print() | |
| self.luid_to_name = {l: n for l, n in all_adapters.items() | |
| if _is_real_gpu(n)} | |
| for luid, name in self.luid_to_name.items(): | |
| lower = name.lower() | |
| if "nvidia" in lower or "geforce" in lower or "rtx" in lower: | |
| self.nvidia_luid = luid | |
| elif "amd" in lower or "radeon" in lower: | |
| self.amd_luid = luid | |
| print(" Tracked GPUs:") | |
| for luid, name in self.luid_to_name.items(): | |
| tag = "" | |
| if luid == self.nvidia_luid: | |
| tag = " [dGPU]" | |
| elif luid == self.amd_luid: | |
| tag = " [iGPU]" | |
| print(f" {luid} -> {name}{tag}") | |
| print() | |
| print(" Test read (3D engine only):") | |
| sample = _sample_gpu_counters() | |
| for luid, name in self.luid_to_name.items(): | |
| val = sample.get(luid, 0.0) | |
| print(f" {name}: {val:.1f}%") | |
| if not self.luid_to_name: | |
| print(" [WARNING] No GPUs identified. GPU bar will show 0%.") | |
| print() | |
| def get_gpu_info(self): | |
| sample = _sample_gpu_counters() | |
| nv_util = sample.get(self.nvidia_luid) if self.nvidia_luid else None | |
| amd_util = sample.get(self.amd_luid) if self.amd_luid else None | |
| have_nv = nv_util is not None | |
| have_amd = amd_util is not None | |
| if have_nv and have_amd: | |
| if nv_util >= amd_util: | |
| return (_short_name(self.luid_to_name.get( | |
| self.nvidia_luid, "dGPU")), nv_util) | |
| else: | |
| return (_short_name(self.luid_to_name.get( | |
| self.amd_luid, "iGPU")), amd_util) | |
| if have_nv: | |
| return (_short_name(self.luid_to_name.get( | |
| self.nvidia_luid, "dGPU")), nv_util) | |
| if have_amd: | |
| return (_short_name(self.luid_to_name.get( | |
| self.amd_luid, "iGPU")), amd_util) | |
| return ("N/A", 0.0) | |
| def _short_name(name): | |
| n = name.lower() | |
| if "5070" in n: | |
| return "RTX5070" | |
| if "890" in n: | |
| return "RD890M" | |
| if "radeon" in n: | |
| return "Radeon" | |
| if "geforce" in n or "nvidia" in n: | |
| return "NVIDIA" | |
| return name[:12] if len(name) > 12 else name | |
| # --- Serial Helpers ------------------------------------------------------- | |
| def send_command(ser, cmd_id, params=None): | |
| if params is None: | |
| params = [] | |
| ser.write(bytes(MAGIC + [cmd_id] + params)) | |
| def stage_column(ser, col, values): | |
| assert len(values) == ROWS | |
| send_command(ser, CMD_STAGE_COL, [col] + values) | |
| def flush_columns(ser): | |
| send_command(ser, CMD_FLUSH_COLS) | |
| def set_brightness(ser, brightness): | |
| send_command(ser, CMD_BRIGHTNESS, [max(0, min(255, brightness))]) | |
| def wake_display(ser): | |
| send_command(ser, CMD_SLEEP, [0]) | |
| def sleep_display(ser): | |
| send_command(ser, CMD_SLEEP, [1]) | |
| def clear_display(ser): | |
| for col in range(COLS): | |
| stage_column(ser, col, [0] * ROWS) | |
| flush_columns(ser) | |
| def safe_kill_display(ser): | |
| """Best-effort clear + sleep. Tolerates serial/USB errors.""" | |
| try: | |
| clear_display(ser) | |
| sleep_display(ser) | |
| except Exception as e: | |
| _log.debug("safe_kill_display serial error (expected during " | |
| "USB transition): %s", e) | |
| # --- Framebuffer Builder -------------------------------------------------- | |
| def build_framebuffer(cpu_pct, mem_pct, gpu_pct): | |
| fb = [[0] * ROWS for _ in range(COLS)] | |
| percentages = [cpu_pct, mem_pct, gpu_pct] | |
| for bar_idx in range(3): | |
| col_start = bar_idx * 3 | |
| pct = max(0.0, min(100.0, percentages[bar_idx])) | |
| filled_rows = round(pct / 100.0 * ROWS) | |
| for row in range(ROWS): | |
| dist_from_bottom = ROWS - 1 - row | |
| is_filled = dist_from_bottom < filled_rows | |
| for col_offset in range(3): | |
| col = col_start + col_offset | |
| if is_filled: | |
| if filled_rows <= 1: | |
| fb[col][row] = BAR_BRIGHT | |
| else: | |
| ratio = dist_from_bottom / (filled_rows - 1) | |
| fb[col][row] = max( | |
| 70, | |
| int(BAR_BRIGHT * (0.35 + 0.65 * ratio))) | |
| else: | |
| if col_offset == 0 or col_offset == 2: | |
| fb[col][row] = BAR_DIM | |
| else: | |
| fb[col][row] = 0 | |
| return fb | |
| # --- Main ----------------------------------------------------------------- | |
| def main(): | |
| if sys.platform != "win32": | |
| print("[ERROR] This script requires Windows (uses Performance Counters).") | |
| sys.exit(1) | |
| _log.info("=" * 50) | |
| _log.info("fw_led_monitor starting (PID %d)", os.getpid()) | |
| print("=" * 58) | |
| print(" Framework LED Matrix - System Monitor") | |
| print("=" * 58) | |
| print(f" Serial port : {SERIAL_PORT}") | |
| print(f" Update rate : every {UPDATE_SECS}s") | |
| print(f" Sleep guard rate: every {SLEEP_GUARD_SECS}s") | |
| print(f" Layout : CPU | MEM | GPU (3D engine)") | |
| print(f" Debug log : {_log_path}") | |
| print() | |
| gpu_mgr = GPUManager() | |
| psutil.cpu_percent(interval=None) | |
| try: | |
| ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1) | |
| except serial.SerialException as e: | |
| print(f"[ERROR] Could not open {SERIAL_PORT}: {e}") | |
| print(" Make sure the LED Matrix module is connected " | |
| "and the port is correct.") | |
| sys.exit(1) | |
| ser_lock = threading.Lock() | |
| power_mon = PowerMonitor() | |
| print(f" Power monitor: {power_mon.status_line()}") | |
| print() | |
| def cleanup(signum=None, frame=None): | |
| print("\n Shutting down - clearing display...") | |
| _log.info("cleanup called (signal=%s)", signum) | |
| try: | |
| power_mon.unregister() | |
| except Exception: | |
| pass | |
| try: | |
| with ser_lock: | |
| clear_display(ser) | |
| except Exception: | |
| pass | |
| ser.close() | |
| sys.exit(0) | |
| signal.signal(signal.SIGINT, cleanup) | |
| signal.signal(signal.SIGTERM, cleanup) | |
| time.sleep(0.1) | |
| wake_display(ser) | |
| time.sleep(0.05) | |
| set_brightness(ser, 30) | |
| time.sleep(0.05) | |
| print(" Monitoring... (Ctrl+C to stop)\n") | |
| _log.info("Entering main loop") | |
| sleep_guard_count = 0 | |
| try: | |
| while True: | |
| loop_start = time.monotonic() | |
| # ========================================================== | |
| # SLEEPING — keep the display off despite USB power cycling | |
| # ========================================================== | |
| if power_mon.asleep: | |
| with ser_lock: | |
| safe_kill_display(ser) | |
| sleep_guard_count += 1 | |
| if sleep_guard_count % 10 == 1: | |
| _log.debug("Sleep guard active (iteration %d)", | |
| sleep_guard_count) | |
| print( | |
| f"\r [SLEEPING] guard cycle {sleep_guard_count}" | |
| f" — display held off ", | |
| end="", flush=True, | |
| ) | |
| time.sleep(SLEEP_GUARD_SECS) | |
| continue | |
| # ========================================================== | |
| # AWAKE — normal monitoring | |
| # ========================================================== | |
| # Did we just wake up? | |
| just_woke = power_mon.check_and_clear_wake() | |
| if just_woke: | |
| sleep_guard_count = 0 | |
| _log.info("Main loop: re-initialising display after wake " | |
| "(ran %d guard cycles)", sleep_guard_count) | |
| with ser_lock: | |
| time.sleep(0.5) # settle time for USB re-enumeration | |
| try: | |
| wake_display(ser) | |
| time.sleep(0.05) | |
| set_brightness(ser, 30) | |
| time.sleep(0.05) | |
| except Exception as e: | |
| _log.warning("Wake re-init serial error: %s", e) | |
| print("\r [POWER] Resumed — display re-initialised. ") | |
| cpu = psutil.cpu_percent(interval=None) | |
| mem = psutil.virtual_memory().percent | |
| gpu_label, gpu = gpu_mgr.get_gpu_info() | |
| fb = build_framebuffer(cpu, mem, gpu) | |
| with ser_lock: | |
| try: | |
| for col in range(COLS): | |
| stage_column(ser, col, fb[col]) | |
| time.sleep(0.005) | |
| flush_columns(ser) | |
| except Exception as e: | |
| _log.warning("Serial write error in main loop: %s", e) | |
| print( | |
| f"\r CPU: {cpu:5.1f}% | MEM: {mem:5.1f}% |" | |
| f" GPU ({gpu_label}): {gpu:5.1f}% ", | |
| end="", flush=True, | |
| ) | |
| time.sleep(UPDATE_SECS) | |
| # --- Fallback: detect sleep/wake via time jump --- | |
| elapsed = time.monotonic() - loop_start | |
| if elapsed > UPDATE_SECS + TIME_JUMP_THRESHOLD: | |
| _log.info("Time jump: %.0fs elapsed (expected ~%ds) — " | |
| "inferring sleep+wake cycle", elapsed, UPDATE_SECS) | |
| print(f"\n [POWER] Time jump detected ({elapsed:.0f}s)") | |
| # The system slept and woke while we were blocked in | |
| # time.sleep(); trigger a wake re-init on the next loop. | |
| if not power_mon.asleep: | |
| # We missed the sleep signal — just re-init display. | |
| power_mon._wake_event.set() | |
| except Exception as e: | |
| _log.error("Main loop exception: %s", e, exc_info=True) | |
| print(f"\n[ERROR] {e}") | |
| cleanup() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment