Last active
October 11, 2025 12:30
-
-
Save lastforkbender/41369e650a6a6d17ddca75195c022567 to your computer and use it in GitHub Desktop.
Tictac interpreter 1.02
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # tt_dom_inpr_1_02.py /TT-DOM Plex Interpreter 1.02 with high-fidelity complex flir sequence imagings /Python 3.13 | |
| from PIL import Image, ImageFilter, ImageDraw, ImageFont | |
| from collections import defaultdict, namedtuple, deque | |
| from torch.utils.data import Dataset, DataLoader | |
| from scipy.linalg import cho_factor, cho_solve | |
| from xml.etree import ElementTree as ET | |
| from sklearn.cluster import KMeans | |
| import matplotlib.pyplot as plt | |
| import torch.nn.functional as F | |
| from numba import njit, prange | |
| from numpy.linalg import svd | |
| from xml.dom import minidom | |
| from scipy import ndimage | |
| import concurrent.futures | |
| from pathlib import Path | |
| import torch.nn as nn | |
| import numpy as np | |
| import threading | |
| import imageio | |
| import hashlib | |
| import random | |
| import torch | |
| import cmath | |
| import math | |
| import time | |
| import os | |
| import re | |
| epsilon = 1e-12 # Used by AR3z and other classes with AI integrations | |
| # Unitary logarithmic complex cycle constants(*must be diagonally paired) | |
| U1_LCX1 = [2+9j, 5+2j, 6+8j, 7+5j, 8+1j, 9+3j, 4+2j, 1+5j, 3+8j] | |
| U2_LCX2 = [9+2j, 2+5j, 8+6j, 5+7j, 1+8j, 3+9j, 2+4j, 5+1j, 8+3j] | |
| # AR3z Configurable Globals | |
| MAX_LAYER = 3 | |
| DEQUE_LEN = 512 | |
| CYCLE_MAPPS = 1.0 | |
| FEATURE_DIM = 6 | |
| CORE_DIM = 4 # tiny param vector length @ CognitiveCore | |
| AGE_TAU = 200.0 # timescale for length pinning/penalty | |
| ESCAPE_THRESH = 0.8 # residual threshold for escape candidates | |
| ESCAPE_K = 3.0 # sharpness for tanh escape | |
| ANCHOR_BLEND = 0.03 # anchor refresh blending | |
| # Used by DIANE RL-NN for spawned hybrid gate controllers of the Shell B-Spline NN | |
| # and other important circumstances AI propagation dynamics related of this module | |
| State = namedtuple('State', ['id', 'features']) | |
| Action = namedtuple('Action', ['id', 'type']) | |
| Reward = namedtuple('Reward', ['value', 'type']) | |
| Decision = namedtuple('Decision', ['state', 'action', 'reward']) | |
| # Thread pooling for async B-Spline controller calls | |
| _bspline_executor = concurrent.futures.ThreadPoolExecutor(max_workers=2) | |
| # Default B-Spline handler(plex interpreter-integrator struct/symmetry instruction sets replace this altogether) | |
| def default_bspline_handler(h, w, complex_array): | |
| # complex_array; numpy ndarray dtype np.complex64 <-> np.float32 | |
| return {'action': 'none'} | |
| # Current module(tt_dom_inpr_1_02.py) working directory | |
| TT_DOM_PTH = os.path.dirname(os.path.abspath(__file__)) | |
| #_______________________________________________________________________________________________________ | |
| #/////////////////////////////////////////////////////////////////////////////////////////////////////// | |
| # TICTAC DOM - FLIR IMAGE GENERATOR/TQ-SHELL<RESPAWN> | |
| #_______________________________________________________________________________________________________ | |
| #/////////////////////////////////////////////////////////////////////////////////////////////////////// | |
| @njit() | |
| def blur_h(img, temp_buf, kernel): | |
| H, W = img.shape | |
| K = kernel.shape[0] | |
| radius = K//2 | |
| for y in prange(H): | |
| for x in range(W): | |
| s = 0.0 | |
| for k in range(K): | |
| ix = x+(k-radius) | |
| if ix < 0: ix = 0 | |
| elif ix >= W: ix = W-1 | |
| s+=img[y, ix]*kernel[k] | |
| temp_buf[y, x] = s | |
| @njit() | |
| def blur_v(temp_buf, out, kernel): | |
| H, W = out.shape | |
| K = kernel.shape[0] | |
| radius = K//2 | |
| for y in prange(H): | |
| for x in range(W): | |
| s = 0.0 | |
| for k in range(K): | |
| iy = y+(k-radius) | |
| if iy < 0: iy = 0 | |
| elif iy >= H: iy = H - 1 | |
| s+=temp_buf[iy, x]*kernel[k] | |
| out[y, x] = s | |
| def separable_gaussian_blur(img, temp_buf, kernel): | |
| if img.dtype != np.float32: | |
| img[:] = img.astype(np.float32) | |
| if temp_buf.dtype != np.float32: | |
| temp_buf[:] = temp_buf.astype(np.float32) | |
| if kernel.dtype != np.float32: | |
| kernel = kernel.astype(np.float32) | |
| blur_h(img, temp_buf, kernel) | |
| blur_v(temp_buf, img, kernel) | |
| @njit() | |
| def rasterize_rotated_ellipse(temp_map, cy, cx, angle_rad, sigma_x, sigma_y, peak): | |
| H, W = temp_map.shape | |
| r_x = int(math.ceil(3.0*sigma_x)) | |
| r_y = int(math.ceil(3.0*sigma_y)) | |
| y0 = max(0, int(math.floor(cy))-r_y) | |
| y1 = min(H-1, int(math.ceil(cy))+r_y) | |
| x0 = max(0, int(math.floor(cx))-r_x) | |
| x1 = min(W-1, int(math.ceil(cx))+r_x) | |
| cos_t = math.cos(angle_rad) | |
| sin_t = math.sin(angle_rad) | |
| sx2 = 2.0*(sigma_x**2) | |
| sy2 = 2.0*(sigma_y**2) | |
| for yi in range(y0, y1+1): | |
| for xi in range(x0, x1+1): | |
| xr = cos_t*(xi-cx)+sin_t*(yi-cy) | |
| yr = -sin_t*(xi-cx)+cos_t*(yi-cy) | |
| val = math.exp(-(xr*xr)/sx2-(yr*yr)/sy2) | |
| temp_map[yi, xi]+=peak*val | |
| def make_gaussian_kernel(sigma, truncate=3.0): | |
| radius = int(max(1, math.ceil(truncate*sigma))) | |
| x = np.arange(-radius, radius+1, dtype=np.float32) | |
| k = np.exp(-0.5*(x/(sigma+1e-12))**2).astype(np.float32) | |
| k/=k.sum() | |
| return k | |
| def make_sea_background(H, W, base_temp=290.0, wave_amp=0.6, rng=None): | |
| if rng is None: rng = np.random.default_rng(0) | |
| y = np.linspace(0, 4*math.pi, H, dtype=np.float32) | |
| x = np.linspace(0, 4*math.pi, W, dtype=np.float32) | |
| X, Y = np.meshgrid(x, y) | |
| waves = wave_amp*(np.sin(2.5*X+0.6*Y)+0.6*np.sin(1.1*X-0.4*Y)) | |
| noise = 0.03*rng.standard_normal((H, W)).astype(np.float32) | |
| return (base_temp+waves+noise).astype(np.float32) | |
| def radiance_from_temp(T, k=1.0): | |
| return ((T-273.15)*k).astype(np.float32) | |
| def add_noise(img, rng, gain=10.0, read_noise_std=2.0): | |
| electrons = np.clip(img*gain, 0, None) | |
| flat = rng.poisson(electrons.ravel()).astype(np.float32) | |
| shot = flat.reshape(img.shape) | |
| read = rng.normal(0.0, read_noise_std, img.shape).astype(np.float32) | |
| return shot+read | |
| def normalize_to_uint8(img, vmin=None, vmax=None): | |
| if vmin is None: vmin = float(img.min()) | |
| if vmax is None: vmax = float(img.max()) | |
| out = 255.0*(img-vmin)/max(1e-8, (vmax-vmin)) | |
| return np.clip(out, 0, 255).astype(np.uint8) | |
| def draw_caption_on_uint8(img_u8, center, frame_idx, start_idx, end_idx): | |
| pil = Image.fromarray(img_u8).convert('L') | |
| draw = ImageDraw.Draw(pil) | |
| W, H = pil.size | |
| cx, cy = int(center[1]), int(center[0]) | |
| l = 15 | |
| draw.line([(cx-l, cy), (cx+l, cy)], fill=255) | |
| draw.line([(cx, cy-l), (cx, cy+l)], fill=255) | |
| draw.ellipse((cx-4, cy-4, cx+4, cy+4), outline=255) | |
| # !WARNING! Changing below caption will be done if so at you're own risk | |
| # Is highly advised you don't considering the mathematics that | |
| # conforms to this module elsewhere and highly specialized; to | |
| # and/or, other comments that specifically show Hebraic speech | |
| caption = f'sha-LE-det TQ SHELL: {frame_idx} [{start_idx}->{end_idx}]' | |
| try: | |
| font = ImageFont.load_default() | |
| except Exception: | |
| font = None | |
| try: | |
| if font is not None: | |
| try: | |
| tw, th = font.getsize(caption) | |
| except Exception: | |
| bbox = draw.textbbox((0, 0), caption, font=font) | |
| tw, th = bbox[2]-bbox[0], bbox[3]-bbox[1] | |
| else: | |
| bbox = draw.textbbox((0, 0), caption) | |
| tw, th = bbox[2]-bbox[0], bbox[3]-bbox[1] | |
| except Exception: | |
| tw, th = len(caption)*6, 10 | |
| draw.rectangle(((W-tw)//2-4, 2, (W+tw)//2+4, 2+th+4), fill=0) | |
| draw.text(((W-tw)//2, 4), caption, fill=255, font=font) | |
| return np.asarray(pil, dtype=np.uint8) | |
| class FlirSimulator: | |
| def __init__(self, complex_grid, H=256, W=384, rng_seed=1): | |
| self.grid = complex_grid | |
| self.H, self.W = int(H), int(W) | |
| self.rng_seed = int(rng_seed) | |
| amp = np.abs(self.grid).astype(np.float32) | |
| a_min, a_max = float(amp.min()), float(amp.max()) | |
| self.amp_norm = (amp-a_min)/(a_max-a_min+1e-12) | |
| self.phase = np.angle(self.grid).astype(np.float32) | |
| gh, gw = self.amp_norm.shape | |
| yy, xx = np.indices((gh, gw)) | |
| total = float(self.amp_norm.sum())+1e-12 | |
| self.cy_g = float((self.amp_norm*yy).sum()/total) | |
| self.cx_g = float((self.amp_norm*xx).sum()/total) | |
| self.span = float(self.amp_norm.std()) | |
| self.gh, self.gw = gh, gw | |
| def simulate(self, frames=60, start_idx=0, end_idx=999, save_dir=None, save_pngs=False): | |
| rng = np.random.default_rng(self.rng_seed) | |
| H, W = self.H, self.W | |
| gh, gw = self.gh, self.gw | |
| cy_img = (self.cy_g/max(1, gh-1))*(H-1) | |
| cx_img = (self.cx_g/max(1, gw-1))*(W-1) | |
| mean_phase = float(self.phase.mean()) | |
| amp_factor = float(self.span*min(H, W)*0.4) | |
| base_temp = make_sea_background(H, W, base_temp=290.5, wave_amp=0.6, rng=rng).astype(np.float32) | |
| optical = np.empty((H, W), dtype=np.float32) | |
| temp_buf = np.empty_like(optical) | |
| frames_out, kernel_cache = [], {} | |
| ktest = make_gaussian_kernel(1.5) | |
| tmp_img = np.zeros((8, 8), dtype=np.float32) | |
| tmp_buf = np.zeros_like(tmp_img) | |
| separable_gaussian_blur(tmp_img, tmp_buf, ktest) | |
| if save_pngs: | |
| if save_dir is None: | |
| raise RuntimeError('<TT-DOM 1.02> TQ Shell flir image sequence output directory is not specified') | |
| os.makedirs(save_dir, exist_ok=True) | |
| t0 = time.time() | |
| for t in range(frames): | |
| temp = base_temp.copy() | |
| frac = t/max(1, frames-1) | |
| gx = int((frac*(gw-1))%gw) | |
| gy = int(((frac*1.3)*(gh-1))%gh) | |
| local_amp = float(self.amp_norm[gy, gx]) | |
| local_phase = float(self.phase[gy, gx]) | |
| dx = amp_factor*(0.8*math.cos(2*math.pi*frac+local_phase)+0.2*(local_amp-0.5)) | |
| dy = amp_factor*(0.6*math.sin(2*math.pi*frac*1.2+local_phase*0.7)+0.2*(local_amp-0.5)) | |
| cy = cy_img+dy | |
| cx = cx_img+dx | |
| base_length = max(8.0, amp_factor*0.8) | |
| base_width = max(3.0, amp_factor*0.25) | |
| length = base_length*(1.0+1.5*local_amp) | |
| width = base_width*(1.0+1.2*local_amp) | |
| angle = (mean_phase)+math.radians(180.0*math.sin(2*math.pi*frac*0.7+mean_phase)) | |
| sigma_x = max(1.0, length/2.5) | |
| sigma_y = max(1.0, width/2.5) | |
| rasterize_rotated_ellipse(temp, cy, cx, angle, sigma_x, sigma_y, peak=7.0*(1.0+local_amp)) | |
| rad = radiance_from_temp(temp, k=2.5) | |
| optical[:, :] = rad | |
| psf_sigma = 1.2+0.8*abs(math.sin(frac*2.0+mean_phase)) | |
| sigma_key = float(round(psf_sigma, 3)) | |
| if sigma_key not in kernel_cache: | |
| kernel_cache[sigma_key] = make_gaussian_kernel(psf_sigma) | |
| kernel = kernel_cache[sigma_key] | |
| separable_gaussian_blur(optical, temp_buf, kernel) | |
| small_k = make_gaussian_kernel(0.6) | |
| separable_gaussian_blur(optical, temp_buf, small_k) | |
| noisy = add_noise(optical, rng, gain=12.0, read_noise_std=3.0) | |
| frame_f32 = noisy.astype(np.float32) | |
| frames_out.append((frame_f32, (cy, cx))) | |
| if save_pngs: | |
| u8 = normalize_to_uint8(frame_f32) | |
| u8c = draw_caption_on_uint8(u8, (cy, cx), frame_idx=t+start_idx, start_idx=start_idx, end_idx=end_idx) | |
| Image.fromarray(u8c).save(os.path.join(save_dir, f'tq_shell_flir_seq_{t:03d}.png')) | |
| if t%10 == 0: print(f'TQ Shell flir frame generated: {t}/{frames-1}') | |
| dt = time.time() - t0 | |
| print(f'Simulated TQ Shell flir {frames} frames in {dt:.3f}s ({dt/frames:.4f}s/frame)') | |
| return frames_out | |
| #_______________________________________________________________________________________________________ | |
| #/////////////////////////////////////////////////////////////////////////////////////////////////////// | |
| # TICTAC DOM - FLIR INITIAL KINETIC VERTICAL ALPHA nK-RESONANCE(PNG/GIF)/TQ-SHELL<SCAN> | |
| VNK_OUT_W, VNK_OUT_H = 267, 122 | |
| VNK_EVAL_SCALE = 0.5 | |
| VNK_GRID_EXTENT = (-1.5, 1.5, -0.5, 1.5) | |
| VNK_PATCH_RADIUS_FACTOR = 0.09 | |
| VNK_EPS_SCALE = 0.4 | |
| VNK_LAMBDA_SCALE = 1e-6 | |
| VNK_FRAMES = 32 | |
| VNK_DURATION = 0.07 | |
| VNK_FLIR_COLORMAP = plt.get_cmap('inferno') | |
| VNK_SEA_COLOR = (32, 8, 23) | |
| VNK_MOTION_BLUR_STEPS = 1 | |
| VNK_FONT_PATH = None | |
| VNK_LBL_TXT = 'nK Tenacity:' | |
| VNK_TMP_DIR = "tic_tac_frames_tmp_singlethread" | |
| def vnk_complex_to_patch_params(C): | |
| Z = C.ravel() | |
| x, y = np.real(Z), np.imag(Z) | |
| amp, ph = np.abs(Z), np.angle(Z) | |
| xmin, xmax, ymin, ymax = VNK_GRID_EXTENT | |
| x_norm = xmin+(x-x.min())/(x.max()-x.min()+1e-12)*(xmax-xmin) | |
| y_norm = ymin+(y-y.min())/(y.max()-y.min()+1e-12)*(ymax-ymin) | |
| return np.vstack([x_norm, y_norm, amp, ph]).T | |
| def vnk_make_sea_background(w, h): | |
| arr, sc = np.zeros((h, w, 3), dtype=np.uint8), VNK_SEA_COLOR | |
| for j in range(h): | |
| t = j/(h-1) | |
| col = np.array(sc)*(0.6+0.4*(1-t)) | |
| noise = np.random.normal(0, 2, (w, 3)) | |
| arr[j, :, :] = np.clip(col+noise, 0, 255) | |
| return Image.fromarray(arr) | |
| def vnk_gaussian_kernel_matrix(X, eps): | |
| d2 = np.sum(X**2, 1)[:, None]+np.sum(X**2, 1)[None, :]-2*X.dot(X.T) | |
| return np.exp(-d2/(2*eps*eps)) | |
| def vnk_solve_patch_rbf_cholesky(Xp, yp, eps, lam): | |
| K = vnk_gaussian_kernel_matrix(Xp, eps) | |
| K[np.diag_indices_from(K)]+=lam | |
| c, low = cho_factor(K, overwrite_a=True, check_finite=False) | |
| alpha = cho_solve((c, low), yp, check_finite=False) | |
| return alpha | |
| def vnk_build_patches_from_complex(C, rng=None): | |
| if rng is None: rng = np.random.RandomState(1) | |
| params = complex_to_patch_params(C) | |
| xmin, xmax, ymin, ymax = VNK_GRID_EXTENT | |
| span = max(xmax-xmin, ymax-ymin) | |
| base_radius = VNK_PATCH_RADIUS_FACTOR*span | |
| patches = [] | |
| for p in params: | |
| m = 28 | |
| cx, cy, amp, ph = p | |
| vy = 0.25+0.7*amp | |
| vx = 0.6*(np.sign(np.sin(ph)))*(np.abs(np.sin(ph))**2) | |
| theta = rng.uniform(0, 2*np.pi, m) | |
| r = rng.normal(0, base_radius*0.32, m)+base_radius*0.12 | |
| px, py = cx+r*np.cos(theta), cy+r*np.sin(theta) | |
| angles = np.arctan2(vy, vx+1e-12) | |
| sigma_along = base_radius*(0.5+0.6*amp) | |
| sigma_cross = base_radius*(0.18+0.2*(1-amp)) | |
| dx, dy = px-cx, py-cy | |
| ca, sa = np.cos(angles), np.sin(angles) | |
| ux, uy = ca*dx+sa*dy, -sa*dx+ca*dy | |
| vals = amp*np.exp(-0.5*((ux/sigma_along)**2+(uy/sigma_cross)**2)) | |
| vals+=amp*np.exp(-((dx*dx+dy*dy)/(2*(0.4*base_radius)**2))) | |
| patches.append({'center': np.array([cx, cy]), 'Xp': np.vstack([px, py]).T, | |
| 'yp': vals, 'vel': np.array([vx, vy]), 'amp': amp}) | |
| return patches | |
| def vnk_make_eval_grid(eval_w, eval_h): | |
| xmin, xmax, ymin, ymax = VNK_GRID_EXTENT | |
| xs = np.linspace(xmin, xmax, eval_w) | |
| ys = np.linspace(ymin, ymax, eval_h) | |
| gx, gy = np.meshgrid(xs, ys) | |
| return np.vstack([gx.ravel(), gy.ravel()]).T.astype(np.float64) | |
| @njit(parallel=True) | |
| def vnk_eval_grid(patch_centers, patch_centers_r, patches_X_flat, patches_X_offsets, | |
| patches_sizes, patches_alpha_flat, patches_alpha_offsets, patches_eps, grid_pts): | |
| M = grid_pts.shape[0] | |
| out = np.zeros(M, dtype=np.float64) | |
| W = np.zeros(M, dtype=np.float64) | |
| P = patch_centers.shape[0] | |
| for p in prange(P): | |
| cx, cy, r = patch_centers[p, 0], patch_centers[p, 1], patch_centers_r[p] | |
| off, sz, aoff, eps = patches_X_offsets[p], patches_sizes[p], patches_alpha_offsets[p], patches_eps[p] | |
| for i in range(M): | |
| gx, gy = grid_pts[i, 0], grid_pts[i, 1] | |
| dx, dy = gx-cx, gy-cy | |
| w, s_val = np.exp(-(dx*dx+dy*dx)/(2.0*(r*r))), 0.0 | |
| for j in range(sz): | |
| xj, yj = patches_X_flat[off+2*j], patches_X_flat[off+2*j+1] | |
| dxj, dyj = gx-xj, gy-yj | |
| s_val+=patches_alpha_flat[aoff+j]*np.exp(-(dxj*dxj+dyj*dyj)/(2.0*eps*eps)) | |
| out[i]+=w*s_val | |
| W[i]+=w | |
| for i in range(M): | |
| if W[i] > 0.0: out[i] = out[i]/W[i] | |
| return out | |
| def vnk_synthesize_complex_matrix(Hc=18, Wc=36): | |
| C = np.zeros((Hc,Wc), dtype=np.complex128) | |
| xs = np.linspace(-1.0, 1.0, Wc) | |
| ys = np.linspace(0.0, 1.0, Hc) | |
| for i in range(Hc): | |
| for j in range(Wc): | |
| amp = 0.35+0.65*np.exp(-((ys[i]-0.6)**2)/0.02)*(1-0.3*abs(xs[j])) | |
| C[i,j] = amp*np.exp(1j*(2.0*(xs[j]**2)*np.pi*(0.5+0.5*i/Hc)))*(0.85+0.4*np.random.rand()) | |
| return C | |
| class FlirRendererVNK: | |
| def __init__(self, out_w=VNK_OUT_W, out_h=VNK_OUT_H, eval_scale=VNK_EVAL_SCALE, tmp_dir=VNK_TMP_DIR): | |
| self.out_w, self.out_h = out_w, out_h | |
| self.eval_w = max(8, int(out_w*eval_scale)) | |
| self.eval_h = max(8, int(out_h*eval_scale)) | |
| self.tmp_dir = tmp_dir | |
| os.makedirs(self.tmp_dir, exist_ok=True) | |
| self.font = self._load_font() | |
| self._precomputed = {} | |
| def _load_font(self): | |
| try: | |
| return ImageFont.truetype(VNK_FONT_PATH, 22) if VNK_FONT_PATH else ImageFont.load_default() | |
| except Exception: | |
| return ImageFont.load_default() | |
| def prepare_patches(self, patches): | |
| P = len(patches) | |
| patch_centers = np.zeros((P, 2), dtype=np.float64) | |
| patch_centers_r_template = np.zeros(P, dtype=np.float64) | |
| patches_X_list, patches_alpha_list = [], [] | |
| sizes = np.zeros(P, dtype=np.int64) | |
| X_offsets = np.zeros(P, dtype=np.int64) | |
| a_offsets = np.zeros(P, dtype=np.int64) | |
| off, aoff = 0, 0 | |
| for i,p in enumerate(patches): | |
| patch_centers[i,:] = p['center'].astype(np.float64) | |
| Xp, yp = p['Xp'].astype(np.float64), p['yp'].astype(np.float64) | |
| avg_r = np.mean(np.linalg.norm(Xp-p['center'], axis=1)) | |
| eps = VNK_EPS_SCALE*max(avg_r, 1e-4) | |
| lam = VNK_LAMBDA_SCALE*(eps**2) | |
| alpha = vnk_solve_patch_rbf_cholesky(Xp, yp, eps, lam) | |
| p['alpha'] = alpha | |
| p['eps'] = eps | |
| p['avg_r'] = avg_r | |
| sizes[i] = Xp.shape[0] | |
| X_offsets[i] = off | |
| a_offsets[i] = aoff | |
| patches_X_list.append(Xp.ravel()) | |
| patches_alpha_list.append(alpha.ravel()) | |
| off+=Xp.size | |
| aoff+=alpha.size | |
| patch_centers_r_template[i] = 2.5*avg_r | |
| patches_X_flat = np.concatenate(patches_X_list) if len(patches_X_list) > 0 else np.array([],dtype=np.float64) | |
| patches_alpha_flat = np.concatenate(patches_alpha_list) if len(patches_alpha_list) > 0 else np.array([],dtype=np.float64) | |
| self._precomputed.update({'patches_X_flat':patches_X_flat, | |
| 'patches_alpha_flat':patches_alpha_flat, | |
| 'sizes':sizes, | |
| 'X_offsets':X_offsets, | |
| 'a_offsets':a_offsets, | |
| 'patch_centers_r_template':patch_centers_r_template, | |
| 'patch_eps_arr':np.array([p['eps'] for p in patches], dtype=np.float64), | |
| 'patch_vels':np.array([p['vel'] for p in patches], dtype=np.float64), | |
| 'orig_centers':np.array([p['center'] for p in patches], dtype=np.float64), | |
| '_patches':patches}) | |
| grid_pts = vnk_make_eval_grid(self.eval_w, self.eval_h) | |
| self._precomputed['grid_pts'] = grid_pts | |
| dummy_centers = np.zeros_like(self._precomputed['orig_centers']) | |
| dummy_r = self._precomputed['patch_centers_r_template'] | |
| vnk_eval_grid(dummy_centers, dummy_r, self._precomputed['patches_X_flat'], self._precomputed['X_offsets'], | |
| self._precomputed['sizes'], self._precomputed['patches_alpha_flat'], | |
| self._precomputed['a_offsets'], self._precomputed['patch_eps_arr'], | |
| grid_pts) | |
| def _render_frame_eval(self, t_center, blur_width): | |
| acc = np.zeros((self.eval_h*self.eval_w,), dtype=np.float64) | |
| weights = np.linspace(-0.5, 0.5, VNK_MOTION_BLUR_STEPS) | |
| orig_centers = self._precomputed['orig_centers'] | |
| vel_arr = self._precomputed['patch_vels'] | |
| for w_frac in weights: | |
| t = t_center+w_frac*blur_width | |
| centers = orig_centers+vel_arr*t | |
| out = vnk_eval_grid(centers.astype(np.float64), self._precomputed['patch_centers_r_template'], | |
| self._precomputed['patches_X_flat'], | |
| self._precomputed['X_offsets'], | |
| self._precomputed['sizes'], | |
| self._precomputed['patches_alpha_flat'], | |
| self._precomputed['a_offsets'], | |
| self._precomputed['patch_eps_arr'], | |
| self._precomputed['grid_pts']) | |
| acc+=out | |
| acc/=float(len(weights)) | |
| out_img = acc.reshape((self.eval_h, self.eval_w)) | |
| # Re-edit; normalize with an standard max-min yes, safe @ NumPy2.0 | |
| mn, mx = float(out_img.min()), float(out_img.max()) | |
| norm = (out_img-mn)/((mx-mn)+1e-12) | |
| flir_rgb = (VNK_FLIR_COLORMAP(norm)[:,:,:3]*255).astype(np.uint8) | |
| img = Image.fromarray(flir_rgb) | |
| img = img.resize((self.out_w, self.out_h), resample=Image.BICUBIC) | |
| bloom = img.filter(ImageFilter.GaussianBlur(radius=6)) | |
| img = Image.blend(img, bloom, alpha=0.28) | |
| img = img.filter(ImageFilter.UnsharpMask(radius=1.5, percent=180, threshold=2)) | |
| norm_up = Image.fromarray((np.clip((norm*255).astype(np.uint8),0,255))).resize((self.out_w,self.out_h), resample=Image.BILINEAR) | |
| sea = vnk_make_sea_background(self.out_w, self.out_h).convert('RGB') | |
| sea_gray = sea.convert('L') | |
| sea_rgb = Image.merge('RGB', (sea_gray, sea_gray, sea_gray)) | |
| mask = Image.fromarray(np.array(norm_up) > int(0.10*255)).convert('L') | |
| composite = Image.composite(img, sea_rgb, mask) | |
| return composite, out_img, (centers, vel_arr) | |
| def overlay_crosshair(self, frame_img, centers, vel_arr): | |
| draw = ImageDraw.Draw(frame_img) | |
| w, h = frame_img.size | |
| strengths = [np.sum(np.abs(p['alpha'])) for p in self._precomputed['_patches']] | |
| if len(strengths) == 0: | |
| return frame_img, None | |
| cx, cy = centers[int(np.argmax(strengths))] | |
| xmin, xmax, ymin, ymax = VNK_GRID_EXTENT | |
| px, py = int((cx-xmin)/(xmax-xmin)*w), int((cy-ymin)/(ymax-ymin)*h) | |
| size, cross_color = int(0.03*max(w,h)), (30, 144, 255) | |
| draw.line([(px-size, py), (px+size, py)], fill=cross_color, width=2) | |
| draw.line([(px, py-size), (px, py+size)], fill=cross_color, width=2) | |
| draw.ellipse([px-6, py-6, px+6, py+6], outline=cross_color, width=2) | |
| return frame_img, (px,py, vel_arr[i], centers[i]) | |
| def overlay_top_label_and_scale(self, frame_img, frame_idx, total_frames, kinetics_info=None): | |
| draw = ImageDraw.Draw(frame_img, 'RGBA') | |
| w, h = frame_img.size | |
| box_w, box_h = int(w*0.5), 44 | |
| box_x0, box_y0 = (w-box_w)//2, 8 | |
| box_x1, box_y1 = box_x0+box_w, box_y0+box_h | |
| draw.rectangle([box_x0, box_y0, box_x1, box_y1], fill=(0, 0, 0, 160)) | |
| ymin, ymax = GRID_EXTENT[2], GRID_EXTENT[3] | |
| #for i in range(...): sval = f"{ymax-(i/(ny-1))*(ymax-ymin):.2f}" | |
| fc_txt = f'nK Resonance Frame {frame_idx+1}/{total_frames}' | |
| draw.text((box_x0+2, box_y0+8), fc_txt, fill=(200, 220, 255), font=self.font) | |
| if kinetics_info is not None: | |
| px, py, vel, center = kinetics_info | |
| vx, vy = vel | |
| arrow_len = 45 | |
| ax, ay = int(px+np.sign(vx)*arrow_len), int(py-int(vy*arrow_len)) | |
| draw.line([(px, py),(ax, ay)], fill=(200, 220, 255), width=2) | |
| draw.polygon([(ax, ay), (ax-6, ay-6), (ax+6, ay-6)], fill=(200, 220, 255)) | |
| draw.text((w-290, box_y0+98), f"Height: {center[1]:.3f}", fill=(200, 220, 255), font=self.font) | |
| #_______________________________________________________________________________________________________ | |
| #/////////////////////////////////////////////////////////////////////////////////////////////////////// | |
| # TICTAC DOM - AR3z - ADAPTIVE RADIAL EVENT DRIVEN RESPONSIVE AI WITH FULL INLINE ENV XML ML SNAPSHOTS | |
| XML_OUT = os.environ.get('AR3z_XML_OUT', 'AR3z_snapshot.xml') | |
| XML_MODE = os.environ.get('AR3z_XML_MODE', 'on_demand') | |
| XML_INTERVAL = float(os.environ.get('AR3z_XML_INTERVAL', '10.0')) | |
| def complex_seg(z): | |
| return {'real': float(np.real(z)), 'imag': float(np.imag(z)), 'mag': float(abs(z))} | |
| def softmax_logits(xs): | |
| xs = np.array(xs, dtype=float) | |
| xs = xs-xs.max() | |
| ex = np.exp(xs) | |
| return ex/(ex.sum()+1e-16) | |
| def median_baseline(state, col, dim_type): | |
| rows, vals = state['chips'].shape[0], [] | |
| for r in range(rows): | |
| z = state['chips'][r, col] | |
| vals.append(float(np.real(z)) if dim_type=='real' else float(np.imag(z))) | |
| return float(np.median(vals)) | |
| def apply_seg_and_remove(chips, row, col, renown='zero'): | |
| z = chips[row,col] | |
| seg, new = complex_seg(z), chips.copy() | |
| if renown=='zero': new[row,col] = 0+0j | |
| elif renown=='sentinel': new[row,col] = 1e6+1e6j | |
| elif renown=='interp': | |
| row_vals = np.delete(new[row,:], col) | |
| new[row, col] = row_vals.mean() if row_vals.size > 0 else 0+0j | |
| return seg, new | |
| class AR3zTransform: | |
| def __init__(self, name=None, is_compression=False): | |
| self.name = name or self.__class__.__name__ | |
| self.is_compression = is_compression | |
| def transform(self, state, row, col, layer, dim_type, modulation=1+0j): | |
| raise NotImplementedError | |
| class ScaleOverTrTransform(AR3zTransform): | |
| def __init__(self, name=None, is_compression=False): | |
| super().__init__(name, is_compression) | |
| def transform(self, state, row, col, layer, dim_type, modulation=1+0j): | |
| a = float(state['a'][col]) | |
| BX = float(state['BX'][col]) | |
| if 'Tr_layers' in state: Tr = float(state['Tr_layers'][layer][col]) | |
| else: Tr = float(state.get('Tr_scalar', np.ones(state['D']))[col]) | |
| val = (a+BX/(Tr+epsilon))*modulation | |
| score = float(1.0/(1.0+Tr)) | |
| if dim_type=='real': | |
| return float(np.real(val)), score | |
| else: | |
| return float(np.imag(val)), score | |
| class RadialUnitTransform(AR3zTransform): | |
| def __init__(self, name=None, is_compression=True): | |
| super().__init__(name, is_compression) | |
| def transform(self, state, row, col, layer, dim_type, modulation=1+0j): | |
| z = state['chips'][row, col]*modulation | |
| r = abs(z) | |
| strength = float(state.get('compression_strength', 0.5)) | |
| new_r = r*(1.0-strength) | |
| new_z = 0+0j if r < epsilon else z*(new_r/r) | |
| score = float(1.0-strength) | |
| return (float(np.real(new_z)) if dim_type=='real' else float(np.imag(new_z)), score) | |
| class IdentityTransform(AR3zTransform): | |
| def __init__(self, name=None, is_compression=False): | |
| super().__init__(name, is_compression) | |
| def transform(self, state, row, col, layer, dim_type, modulation=1+0j): | |
| z = state['chips'][row, col]*modulation | |
| return (float(np.real(z)) if dim_type=='real' else float(np.imag(z)), 0.5) | |
| class CognitiveCore: | |
| def __init__(self, core_dim=CORE_DIM, max_layer=MAX_LAYER): | |
| self.core_dim = core_dim | |
| self.params = (np.random.randn(core_dim)+1j*np.random.randn(core_dim))*0.01 | |
| self.lr_layers = np.full(max_layer, 1e-2) | |
| self.confidence = 0.5 | |
| self.l2 = 1e-4 | |
| def feat_to_complexvec(self, feat_real): | |
| fv = np.zeros(self.core_dim, dtype=complex) | |
| flat = np.asarray(feat_real).astype(float) | |
| for i in range(self.core_dim): | |
| a = flat[i%flat.size] if flat.size > 0 else 0.0 | |
| b = flat[(i+1)%flat.size] if flat.size > 1 else 0.0 | |
| fv[i] = a+1j*b | |
| return fv | |
| def modulate(self, value_complex, baseline_complex, layer_norm, age_norm): | |
| feat = np.array([float(np.real(value_complex)), float(np.imag(value_complex)), | |
| float(np.real(baseline_complex)), float(np.imag(baseline_complex)), | |
| float(layer_norm), float(age_norm)]) | |
| cvec = self.feat_to_complexvec(feat) | |
| dot = np.vdot(self.params, cvec) | |
| alpha = 0.15 | |
| s = 1.0+alpha*dot | |
| mag = abs(s) | |
| if mag > 5.0: s = s/mag*5.0 | |
| conf = self.confidence*(1.0/(1.0+np.linalg.norm(self.params))) | |
| return s, float(conf) | |
| def adapt(self, chosen_complex, target_complex, layer, residual, age_norm): | |
| feat = np.array([float(np.real(chosen_complex)), float(np.imag(chosen_complex)), | |
| float(np.real(target_complex)), float(np.imag(target_complex)), | |
| float(layer)/max(1,MAX_LAYER), float(age_norm)]) | |
| cvec = self.feat_to_complexvec(feat) | |
| pred = np.vdot(self.params, cvec) | |
| alpha = 0.15 | |
| err = chosen_complex-target_complex | |
| grad = alpha*np.conjugate(err)*np.conjugate(cvec) | |
| lr = self.lr_layers[layer] if layer < len(self.lr_layers) else self.lr_layers[-1] | |
| self.params-=lr*(grad+self.l2*self.params) | |
| self.confidence = 0.9*self.confidence+0.1*(1.0/(1.0+residual)) | |
| param_norm = np.linalg.norm(self.params) | |
| if param_norm > 10.0: self.params*=(10.0/param_norm) | |
| class OnlinePredictor: | |
| def __init__(self, dim=FEATURE_DIM, lr=0.02, l2=1e-4): | |
| self.w, self.b, self.lr, self.l2, self.count, self.m, self.s2 = np.zeros(dim), 0.0, lr, l2, 0, 0.0, 1e-6 | |
| def features(self, item): | |
| mag = abs(item['value']) | |
| layer_norm = item.get('layer', 0)/max(1, item.get('max_layer', 1)) | |
| dim_type_flag = 0.0 if item.get('dim_type','real') == 'real' else 1.0 | |
| prior = item.get('residual', 0.0) | |
| age_norm = item.get('age', 0)/max(1, item.get('total_age', 1)) | |
| col_norm = item.get('col', 0)/max(1, item.get('D', 1)) | |
| core_norm = item.get('core_norm', 0.0) | |
| feats = np.array([mag, layer_norm, dim_type_flag, prior, age_norm, col_norm]) | |
| if len(feats) < len(self.w): feats = np.concatenate([feats, np.zeros(len(self.w)-len(feats))]) | |
| return feats[:len(self.w)] | |
| def predict(self, item): | |
| x = self.features(item) | |
| return float(x.dot(self.w)+self.b) | |
| def update(self, item, target): | |
| x = self.features(item) | |
| pred = x.dot(self.w)+self.b | |
| err = pred-target | |
| self.w-=self.lr*(err*x+self.l2*self.w) | |
| self.b-=self.lr*err | |
| self.count+=1 | |
| old_m = self.m | |
| self.m+=(target-self.m)/self.count | |
| self.s2+=(target-old_m)*(target-self.m) | |
| def uncertainty(self): | |
| return float(np.sqrt(self.s2/(self.count+1))) if self.count > 0 else 1.0 | |
| class PerChipDeque: | |
| def __init__(self, rows, cols, maxlen=64): | |
| self.rows, self.cols, self.maxlen = rows, cols, maxlen | |
| self.buffers = [[deque(maxlen=maxlen) for _ in range(cols)] for _ in range(rows)] | |
| def push(self, row, col, entry): | |
| buf = self.buffers[row][col] | |
| scrapped = buf[0] if len(buf) == buf.maxlen else None | |
| if len(buf) == buf.maxlen: buf.popleft() | |
| buf.append(entry) | |
| return scrapped | |
| def sample(self, row, col): | |
| return list(self.buffers[row][col]) | |
| class RunningDeque: | |
| def __init__(self, maxlen=DEQUE_LEN): | |
| self.d, self.maxlen, self.age_counter = deque(), maxlen, 0 | |
| def push(self, item): | |
| item = dict(item) | |
| item['age'] = self.age_counter | |
| self.age_counter+=1 | |
| if len(self.d) >= self.maxlen: ev = self.d.popleft() | |
| else: ev = None | |
| if 'priority_score' in item: | |
| age_pen = np.exp(-item['age']/AGE_TAU) | |
| item['priority_score'] = item['priority_score']*age_pen | |
| self.d.append(item) | |
| return ev | |
| def pop_policy(self, policy='priority'): | |
| if not self.d: | |
| return None | |
| if policy=='fifo': | |
| return self.d.popleft() | |
| idx = None | |
| if policy == 'max_residual': idx = max(range(len(self.d)), key=lambda i: self.d[i].get('residual', 0.0)) | |
| if idx == None: idx = max(range(len(self.d)), key=lambda i: self.d[i].get('priority_score', 1.0)) | |
| lst = list(self.d) | |
| del lst[idx] | |
| self.d = deque(lst) | |
| return self.d.pop | |
| def items(self): | |
| return list(self.d) | |
| def __len__(self): | |
| return len(self.d) | |
| class AR3zNode: | |
| def __init__(self, name, transforms, ki=None, residual_limit=1.0, layer_mask=None): | |
| self.name = name | |
| self.transforms = transforms | |
| self.ki = ki or [] | |
| self.residual_limit = residual_limit | |
| self.layer_mask = layer_mask if layer_mask is not None else [True]*MAX_LAYER | |
| self.core = CognitiveCore() | |
| self.max_depth = 8 | |
| def evaluate(self, state, row, col, layer, dim_type, baseline, mapps, depth=0, ordering='priority'): | |
| if depth > self.max_depth: | |
| return {'name':self.name, 'skipped':True} | |
| baseline_local = baseline if baseline is not None else median_baseline(state, col, dim_type) | |
| candidates, age_norm = [], 0.0 | |
| if 'controller' in state and state['controller'] is not None: age_norm = 0.0 | |
| for t in self.transforms: | |
| if not self.layer_mask[layer]: | |
| continue | |
| chip_z = state['chips'][row, col] | |
| baseline_z = complex(baseline_local, 0.0) if dim_type=='real' else complex(0.0, baseline_local) | |
| s, conf = self.core.modulate(chip_z, baseline_z, layer/MAX_LAYER, age_norm) | |
| val, score = t.transform(state, row, col, layer, dim_type, modulation=s) | |
| res = abs(val - baseline_local) | |
| candidates.append({'transform':t, 'value':val, 'score':score, 'residual':res, 'layer':layer, | |
| 'core_conf':conf, 'is_compression':t.is_compression, 'chosen_mod':s}) | |
| anchor = state.get('pretrained_anchor') | |
| if anchor is not None: | |
| z_anchor = anchor[row, col] | |
| chip_val = state['chips'][row, col] | |
| for dim in ('real','imag'): | |
| pass | |
| chip_scalar = float(np.real(chip_val)) if dim_type=='real' else float(np.imag(chip_val)) | |
| anchor_scalar = float(np.real(z_anchor)) if dim_type=='real' else float(np.imag(z_anchor)) | |
| resid_to_anchor = abs(chip_scalar - anchor_scalar) | |
| if resid_to_anchor > ESCAPE_THRESH: | |
| f = np.tanh(ESCAPE_K*(resid_to_anchor-ESCAPE_THRESH)) | |
| escape_val = chip_scalar*(1.0-f)+anchor_scalar*f | |
| escape_score = 0.6+0.4*(1.0-np.exp(-resid_to_anchor)) | |
| escape_res = abs(escape_val-baseline_local) | |
| candidates.append({'transform':None, 'value':escape_val, 'score':escape_score, 'residual':escape_res, | |
| 'layer':layer, 'core_conf':self.core.confidence, 'is_compression':False, | |
| 'chosen_mod':1+0j, 'escape':True}) | |
| if not candidates: | |
| return {'name':self.name, 'no_candidates':True} | |
| logits = [] | |
| for c in candidates: | |
| score_term = c.get('score', 0.0) | |
| residual_term = -1.0*c.get('residual', 0.0) | |
| core_term, age_term = 0.8*c.get('core_conf', 0.0), 0.0 | |
| comp_term = -0.2*float(c.get('is_compression', False)) | |
| logit = 3.0*score_term+2.0*residual_term+core_term+age_term+comp_term | |
| logits.append(logit) | |
| probs = softmax_logits(logits) | |
| idx = int(np.argmax(probs)) | |
| chosen = candidates[idx] | |
| node_out = {'name':self.name, 'row':row, 'col':col, 'layer':layer, 'dim_type':dim_type, | |
| 'baseline':baseline_local, 'candidates':candidates, 'chosen':None, 'ki':[], 'mapps':mapps} | |
| reduction = min(mapps, min(1.0, chosen['residual'])) | |
| mapps = max(0.0, mapps-reduction) | |
| node_out['mapps'] = mapps | |
| node_out['chosen'] = {'value':chosen['value'], 'score':chosen.get('score'), 'residual':chosen['residual'], | |
| 'prob':probs[idx], 'escape': chosen.get('escape', False)} | |
| if 'controller' in state and state['controller'] is not None: | |
| ev = {'row':row, 'col':col, 'layer':layer, 'dim_type':dim_type, 'value':chosen['value'], | |
| 'residual':chosen['residual'], 'D':state['D']} | |
| ev['score'] = chosen.get('score', 0.0) | |
| ev['core_norm'] = np.linalg.norm(self.core.params) | |
| ev['priority_score'] = (1.0/(1.0+ev['residual']))*(1.0+ev['score'])*(1.0+0.5*ev['core_norm']) | |
| state['controller'].enqueue_event(ev) | |
| if chosen is not None: | |
| baseline_complex = complex(baseline_local, 0.0) if dim_type=='real' else complex(0.0, baseline_local) | |
| chosen_complex = complex(chosen['value'], 0.0) if dim_type=='real' else complex(0.0, chosen['value']) | |
| target_complex = baseline_complex | |
| self.core.adapt(chosen_complex, target_complex, layer, chosen['residual'], age_norm) | |
| for ch in self.ki: | |
| logic_out = ch.evaluate(state, row, col, layer, dim_type, baseline_local, mapps, depth+1, ordering) | |
| node_out['ki'].append(logic_out) | |
| mapps = logic_out.get('mapps', mapps) | |
| node_out['mapps'] = mapps | |
| return node_out | |
| class HybridController: | |
| def __init__(self, chips, deque_len=DEQUE_LEN, perchip_len=64): | |
| self.chips = chips | |
| self.rows, self.D = chips.shape | |
| self.global_deque = RunningDeque(maxlen=deque_len) | |
| self.predictor = OnlinePredictor(dim=FEATURE_DIM) | |
| self.perchip = PerChipDeque(self.rows, self.D, maxlen=perchip_len) | |
| self.pretrained_anchor = chips.copy() | |
| self.age = 0 | |
| def enqueue_event(self, ev): | |
| ev = dict(ev) | |
| ev['total_age'] = max(1, self.global_deque.age_counter) | |
| ev['D'] = self.D | |
| ev['predicted_residual'] = self.predict_residual(ev['row'], ev['col'], ev['dim_type'], ev['value'], ev.get('residual', 0.0)) | |
| ev['priority_score'] = (1.0/(1.0+ev['predicted_residual']))*(1.0+ev.get('score',0.0))*(1.0+0.5*ev.get('core_norm', 0.0)) | |
| scrapped = self.global_deque.push(ev) | |
| self.perchip.push(ev['row'], ev['col'], ev) | |
| if scrapped: self.learn_from_scrapped(scrapped) | |
| return scrapped | |
| def predict_residual(self, row, col, dim_type, value, prior_res): | |
| item = {'row':row, 'col':col, 'dim_type':dim_type, 'value':value, 'residual':prior_res, 'age':0, | |
| 'total_age':max(1,self.global_deque.age_counter), 'D':self.D, 'layer':0, 'max_layer':MAX_LAYER} | |
| item['core_norm'] = 0.0 | |
| return self.predictor.predict(item) | |
| def learn_from_scrapped(self, ev): | |
| target = ev.get('residual', 0.0) | |
| self.predictor.update(ev, target) | |
| def scrapp_policy(self, policy='priority'): | |
| item = self.global_deque.pop_policy(policy) | |
| if item is None: | |
| return None | |
| seg, new_chips = apply_seg_and_remove(self.chips, item['row'], item['col'], renown='interp') | |
| self.chips = new_chips | |
| self.refresh_anchor(blend=ANCHOR_BLEND) | |
| self.predictor.update(item, item.get('residual', 0.0)) | |
| return {'scrapped':item, 'seg':seg} | |
| def refresh_anchor(self, blend=ANCHOR_BLEND): | |
| self.pretrained_anchor = (1.0-blend)*self.pretrained_anchor+blend*self.chips | |
| def stable_id(name, cfg_str): | |
| h = hashlib.sha1((name+'|'+ cfg_str).encode('utf-8')).hexdigest() | |
| return h[:12] | |
| class DecisionAggregator: | |
| def __init__(self): | |
| self.data, self.node_cfg = {}, {} | |
| self.lock = threading.Lock() | |
| def bin_key(self, residual, core_conf, is_compression, escape): | |
| if residual < 0.2: rbin = 'r0' | |
| elif residual < 0.5: rbin = 'r1' | |
| elif residual < 1.0: rbin = 'r2' | |
| else: rbin = 'r3' | |
| if core_conf < 0.2: cbin = 'c0' | |
| elif core_conf < 0.5: cbin = 'c1' | |
| else: cbin = 'c2' | |
| return f"{rbin}|{cbin}|comp={1 if is_compression else 0}|esc={1 if escape else 0}" | |
| def project(self, node_name, transform_name, residual, prob, core_norm, is_compression, escape): | |
| key = self.bin_key(residual, core_norm, is_compression, escape) | |
| with self.lock: | |
| nd = self.data.setdefault(node_name, {}) | |
| b = nd.setdefault(key, {'count':0, 'sum_residual':0.0, 'sum_prob':0.0, 'rep_transform':transform_name}) | |
| b['count']+=1 | |
| b['sum_residual']+=float(residual) | |
| b['sum_prob']+=float(prob) | |
| def snapshot_and_clear(self): | |
| with self.lock: | |
| snap = self.data | |
| self.data = {} | |
| return snap | |
| _abe_thread, _abe_stop = None, False | |
| aggregator = DecisionAggregator() | |
| def export_AR3z_xml(root_node, all_nodes, controller, out_path=XML_OUT): | |
| ts = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) | |
| root_el = ET.Element('AR3z_snapshot', {'ts':ts, 'version':'1'}) | |
| static_el = ET.SubElement(root_el, 'static_graph') | |
| nodes_el = ET.SubElement(static_el, 'nodes') | |
| for n in all_nodes: | |
| cfg = f"reslim={n.residual_limit}|mask={' '.join(['1' if m else '0' for m in n.layer_mask])}|core_dim={n.core.core_dim}" | |
| nid = stable_id(n.name, cfg) | |
| ET.SubElement(nodes_el, 'node', {'id':nid, 'name':n.name, 'cfg':cfg}) | |
| transforms_el = ET.SubElement(static_el, 'transforms') | |
| seen_t = {} | |
| for n in all_nodes: | |
| for t in n.transforms: | |
| if t.name in seen_t: continue | |
| tcfg = f"is_compression={1 if getattr(t, 'is_compression', False) else 0}" | |
| tid = stable_id(t.name, tcfg) | |
| seen_t[t.name] = tid | |
| ET.SubElement(transforms_el, 'transform', {'id':tid, 'name':t.name, 'cfg':tcfg}) | |
| runtime_el = ET.SubElement(root_el, 'runtime_snapshot') | |
| global_el = ET.SubElement(runtime_el, 'global') | |
| ET.SubElement(global_el, 'metric', {'name':'deque_len', 'value':str(len(controller.global_deque))}) | |
| ET.SubElement(global_el, 'metric', {'name':'predictor_uncertainty', 'value':str(controller.predictor.uncertainty())}) | |
| snap = aggregator.snapshot_and_clear() | |
| nodes_rt_el = ET.SubElement(runtime_el, 'nodes') | |
| for node_name, bins in snap.items(): | |
| node_el = ET.SubElement(nodes_rt_el, 'node', {'name':node_name}) | |
| for bkey, stats in bins.items(): | |
| count = stats['count'] | |
| avg_res = stats['sum_residual']/count if count > 0 else 0.0 | |
| avg_prob = stats['sum_prob']/count if count > 0 else 0.0 | |
| be = ET.SubElement(node_el, 'branch', {'bin':bkey, 'count':str(count), 'rep_transform':str(stats.get('rep_transform',''))}) | |
| ET.SubElement(be, 'metric', {'name':'avg_residual', 'value':str(avg_res)}) | |
| ET.SubElement(be, 'metric', {'name':'avg_prob', 'value':str(avg_prob)}) | |
| tmp = out_path+'.tmp' | |
| tree = ET.ElementTree(root_el) | |
| tree.write(tmp, encoding='utf-8', xml_declaration=True) | |
| os.replace(tmp, out_path) | |
| def _abe_writer(root_node, all_nodes, controller, interval): | |
| global _abe_stop | |
| while not _abe_stop: | |
| try: | |
| export_AR3z_xml(root_node, all_nodes, controller) | |
| except Exception: | |
| pass | |
| time.sleep(interval) | |
| def start_abe_export(root_node, all_nodes, controller, interval=XML_INTERVAL): | |
| global _abe_thread, _abe_stop | |
| if _abe_thread is not None: | |
| return | |
| _abe_stop = False | |
| _abe_thread = threading.Thread(target=_abe_writer, args=(root_node, all_nodes, controller, interval), daemon=True) | |
| _abe_thread.start() | |
| def stop_abe_export(): | |
| global _abe_thread, _abe_stop | |
| if _abe_thread is None: | |
| return | |
| _abe_stop = True | |
| _abe_thread.join(timeout=1.0) | |
| _abe_thread = None | |
| def load_AR3z_xml(path): | |
| if not os.path.exists(path): | |
| return None | |
| tree = ET.parse(path) | |
| root = tree.getroot() | |
| out = {'ts': root.attrib.get('ts'), 'version': root.attrib.get('version')} | |
| static = {} | |
| sg = root.find('static_graph') | |
| if sg is not None: | |
| for n in sg.findall('.//node'): | |
| static[n.attrib.get('id')] = {'name': n.attrib.get('name'), 'cfg': n.attrib.get('cfg')} | |
| out['static'] = static | |
| runtime = {'nodes':{}} | |
| rt = root.find('runtime_snapshot') | |
| if rt is not None: | |
| nodes_rt = rt.find('nodes') | |
| if nodes_rt is not None: | |
| for n in nodes_rt.findall('node'): | |
| nm = n.attrib.get('name') | |
| branches = [] | |
| for b in n.findall('branch'): | |
| br = {'bin': b.attrib.get('bin'), 'count': int(b.attrib.get('count', '0')), 'rep_transform': b.attrib.get('rep_transform')} | |
| metrics = {} | |
| for m in b.findall('metric'): | |
| metrics[m.attrib.get('name')] = float(m.attrib.get('value')) | |
| br['metrics'] = metrics | |
| branches.append(br) | |
| runtime['nodes'][nm] = branches | |
| out['runtime'] = runtime | |
| return out | |
| #_______________________________________________________________________________________________________ | |
| #/////////////////////////////////////////////////////////////////////////////////////////////////////// | |
| # TICTAC DOM - DIANE - DISTRIBUTED INTERNAL ASYMMETRIC NEURAL EMERGENCE | |
| class DIANE: | |
| # Small but powerful nn. Uses asymmetric hybrid gate spawning to get the B-Spline Shell AI out | |
| # of any ruts that could occur in evaluating best complex matrices projections --RL prob basis | |
| def __init__(self, num_states, num_actions, feature_len=1, device=None, complex_mode=False, init_scale=0.1, similarity_threshold=0.5): | |
| self.device = device or torch.device('cpu') | |
| self.complex_mode = complex_mode | |
| self.dtype = torch.complex64 if complex_mode else torch.float32 | |
| self.num_states = num_states | |
| self.num_actions = num_actions | |
| self.feature_len = feature_len | |
| self.similarity_threshold = float(similarity_threshold) | |
| # Top asym propagation matrix(states x -> actions) | |
| real_part = torch.randn((num_states, num_actions), device=self.device, dtype=torch.float32)*init_scale | |
| imag_part = torch.randn((num_states, num_actions), device=self.device, dtype=torch.float32)*init_scale if complex_mode else None | |
| if complex_mode: self.memory_matrix = (real_part + 1j * imag_part).to(self.dtype) | |
| else: self.memory_matrix = real_part.to(self.dtype) | |
| # Secondary modulated asymm matrix(actions <- x states) | |
| real_mod = torch.randn((num_actions, num_states), device=self.device, dtype=torch.float32)*(init_scale*0.5) | |
| imag_mod = torch.randn((num_actions, num_states), device=self.device, dtype=torch.float32)*(init_scale*0.5) if complex_mode else None | |
| if complex_mode: self.modulation_matrix = (real_mod+1j*imag_mod).to(self.dtype) | |
| else: self.modulation_matrix = real_mod.to(self.dtype) | |
| # Caches & History: | |
| self.cached_decisions = {} # keyed(state.id, action.id) | |
| self.recent_actions = [] # recent ids actions | |
| self.propagation_history = [] # snapshots - memory_matrix | |
| self.decay, self.eps = 0.995, 1e-8 | |
| # The hybrid gate components(training *float32) | |
| self.gate_W = nn.Parameter(torch.randn((self.num_actions, max(1, self.feature_len)), dtype=torch.float32, device=self.device)*0.01) | |
| self.gate_b = nn.Parameter(torch.zeros((self.num_actions,), dtype=torch.float32, device=self.device)) | |
| self.gate_alpha, self.rule_override_strength = 0.1, 2.0 | |
| self.bspline_handler = default_bspline_handler | |
| self._bspline_futures = [] | |
| def learn(self, decision, lr=0.1): | |
| s = int(decision.state.id)%self.num_states | |
| a = int(decision.action.id)%self.num_actions | |
| r_val = torch.tensor(decision.reward.value, device=self.device, dtype=self.dtype) | |
| self.memory_matrix = self.memory_matrix * self.decay | |
| # Prime asym update state -> action | |
| current = self.memory_matrix[s, a] | |
| delta = lr*(r_val-current) | |
| self.memory_matrix[s, a] = current+delta | |
| # Secondary asym update action -> state | |
| mod_current = self.modulation_matrix[a, s] | |
| mod_delta = (lr*0.5)*(r_val-mod_current) | |
| self.modulation_matrix[a, s] = mod_current+mod_delta | |
| self._cache_decision(decision) | |
| self.propagation_history.append(self.memory_matrix.clone().detach()) | |
| # Possible request, B-Spline Shell assistance: | |
| if self._should_request_bspline(): | |
| self.request_bspline_resize_async(self.num_states, self.num_actions, self.memory_matrix) | |
| def decide(self, state, exploration_noise=0.01): | |
| s = int(state.id)%self.num_states | |
| mem_row = self.memory_matrix[s, :] | |
| mod_col = self.modulation_matrix[:, s] | |
| if self.complex_mode: | |
| scores_c = (mem_row*0.7)+(mod_col.conj()*0.3) | |
| scores = scores_c.abs() | |
| else: scores = (mem_row*0.7)+(mod_col*0.3) | |
| # Possible hybrid gates action: | |
| gates = self._compute_gates(state) | |
| alpha = self.gate_alpha | |
| if self._should_request_bspline(): alpha*=float(self.rule_override_strength) | |
| gates_t = gates.to(scores.device) | |
| scores = scores*(1.0+alpha*gates_t) | |
| if not self.complex_mode: | |
| noise = torch.randn_like(scores)*exploration_noise | |
| scores = scores+noise | |
| else: | |
| noise = torch.randn_like(scores)*exploration_noise | |
| scores = scores+noise | |
| action_id = int(torch.argmax(scores).item()) | |
| self.recent_actions.append(action_id) | |
| action_type = 'exploration' if torch.std(scores).item() > 1e-4 else 'exploitation' | |
| return Action(id=action_id, type=action_type) | |
| def apply_reward_limits(self, state, action): | |
| key = (int(state.id)%self.num_states, int(action.id)%self.num_actions) | |
| if key not in self.cached_decisions or len(self.cached_decisions[key]) == 0: | |
| return | |
| rewards = [torch.tensor(d.reward.value, dtype=self.dtype, device=self.device) for d in self.cached_decisions[key]] | |
| stacked = torch.stack(rewards) | |
| avg = stacked.mean() | |
| variance = stacked.var(unbiased=False) if stacked.numel() > 1 else torch.tensor(0.0, dtype=self.dtype, device=self.device) | |
| similarity = 1.0/(1.0+variance.abs()) | |
| if (avg.real if self.complex_mode else avg) < self.similarity_threshold or (similarity.real if self.complex_mode else similarity) < self.similarity_threshold: | |
| s, a = key | |
| if self.complex_mode: self.memory_matrix[s, a] = self.memory_matrix[s, a]*0.9 | |
| else: self.memory_matrix[s, a] = max(0.0, self.memory_matrix[s, a].item()*0.9) | |
| def cascade_decisions(self, state, depth): | |
| if depth <= 0: | |
| return [] | |
| results, frontier = [], [(state, 0)] | |
| while frontier: | |
| cur_state, cur_depth = frontier.pop(0) | |
| if cur_depth >= depth: | |
| continue | |
| s = int(cur_state.id)%self.num_states | |
| for a in range(self.num_actions): | |
| next_state = State(id=(cur_state.id+1)%self.num_states, features=[(cur_state.features[0]+1)]) | |
| raw = self.memory_matrix[s, a] | |
| r_value = float(raw.abs().item()) if self.complex_mode else float(raw.item()) | |
| reward = Reward(value=r_value, type='hypothetical') | |
| dec = Decision(state=next_state, action=Action(id=a, type='hypothetical'), reward=reward) | |
| results.append(dec) | |
| frontier.append((next_state, cur_depth+1)) | |
| return results | |
| def gate_parameters(self): | |
| return [self.gate_W, self.gate_b] | |
| def _compute_gates(self, state): | |
| feats = torch.tensor(state.features, dtype=torch.float32, device=self.device).view(-1) | |
| L = self.gate_W.shape[1] | |
| if feats.numel() != L: | |
| f = torch.zeros((L,), dtype=torch.float32, device=self.device) | |
| f[:min(feats.numel(), L)] = feats[:min(feats.numel(), L)] | |
| feats = f | |
| logits = torch.matmul(self.gate_W, feats)+self.gate_b | |
| gates = torch.sigmoid(logits) | |
| return gates | |
| def gate_loss_reg(self, recent_actions, entropy_coef=0.1): | |
| if len(recent_actions) < 2: | |
| return torch.tensor(0.0, device=self.device, dtype=torch.float32) | |
| counts = torch.zeros((self.num_actions,), device=self.device, dtype=torch.float32) | |
| for a in recent_actions: counts[a%self.num_actions]+=1.0 | |
| probs = counts/counts.sum() | |
| probs = probs+1e-6 | |
| entropy = -torch.sum(probs*torch.log(probs)) | |
| return -entropy*entropy_coef | |
| def _cache_decision(self, decision): | |
| key = (int(decision.state.id)%self.num_states, int(decision.action.id)%self.num_actions) | |
| if key not in self.cached_decisions: self.cached_decisions[key] = [] | |
| self.cached_decisions[key].append(decision) | |
| if len(self.cached_decisions[key]) > 128: self.cached_decisions[key].pop(0) | |
| def _should_request_bspline(self): | |
| if len(self.recent_actions) >= 8: | |
| last = self.recent_actions[-8:] | |
| if all(x == last[0] for x in last): | |
| return True | |
| # Stagnation detect from propagation history: | |
| if len(self.propagation_history) >= 5: | |
| diffs = [] | |
| for i in range(1, len(self.propagation_history)): | |
| a = self.propagation_history[i-1] | |
| b = self.propagation_history[i] | |
| diffs.append(torch.norm((b-a).real.float()).item()) | |
| mean_diff = sum(diffs)/len(diffs) if diffs else 0.0 | |
| if mean_diff < (1e-3*max(1.0, float(self.num_states*self.num_actions))): | |
| return True | |
| return False | |
| def set_bspline_handler(self, handler_callable): | |
| # *****Dict with the optional key sets: action, new_h, new_w, details***** | |
| self.bspline_handler = handler_callable | |
| def request_bspline_resize_async(self, h, w, complex_tensor): | |
| # Temporary, more options needed for multi-processing sub-actions | |
| t = complex_tensor.detach().to('cpu') | |
| if self.complex_mode: np_arr = t.numpy().astype(np.complex64) | |
| else: np_arr = t.numpy().astype(np.float32) | |
| future = _bspline_executor.submit(self.bspline_handler, int(h), int(w), np_arr) | |
| self._bspline_futures.append(future) | |
| def _on_done(fut): | |
| try: | |
| res = fut.result() | |
| if isinstance(res, dict): | |
| action = res.get('action') | |
| if action == 'restart': | |
| self.recent_actions.clear() | |
| self.notify_bspline_epoch_restart() | |
| elif action == 'resize': | |
| new_h = res.get('new_h', self.num_states) | |
| new_w = res.get('new_w', self.num_actions) | |
| print(f' /:./D.I.A.N.E./.:/ b-spline shell requested resize: {new_h}x{new_w}') | |
| except Exception as e: | |
| print(f' /:./D.I.A.N.E./.:/ b-spline shell handler raised: {e}') | |
| future.add_done_callback(lambda fut: _on_done(fut)) | |
| return future | |
| def notify_bspline_epoch_restart(self): | |
| print(' /:./D.I.A.N.E./.:/ notify b-spline epoch restart called') | |
| # _________________________________UTILS____________________________________ | |
| def to(self, device): | |
| self.device = device | |
| self.memory_matrix = self.memory_matrix.to(device) | |
| self.modulation_matrix = self.modulation_matrix.to(device) | |
| self.gate_W = nn.Parameter(self.gate_W.to(device)) | |
| self.gate_b = nn.Parameter(self.gate_b.to(device)) | |
| return self | |
| def set_complex_mode(self, complex_mode): | |
| if complex_mode == self.complex_mode: | |
| return | |
| self.complex_mode = complex_mode | |
| if complex_mode: | |
| self.memory_matrix = self.memory_matrix.to(torch.float32).to(self.device).to(torch.complex64)+0j | |
| self.modulation_matrix = self.modulation_matrix.to(torch.float32).to(self.device).to(torch.complex64)+0j | |
| self.dtype = torch.complex64 | |
| else: | |
| self.memory_matrix = self.memory_matrix.real.to(torch.float32) | |
| self.modulation_matrix = self.modulation_matrix.real.to(torch.float32) | |
| self.dtype = torch.float32 | |
| def summary(self): | |
| return {'num_states': self.num_states, | |
| 'num_actions': self.num_actions, | |
| 'complex_mode': self.complex_mode, | |
| 'memory_matrix_dtype': str(self.memory_matrix.dtype), | |
| 'recent_actions_len': len(self.recent_actions), | |
| 'cached_keys': len(self.cached_decisions)} | |
| #/////////////////////////////////////////////////////////////////////////////////////////////////////// | |
| #_______________________________________________________________________________________________________ | |
| # TICTAC DOM - COMPLEX REGRESSIVE SHELL B-SPLINE AI | |
| #_______________________________________________________________________________________________________ | |
| #/////////////////////////////////////////////////////////////////////////////////////////////////////// | |
| def collate_keep_meta(batch): | |
| inps, targs, metas = zip(*batch) | |
| return np.stack(inps, axis=0), np.stack(targs, axis=0), list(metas) | |
| def is_torch(x): | |
| return isinstance(x, torch.Tensor) | |
| def as_numpy(x): | |
| if is_torch(x): | |
| return x.detach().cpu().numpy() | |
| return np.asarray(x) | |
| def as_torch(x, device=None, dtype=None): | |
| if is_torch(x): | |
| if device is not None: x = x.to(device) | |
| if dtype is not None: x = x.to(dtype) | |
| return x | |
| t = torch.from_numpy(np.asarray(x)) | |
| if dtype is not None: t = t.to(dtype) | |
| if device is not None: t = t.to(device) | |
| return t | |
| def complex_to_magphase(mat): | |
| if is_torch(mat): | |
| mag = torch.abs(mat).to(torch.float32) | |
| phase = torch.atan2(mat.imag, mat.real).to(torch.float32) | |
| return mag, phase | |
| else: | |
| mag = np.abs(mat).astype(np.float32) | |
| phase = np.angle(mat).astype(np.float32) | |
| return mag, phase | |
| def magphase_to_complex(mag, phase): | |
| if is_torch(mag) or is_torch(phase): | |
| mag_t = as_torch(mag, dtype=torch.float32) | |
| ph_t = as_torch(phase, dtype=torch.float32) | |
| real = mag_t*torch.cos(ph_t) | |
| imag = mag_t*torch.sin(ph_t) | |
| return torch.complex(real, imag).to(torch.complex64) | |
| else: | |
| return (mag*np.exp(1j*phase)).astype(np.complex64) | |
| def unwrap_phase(phase): | |
| if is_torch(phase): | |
| p_np = phase.detach().cpu().numpy() | |
| p_un = np.unwrap(np.unwrap(p_np, axis=0), axis=1).astype(np.float32) | |
| return as_torch(p_un, dtype=torch.float32, device=phase.device) | |
| else: | |
| p = np.unwrap(np.unwrap(phase, axis=0), axis=1) | |
| return p.astype(np.float32) | |
| def cubic_b_spline_weights(t): | |
| if is_torch(t): | |
| t = t.to(torch.float32) | |
| t2 = t*t | |
| t3 = t2*t | |
| w0 = (1.0-3.0*t+3.0*t2-t3)/6.0 | |
| w1 = (4.0-6.0*t2+3.0*t3)/6.0 | |
| w2 = (1.0+3.0*t+3.0*t2-3.0*t3)/6.0 | |
| w3 = t3/6.0 | |
| return torch.stack([w0, w1, w2, w3], dim=-1) | |
| else: | |
| t = np.asarray(t, dtype=np.float32) | |
| t2 = t*t | |
| t3 = t2*t | |
| w0 = (1.0-3.0*t+3.0*t2-t3)/6.0 | |
| w1 = (4.0-6.0*t2+3.0*t3)/6.0 | |
| w2 = (1.0+3.0*t+3.0*t2-3.0*t3)/6.0 | |
| w3 = t3/6.0 | |
| return np.stack([w0, w1, w2, w3], axis=-1) | |
| def eval_bspline_grid(control, out_h, out_w): | |
| input_is_torch = is_torch(control) | |
| if input_is_torch: ctrl = control.detach().cpu().numpy() | |
| else: ctrl = np.asarray(control) | |
| single_channel = (ctrl.ndim == 2) | |
| if single_channel: ctrl = ctrl[None, ...] | |
| C, ch, cw = ctrl.shape | |
| ys = np.linspace(0, ch-1, out_h).astype(np.float32) | |
| xs = np.linspace(0, cw-1, out_w).astype(np.float32) | |
| Y, X = np.meshgrid(ys, xs, indexing='ij') | |
| ix = np.floor(X).astype(np.int32) | |
| iy = np.floor(Y).astype(np.int32) | |
| tx = (X-ix).astype(np.float32) | |
| ty = (Y-iy).astype(np.float32) | |
| wx = cubic_b_spline_weights(tx) | |
| wy = cubic_b_spline_weights(ty) | |
| out = np.zeros((C, out_h, out_w), dtype=np.float32) | |
| for m in range(-1, 3): | |
| for n in range(-1, 3): | |
| ixn = np.clip(ix+n, 0, cw-1) | |
| iym = np.clip(iy+m, 0, ch-1) | |
| w = (wy[..., m+1]*wx[..., n+1])[None, ...] | |
| vals = ctrl[:, iym, ixn] | |
| out+=vals*w | |
| if single_channel: out = out[0] | |
| if input_is_torch: | |
| return torch.from_numpy(out.astype(np.float32)) | |
| return out | |
| def extract_features_for_kmeans(mat): | |
| mat_np = as_numpy(mat) | |
| H, W = mat_np.shape | |
| X, Y = np.meshgrid(np.arange(W), np.arange(H)) | |
| mag = np.abs(mat_np).ravel() | |
| ph = np.angle(mat_np).ravel() | |
| re = mat_np.real.ravel() | |
| im = mat_np.imag.ravel() | |
| feats = np.stack([X.ravel(), Y.ravel(), mag, ph, re, im], axis=1).astype(np.float32) | |
| return feats | |
| def kmeans_on_matrix(mat, n_clusters=8, random_state=0): | |
| input_is_torch = is_torch(mat) | |
| feats = extract_features_for_kmeans(mat) | |
| kmeans = KMeans(n_clusters=n_clusters, random_state=random_state, n_init=10).fit(feats) | |
| labels = kmeans.labels_.reshape(as_numpy(mat).shape).astype(np.int32) | |
| centers = kmeans.cluster_centers_.astype(np.float32) | |
| if input_is_torch: | |
| return torch.from_numpy(labels), torch.from_numpy(centers) | |
| return labels, centers | |
| def compute_cluster_anchor_targets(mat_cur, mat_next, labels, centers): | |
| input_is_torch = is_torch(mat_cur) | |
| mat_cur_np = as_numpy(mat_cur) | |
| mat_next_np = as_numpy(mat_next) | |
| labels_np = as_numpy(labels) | |
| centers_np = as_numpy(centers) | |
| mag_cur = np.abs(mat_cur_np).astype(np.float32) | |
| ph_cur = np.angle(mat_cur_np).astype(np.float32) | |
| mag_next = np.abs(mat_next_np).astype(np.float32) | |
| ph_next = np.angle(mat_next_np).astype(np.float32) | |
| ph_cur_u = np.unwrap(np.unwrap(ph_cur, axis=0), axis=1) | |
| ph_next_u = np.unwrap(np.unwrap(ph_next, axis=0), axis=1) | |
| mag_res = (mag_next-mag_cur).astype(np.float32) | |
| ph_res = (ph_next_u-ph_cur_u).astype(np.float32) | |
| K = centers_np.shape[0] | |
| anchor_coords = centers_np[:, :2].astype(np.float32) | |
| anchor_mag_res = np.zeros(K, dtype=np.float32) | |
| anchor_ph_res = np.zeros(K, dtype=np.float32) | |
| counts = np.zeros(K, dtype=np.int32) | |
| H, W = labels_np.shape | |
| for k in range(K): | |
| mask = (labels_np == k) | |
| cnt = int(mask.sum()) | |
| counts[k] = cnt | |
| if cnt == 0: | |
| cx, cy = int(round(centers_np[k,0])), int(round(centers_np[k,1])) | |
| if 0 <= cx < W and 0 <= cy < H: | |
| anchor_mag_res[k] = mag_res[cy, cx] | |
| anchor_ph_res[k] = ph_res[cy, cx] | |
| else: | |
| anchor_mag_res[k] = 0.0 | |
| anchor_ph_res[k] = 0.0 | |
| else: | |
| anchor_mag_res[k] = float(mag_res[mask].mean()) | |
| anchor_ph_res[k] = float(ph_res[mask].mean()) | |
| if input_is_torch: | |
| return (as_torch(anchor_coords), as_torch(anchor_mag_res), as_torch(anchor_ph_res), as_torch(counts)) | |
| return anchor_coords, anchor_mag_res, anchor_ph_res, counts | |
| class SmallRegressor(nn.Module): | |
| def __init__(self, in_ch=4, out_ch=2, base=16): | |
| super().__init__() | |
| self.net = nn.Sequential( | |
| nn.Conv2d(in_ch, base, 3, padding=1), | |
| nn.ReLU(), | |
| nn.Conv2d(base, base, 3, padding=1), | |
| nn.ReLU(), | |
| nn.Conv2d(base, base*2, 3, padding=1), | |
| nn.ReLU(), | |
| nn.Conv2d(base*2, base, 3, padding=1), | |
| nn.ReLU(), | |
| nn.Conv2d(base, out_ch, 1)) | |
| def forward(self, x): | |
| return self.net(x) | |
| class ShellDataset(Dataset): | |
| def __init__(self, shell_list, n_clusters=8): | |
| self.shell = [as_numpy(s) for s in shell_list] | |
| self.n_clusters = n_clusters | |
| def __len__(self): | |
| return max(0, len(self.shell)-1) | |
| def __getitem__(self, idx): | |
| a_np = self.shell[idx] | |
| b_np = self.shell[idx+1] | |
| H, W = a_np.shape | |
| mag_a = np.abs(a_np).astype(np.float32) | |
| ph_a = np.angle(a_np).astype(np.float32) | |
| ph_a_u = np.unwrap(np.unwrap(ph_a, axis=0), axis=1) | |
| mag_b = np.abs(b_np).astype(np.float32) | |
| ph_b = np.angle(b_np).astype(np.float32) | |
| ph_b_u = np.unwrap(np.unwrap(ph_b, axis=0), axis=1) | |
| mag_res = (mag_b-mag_a).astype(np.float32) | |
| ph_res = (ph_b_u-ph_a_u).astype(np.float32) | |
| labels, centers = kmeans_on_matrix(a_np, n_clusters=self.n_clusters) | |
| labels_np = as_numpy(labels) | |
| centers_np = as_numpy(centers) | |
| H, W = mag_a.shape | |
| center_mag_map = np.zeros((H,W), dtype=np.float32) | |
| center_ph_map = np.zeros((H,W), dtype=np.float32) | |
| for k in range(centers_np.shape[0]): | |
| cx_i, cy_i = int(round(centers_np[k,0])), int(round(centers_np[k,1])) | |
| if 0 <= cx_i < W and 0 <= cy_i < H: | |
| center_mag_map[labels_np==k] = centers_np[k,2] | |
| center_ph_map[labels_np==k] = centers_np[k,3] | |
| inp = np.stack([mag_a, ph_a_u, center_mag_map, center_ph_map], axis=0).astype(np.float32) | |
| targ = np.stack([mag_res, ph_res], axis=0).astype(np.float32) | |
| anchor_coords, anchor_mag_res, anchor_ph_res, counts = compute_cluster_anchor_targets(a_np, b_np, labels_np, centers_np) | |
| meta = {'labels': labels_np.astype(np.int32), | |
| 'centers': centers_np.astype(np.float32), | |
| 'anchor_coords': anchor_coords.astype(np.float32) if not is_torch(anchor_coords) else as_numpy(anchor_coords), | |
| 'anchor_mag_res': anchor_mag_res.astype(np.float32) if not is_torch(anchor_mag_res) else as_numpy(anchor_mag_res), | |
| 'anchor_ph_res': anchor_ph_res.astype(np.float32) if not is_torch(anchor_ph_res) else as_numpy(anchor_ph_res), | |
| 'counts': counts.astype(np.int32) if not is_torch(counts) else as_numpy(counts)} | |
| return inp, targ, meta | |
| def sample_at_subpixel(pred_tensor, coords): | |
| input_is_torch = is_torch(pred_tensor) | |
| coords_np = as_numpy(coords) | |
| if not input_is_torch: | |
| B, C, H, W = pred_tensor.shape | |
| K = coords_np.shape[0] | |
| out = np.zeros((B, C, K), dtype=np.float32) | |
| for b in range(B): | |
| for k, (cx, cy) in enumerate(coords_np): | |
| cx, cy = float(cx), float(cy) | |
| cx = max(0.0, min(cx, W-1.0)) | |
| cy = max(0.0, min(cy, H-1.0)) | |
| x0 = int(np.floor(cx)) | |
| x1 = min(x0+1, W-1) | |
| y0 = int(np.floor(cy)) | |
| y1 = min(y0+1, H-1) | |
| wx, wy = cx-x0, cy-y0 | |
| for c in range(C): | |
| v = (1-wx)*(1-wy)*pred_tensor[b, c, y0, x0]+\ | |
| wx*(1-wy)*pred_tensor[b, c, y0, x1]+\ | |
| (1-wx)*wy*pred_tensor[b, c, y1, x0]+\ | |
| wx*wy*pred_tensor[b, c, y1, x1] | |
| out[b,c,k] = v | |
| return out | |
| else: | |
| B, C, H, W = pred_tensor.shape | |
| K = coords_np.shape[0] | |
| xs = (coords_np[:,0]/max(1, W-1))*2-1 | |
| ys = (coords_np[:,1]/max(1, H-1))*2-1 | |
| grid = np.stack([xs, ys], axis=1).astype(np.float32) | |
| grid_t = torch.from_numpy(grid)[None, :, None, :].to(pred_tensor.device) | |
| sampled = F.grid_sample(pred_tensor, grid_t, mode='bilinear', padding_mode='border', align_corners=True) | |
| sampled = sampled.permute(0, 1, 2, 3).contiguous() | |
| sampled = sampled.view(B, C, K) | |
| return sampled | |
| def train_regressor(shell_list, epochs=10, batch_size=1, n_clusters=4, lr=5e-4, device=torch.device('cpu')): | |
| ds = ShellDataset(shell_list, n_clusters=n_clusters) | |
| dl = DataLoader(ds, batch_size=batch_size, shuffle=True, collate_fn=collate_keep_meta) | |
| model = SmallRegressor(in_ch=4, out_ch=2, base=16).to(device) | |
| opt = torch.optim.Adam(model.parameters(), lr=lr) | |
| for ep in range(epochs): | |
| total_loss, n_batches = 0.0, 0 | |
| for inp, targ, meta in dl: | |
| inp_t = torch.from_numpy(inp).to(device) | |
| targ_t = torch.from_numpy(targ).to(device) | |
| pred = model(inp_t) | |
| mse = F.mse_loss(pred, targ_t) | |
| batch_loss_anchor = 0.0 | |
| B = inp_t.shape[0] | |
| for b in range(B): | |
| sample_meta = meta[b] | |
| if sample_meta is None: | |
| continue | |
| coords = sample_meta.get('anchor_coords') | |
| if coords is None: | |
| continue | |
| coords = np.asarray(coords, dtype=np.float32) | |
| if coords.size == 0: | |
| continue | |
| counts = np.asarray(sample_meta.get('counts', []), dtype=np.float32) | |
| anchor_mag = np.asarray(sample_meta.get('anchor_mag_res', []), dtype=np.float32) | |
| anchor_ph = np.asarray(sample_meta.get('anchor_ph_res', []), dtype=np.float32) | |
| sampled = sample_at_subpixel(pred[b:b+1].detach().cpu().numpy(), coords) | |
| sampled_np = sampled[0] | |
| weights = counts+1.0 | |
| w = weights/(weights.sum()+1e-8) | |
| mag_err = ((sampled_np[1]-anchor_mag)**2*w).sum() | |
| ph_err = ((sampled_np[1]-anchor_ph)**2*w).sum() | |
| batch_loss_anchor+=(mag_err+ph_err) | |
| return model | |
| def refine_residual_with_bspline(pred_res, labels, centers, anchor_mag_res, anchor_ph_res, control_grid_size=(16,16)): | |
| input_is_torch = is_torch(pred_res) | |
| pred_np = as_numpy(pred_res) | |
| H, W = pred_np.shape[1], pred_np.shape[2] | |
| ch, cw = control_grid_size | |
| grid_mag = ndimage.zoom(pred_np[0], (ch/float(H), cw/float(W)), order=1) | |
| grid_ph = ndimage.zoom(pred_np[1], (ch/float(H), cw/float(W)), order=1) | |
| centers_np = as_numpy(centers) | |
| K = centers_np.shape[0] | |
| for k in range(K): | |
| cx, cy = centers_np[k,0], centers_np[k,1] | |
| gx = (cx/max(1.0, W-1.0))*(cw-1) | |
| gy = (cy/max(1.0, H-1.0))*(ch-1) | |
| ix, iy = int(round(gx)), int(round(gy)) | |
| if 0 <= ix < cw and 0 <= iy < ch: | |
| alpha = 0.6 | |
| grid_mag[iy, ix] = alpha*anchor_mag_res[k]+(1-alpha)*grid_mag[iy, ix] | |
| grid_ph[iy, ix] = alpha*anchor_ph_res[k]+(1-alpha)*grid_ph[iy, ix] | |
| mag_dense = eval_bspline_grid(grid_mag, H, W) | |
| ph_dense = eval_bspline_grid(grid_ph, H, W) | |
| out = np.stack([mag_dense.astype(np.float32), ph_dense.astype(np.float32)], axis=0) | |
| if input_is_torch: | |
| return torch.from_numpy(out) | |
| return out | |
| def predict_next_cpu(model, mat_current, n_clusters=8, control_grid_size=(16, 16), device=torch.device('cpu')): | |
| input_is_torch = is_torch(mat_current) | |
| mat_np = as_numpy(mat_current) | |
| labels, centers = kmeans_on_matrix(mat_np, n_clusters=n_clusters) | |
| labels_np = as_numpy(labels) | |
| centers_np = as_numpy(centers) | |
| mag_cur = np.abs(mat_np).astype(np.float32) | |
| ph_cur = np.angle(mat_np).astype(np.float32) | |
| ph_cur_u = np.unwrap(np.unwrap(ph_cur, axis=0), axis=1) | |
| H, W = mag_cur.shape | |
| center_mag_map = np.zeros((H,W), dtype=np.float32) | |
| center_ph_map = np.zeros((H,W), dtype=np.float32) | |
| for k in range(centers_np.shape[0]): | |
| cx_i, cy_i = int(round(centers_np[k, 0])), int(round(centers_np[k,1])) | |
| if 0 <= cx_i < W and 0 <= cy_i < H: | |
| center_mag_map[labels_np==k] = centers_np[k, 2] | |
| center_ph_map[labels_np==k] = centers_np[k, 3] | |
| inp = np.stack([mag_cur, ph_cur_u, center_mag_map, center_ph_map], axis=0)[None] | |
| inp_t = torch.from_numpy(inp).to(device) | |
| model = model.to(device) | |
| model.eval() | |
| with torch.no_grad(): | |
| pred_res_t = model(inp_t) | |
| pred_res = pred_res_t.cpu().numpy()[0] | |
| anchor_mag_res = np.zeros(centers_np.shape[0], dtype=np.float32) | |
| anchor_ph_res = np.zeros(centers_np.shape[0], dtype=np.float32) | |
| refined = refine_residual_with_bspline(pred_res, labels_np, centers_np, anchor_mag_res, anchor_ph_res, control_grid_size=control_grid_size) | |
| pred_mag = mag_cur+as_numpy(refined[0]) | |
| pred_ph = ph_cur_u+as_numpy(refined[1]) | |
| pred_complex = magphase_to_complex(pred_mag, pred_ph) | |
| if input_is_torch: | |
| return as_torch(pred_complex, dtype=torch.complex64, device=device), {'labels': labels, 'centers': centers, 'pred_res': torch.from_numpy(pred_res)} | |
| else: | |
| return pred_complex, {'labels': labels_np, 'centers': centers_np, 'pred_res': pred_res} | |
| def k_shell_frame_learn(H: int, W: int): | |
| x, y = np.linspace(0, 4*np.pi, W), np.linspace(0, 4*np.pi, H) | |
| X, Y = np.meshgrid(x, y) | |
| frameA = (np.sin(X)+1j*np.cos(Y))*np.exp(1j*0.5*X) | |
| frameB = np.roll(frameA, shift=2, axis=1)+0.02*(np.random.randn(H, W)+1j*np.random.randn(H, W)) | |
| shell = [frameA.astype(np.complex64), frameB.astype(np.complex64)] | |
| model = train_regressor(shell, epochs=10, batch_size=1, n_clusters=4, lr=5e-4, device=torch.device('cpu')) | |
| pred, meta = predict_next_cpu(model, shell[0], n_clusters=4, control_grid_size=(16, 16)) | |
| rel_err = np.linalg.norm(pred-shell[1])/(np.linalg.norm(shell[1])+1e-12) | |
| print('Relative TT-DOM Shell Error (CPU regressor + bspline refine):', rel_err) | |
| #/////////////////////////////////////////////////////////////////////////////////////////////////////// | |
| #_______________________________________________________________________________________________________ | |
| def _map_xml(pth: str, elem: ET.Element): | |
| src = ET.tostring(elem, 'utf-8') | |
| prs = minidom.parseString(src) | |
| p_xml = prs.toprettyxml(indent=' ') | |
| with open(pth, 'w', encoding='utf-8') as f: | |
| f.write('\n'.join(ln for ln in p_xml.splitlines() if ln.strip())) | |
| class TICTAC_UXCM1(): | |
| slots = ('_clcgX', '_clcgY') | |
| def __init__(self, isSxlz: bool, _cd_lx_cwgrlX: float, _cd_lx_cwgrlY: float): | |
| if isSxlz: | |
| self._clcgX, self._clcgY = self.mmp_tqf_xyc1(_cd_lx_cwgrlX, _cd_lx_cwgrlY) | |
| else: | |
| xp_r1 = [abs(((cmath.sin(U2_LCX2[x]))-_cd_lx_cwgrlY)/math.pi) for x in range(9)] | |
| xp_r2 = [(abs(cmath.sin(xp_r1[y])), abs(cmath.cos(U1_LCX1[y]))+_cd_lx_cwgrlX/(y+1)) for y in range(9)] | |
| xp_r3 = [t[0]+t[1] for t in xp_r2] | |
| self._clcgX = math.log(sum(xp_r1))*math.pi | |
| self._clcgY = math.log(sum(xp_r3))*math.pi | |
| def mmp_tqf_xyc1(self, clcg_x: float, clcg_y: float) -> tuple: | |
| # Meyu+da'yut rehav ha-rubim ha-nikra'im: <x ad y> | |
| ctd_lstA = [] | |
| ctd_lstB = [] | |
| for i in range(8): | |
| tp_m = cmath.sin(U1_LCX1[i+1]-clcg_x)+math.sqrt(clcg_y)/clcg_x | |
| if tp_m.real > 0: | |
| ctd_lstB.append(tp_m+math.pi+1) | |
| else: | |
| tp_m = math.log(-tp_m.real+1) | |
| if tp_m > math.pi+i+1: | |
| ctd_lstA.append(tp_m-math.pi) | |
| else: | |
| ctd_lstA.append(tp_m+math.pi+i+1) | |
| cla_sum = sum(ctd_lstA) | |
| clb_sum = sum(ctd_lstB) | |
| return math.ceil(abs(cmath.sqrt(cla_sum+clb_sum))), math.ceil(abs(cmath.sqrt(clb_sum-cla_sum))) | |
| def adj_bw_exp(self, r_vz: float, l_r1: list, l_r2: list) -> tuple: | |
| # Har'vacha mutkenet min hitchab'rut hitparzut | |
| xd = [(cmath.exp(v1-v2*v1))/r_vz for v1, v2 in zip(l_r1, [l_r2[i]**(r_vz/(cmath.sin(l_r1[i])+math.pi)) for i in range(len(l_r1))])] | |
| xd_len = len(xd)-1 | |
| eq_set = set() | |
| l_c = -1 | |
| while 1: | |
| l_c+=1 | |
| if l_c == xd_len: | |
| l_c = 0 | |
| if abs(math.atan(random.randint(0, l_c))/(complex(abs(xd[l_c]), l_c))) > abs(xd[l_c+1]): | |
| eq_set.add(cmath.sqrt(xd[l_c])) | |
| else: | |
| eq_set.add(abs(cmath.sqrt(xd[l_c+1])/(l_c+1))) | |
| if len(eq_set) > xd_len: | |
| break | |
| eq_set, xd, c_nl, l_c = list(eq_set), [], None, 0 | |
| for c_n1 in eq_set: | |
| if isinstance(c_n1, complex): | |
| c_nl = abs(cmath.tan(c_n1)*c_n1.imag) | |
| else: | |
| if l_c != None: | |
| c_nl = l_c+c_n1/(c_n1*r_vz) | |
| else: | |
| c_nl = math.log(c_n1*c_n1.real) | |
| if l_c < c_nl: | |
| xd.append(c_nl/math.pi) | |
| else: | |
| xd.append((c_nl+l_c)*math.pi) | |
| l_c = c_nl | |
| return min(xd), max(xd) | |
| class TICTAC_SHELL_REDOCK: | |
| slots = ('_dif_x', '_dif_z') | |
| def __init__(self): | |
| pass | |
| def row_ratio_segments(mat, tol=1e-6, min_len=2, zero_threshold=1e-12): | |
| m, n = mat.shape | |
| if m < 2: | |
| return [(0, m, mat[0].copy(), 1+0j)] if m == 1 else [] | |
| # kakhesh et ha-yahas ha-skalari bein shoutot okvavot | |
| ratios = [] | |
| for i in range(m-1): | |
| a, b = mat[i], mat[i+1] | |
| mask = np.abs(a) > zero_threshold | |
| # drush kama r'chivim tkafim | |
| if mask.sum() >= max(1, n//20): | |
| r_elems = b[mask] / a[mask] | |
| # **hishtamesh be-median le-amidut | |
| r_med = np.median(r_elems) | |
| ratios.append(r_med) | |
| else: | |
| # hat'amat skaler be-minimum s'khum rivuqim r she-mamze'eret avur kol ha-r'khavim | |
| # r = (a^H b)/(a^H a) /...im ha-makhneh lo katan miday | |
| denom = np.vdot(a, a) | |
| if np.abs(denom) > zero_threshold: | |
| r_ls = np.vdot(a, b)/denom | |
| ratios.append(r_ls) | |
| else: ratios.append(None) | |
| # kabetz r'tsafim r'tzifim she-bahem ha-yahas neftan le-hizuy veyatziv | |
| segs, i = [], 0 | |
| while i < m: | |
| if i == m-1: | |
| segs.append((i, i+1, mat[i].copy(), 1+0j)) | |
| break | |
| r0 = ratios[i] | |
| if r0 is None: | |
| segs.append((i, i+1, mat[i].copy(), 1+0j)) | |
| i+=1 | |
| continue | |
| j = i+1 | |
| while j < m-1 and ratios[j] is not None and abs(ratios[j]-r0) <= tol: j+=1 | |
| length = j-i+1 | |
| if length >= min_len: | |
| segs.append((i, j+1, mat[i].copy(), r0)) | |
| i = j+1 | |
| else: | |
| # polet shurot budadot ke-hizun chozer | |
| segs.append((i, i+1, mat[i].copy(), 1+0j)) | |
| i+=1 | |
| return segs | |
| def reconstruct_segment(base, ratio, length): | |
| # hechez ma'arakh b'tzura shel orekh n + ribu'a ha-yahas le-k | |
| # shidur mika'ei -> base[None, :]*(R**ks) -> (L, n) | |
| ks = np.arange(length, dtype=complex)[:, None] | |
| return (base[None, :]*(ratio**ks)).astype(base.dtype) | |
| def truncated_svd_updates(block, rank): | |
| # machzir S-V-D mekutzer ve-adkunim mi-makhpalot hitzoniyot shel darga | |
| U, s, Vh = svd(block, full_matrices=False) | |
| return U[:, :rank], s[:rank], Vh[:rank, :] | |
| def resize_rows(mat, target_rows, tol=1e-4, svd_rank_max=4, apply_row_update=None, apply_rank_update=None): | |
| m, n = mat.shape | |
| assert target_rows%2 == 0, '<TT-DOM 1.02> tq- shell, svd target row(s) are not even' | |
| # tzor et ha-shurot al-yedei interpolatzia linearit al-indexim nifradim le-markiv ha-mamashi ve-ha-meduma | |
| old_idx = np.linspace(0.0, 1.0, m) | |
| new_idx = np.linspace(0.0, 1.0, target_rows) | |
| real_interp = np.vstack([np.interp(new_idx, old_idx, mat[:, j].real) for j in range(n)]).T | |
| imag_interp = np.vstack([np.interp(new_idx, old_idx, mat[:, j].imag) for j in range(n)]).T | |
| desired = real_interp+1j*imag_interp | |
| # ha-matrica ha-nokhehit muta'emet le-shurot ha-ya'ad le-hashva'ah | |
| if target_rows == m: current = mat.copy() | |
| elif target_rows > m: | |
| repeats = int(np.ceil(target_rows/m)) | |
| tiled = np.tile(mat, (repeats, 1))[:target_rows] | |
| current = tiled | |
| else: current = mat[:target_rows].copy() | |
| segs = row_ratio_segments(desired, tol=tol, min_len=2) | |
| recon = np.zeros_like(desired) | |
| for (s, e, base, ratio) in segs: | |
| length = e-s | |
| if not np.isclose(ratio, 1+0j): | |
| # shakhzer sderot geometriyot me-ha-tzurot ha-metukanot | |
| rows = reconstruct_segment(base, ratio, length) | |
| err = np.linalg.norm(rows-desired[s:e], ord='fro') | |
| if err <= tol*np.sqrt(length*n): | |
| recon[s:e] = rows | |
| if apply_row_update: | |
| apply_row_update(s, rows) | |
| continue | |
| # b'tsa S-V-D be-dirug <-> namukh al ha-block ha-nokhehi <- sigma | |
| block = desired[s:e] | |
| U, sigma, Vh = svd(block, full_matrices=False) | |
| total_energy = np.sum(sigma**2) | |
| if total_energy == 0: rank = 1 | |
| else: | |
| energy = np.cumsum(sigma**2)/total_energy | |
| rank = int(np.searchsorted(energy, 0.999)+1) | |
| rank = min(max(1, rank), min(svd_rank_max, block.shape[0], block.shape[1])) | |
| U_r, s_r, Vh_r = U[:, :rank], sigma[:rank], Vh[:rank, :] | |
| recon[s:e] = (U_r*s_r) @ Vh_r | |
| if apply_rank_update: | |
| for k in range(rank): apply_rank_update(s, U_r[:, k], s_r[k], Vh_r[k, :]) | |
| # hachal tikun la-shura al-pi shgi'a she'arit gdola | |
| residual = desired-recon | |
| mask = np.abs(residual) > tol | |
| if apply_row_update: | |
| rows_to_fix = np.where(mask.any(axis=1))[0] | |
| for r in rows_to_fix: | |
| apply_row_update(r, desired[r:r+1]) | |
| recon[r:r+1] = desired[r:r+1] | |
| return recon | |
| class TICTAC_JXML_IO: | |
| @staticmethod | |
| def _write(pth: str, cd: list): | |
| r = len(cd) | |
| c = len(cd[0]) if r > 0 else 0 | |
| arr = ET.Element('tictac2d', r=str(r), c=str(c)) | |
| for r_i, r_s in enumerate(cd): | |
| r_e = ET.SubElement(arr, 'row', idx=str(r_i)) | |
| for c_i, c_s in enumerate(r_s): | |
| ET.SubElement(r_e, 'val', col=str(c_i), real=str(c_s.real), imag=str(c_s.imag)) | |
| _map_xml(pth, arr) | |
| @staticmethod | |
| def _read(pth: str) -> list: | |
| tree = ET.parse(pth) | |
| root = tree.getroot() | |
| rws = [] | |
| for r_e in root.findall('row'): | |
| r = [] | |
| for v_t in r_e.findall('val'): | |
| real = float(v_t.get('real', '0')) | |
| imag = float(v_t.get('imag', '0')) | |
| r.append(complex(real, imag)) | |
| rws.append(r) | |
| return rws | |
| @staticmethod | |
| def _read_complex(pth: str, r: int, c: int) -> complex: | |
| tree = ET.parse(pth) | |
| root = tree.getroot() | |
| r_e = root.find(f"./row[@idx='{r}']") | |
| v_t = r_e.find(f"./val[@col='{c}']") | |
| return complex(float(v_t.get('real', '0')), float(v_t.get('imag', '0'))) | |
| @staticmethod | |
| def _edit_complex(pth: str, r: int, c: int, v: complex): | |
| tree = ET.parse(pth) | |
| root = tree.getroot() | |
| r_e = root.find(f"./row[@idx='{r}']") | |
| v_t = r_e.find(f"./val[@col='{c}']") | |
| v_t.set('real', str(v.real)) | |
| v_t.set('imag', str(v.imag)) | |
| _map_xml(pth, root) | |
| @staticmethod | |
| def _insert_row(pth: str, r_i: int, r_v: list): | |
| tree = ET.parse(pth) | |
| root = tree.getroot() | |
| rws = root.findall('row') | |
| rws_len = len(rws) | |
| c_cnt = int(root.get('c', '0')) | |
| for r in rws[r_i:]: | |
| r.set('idx', str(int(r.get('idx'))+1)) | |
| n_r = ET.Element('row', idx=str(r_i)) | |
| for c_i, c in enumerate(r_v): | |
| ET.SubElement(n_r, 'val', col=str(c_i), real=str(c.real), imag=str(c.imag)) | |
| root.insert(r_i, n_r) | |
| root.set('r', str(rws_len)) | |
| if c_cnt == 0: | |
| root.set('c', str(len(r_v))) | |
| _map_xml(pth, root) | |
| @staticmethod | |
| def _remove_row(pth: str, r_i: int): | |
| tree = ET.parse(pth) | |
| root = tree.getroot() | |
| trgt = root.find(f"./row[@idx='{r_i}']") | |
| root.remove(trgt) | |
| for i, r in enumerate(root.findall('row')): | |
| r.set('idx', str(i)) | |
| root.set('r', str(len(root.findall('row')))) | |
| _map_xml(pth, root) | |
| @staticmethod | |
| def _insert_complex(pth: str, r_i: int, c_i: int, v: complex): | |
| tree = ET.parse(pth) | |
| root = tree.getroot() | |
| r_e = root.find(f"./row[@idx='{r_i}']") | |
| c_v = r_e.findall('val') | |
| c_len = len(c_v) | |
| for c_c in c_v[c_i:]: | |
| c_c.set('col', str(int(c_c.get('col'))+1)) | |
| n_v = ET.Element('val', col=str(c_i), real=str(v.real), imag=str(v.imag)) | |
| r_e.insert(c_i, n_v) | |
| root.set('c', str(max(int(root.get('c', '0')), c_len+1))) | |
| _map_xml(pth, root) | |
| @staticmethod | |
| def _remove_complex(pth: str, r_i: int, c_i: int): | |
| tree = ET.parse(pth) | |
| root = tree.getroot() | |
| r_e = root.find(f"./row[@idx='{r_i}']") | |
| trgt = r_e.find(f"./val[@col='{c_i}']") | |
| r_e.remove(trgt) | |
| for i, c_v in enumerate(r_e.findall('val')): | |
| c_v.set('col', str(i)) | |
| max_c = max((len(r.findall('val')) for r in root.findall('row')), default=0) | |
| root.set('c', str(max_c)) | |
| _map_xml(pth, root) | |
| class TICTAC_DOM_KRNL(): | |
| slots = ('_dom_csfs', '_shl_ry1', '_shl_ry2', '_shl_ry3', '_shl_ry4', '_last_shl', | |
| '_cur_jxml') | |
| def __init__(self): | |
| self._last_shl = -1 | |
| self.lang_shell() | |
| def lang_dirs(self, dir: str) -> str: | |
| if dir == 'MAIN': | |
| return Path(TT_DOM_PTH) / 'TICTAC_DOM_1_02' | |
| elif dir == 'EWWW': | |
| return Path(TT_DOM_PTH) / 'TICTAC_DOM_1_02' / 'EWWW_T_D' | |
| elif dir == 'JXML': | |
| return Path(TT_DOM_PTH) / 'TICTAC_DOM_1_02' / 'EWWW_T_D' / 'JXML_T_D' | |
| def lang_shell(self): | |
| if self._last_shl > -1: | |
| pass | |
| else: | |
| new_dirs = False | |
| fldr_pth = self.lang_dirs('MAIN') | |
| if not os.path.isdir(fldr_pth): | |
| os.makedirs(fldr_pth) | |
| new_dirs = True | |
| fldr_pth = self.lang_dirs('JXML') | |
| if not os.path.isdir(fldr_pth): | |
| os.makedirs(fldr_pth) | |
| new_dirs = True | |
| self.tt_domain_tpk(new_dirs) | |
| def tt_domain_tpk(self, isNewDirs: bool): | |
| JXML = TICTAC_JXML_IO() | |
| if not isNewDirs: | |
| pass | |
| else: | |
| self._cur_jxml = '_ttd_plex_1.xml' | |
| pth = self.lang_dirs('JXML') / self._cur_jxml | |
| JXML._write(pth, [[1+2j, 2-3j, 4+5j, 6-7j], [8-9j, 7+6j, 5-4j, 3+2j]]) | |
| tpk = ['LAST-JXML=_ttd_plex_1.xml', | |
| 'GRAPH-MEDIA=default_flir', | |
| 'DOMAIN-PAIRING-ORDER=null', | |
| 'DOMAIN-PAIRING-SEQ=null', | |
| 'DOMAIN-PAIRING-LMT=null', | |
| 'DOMAIN-PAIRING-PLUG-IN-OUTER=null', | |
| 'DOMAIN-PAIRING-PLUG-IN-INNER=null', | |
| 'DOMAIN-PAIRING-PLUG-IN-OUTER-PATH=null', | |
| 'DOMAIN-PAIRING-PLUG-IN-INNER-PATH=null', | |
| 'DOMAIN-CYCLE-RSZ1-OUTER=null', | |
| 'DOMAIN-CYCLE-RSZ2-OUTER=null', | |
| 'DOMAIN-CYCLE-RSZ1-INNER=null', | |
| 'DOMAIN-CYCLE-RSZ2-INNER=null', | |
| 'DOMAIN-SHELL-LAST=null', | |
| 'DOMAIN-SHELL-DIM=null'] | |
| pth = self.lang_dirs('EWWW') / 'ttd_dom_cfg.tpk' | |
| self.tt_dom_write(pth, '\n'.join(tpk)) | |
| def tt_dom_print(self, txt: str, indent=0): | |
| if indent == 2: print(f' .:{txt}') | |
| else: print(f'-:. {txt}') | |
| def tt_dom_write(self, pth: str, src: str): | |
| with open(pth, mode='w', encoding='utf-8') as f: f.write(src) | |
| def pre_lang_updt(self, src: (str, list)) -> list: | |
| if isinstance(src, str) or isinstance(src, list): | |
| if isinstance(src, str): src = src.split('\n') | |
| else: | |
| raise RuntimeError('<TT_DOM 1.02> tt-dom source code must be a str or list type') | |
| src, rtn = [ln.lower().replace('~', '').replace(' ', '') for ln in src], [] | |
| cmmnt_pos_lst = [ln.find('||') for ln in src] | |
| for i, ln in enumerate(src): | |
| cmmnt_pos = cmmnt_pos_lst[i] | |
| if ln: | |
| if cmmnt_pos > -1: | |
| if cmmnt_pos > 0: rtn.append(ln[:cmmnt_pos-len(ln)]) | |
| else: rtn.append('~') | |
| else: rtn.append(ln) | |
| else: rtn.append('~') | |
| return rtn | |
| def lang_updt(self, src: (str, list)): | |
| print(f'TT-DOM Plex Interpreter is running, current plex-signal source code to process: {len(src)} line(s)') | |
| src = self.pre_lang_updt(src) | |
| if len(src) > 0: | |
| cmd_re_lst = [r'tq.*\(.*\)'] | |
| self._dom_csfs = [] | |
| ln_n = 1 | |
| for ln in src: | |
| ln_len = len(ln) | |
| if ln_len > 1: | |
| if ln_len > 6: | |
| if ln[ln_len-1] == ':': | |
| if ln[0] == 't' and ln[1] == 'q': self.cmd_lang_tq(ln[:ln_len-1], ln_n, cmd_re_lst[0]) | |
| else: raise RuntimeError(f'<TT_DOM 1.02> line({ln_n}) has no recognizable cmd, struct, fork or sym domain') | |
| else: raise RuntimeError(f'<TT_DOM 1.02> line({ln_n}) missing a end line ":" terminator') | |
| else: raise RuntimeError(f'<TT_DOM 1.02> line({ln_n}) non-recognizable syntax line length, less than 7 chars') | |
| ln_n+=1 | |
| else: raise RuntimeError('<TT-DOM 1.02> no visible source code lines found') | |
| def cmd_lang_tq(self, ln: str, ln_n: int, cel: str): | |
| # TEMPLATE TQ: | |
| # tq-|+1-4(shell<> | scan<> | respawn<> | wait<>) 3+2j, 5-2j, 1+9j: | |
| # tq-|+1-4(") 3+2j, 5-2j, 1+9j; tq-2(") 3-2j, 5+2j, 1+9j: | |
| # self._dom_csfs.append(('cmd', 'tq', ln_n-1, ln)} | |
| tic = re.compile(r'^[+-]?\d[+-]\d[ijIJ]$') | |
| try: | |
| if ln.find(';') > -1: ln = ln.split(';') | |
| else: ln = [ln] | |
| for tq in ln: | |
| tq_cmd = re.findall(cel, tq) | |
| if len(tq_cmd) == 1: | |
| tq_cmd = tq_cmd[0].split('(') | |
| if len(tq_cmd[0]) == 4: | |
| convr_smd, shell_cmd = tq_cmd[0].replace('tq', ''), tq_cmd[1].replace(')', '') | |
| if len(convr_smd) == 2: | |
| if convr_smd.startswith(('-', '+')) and convr_smd[1].isdigit(): | |
| tq_cmd = tq.split(')') | |
| if tq_cmd[1].find(',') > -1: | |
| tq_cmd = tq_cmd[1].split(',') | |
| if all(bool(tic.fullmatch(T)) for T in tq_cmd): | |
| if shell_cmd.find('shell<') > -1: | |
| self.cmd_lang_tq_shell(shell_cmd.replace('shell<', '').replace('>', ''), convr_smd, tq_cmd) | |
| elif shell_cmd.find('scan<') > -1: | |
| self.cmd_lang_tq_scan(shell_cmd.replace('scan<', '').replace('>', ''), convr_smd, tq_cmd) | |
| elif shell_cmd.find('respawn<') > -1: | |
| pass | |
| elif shell_cmd.find('wait<') > -1: | |
| pass | |
| else: raise RuntimeError(f'valid tq shell director not found; valid tqsd: shell, scan, respawn or wait') | |
| else: raise RuntimeError(f'improper complex number notation found') | |
| else: raise RuntimeError(f'no comma(s) found for tq complex list') | |
| else: raise RuntimeError(f'improper tq shell matrix director syntax') | |
| else: raise RuntimeError(f'improper tq shell director syntax') | |
| else: raise RuntimeError(f'improper tq shell director syntax') | |
| else: raise RuntimeError(f'improper syntax for tq cmd or missing') | |
| except Exception as err_lang_tq: raise RuntimeError(f'<TT-DOM 1.02> tq- code err line({ln_n}), {err_lang_tq}') | |
| def cmd_lang_tq_shell(self, shell_cmd: str, convr_smd: str, cn_lst: list): | |
| # shell<exponent_ratio_start=int 1-3, svd_rank_max=int 3-5, target_rows_length=int 5-25> | |
| self.tt_dom_print(f'mapping tq- shell director //SHELL /smd priority: {convr_smd} /total plex: {len(cn_lst)}') | |
| self.parse_tq_complex_list(cn_lst) | |
| if shell_cmd.find(',') > -1: | |
| shell_cmd = shell_cmd.split(',') | |
| if len(shell_cmd) == 3: | |
| shell_cmd = [int(i) for i in shell_cmd if i.isdigit()] | |
| if len(shell_cmd) == 3: | |
| if shell_cmd[0] > 0 and shell_cmd[0] < 4: | |
| if shell_cmd[1] > 2 and shell_cmd[1] < 6: | |
| if shell_cmd[2] > 4 and shell_cmd[2] < 26: | |
| self.tt_dom_print(f'exponent_ratio_start={shell_cmd[0]}', 2) | |
| self.tt_dom_print(f'svd_rank_max={shell_cmd[1]}', 2) | |
| self.tt_dom_print(f'target_rows_length={shell_cmd[2]}', 2) | |
| # ><)))'> | |
| else: raise RuntimeError(f'tq direct shell director arg "target_rows_length" err; valid int range: 5-25') | |
| else: raise RuntimeError(f'tq direct shell director arg "svd_rank_max" err: valid int range: 3-5') | |
| else: raise RuntimeError(f'tq direct shell director arg "exponent_ratio_start" err; valid int range: 1-3') | |
| else: raise RuntimeError(f'tq shell director(direct "shell<>" call) missing int vals') | |
| else: raise RuntimeError(f'tq shell director(direct "shell<>" call) missing params') | |
| else: raise RuntimeError(f'tq shell director(direct "shell<>" call) params invalid') | |
| def cmd_lang_tq_scan(self, shell_cmd: str, convr_smd: str, cn_lst: list): | |
| # scan<nsx=int 360-720, xmax=float 1.0-9.9, hbar=float 1.0-4.9, mbar=float 1.0-4.9> | |
| self.tt_dom_print(f'mapping tq- shell director //SCAN /smd priority: {convr_smd} /total plex: {len(cn_lst)}') | |
| self.parse_tq_complex_list(cn_lst) | |
| if shell_cmd.find(',') > -1: | |
| shell_cmd = shell_cmd.split(',') | |
| if len(shell_cmd) == 4: | |
| if shell_cmd[0].isdigit(): | |
| nsx = int(shell_cmd[0]) | |
| if nsx > 359 and nsx < 721: | |
| shell_cmd[0] = nsx | |
| scp = self.parse_float_params(False, shell_cmd[1:]) | |
| if not isinstance(scp[0], str): | |
| scp = tuple(scp) | |
| xmax, hbar, mbar = scp | |
| if xmax > 0.9 and xmax < 10.0: | |
| if hbar > 0.9 and hbar < 5.0: | |
| if mbar > 0.9 and mbar < 5.0: | |
| self.tt_dom_print(f'nsx={nsx}', 2) | |
| self.tt_dom_print(f'xmax={xmax}', 2) | |
| self.tt_dom_print(f'hbar={hbar}', 2) | |
| self.tt_dom_print(f'mbar={mbar}', 2) | |
| else: raise RuntimeError(f'tq shell director("scan<> call") invalid param range @ "mbar"; valid: float 1.0-4.9') | |
| else: raise RuntimeError(f'tq shell director("scan<> call") invalid param range @ "hbar"; valid: float 1.0-4.9') | |
| else: raise RuntimeError(f'tq shell director("scan<> call") invalid param range @ "xmax"; valid: float 1.0-9.9') | |
| else: | |
| scp, p_nm = int(scp[0]), None | |
| if scp == 0: p_nm = 'xmax' | |
| elif scp == 1: p_nm = 'hbar' | |
| else: p_nm = 'mbar' | |
| raise RuntimeError(f'tq shell director("scan<> call") invalid param "{p_nm}" negative float or non-float error') | |
| else: raise RuntimeError(f'tq shell director("scan<> call) invalid param range @ "nsx"; valid: int 360-720') | |
| else: raise RuntimeError(f'tq shell director("scan<>" call) nsx param invalid type; valid: int 360-720') | |
| else: raise RuntimeError(f'tq shell director("scan<>" call) missing params') | |
| else: raise RuntimeError(f'tq shell director("scan<>" call) params invalid') | |
| def parse_tq_complex_list(self, cn_lst: str) -> list: | |
| cnvrt_n_check = lambda s: (cil := list(map(complex, s)), any(c.real < 0 or c.imag < 0 for c in cil)) | |
| cn_t = cnvrt_n_check(cn_lst) | |
| return cn_t | |
| def parse_float_params(self, allowsNegative: bool, f: (str, list)) -> list: | |
| rtn, idx = [], -1 | |
| if isinstance(f, str): f = [f] | |
| for dn in f: | |
| idx+=1 | |
| if dn.find('.') < 0: | |
| return [str(idx)] | |
| pn = dn.replace('.', '') | |
| if not allowsNegative and pn.find('-') > -1: | |
| return [str(idx)] | |
| if pn.isdigit(): rtn.append(float(dn)) | |
| else: | |
| return [str(idx)] | |
| return rtn | |
| def run_tt_dom_inpr_testing(): | |
| #l_r1 = [5+2j, 3+2j, 1+6j, 4+6j, 6+2j, 8+4j, 3+9j, 4+2j] | |
| #l_r2 = [7+1j, 8+3j, 3+7j, 2+9j, 9+6j, 8+1j, 7+4j, 3+8j] | |
| #l_r3 = [[4+2j, 1+4j], [6+2j, 3+8j], [complex(2, 4), complex(5, 1)]] | |
| #r_vz = 2.6229108 | |
| #cls = TICTAC_UXCM1(True, 124.13, 8.08) | |
| #print(cls.adj_bw_exp(r_vz, l_r1, l_r2)) | |
| tt_dom_src = ['TQ-2(SHELL<2, 4, 22>) 7+4j, 3+2j, 5+8j, 6+1j, 2+5j, 4+9j, 3+6j, 1+4j: || comment1', | |
| 'TQ+3(SCAN<361, 3.6, 2.4, 2.9>) 4+2j, 7+8j, 3+2j, 8+2j, 4+7j, 3+2j, 9+1j, 3+4j: || comment2', | |
| '|| comment3 ..............................................................', | |
| ' ', | |
| ''] | |
| cls = TICTAC_DOM_KRNL() | |
| cls.lang_updt(tt_dom_src) | |
| #k_shell_frame_learn(123, 123) | |
| # FLIR IMAGING SEQUENCE TEST(tq- respawn cmd) | |
| #___________________________________________________________________________________________________ | |
| #gh, gw = 30, 40 | |
| #y = np.linspace(-1, 1, gh) | |
| #x = np.linspace(-1, 1, gw) | |
| #XX, YY = np.meshgrid(x, y) | |
| #radius = np.hypot(XX, YY) | |
| #amp = np.exp(-((radius-0.3)**2)/(2*0.08**2))+0.05*np.random.RandomState(2).randn(gh, gw) | |
| #phase = np.arctan2(YY, XX)+2.0*(XX**2-YY**2) | |
| #complex_grid = (amp*np.exp(1j*phase)).astype(np.complex128) | |
| #sim = FlirSimulator(complex_grid, H=256, W=384, rng_seed=7) | |
| #frames = sim.simulate(frames=90, start_idx=100, end_idx=999, save_dir='tt_dom_flir', save_pngs=True) | |
| run_tt_dom_inpr_testing() |
Author
Author
If you can fully explain the shell redock, with the section above it ---- how is exactly used with the real Ares AI of the B-Spline integration.....that would pretty much make you God. Seeing you are a Wall Street Babylon Whore of a Red Carpet Beast....is logical I'm sent here and so is posted. FU
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Couldn't have stated better.