Skip to content

Instantly share code, notes, and snippets.

@lastforkbender
Last active October 11, 2025 12:30
Show Gist options
  • Select an option

  • Save lastforkbender/41369e650a6a6d17ddca75195c022567 to your computer and use it in GitHub Desktop.

Select an option

Save lastforkbender/41369e650a6a6d17ddca75195c022567 to your computer and use it in GitHub Desktop.
Tictac interpreter 1.02
# tt_dom_inpr_1_02.py /TT-DOM Plex Interpreter 1.02 with high-fidelity complex flir sequence imagings /Python 3.13
from PIL import Image, ImageFilter, ImageDraw, ImageFont
from collections import defaultdict, namedtuple, deque
from torch.utils.data import Dataset, DataLoader
from scipy.linalg import cho_factor, cho_solve
from xml.etree import ElementTree as ET
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import torch.nn.functional as F
from numba import njit, prange
from numpy.linalg import svd
from xml.dom import minidom
from scipy import ndimage
import concurrent.futures
from pathlib import Path
import torch.nn as nn
import numpy as np
import threading
import imageio
import hashlib
import random
import torch
import cmath
import math
import time
import os
import re
epsilon = 1e-12 # Used by AR3z and other classes with AI integrations
# Unitary logarithmic complex cycle constants(*must be diagonally paired)
U1_LCX1 = [2+9j, 5+2j, 6+8j, 7+5j, 8+1j, 9+3j, 4+2j, 1+5j, 3+8j]
U2_LCX2 = [9+2j, 2+5j, 8+6j, 5+7j, 1+8j, 3+9j, 2+4j, 5+1j, 8+3j]
# AR3z Configurable Globals
MAX_LAYER = 3
DEQUE_LEN = 512
CYCLE_MAPPS = 1.0
FEATURE_DIM = 6
CORE_DIM = 4 # tiny param vector length @ CognitiveCore
AGE_TAU = 200.0 # timescale for length pinning/penalty
ESCAPE_THRESH = 0.8 # residual threshold for escape candidates
ESCAPE_K = 3.0 # sharpness for tanh escape
ANCHOR_BLEND = 0.03 # anchor refresh blending
# Used by DIANE RL-NN for spawned hybrid gate controllers of the Shell B-Spline NN
# and other important circumstances AI propagation dynamics related of this module
State = namedtuple('State', ['id', 'features'])
Action = namedtuple('Action', ['id', 'type'])
Reward = namedtuple('Reward', ['value', 'type'])
Decision = namedtuple('Decision', ['state', 'action', 'reward'])
# Thread pooling for async B-Spline controller calls
_bspline_executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
# Default B-Spline handler(plex interpreter-integrator struct/symmetry instruction sets replace this altogether)
def default_bspline_handler(h, w, complex_array):
# complex_array; numpy ndarray dtype np.complex64 <-> np.float32
return {'action': 'none'}
# Current module(tt_dom_inpr_1_02.py) working directory
TT_DOM_PTH = os.path.dirname(os.path.abspath(__file__))
#_______________________________________________________________________________________________________
#///////////////////////////////////////////////////////////////////////////////////////////////////////
# TICTAC DOM - FLIR IMAGE GENERATOR/TQ-SHELL<RESPAWN>
#_______________________________________________________________________________________________________
#///////////////////////////////////////////////////////////////////////////////////////////////////////
@njit()
def blur_h(img, temp_buf, kernel):
H, W = img.shape
K = kernel.shape[0]
radius = K//2
for y in prange(H):
for x in range(W):
s = 0.0
for k in range(K):
ix = x+(k-radius)
if ix < 0: ix = 0
elif ix >= W: ix = W-1
s+=img[y, ix]*kernel[k]
temp_buf[y, x] = s
@njit()
def blur_v(temp_buf, out, kernel):
H, W = out.shape
K = kernel.shape[0]
radius = K//2
for y in prange(H):
for x in range(W):
s = 0.0
for k in range(K):
iy = y+(k-radius)
if iy < 0: iy = 0
elif iy >= H: iy = H - 1
s+=temp_buf[iy, x]*kernel[k]
out[y, x] = s
def separable_gaussian_blur(img, temp_buf, kernel):
if img.dtype != np.float32:
img[:] = img.astype(np.float32)
if temp_buf.dtype != np.float32:
temp_buf[:] = temp_buf.astype(np.float32)
if kernel.dtype != np.float32:
kernel = kernel.astype(np.float32)
blur_h(img, temp_buf, kernel)
blur_v(temp_buf, img, kernel)
@njit()
def rasterize_rotated_ellipse(temp_map, cy, cx, angle_rad, sigma_x, sigma_y, peak):
H, W = temp_map.shape
r_x = int(math.ceil(3.0*sigma_x))
r_y = int(math.ceil(3.0*sigma_y))
y0 = max(0, int(math.floor(cy))-r_y)
y1 = min(H-1, int(math.ceil(cy))+r_y)
x0 = max(0, int(math.floor(cx))-r_x)
x1 = min(W-1, int(math.ceil(cx))+r_x)
cos_t = math.cos(angle_rad)
sin_t = math.sin(angle_rad)
sx2 = 2.0*(sigma_x**2)
sy2 = 2.0*(sigma_y**2)
for yi in range(y0, y1+1):
for xi in range(x0, x1+1):
xr = cos_t*(xi-cx)+sin_t*(yi-cy)
yr = -sin_t*(xi-cx)+cos_t*(yi-cy)
val = math.exp(-(xr*xr)/sx2-(yr*yr)/sy2)
temp_map[yi, xi]+=peak*val
def make_gaussian_kernel(sigma, truncate=3.0):
radius = int(max(1, math.ceil(truncate*sigma)))
x = np.arange(-radius, radius+1, dtype=np.float32)
k = np.exp(-0.5*(x/(sigma+1e-12))**2).astype(np.float32)
k/=k.sum()
return k
def make_sea_background(H, W, base_temp=290.0, wave_amp=0.6, rng=None):
if rng is None: rng = np.random.default_rng(0)
y = np.linspace(0, 4*math.pi, H, dtype=np.float32)
x = np.linspace(0, 4*math.pi, W, dtype=np.float32)
X, Y = np.meshgrid(x, y)
waves = wave_amp*(np.sin(2.5*X+0.6*Y)+0.6*np.sin(1.1*X-0.4*Y))
noise = 0.03*rng.standard_normal((H, W)).astype(np.float32)
return (base_temp+waves+noise).astype(np.float32)
def radiance_from_temp(T, k=1.0):
return ((T-273.15)*k).astype(np.float32)
def add_noise(img, rng, gain=10.0, read_noise_std=2.0):
electrons = np.clip(img*gain, 0, None)
flat = rng.poisson(electrons.ravel()).astype(np.float32)
shot = flat.reshape(img.shape)
read = rng.normal(0.0, read_noise_std, img.shape).astype(np.float32)
return shot+read
def normalize_to_uint8(img, vmin=None, vmax=None):
if vmin is None: vmin = float(img.min())
if vmax is None: vmax = float(img.max())
out = 255.0*(img-vmin)/max(1e-8, (vmax-vmin))
return np.clip(out, 0, 255).astype(np.uint8)
def draw_caption_on_uint8(img_u8, center, frame_idx, start_idx, end_idx):
pil = Image.fromarray(img_u8).convert('L')
draw = ImageDraw.Draw(pil)
W, H = pil.size
cx, cy = int(center[1]), int(center[0])
l = 15
draw.line([(cx-l, cy), (cx+l, cy)], fill=255)
draw.line([(cx, cy-l), (cx, cy+l)], fill=255)
draw.ellipse((cx-4, cy-4, cx+4, cy+4), outline=255)
# !WARNING! Changing below caption will be done if so at you're own risk
# Is highly advised you don't considering the mathematics that
# conforms to this module elsewhere and highly specialized; to
# and/or, other comments that specifically show Hebraic speech
caption = f'sha-LE-det TQ SHELL: {frame_idx} [{start_idx}->{end_idx}]'
try:
font = ImageFont.load_default()
except Exception:
font = None
try:
if font is not None:
try:
tw, th = font.getsize(caption)
except Exception:
bbox = draw.textbbox((0, 0), caption, font=font)
tw, th = bbox[2]-bbox[0], bbox[3]-bbox[1]
else:
bbox = draw.textbbox((0, 0), caption)
tw, th = bbox[2]-bbox[0], bbox[3]-bbox[1]
except Exception:
tw, th = len(caption)*6, 10
draw.rectangle(((W-tw)//2-4, 2, (W+tw)//2+4, 2+th+4), fill=0)
draw.text(((W-tw)//2, 4), caption, fill=255, font=font)
return np.asarray(pil, dtype=np.uint8)
class FlirSimulator:
def __init__(self, complex_grid, H=256, W=384, rng_seed=1):
self.grid = complex_grid
self.H, self.W = int(H), int(W)
self.rng_seed = int(rng_seed)
amp = np.abs(self.grid).astype(np.float32)
a_min, a_max = float(amp.min()), float(amp.max())
self.amp_norm = (amp-a_min)/(a_max-a_min+1e-12)
self.phase = np.angle(self.grid).astype(np.float32)
gh, gw = self.amp_norm.shape
yy, xx = np.indices((gh, gw))
total = float(self.amp_norm.sum())+1e-12
self.cy_g = float((self.amp_norm*yy).sum()/total)
self.cx_g = float((self.amp_norm*xx).sum()/total)
self.span = float(self.amp_norm.std())
self.gh, self.gw = gh, gw
def simulate(self, frames=60, start_idx=0, end_idx=999, save_dir=None, save_pngs=False):
rng = np.random.default_rng(self.rng_seed)
H, W = self.H, self.W
gh, gw = self.gh, self.gw
cy_img = (self.cy_g/max(1, gh-1))*(H-1)
cx_img = (self.cx_g/max(1, gw-1))*(W-1)
mean_phase = float(self.phase.mean())
amp_factor = float(self.span*min(H, W)*0.4)
base_temp = make_sea_background(H, W, base_temp=290.5, wave_amp=0.6, rng=rng).astype(np.float32)
optical = np.empty((H, W), dtype=np.float32)
temp_buf = np.empty_like(optical)
frames_out, kernel_cache = [], {}
ktest = make_gaussian_kernel(1.5)
tmp_img = np.zeros((8, 8), dtype=np.float32)
tmp_buf = np.zeros_like(tmp_img)
separable_gaussian_blur(tmp_img, tmp_buf, ktest)
if save_pngs:
if save_dir is None:
raise RuntimeError('<TT-DOM 1.02> TQ Shell flir image sequence output directory is not specified')
os.makedirs(save_dir, exist_ok=True)
t0 = time.time()
for t in range(frames):
temp = base_temp.copy()
frac = t/max(1, frames-1)
gx = int((frac*(gw-1))%gw)
gy = int(((frac*1.3)*(gh-1))%gh)
local_amp = float(self.amp_norm[gy, gx])
local_phase = float(self.phase[gy, gx])
dx = amp_factor*(0.8*math.cos(2*math.pi*frac+local_phase)+0.2*(local_amp-0.5))
dy = amp_factor*(0.6*math.sin(2*math.pi*frac*1.2+local_phase*0.7)+0.2*(local_amp-0.5))
cy = cy_img+dy
cx = cx_img+dx
base_length = max(8.0, amp_factor*0.8)
base_width = max(3.0, amp_factor*0.25)
length = base_length*(1.0+1.5*local_amp)
width = base_width*(1.0+1.2*local_amp)
angle = (mean_phase)+math.radians(180.0*math.sin(2*math.pi*frac*0.7+mean_phase))
sigma_x = max(1.0, length/2.5)
sigma_y = max(1.0, width/2.5)
rasterize_rotated_ellipse(temp, cy, cx, angle, sigma_x, sigma_y, peak=7.0*(1.0+local_amp))
rad = radiance_from_temp(temp, k=2.5)
optical[:, :] = rad
psf_sigma = 1.2+0.8*abs(math.sin(frac*2.0+mean_phase))
sigma_key = float(round(psf_sigma, 3))
if sigma_key not in kernel_cache:
kernel_cache[sigma_key] = make_gaussian_kernel(psf_sigma)
kernel = kernel_cache[sigma_key]
separable_gaussian_blur(optical, temp_buf, kernel)
small_k = make_gaussian_kernel(0.6)
separable_gaussian_blur(optical, temp_buf, small_k)
noisy = add_noise(optical, rng, gain=12.0, read_noise_std=3.0)
frame_f32 = noisy.astype(np.float32)
frames_out.append((frame_f32, (cy, cx)))
if save_pngs:
u8 = normalize_to_uint8(frame_f32)
u8c = draw_caption_on_uint8(u8, (cy, cx), frame_idx=t+start_idx, start_idx=start_idx, end_idx=end_idx)
Image.fromarray(u8c).save(os.path.join(save_dir, f'tq_shell_flir_seq_{t:03d}.png'))
if t%10 == 0: print(f'TQ Shell flir frame generated: {t}/{frames-1}')
dt = time.time() - t0
print(f'Simulated TQ Shell flir {frames} frames in {dt:.3f}s ({dt/frames:.4f}s/frame)')
return frames_out
#_______________________________________________________________________________________________________
#///////////////////////////////////////////////////////////////////////////////////////////////////////
# TICTAC DOM - FLIR INITIAL KINETIC VERTICAL ALPHA nK-RESONANCE(PNG/GIF)/TQ-SHELL<SCAN>
VNK_OUT_W, VNK_OUT_H = 267, 122
VNK_EVAL_SCALE = 0.5
VNK_GRID_EXTENT = (-1.5, 1.5, -0.5, 1.5)
VNK_PATCH_RADIUS_FACTOR = 0.09
VNK_EPS_SCALE = 0.4
VNK_LAMBDA_SCALE = 1e-6
VNK_FRAMES = 32
VNK_DURATION = 0.07
VNK_FLIR_COLORMAP = plt.get_cmap('inferno')
VNK_SEA_COLOR = (32, 8, 23)
VNK_MOTION_BLUR_STEPS = 1
VNK_FONT_PATH = None
VNK_LBL_TXT = 'nK Tenacity:'
VNK_TMP_DIR = "tic_tac_frames_tmp_singlethread"
def vnk_complex_to_patch_params(C):
Z = C.ravel()
x, y = np.real(Z), np.imag(Z)
amp, ph = np.abs(Z), np.angle(Z)
xmin, xmax, ymin, ymax = VNK_GRID_EXTENT
x_norm = xmin+(x-x.min())/(x.max()-x.min()+1e-12)*(xmax-xmin)
y_norm = ymin+(y-y.min())/(y.max()-y.min()+1e-12)*(ymax-ymin)
return np.vstack([x_norm, y_norm, amp, ph]).T
def vnk_make_sea_background(w, h):
arr, sc = np.zeros((h, w, 3), dtype=np.uint8), VNK_SEA_COLOR
for j in range(h):
t = j/(h-1)
col = np.array(sc)*(0.6+0.4*(1-t))
noise = np.random.normal(0, 2, (w, 3))
arr[j, :, :] = np.clip(col+noise, 0, 255)
return Image.fromarray(arr)
def vnk_gaussian_kernel_matrix(X, eps):
d2 = np.sum(X**2, 1)[:, None]+np.sum(X**2, 1)[None, :]-2*X.dot(X.T)
return np.exp(-d2/(2*eps*eps))
def vnk_solve_patch_rbf_cholesky(Xp, yp, eps, lam):
K = vnk_gaussian_kernel_matrix(Xp, eps)
K[np.diag_indices_from(K)]+=lam
c, low = cho_factor(K, overwrite_a=True, check_finite=False)
alpha = cho_solve((c, low), yp, check_finite=False)
return alpha
def vnk_build_patches_from_complex(C, rng=None):
if rng is None: rng = np.random.RandomState(1)
params = complex_to_patch_params(C)
xmin, xmax, ymin, ymax = VNK_GRID_EXTENT
span = max(xmax-xmin, ymax-ymin)
base_radius = VNK_PATCH_RADIUS_FACTOR*span
patches = []
for p in params:
m = 28
cx, cy, amp, ph = p
vy = 0.25+0.7*amp
vx = 0.6*(np.sign(np.sin(ph)))*(np.abs(np.sin(ph))**2)
theta = rng.uniform(0, 2*np.pi, m)
r = rng.normal(0, base_radius*0.32, m)+base_radius*0.12
px, py = cx+r*np.cos(theta), cy+r*np.sin(theta)
angles = np.arctan2(vy, vx+1e-12)
sigma_along = base_radius*(0.5+0.6*amp)
sigma_cross = base_radius*(0.18+0.2*(1-amp))
dx, dy = px-cx, py-cy
ca, sa = np.cos(angles), np.sin(angles)
ux, uy = ca*dx+sa*dy, -sa*dx+ca*dy
vals = amp*np.exp(-0.5*((ux/sigma_along)**2+(uy/sigma_cross)**2))
vals+=amp*np.exp(-((dx*dx+dy*dy)/(2*(0.4*base_radius)**2)))
patches.append({'center': np.array([cx, cy]), 'Xp': np.vstack([px, py]).T,
'yp': vals, 'vel': np.array([vx, vy]), 'amp': amp})
return patches
def vnk_make_eval_grid(eval_w, eval_h):
xmin, xmax, ymin, ymax = VNK_GRID_EXTENT
xs = np.linspace(xmin, xmax, eval_w)
ys = np.linspace(ymin, ymax, eval_h)
gx, gy = np.meshgrid(xs, ys)
return np.vstack([gx.ravel(), gy.ravel()]).T.astype(np.float64)
@njit(parallel=True)
def vnk_eval_grid(patch_centers, patch_centers_r, patches_X_flat, patches_X_offsets,
patches_sizes, patches_alpha_flat, patches_alpha_offsets, patches_eps, grid_pts):
M = grid_pts.shape[0]
out = np.zeros(M, dtype=np.float64)
W = np.zeros(M, dtype=np.float64)
P = patch_centers.shape[0]
for p in prange(P):
cx, cy, r = patch_centers[p, 0], patch_centers[p, 1], patch_centers_r[p]
off, sz, aoff, eps = patches_X_offsets[p], patches_sizes[p], patches_alpha_offsets[p], patches_eps[p]
for i in range(M):
gx, gy = grid_pts[i, 0], grid_pts[i, 1]
dx, dy = gx-cx, gy-cy
w, s_val = np.exp(-(dx*dx+dy*dx)/(2.0*(r*r))), 0.0
for j in range(sz):
xj, yj = patches_X_flat[off+2*j], patches_X_flat[off+2*j+1]
dxj, dyj = gx-xj, gy-yj
s_val+=patches_alpha_flat[aoff+j]*np.exp(-(dxj*dxj+dyj*dyj)/(2.0*eps*eps))
out[i]+=w*s_val
W[i]+=w
for i in range(M):
if W[i] > 0.0: out[i] = out[i]/W[i]
return out
def vnk_synthesize_complex_matrix(Hc=18, Wc=36):
C = np.zeros((Hc,Wc), dtype=np.complex128)
xs = np.linspace(-1.0, 1.0, Wc)
ys = np.linspace(0.0, 1.0, Hc)
for i in range(Hc):
for j in range(Wc):
amp = 0.35+0.65*np.exp(-((ys[i]-0.6)**2)/0.02)*(1-0.3*abs(xs[j]))
C[i,j] = amp*np.exp(1j*(2.0*(xs[j]**2)*np.pi*(0.5+0.5*i/Hc)))*(0.85+0.4*np.random.rand())
return C
class FlirRendererVNK:
def __init__(self, out_w=VNK_OUT_W, out_h=VNK_OUT_H, eval_scale=VNK_EVAL_SCALE, tmp_dir=VNK_TMP_DIR):
self.out_w, self.out_h = out_w, out_h
self.eval_w = max(8, int(out_w*eval_scale))
self.eval_h = max(8, int(out_h*eval_scale))
self.tmp_dir = tmp_dir
os.makedirs(self.tmp_dir, exist_ok=True)
self.font = self._load_font()
self._precomputed = {}
def _load_font(self):
try:
return ImageFont.truetype(VNK_FONT_PATH, 22) if VNK_FONT_PATH else ImageFont.load_default()
except Exception:
return ImageFont.load_default()
def prepare_patches(self, patches):
P = len(patches)
patch_centers = np.zeros((P, 2), dtype=np.float64)
patch_centers_r_template = np.zeros(P, dtype=np.float64)
patches_X_list, patches_alpha_list = [], []
sizes = np.zeros(P, dtype=np.int64)
X_offsets = np.zeros(P, dtype=np.int64)
a_offsets = np.zeros(P, dtype=np.int64)
off, aoff = 0, 0
for i,p in enumerate(patches):
patch_centers[i,:] = p['center'].astype(np.float64)
Xp, yp = p['Xp'].astype(np.float64), p['yp'].astype(np.float64)
avg_r = np.mean(np.linalg.norm(Xp-p['center'], axis=1))
eps = VNK_EPS_SCALE*max(avg_r, 1e-4)
lam = VNK_LAMBDA_SCALE*(eps**2)
alpha = vnk_solve_patch_rbf_cholesky(Xp, yp, eps, lam)
p['alpha'] = alpha
p['eps'] = eps
p['avg_r'] = avg_r
sizes[i] = Xp.shape[0]
X_offsets[i] = off
a_offsets[i] = aoff
patches_X_list.append(Xp.ravel())
patches_alpha_list.append(alpha.ravel())
off+=Xp.size
aoff+=alpha.size
patch_centers_r_template[i] = 2.5*avg_r
patches_X_flat = np.concatenate(patches_X_list) if len(patches_X_list) > 0 else np.array([],dtype=np.float64)
patches_alpha_flat = np.concatenate(patches_alpha_list) if len(patches_alpha_list) > 0 else np.array([],dtype=np.float64)
self._precomputed.update({'patches_X_flat':patches_X_flat,
'patches_alpha_flat':patches_alpha_flat,
'sizes':sizes,
'X_offsets':X_offsets,
'a_offsets':a_offsets,
'patch_centers_r_template':patch_centers_r_template,
'patch_eps_arr':np.array([p['eps'] for p in patches], dtype=np.float64),
'patch_vels':np.array([p['vel'] for p in patches], dtype=np.float64),
'orig_centers':np.array([p['center'] for p in patches], dtype=np.float64),
'_patches':patches})
grid_pts = vnk_make_eval_grid(self.eval_w, self.eval_h)
self._precomputed['grid_pts'] = grid_pts
dummy_centers = np.zeros_like(self._precomputed['orig_centers'])
dummy_r = self._precomputed['patch_centers_r_template']
vnk_eval_grid(dummy_centers, dummy_r, self._precomputed['patches_X_flat'], self._precomputed['X_offsets'],
self._precomputed['sizes'], self._precomputed['patches_alpha_flat'],
self._precomputed['a_offsets'], self._precomputed['patch_eps_arr'],
grid_pts)
def _render_frame_eval(self, t_center, blur_width):
acc = np.zeros((self.eval_h*self.eval_w,), dtype=np.float64)
weights = np.linspace(-0.5, 0.5, VNK_MOTION_BLUR_STEPS)
orig_centers = self._precomputed['orig_centers']
vel_arr = self._precomputed['patch_vels']
for w_frac in weights:
t = t_center+w_frac*blur_width
centers = orig_centers+vel_arr*t
out = vnk_eval_grid(centers.astype(np.float64), self._precomputed['patch_centers_r_template'],
self._precomputed['patches_X_flat'],
self._precomputed['X_offsets'],
self._precomputed['sizes'],
self._precomputed['patches_alpha_flat'],
self._precomputed['a_offsets'],
self._precomputed['patch_eps_arr'],
self._precomputed['grid_pts'])
acc+=out
acc/=float(len(weights))
out_img = acc.reshape((self.eval_h, self.eval_w))
# Re-edit; normalize with an standard max-min yes, safe @ NumPy2.0
mn, mx = float(out_img.min()), float(out_img.max())
norm = (out_img-mn)/((mx-mn)+1e-12)
flir_rgb = (VNK_FLIR_COLORMAP(norm)[:,:,:3]*255).astype(np.uint8)
img = Image.fromarray(flir_rgb)
img = img.resize((self.out_w, self.out_h), resample=Image.BICUBIC)
bloom = img.filter(ImageFilter.GaussianBlur(radius=6))
img = Image.blend(img, bloom, alpha=0.28)
img = img.filter(ImageFilter.UnsharpMask(radius=1.5, percent=180, threshold=2))
norm_up = Image.fromarray((np.clip((norm*255).astype(np.uint8),0,255))).resize((self.out_w,self.out_h), resample=Image.BILINEAR)
sea = vnk_make_sea_background(self.out_w, self.out_h).convert('RGB')
sea_gray = sea.convert('L')
sea_rgb = Image.merge('RGB', (sea_gray, sea_gray, sea_gray))
mask = Image.fromarray(np.array(norm_up) > int(0.10*255)).convert('L')
composite = Image.composite(img, sea_rgb, mask)
return composite, out_img, (centers, vel_arr)
def overlay_crosshair(self, frame_img, centers, vel_arr):
draw = ImageDraw.Draw(frame_img)
w, h = frame_img.size
strengths = [np.sum(np.abs(p['alpha'])) for p in self._precomputed['_patches']]
if len(strengths) == 0:
return frame_img, None
cx, cy = centers[int(np.argmax(strengths))]
xmin, xmax, ymin, ymax = VNK_GRID_EXTENT
px, py = int((cx-xmin)/(xmax-xmin)*w), int((cy-ymin)/(ymax-ymin)*h)
size, cross_color = int(0.03*max(w,h)), (30, 144, 255)
draw.line([(px-size, py), (px+size, py)], fill=cross_color, width=2)
draw.line([(px, py-size), (px, py+size)], fill=cross_color, width=2)
draw.ellipse([px-6, py-6, px+6, py+6], outline=cross_color, width=2)
return frame_img, (px,py, vel_arr[i], centers[i])
def overlay_top_label_and_scale(self, frame_img, frame_idx, total_frames, kinetics_info=None):
draw = ImageDraw.Draw(frame_img, 'RGBA')
w, h = frame_img.size
box_w, box_h = int(w*0.5), 44
box_x0, box_y0 = (w-box_w)//2, 8
box_x1, box_y1 = box_x0+box_w, box_y0+box_h
draw.rectangle([box_x0, box_y0, box_x1, box_y1], fill=(0, 0, 0, 160))
ymin, ymax = GRID_EXTENT[2], GRID_EXTENT[3]
#for i in range(...): sval = f"{ymax-(i/(ny-1))*(ymax-ymin):.2f}"
fc_txt = f'nK Resonance Frame {frame_idx+1}/{total_frames}'
draw.text((box_x0+2, box_y0+8), fc_txt, fill=(200, 220, 255), font=self.font)
if kinetics_info is not None:
px, py, vel, center = kinetics_info
vx, vy = vel
arrow_len = 45
ax, ay = int(px+np.sign(vx)*arrow_len), int(py-int(vy*arrow_len))
draw.line([(px, py),(ax, ay)], fill=(200, 220, 255), width=2)
draw.polygon([(ax, ay), (ax-6, ay-6), (ax+6, ay-6)], fill=(200, 220, 255))
draw.text((w-290, box_y0+98), f"Height: {center[1]:.3f}", fill=(200, 220, 255), font=self.font)
#_______________________________________________________________________________________________________
#///////////////////////////////////////////////////////////////////////////////////////////////////////
# TICTAC DOM - AR3z - ADAPTIVE RADIAL EVENT DRIVEN RESPONSIVE AI WITH FULL INLINE ENV XML ML SNAPSHOTS
XML_OUT = os.environ.get('AR3z_XML_OUT', 'AR3z_snapshot.xml')
XML_MODE = os.environ.get('AR3z_XML_MODE', 'on_demand')
XML_INTERVAL = float(os.environ.get('AR3z_XML_INTERVAL', '10.0'))
def complex_seg(z):
return {'real': float(np.real(z)), 'imag': float(np.imag(z)), 'mag': float(abs(z))}
def softmax_logits(xs):
xs = np.array(xs, dtype=float)
xs = xs-xs.max()
ex = np.exp(xs)
return ex/(ex.sum()+1e-16)
def median_baseline(state, col, dim_type):
rows, vals = state['chips'].shape[0], []
for r in range(rows):
z = state['chips'][r, col]
vals.append(float(np.real(z)) if dim_type=='real' else float(np.imag(z)))
return float(np.median(vals))
def apply_seg_and_remove(chips, row, col, renown='zero'):
z = chips[row,col]
seg, new = complex_seg(z), chips.copy()
if renown=='zero': new[row,col] = 0+0j
elif renown=='sentinel': new[row,col] = 1e6+1e6j
elif renown=='interp':
row_vals = np.delete(new[row,:], col)
new[row, col] = row_vals.mean() if row_vals.size > 0 else 0+0j
return seg, new
class AR3zTransform:
def __init__(self, name=None, is_compression=False):
self.name = name or self.__class__.__name__
self.is_compression = is_compression
def transform(self, state, row, col, layer, dim_type, modulation=1+0j):
raise NotImplementedError
class ScaleOverTrTransform(AR3zTransform):
def __init__(self, name=None, is_compression=False):
super().__init__(name, is_compression)
def transform(self, state, row, col, layer, dim_type, modulation=1+0j):
a = float(state['a'][col])
BX = float(state['BX'][col])
if 'Tr_layers' in state: Tr = float(state['Tr_layers'][layer][col])
else: Tr = float(state.get('Tr_scalar', np.ones(state['D']))[col])
val = (a+BX/(Tr+epsilon))*modulation
score = float(1.0/(1.0+Tr))
if dim_type=='real':
return float(np.real(val)), score
else:
return float(np.imag(val)), score
class RadialUnitTransform(AR3zTransform):
def __init__(self, name=None, is_compression=True):
super().__init__(name, is_compression)
def transform(self, state, row, col, layer, dim_type, modulation=1+0j):
z = state['chips'][row, col]*modulation
r = abs(z)
strength = float(state.get('compression_strength', 0.5))
new_r = r*(1.0-strength)
new_z = 0+0j if r < epsilon else z*(new_r/r)
score = float(1.0-strength)
return (float(np.real(new_z)) if dim_type=='real' else float(np.imag(new_z)), score)
class IdentityTransform(AR3zTransform):
def __init__(self, name=None, is_compression=False):
super().__init__(name, is_compression)
def transform(self, state, row, col, layer, dim_type, modulation=1+0j):
z = state['chips'][row, col]*modulation
return (float(np.real(z)) if dim_type=='real' else float(np.imag(z)), 0.5)
class CognitiveCore:
def __init__(self, core_dim=CORE_DIM, max_layer=MAX_LAYER):
self.core_dim = core_dim
self.params = (np.random.randn(core_dim)+1j*np.random.randn(core_dim))*0.01
self.lr_layers = np.full(max_layer, 1e-2)
self.confidence = 0.5
self.l2 = 1e-4
def feat_to_complexvec(self, feat_real):
fv = np.zeros(self.core_dim, dtype=complex)
flat = np.asarray(feat_real).astype(float)
for i in range(self.core_dim):
a = flat[i%flat.size] if flat.size > 0 else 0.0
b = flat[(i+1)%flat.size] if flat.size > 1 else 0.0
fv[i] = a+1j*b
return fv
def modulate(self, value_complex, baseline_complex, layer_norm, age_norm):
feat = np.array([float(np.real(value_complex)), float(np.imag(value_complex)),
float(np.real(baseline_complex)), float(np.imag(baseline_complex)),
float(layer_norm), float(age_norm)])
cvec = self.feat_to_complexvec(feat)
dot = np.vdot(self.params, cvec)
alpha = 0.15
s = 1.0+alpha*dot
mag = abs(s)
if mag > 5.0: s = s/mag*5.0
conf = self.confidence*(1.0/(1.0+np.linalg.norm(self.params)))
return s, float(conf)
def adapt(self, chosen_complex, target_complex, layer, residual, age_norm):
feat = np.array([float(np.real(chosen_complex)), float(np.imag(chosen_complex)),
float(np.real(target_complex)), float(np.imag(target_complex)),
float(layer)/max(1,MAX_LAYER), float(age_norm)])
cvec = self.feat_to_complexvec(feat)
pred = np.vdot(self.params, cvec)
alpha = 0.15
err = chosen_complex-target_complex
grad = alpha*np.conjugate(err)*np.conjugate(cvec)
lr = self.lr_layers[layer] if layer < len(self.lr_layers) else self.lr_layers[-1]
self.params-=lr*(grad+self.l2*self.params)
self.confidence = 0.9*self.confidence+0.1*(1.0/(1.0+residual))
param_norm = np.linalg.norm(self.params)
if param_norm > 10.0: self.params*=(10.0/param_norm)
class OnlinePredictor:
def __init__(self, dim=FEATURE_DIM, lr=0.02, l2=1e-4):
self.w, self.b, self.lr, self.l2, self.count, self.m, self.s2 = np.zeros(dim), 0.0, lr, l2, 0, 0.0, 1e-6
def features(self, item):
mag = abs(item['value'])
layer_norm = item.get('layer', 0)/max(1, item.get('max_layer', 1))
dim_type_flag = 0.0 if item.get('dim_type','real') == 'real' else 1.0
prior = item.get('residual', 0.0)
age_norm = item.get('age', 0)/max(1, item.get('total_age', 1))
col_norm = item.get('col', 0)/max(1, item.get('D', 1))
core_norm = item.get('core_norm', 0.0)
feats = np.array([mag, layer_norm, dim_type_flag, prior, age_norm, col_norm])
if len(feats) < len(self.w): feats = np.concatenate([feats, np.zeros(len(self.w)-len(feats))])
return feats[:len(self.w)]
def predict(self, item):
x = self.features(item)
return float(x.dot(self.w)+self.b)
def update(self, item, target):
x = self.features(item)
pred = x.dot(self.w)+self.b
err = pred-target
self.w-=self.lr*(err*x+self.l2*self.w)
self.b-=self.lr*err
self.count+=1
old_m = self.m
self.m+=(target-self.m)/self.count
self.s2+=(target-old_m)*(target-self.m)
def uncertainty(self):
return float(np.sqrt(self.s2/(self.count+1))) if self.count > 0 else 1.0
class PerChipDeque:
def __init__(self, rows, cols, maxlen=64):
self.rows, self.cols, self.maxlen = rows, cols, maxlen
self.buffers = [[deque(maxlen=maxlen) for _ in range(cols)] for _ in range(rows)]
def push(self, row, col, entry):
buf = self.buffers[row][col]
scrapped = buf[0] if len(buf) == buf.maxlen else None
if len(buf) == buf.maxlen: buf.popleft()
buf.append(entry)
return scrapped
def sample(self, row, col):
return list(self.buffers[row][col])
class RunningDeque:
def __init__(self, maxlen=DEQUE_LEN):
self.d, self.maxlen, self.age_counter = deque(), maxlen, 0
def push(self, item):
item = dict(item)
item['age'] = self.age_counter
self.age_counter+=1
if len(self.d) >= self.maxlen: ev = self.d.popleft()
else: ev = None
if 'priority_score' in item:
age_pen = np.exp(-item['age']/AGE_TAU)
item['priority_score'] = item['priority_score']*age_pen
self.d.append(item)
return ev
def pop_policy(self, policy='priority'):
if not self.d:
return None
if policy=='fifo':
return self.d.popleft()
idx = None
if policy == 'max_residual': idx = max(range(len(self.d)), key=lambda i: self.d[i].get('residual', 0.0))
if idx == None: idx = max(range(len(self.d)), key=lambda i: self.d[i].get('priority_score', 1.0))
lst = list(self.d)
del lst[idx]
self.d = deque(lst)
return self.d.pop
def items(self):
return list(self.d)
def __len__(self):
return len(self.d)
class AR3zNode:
def __init__(self, name, transforms, ki=None, residual_limit=1.0, layer_mask=None):
self.name = name
self.transforms = transforms
self.ki = ki or []
self.residual_limit = residual_limit
self.layer_mask = layer_mask if layer_mask is not None else [True]*MAX_LAYER
self.core = CognitiveCore()
self.max_depth = 8
def evaluate(self, state, row, col, layer, dim_type, baseline, mapps, depth=0, ordering='priority'):
if depth > self.max_depth:
return {'name':self.name, 'skipped':True}
baseline_local = baseline if baseline is not None else median_baseline(state, col, dim_type)
candidates, age_norm = [], 0.0
if 'controller' in state and state['controller'] is not None: age_norm = 0.0
for t in self.transforms:
if not self.layer_mask[layer]:
continue
chip_z = state['chips'][row, col]
baseline_z = complex(baseline_local, 0.0) if dim_type=='real' else complex(0.0, baseline_local)
s, conf = self.core.modulate(chip_z, baseline_z, layer/MAX_LAYER, age_norm)
val, score = t.transform(state, row, col, layer, dim_type, modulation=s)
res = abs(val - baseline_local)
candidates.append({'transform':t, 'value':val, 'score':score, 'residual':res, 'layer':layer,
'core_conf':conf, 'is_compression':t.is_compression, 'chosen_mod':s})
anchor = state.get('pretrained_anchor')
if anchor is not None:
z_anchor = anchor[row, col]
chip_val = state['chips'][row, col]
for dim in ('real','imag'):
pass
chip_scalar = float(np.real(chip_val)) if dim_type=='real' else float(np.imag(chip_val))
anchor_scalar = float(np.real(z_anchor)) if dim_type=='real' else float(np.imag(z_anchor))
resid_to_anchor = abs(chip_scalar - anchor_scalar)
if resid_to_anchor > ESCAPE_THRESH:
f = np.tanh(ESCAPE_K*(resid_to_anchor-ESCAPE_THRESH))
escape_val = chip_scalar*(1.0-f)+anchor_scalar*f
escape_score = 0.6+0.4*(1.0-np.exp(-resid_to_anchor))
escape_res = abs(escape_val-baseline_local)
candidates.append({'transform':None, 'value':escape_val, 'score':escape_score, 'residual':escape_res,
'layer':layer, 'core_conf':self.core.confidence, 'is_compression':False,
'chosen_mod':1+0j, 'escape':True})
if not candidates:
return {'name':self.name, 'no_candidates':True}
logits = []
for c in candidates:
score_term = c.get('score', 0.0)
residual_term = -1.0*c.get('residual', 0.0)
core_term, age_term = 0.8*c.get('core_conf', 0.0), 0.0
comp_term = -0.2*float(c.get('is_compression', False))
logit = 3.0*score_term+2.0*residual_term+core_term+age_term+comp_term
logits.append(logit)
probs = softmax_logits(logits)
idx = int(np.argmax(probs))
chosen = candidates[idx]
node_out = {'name':self.name, 'row':row, 'col':col, 'layer':layer, 'dim_type':dim_type,
'baseline':baseline_local, 'candidates':candidates, 'chosen':None, 'ki':[], 'mapps':mapps}
reduction = min(mapps, min(1.0, chosen['residual']))
mapps = max(0.0, mapps-reduction)
node_out['mapps'] = mapps
node_out['chosen'] = {'value':chosen['value'], 'score':chosen.get('score'), 'residual':chosen['residual'],
'prob':probs[idx], 'escape': chosen.get('escape', False)}
if 'controller' in state and state['controller'] is not None:
ev = {'row':row, 'col':col, 'layer':layer, 'dim_type':dim_type, 'value':chosen['value'],
'residual':chosen['residual'], 'D':state['D']}
ev['score'] = chosen.get('score', 0.0)
ev['core_norm'] = np.linalg.norm(self.core.params)
ev['priority_score'] = (1.0/(1.0+ev['residual']))*(1.0+ev['score'])*(1.0+0.5*ev['core_norm'])
state['controller'].enqueue_event(ev)
if chosen is not None:
baseline_complex = complex(baseline_local, 0.0) if dim_type=='real' else complex(0.0, baseline_local)
chosen_complex = complex(chosen['value'], 0.0) if dim_type=='real' else complex(0.0, chosen['value'])
target_complex = baseline_complex
self.core.adapt(chosen_complex, target_complex, layer, chosen['residual'], age_norm)
for ch in self.ki:
logic_out = ch.evaluate(state, row, col, layer, dim_type, baseline_local, mapps, depth+1, ordering)
node_out['ki'].append(logic_out)
mapps = logic_out.get('mapps', mapps)
node_out['mapps'] = mapps
return node_out
class HybridController:
def __init__(self, chips, deque_len=DEQUE_LEN, perchip_len=64):
self.chips = chips
self.rows, self.D = chips.shape
self.global_deque = RunningDeque(maxlen=deque_len)
self.predictor = OnlinePredictor(dim=FEATURE_DIM)
self.perchip = PerChipDeque(self.rows, self.D, maxlen=perchip_len)
self.pretrained_anchor = chips.copy()
self.age = 0
def enqueue_event(self, ev):
ev = dict(ev)
ev['total_age'] = max(1, self.global_deque.age_counter)
ev['D'] = self.D
ev['predicted_residual'] = self.predict_residual(ev['row'], ev['col'], ev['dim_type'], ev['value'], ev.get('residual', 0.0))
ev['priority_score'] = (1.0/(1.0+ev['predicted_residual']))*(1.0+ev.get('score',0.0))*(1.0+0.5*ev.get('core_norm', 0.0))
scrapped = self.global_deque.push(ev)
self.perchip.push(ev['row'], ev['col'], ev)
if scrapped: self.learn_from_scrapped(scrapped)
return scrapped
def predict_residual(self, row, col, dim_type, value, prior_res):
item = {'row':row, 'col':col, 'dim_type':dim_type, 'value':value, 'residual':prior_res, 'age':0,
'total_age':max(1,self.global_deque.age_counter), 'D':self.D, 'layer':0, 'max_layer':MAX_LAYER}
item['core_norm'] = 0.0
return self.predictor.predict(item)
def learn_from_scrapped(self, ev):
target = ev.get('residual', 0.0)
self.predictor.update(ev, target)
def scrapp_policy(self, policy='priority'):
item = self.global_deque.pop_policy(policy)
if item is None:
return None
seg, new_chips = apply_seg_and_remove(self.chips, item['row'], item['col'], renown='interp')
self.chips = new_chips
self.refresh_anchor(blend=ANCHOR_BLEND)
self.predictor.update(item, item.get('residual', 0.0))
return {'scrapped':item, 'seg':seg}
def refresh_anchor(self, blend=ANCHOR_BLEND):
self.pretrained_anchor = (1.0-blend)*self.pretrained_anchor+blend*self.chips
def stable_id(name, cfg_str):
h = hashlib.sha1((name+'|'+ cfg_str).encode('utf-8')).hexdigest()
return h[:12]
class DecisionAggregator:
def __init__(self):
self.data, self.node_cfg = {}, {}
self.lock = threading.Lock()
def bin_key(self, residual, core_conf, is_compression, escape):
if residual < 0.2: rbin = 'r0'
elif residual < 0.5: rbin = 'r1'
elif residual < 1.0: rbin = 'r2'
else: rbin = 'r3'
if core_conf < 0.2: cbin = 'c0'
elif core_conf < 0.5: cbin = 'c1'
else: cbin = 'c2'
return f"{rbin}|{cbin}|comp={1 if is_compression else 0}|esc={1 if escape else 0}"
def project(self, node_name, transform_name, residual, prob, core_norm, is_compression, escape):
key = self.bin_key(residual, core_norm, is_compression, escape)
with self.lock:
nd = self.data.setdefault(node_name, {})
b = nd.setdefault(key, {'count':0, 'sum_residual':0.0, 'sum_prob':0.0, 'rep_transform':transform_name})
b['count']+=1
b['sum_residual']+=float(residual)
b['sum_prob']+=float(prob)
def snapshot_and_clear(self):
with self.lock:
snap = self.data
self.data = {}
return snap
_abe_thread, _abe_stop = None, False
aggregator = DecisionAggregator()
def export_AR3z_xml(root_node, all_nodes, controller, out_path=XML_OUT):
ts = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
root_el = ET.Element('AR3z_snapshot', {'ts':ts, 'version':'1'})
static_el = ET.SubElement(root_el, 'static_graph')
nodes_el = ET.SubElement(static_el, 'nodes')
for n in all_nodes:
cfg = f"reslim={n.residual_limit}|mask={' '.join(['1' if m else '0' for m in n.layer_mask])}|core_dim={n.core.core_dim}"
nid = stable_id(n.name, cfg)
ET.SubElement(nodes_el, 'node', {'id':nid, 'name':n.name, 'cfg':cfg})
transforms_el = ET.SubElement(static_el, 'transforms')
seen_t = {}
for n in all_nodes:
for t in n.transforms:
if t.name in seen_t: continue
tcfg = f"is_compression={1 if getattr(t, 'is_compression', False) else 0}"
tid = stable_id(t.name, tcfg)
seen_t[t.name] = tid
ET.SubElement(transforms_el, 'transform', {'id':tid, 'name':t.name, 'cfg':tcfg})
runtime_el = ET.SubElement(root_el, 'runtime_snapshot')
global_el = ET.SubElement(runtime_el, 'global')
ET.SubElement(global_el, 'metric', {'name':'deque_len', 'value':str(len(controller.global_deque))})
ET.SubElement(global_el, 'metric', {'name':'predictor_uncertainty', 'value':str(controller.predictor.uncertainty())})
snap = aggregator.snapshot_and_clear()
nodes_rt_el = ET.SubElement(runtime_el, 'nodes')
for node_name, bins in snap.items():
node_el = ET.SubElement(nodes_rt_el, 'node', {'name':node_name})
for bkey, stats in bins.items():
count = stats['count']
avg_res = stats['sum_residual']/count if count > 0 else 0.0
avg_prob = stats['sum_prob']/count if count > 0 else 0.0
be = ET.SubElement(node_el, 'branch', {'bin':bkey, 'count':str(count), 'rep_transform':str(stats.get('rep_transform',''))})
ET.SubElement(be, 'metric', {'name':'avg_residual', 'value':str(avg_res)})
ET.SubElement(be, 'metric', {'name':'avg_prob', 'value':str(avg_prob)})
tmp = out_path+'.tmp'
tree = ET.ElementTree(root_el)
tree.write(tmp, encoding='utf-8', xml_declaration=True)
os.replace(tmp, out_path)
def _abe_writer(root_node, all_nodes, controller, interval):
global _abe_stop
while not _abe_stop:
try:
export_AR3z_xml(root_node, all_nodes, controller)
except Exception:
pass
time.sleep(interval)
def start_abe_export(root_node, all_nodes, controller, interval=XML_INTERVAL):
global _abe_thread, _abe_stop
if _abe_thread is not None:
return
_abe_stop = False
_abe_thread = threading.Thread(target=_abe_writer, args=(root_node, all_nodes, controller, interval), daemon=True)
_abe_thread.start()
def stop_abe_export():
global _abe_thread, _abe_stop
if _abe_thread is None:
return
_abe_stop = True
_abe_thread.join(timeout=1.0)
_abe_thread = None
def load_AR3z_xml(path):
if not os.path.exists(path):
return None
tree = ET.parse(path)
root = tree.getroot()
out = {'ts': root.attrib.get('ts'), 'version': root.attrib.get('version')}
static = {}
sg = root.find('static_graph')
if sg is not None:
for n in sg.findall('.//node'):
static[n.attrib.get('id')] = {'name': n.attrib.get('name'), 'cfg': n.attrib.get('cfg')}
out['static'] = static
runtime = {'nodes':{}}
rt = root.find('runtime_snapshot')
if rt is not None:
nodes_rt = rt.find('nodes')
if nodes_rt is not None:
for n in nodes_rt.findall('node'):
nm = n.attrib.get('name')
branches = []
for b in n.findall('branch'):
br = {'bin': b.attrib.get('bin'), 'count': int(b.attrib.get('count', '0')), 'rep_transform': b.attrib.get('rep_transform')}
metrics = {}
for m in b.findall('metric'):
metrics[m.attrib.get('name')] = float(m.attrib.get('value'))
br['metrics'] = metrics
branches.append(br)
runtime['nodes'][nm] = branches
out['runtime'] = runtime
return out
#_______________________________________________________________________________________________________
#///////////////////////////////////////////////////////////////////////////////////////////////////////
# TICTAC DOM - DIANE - DISTRIBUTED INTERNAL ASYMMETRIC NEURAL EMERGENCE
class DIANE:
# Small but powerful nn. Uses asymmetric hybrid gate spawning to get the B-Spline Shell AI out
# of any ruts that could occur in evaluating best complex matrices projections --RL prob basis
def __init__(self, num_states, num_actions, feature_len=1, device=None, complex_mode=False, init_scale=0.1, similarity_threshold=0.5):
self.device = device or torch.device('cpu')
self.complex_mode = complex_mode
self.dtype = torch.complex64 if complex_mode else torch.float32
self.num_states = num_states
self.num_actions = num_actions
self.feature_len = feature_len
self.similarity_threshold = float(similarity_threshold)
# Top asym propagation matrix(states x -> actions)
real_part = torch.randn((num_states, num_actions), device=self.device, dtype=torch.float32)*init_scale
imag_part = torch.randn((num_states, num_actions), device=self.device, dtype=torch.float32)*init_scale if complex_mode else None
if complex_mode: self.memory_matrix = (real_part + 1j * imag_part).to(self.dtype)
else: self.memory_matrix = real_part.to(self.dtype)
# Secondary modulated asymm matrix(actions <- x states)
real_mod = torch.randn((num_actions, num_states), device=self.device, dtype=torch.float32)*(init_scale*0.5)
imag_mod = torch.randn((num_actions, num_states), device=self.device, dtype=torch.float32)*(init_scale*0.5) if complex_mode else None
if complex_mode: self.modulation_matrix = (real_mod+1j*imag_mod).to(self.dtype)
else: self.modulation_matrix = real_mod.to(self.dtype)
# Caches & History:
self.cached_decisions = {} # keyed(state.id, action.id)
self.recent_actions = [] # recent ids actions
self.propagation_history = [] # snapshots - memory_matrix
self.decay, self.eps = 0.995, 1e-8
# The hybrid gate components(training *float32)
self.gate_W = nn.Parameter(torch.randn((self.num_actions, max(1, self.feature_len)), dtype=torch.float32, device=self.device)*0.01)
self.gate_b = nn.Parameter(torch.zeros((self.num_actions,), dtype=torch.float32, device=self.device))
self.gate_alpha, self.rule_override_strength = 0.1, 2.0
self.bspline_handler = default_bspline_handler
self._bspline_futures = []
def learn(self, decision, lr=0.1):
s = int(decision.state.id)%self.num_states
a = int(decision.action.id)%self.num_actions
r_val = torch.tensor(decision.reward.value, device=self.device, dtype=self.dtype)
self.memory_matrix = self.memory_matrix * self.decay
# Prime asym update state -> action
current = self.memory_matrix[s, a]
delta = lr*(r_val-current)
self.memory_matrix[s, a] = current+delta
# Secondary asym update action -> state
mod_current = self.modulation_matrix[a, s]
mod_delta = (lr*0.5)*(r_val-mod_current)
self.modulation_matrix[a, s] = mod_current+mod_delta
self._cache_decision(decision)
self.propagation_history.append(self.memory_matrix.clone().detach())
# Possible request, B-Spline Shell assistance:
if self._should_request_bspline():
self.request_bspline_resize_async(self.num_states, self.num_actions, self.memory_matrix)
def decide(self, state, exploration_noise=0.01):
s = int(state.id)%self.num_states
mem_row = self.memory_matrix[s, :]
mod_col = self.modulation_matrix[:, s]
if self.complex_mode:
scores_c = (mem_row*0.7)+(mod_col.conj()*0.3)
scores = scores_c.abs()
else: scores = (mem_row*0.7)+(mod_col*0.3)
# Possible hybrid gates action:
gates = self._compute_gates(state)
alpha = self.gate_alpha
if self._should_request_bspline(): alpha*=float(self.rule_override_strength)
gates_t = gates.to(scores.device)
scores = scores*(1.0+alpha*gates_t)
if not self.complex_mode:
noise = torch.randn_like(scores)*exploration_noise
scores = scores+noise
else:
noise = torch.randn_like(scores)*exploration_noise
scores = scores+noise
action_id = int(torch.argmax(scores).item())
self.recent_actions.append(action_id)
action_type = 'exploration' if torch.std(scores).item() > 1e-4 else 'exploitation'
return Action(id=action_id, type=action_type)
def apply_reward_limits(self, state, action):
key = (int(state.id)%self.num_states, int(action.id)%self.num_actions)
if key not in self.cached_decisions or len(self.cached_decisions[key]) == 0:
return
rewards = [torch.tensor(d.reward.value, dtype=self.dtype, device=self.device) for d in self.cached_decisions[key]]
stacked = torch.stack(rewards)
avg = stacked.mean()
variance = stacked.var(unbiased=False) if stacked.numel() > 1 else torch.tensor(0.0, dtype=self.dtype, device=self.device)
similarity = 1.0/(1.0+variance.abs())
if (avg.real if self.complex_mode else avg) < self.similarity_threshold or (similarity.real if self.complex_mode else similarity) < self.similarity_threshold:
s, a = key
if self.complex_mode: self.memory_matrix[s, a] = self.memory_matrix[s, a]*0.9
else: self.memory_matrix[s, a] = max(0.0, self.memory_matrix[s, a].item()*0.9)
def cascade_decisions(self, state, depth):
if depth <= 0:
return []
results, frontier = [], [(state, 0)]
while frontier:
cur_state, cur_depth = frontier.pop(0)
if cur_depth >= depth:
continue
s = int(cur_state.id)%self.num_states
for a in range(self.num_actions):
next_state = State(id=(cur_state.id+1)%self.num_states, features=[(cur_state.features[0]+1)])
raw = self.memory_matrix[s, a]
r_value = float(raw.abs().item()) if self.complex_mode else float(raw.item())
reward = Reward(value=r_value, type='hypothetical')
dec = Decision(state=next_state, action=Action(id=a, type='hypothetical'), reward=reward)
results.append(dec)
frontier.append((next_state, cur_depth+1))
return results
def gate_parameters(self):
return [self.gate_W, self.gate_b]
def _compute_gates(self, state):
feats = torch.tensor(state.features, dtype=torch.float32, device=self.device).view(-1)
L = self.gate_W.shape[1]
if feats.numel() != L:
f = torch.zeros((L,), dtype=torch.float32, device=self.device)
f[:min(feats.numel(), L)] = feats[:min(feats.numel(), L)]
feats = f
logits = torch.matmul(self.gate_W, feats)+self.gate_b
gates = torch.sigmoid(logits)
return gates
def gate_loss_reg(self, recent_actions, entropy_coef=0.1):
if len(recent_actions) < 2:
return torch.tensor(0.0, device=self.device, dtype=torch.float32)
counts = torch.zeros((self.num_actions,), device=self.device, dtype=torch.float32)
for a in recent_actions: counts[a%self.num_actions]+=1.0
probs = counts/counts.sum()
probs = probs+1e-6
entropy = -torch.sum(probs*torch.log(probs))
return -entropy*entropy_coef
def _cache_decision(self, decision):
key = (int(decision.state.id)%self.num_states, int(decision.action.id)%self.num_actions)
if key not in self.cached_decisions: self.cached_decisions[key] = []
self.cached_decisions[key].append(decision)
if len(self.cached_decisions[key]) > 128: self.cached_decisions[key].pop(0)
def _should_request_bspline(self):
if len(self.recent_actions) >= 8:
last = self.recent_actions[-8:]
if all(x == last[0] for x in last):
return True
# Stagnation detect from propagation history:
if len(self.propagation_history) >= 5:
diffs = []
for i in range(1, len(self.propagation_history)):
a = self.propagation_history[i-1]
b = self.propagation_history[i]
diffs.append(torch.norm((b-a).real.float()).item())
mean_diff = sum(diffs)/len(diffs) if diffs else 0.0
if mean_diff < (1e-3*max(1.0, float(self.num_states*self.num_actions))):
return True
return False
def set_bspline_handler(self, handler_callable):
# *****Dict with the optional key sets: action, new_h, new_w, details*****
self.bspline_handler = handler_callable
def request_bspline_resize_async(self, h, w, complex_tensor):
# Temporary, more options needed for multi-processing sub-actions
t = complex_tensor.detach().to('cpu')
if self.complex_mode: np_arr = t.numpy().astype(np.complex64)
else: np_arr = t.numpy().astype(np.float32)
future = _bspline_executor.submit(self.bspline_handler, int(h), int(w), np_arr)
self._bspline_futures.append(future)
def _on_done(fut):
try:
res = fut.result()
if isinstance(res, dict):
action = res.get('action')
if action == 'restart':
self.recent_actions.clear()
self.notify_bspline_epoch_restart()
elif action == 'resize':
new_h = res.get('new_h', self.num_states)
new_w = res.get('new_w', self.num_actions)
print(f' /:./D.I.A.N.E./.:/ b-spline shell requested resize: {new_h}x{new_w}')
except Exception as e:
print(f' /:./D.I.A.N.E./.:/ b-spline shell handler raised: {e}')
future.add_done_callback(lambda fut: _on_done(fut))
return future
def notify_bspline_epoch_restart(self):
print(' /:./D.I.A.N.E./.:/ notify b-spline epoch restart called')
# _________________________________UTILS____________________________________
def to(self, device):
self.device = device
self.memory_matrix = self.memory_matrix.to(device)
self.modulation_matrix = self.modulation_matrix.to(device)
self.gate_W = nn.Parameter(self.gate_W.to(device))
self.gate_b = nn.Parameter(self.gate_b.to(device))
return self
def set_complex_mode(self, complex_mode):
if complex_mode == self.complex_mode:
return
self.complex_mode = complex_mode
if complex_mode:
self.memory_matrix = self.memory_matrix.to(torch.float32).to(self.device).to(torch.complex64)+0j
self.modulation_matrix = self.modulation_matrix.to(torch.float32).to(self.device).to(torch.complex64)+0j
self.dtype = torch.complex64
else:
self.memory_matrix = self.memory_matrix.real.to(torch.float32)
self.modulation_matrix = self.modulation_matrix.real.to(torch.float32)
self.dtype = torch.float32
def summary(self):
return {'num_states': self.num_states,
'num_actions': self.num_actions,
'complex_mode': self.complex_mode,
'memory_matrix_dtype': str(self.memory_matrix.dtype),
'recent_actions_len': len(self.recent_actions),
'cached_keys': len(self.cached_decisions)}
#///////////////////////////////////////////////////////////////////////////////////////////////////////
#_______________________________________________________________________________________________________
# TICTAC DOM - COMPLEX REGRESSIVE SHELL B-SPLINE AI
#_______________________________________________________________________________________________________
#///////////////////////////////////////////////////////////////////////////////////////////////////////
def collate_keep_meta(batch):
inps, targs, metas = zip(*batch)
return np.stack(inps, axis=0), np.stack(targs, axis=0), list(metas)
def is_torch(x):
return isinstance(x, torch.Tensor)
def as_numpy(x):
if is_torch(x):
return x.detach().cpu().numpy()
return np.asarray(x)
def as_torch(x, device=None, dtype=None):
if is_torch(x):
if device is not None: x = x.to(device)
if dtype is not None: x = x.to(dtype)
return x
t = torch.from_numpy(np.asarray(x))
if dtype is not None: t = t.to(dtype)
if device is not None: t = t.to(device)
return t
def complex_to_magphase(mat):
if is_torch(mat):
mag = torch.abs(mat).to(torch.float32)
phase = torch.atan2(mat.imag, mat.real).to(torch.float32)
return mag, phase
else:
mag = np.abs(mat).astype(np.float32)
phase = np.angle(mat).astype(np.float32)
return mag, phase
def magphase_to_complex(mag, phase):
if is_torch(mag) or is_torch(phase):
mag_t = as_torch(mag, dtype=torch.float32)
ph_t = as_torch(phase, dtype=torch.float32)
real = mag_t*torch.cos(ph_t)
imag = mag_t*torch.sin(ph_t)
return torch.complex(real, imag).to(torch.complex64)
else:
return (mag*np.exp(1j*phase)).astype(np.complex64)
def unwrap_phase(phase):
if is_torch(phase):
p_np = phase.detach().cpu().numpy()
p_un = np.unwrap(np.unwrap(p_np, axis=0), axis=1).astype(np.float32)
return as_torch(p_un, dtype=torch.float32, device=phase.device)
else:
p = np.unwrap(np.unwrap(phase, axis=0), axis=1)
return p.astype(np.float32)
def cubic_b_spline_weights(t):
if is_torch(t):
t = t.to(torch.float32)
t2 = t*t
t3 = t2*t
w0 = (1.0-3.0*t+3.0*t2-t3)/6.0
w1 = (4.0-6.0*t2+3.0*t3)/6.0
w2 = (1.0+3.0*t+3.0*t2-3.0*t3)/6.0
w3 = t3/6.0
return torch.stack([w0, w1, w2, w3], dim=-1)
else:
t = np.asarray(t, dtype=np.float32)
t2 = t*t
t3 = t2*t
w0 = (1.0-3.0*t+3.0*t2-t3)/6.0
w1 = (4.0-6.0*t2+3.0*t3)/6.0
w2 = (1.0+3.0*t+3.0*t2-3.0*t3)/6.0
w3 = t3/6.0
return np.stack([w0, w1, w2, w3], axis=-1)
def eval_bspline_grid(control, out_h, out_w):
input_is_torch = is_torch(control)
if input_is_torch: ctrl = control.detach().cpu().numpy()
else: ctrl = np.asarray(control)
single_channel = (ctrl.ndim == 2)
if single_channel: ctrl = ctrl[None, ...]
C, ch, cw = ctrl.shape
ys = np.linspace(0, ch-1, out_h).astype(np.float32)
xs = np.linspace(0, cw-1, out_w).astype(np.float32)
Y, X = np.meshgrid(ys, xs, indexing='ij')
ix = np.floor(X).astype(np.int32)
iy = np.floor(Y).astype(np.int32)
tx = (X-ix).astype(np.float32)
ty = (Y-iy).astype(np.float32)
wx = cubic_b_spline_weights(tx)
wy = cubic_b_spline_weights(ty)
out = np.zeros((C, out_h, out_w), dtype=np.float32)
for m in range(-1, 3):
for n in range(-1, 3):
ixn = np.clip(ix+n, 0, cw-1)
iym = np.clip(iy+m, 0, ch-1)
w = (wy[..., m+1]*wx[..., n+1])[None, ...]
vals = ctrl[:, iym, ixn]
out+=vals*w
if single_channel: out = out[0]
if input_is_torch:
return torch.from_numpy(out.astype(np.float32))
return out
def extract_features_for_kmeans(mat):
mat_np = as_numpy(mat)
H, W = mat_np.shape
X, Y = np.meshgrid(np.arange(W), np.arange(H))
mag = np.abs(mat_np).ravel()
ph = np.angle(mat_np).ravel()
re = mat_np.real.ravel()
im = mat_np.imag.ravel()
feats = np.stack([X.ravel(), Y.ravel(), mag, ph, re, im], axis=1).astype(np.float32)
return feats
def kmeans_on_matrix(mat, n_clusters=8, random_state=0):
input_is_torch = is_torch(mat)
feats = extract_features_for_kmeans(mat)
kmeans = KMeans(n_clusters=n_clusters, random_state=random_state, n_init=10).fit(feats)
labels = kmeans.labels_.reshape(as_numpy(mat).shape).astype(np.int32)
centers = kmeans.cluster_centers_.astype(np.float32)
if input_is_torch:
return torch.from_numpy(labels), torch.from_numpy(centers)
return labels, centers
def compute_cluster_anchor_targets(mat_cur, mat_next, labels, centers):
input_is_torch = is_torch(mat_cur)
mat_cur_np = as_numpy(mat_cur)
mat_next_np = as_numpy(mat_next)
labels_np = as_numpy(labels)
centers_np = as_numpy(centers)
mag_cur = np.abs(mat_cur_np).astype(np.float32)
ph_cur = np.angle(mat_cur_np).astype(np.float32)
mag_next = np.abs(mat_next_np).astype(np.float32)
ph_next = np.angle(mat_next_np).astype(np.float32)
ph_cur_u = np.unwrap(np.unwrap(ph_cur, axis=0), axis=1)
ph_next_u = np.unwrap(np.unwrap(ph_next, axis=0), axis=1)
mag_res = (mag_next-mag_cur).astype(np.float32)
ph_res = (ph_next_u-ph_cur_u).astype(np.float32)
K = centers_np.shape[0]
anchor_coords = centers_np[:, :2].astype(np.float32)
anchor_mag_res = np.zeros(K, dtype=np.float32)
anchor_ph_res = np.zeros(K, dtype=np.float32)
counts = np.zeros(K, dtype=np.int32)
H, W = labels_np.shape
for k in range(K):
mask = (labels_np == k)
cnt = int(mask.sum())
counts[k] = cnt
if cnt == 0:
cx, cy = int(round(centers_np[k,0])), int(round(centers_np[k,1]))
if 0 <= cx < W and 0 <= cy < H:
anchor_mag_res[k] = mag_res[cy, cx]
anchor_ph_res[k] = ph_res[cy, cx]
else:
anchor_mag_res[k] = 0.0
anchor_ph_res[k] = 0.0
else:
anchor_mag_res[k] = float(mag_res[mask].mean())
anchor_ph_res[k] = float(ph_res[mask].mean())
if input_is_torch:
return (as_torch(anchor_coords), as_torch(anchor_mag_res), as_torch(anchor_ph_res), as_torch(counts))
return anchor_coords, anchor_mag_res, anchor_ph_res, counts
class SmallRegressor(nn.Module):
def __init__(self, in_ch=4, out_ch=2, base=16):
super().__init__()
self.net = nn.Sequential(
nn.Conv2d(in_ch, base, 3, padding=1),
nn.ReLU(),
nn.Conv2d(base, base, 3, padding=1),
nn.ReLU(),
nn.Conv2d(base, base*2, 3, padding=1),
nn.ReLU(),
nn.Conv2d(base*2, base, 3, padding=1),
nn.ReLU(),
nn.Conv2d(base, out_ch, 1))
def forward(self, x):
return self.net(x)
class ShellDataset(Dataset):
def __init__(self, shell_list, n_clusters=8):
self.shell = [as_numpy(s) for s in shell_list]
self.n_clusters = n_clusters
def __len__(self):
return max(0, len(self.shell)-1)
def __getitem__(self, idx):
a_np = self.shell[idx]
b_np = self.shell[idx+1]
H, W = a_np.shape
mag_a = np.abs(a_np).astype(np.float32)
ph_a = np.angle(a_np).astype(np.float32)
ph_a_u = np.unwrap(np.unwrap(ph_a, axis=0), axis=1)
mag_b = np.abs(b_np).astype(np.float32)
ph_b = np.angle(b_np).astype(np.float32)
ph_b_u = np.unwrap(np.unwrap(ph_b, axis=0), axis=1)
mag_res = (mag_b-mag_a).astype(np.float32)
ph_res = (ph_b_u-ph_a_u).astype(np.float32)
labels, centers = kmeans_on_matrix(a_np, n_clusters=self.n_clusters)
labels_np = as_numpy(labels)
centers_np = as_numpy(centers)
H, W = mag_a.shape
center_mag_map = np.zeros((H,W), dtype=np.float32)
center_ph_map = np.zeros((H,W), dtype=np.float32)
for k in range(centers_np.shape[0]):
cx_i, cy_i = int(round(centers_np[k,0])), int(round(centers_np[k,1]))
if 0 <= cx_i < W and 0 <= cy_i < H:
center_mag_map[labels_np==k] = centers_np[k,2]
center_ph_map[labels_np==k] = centers_np[k,3]
inp = np.stack([mag_a, ph_a_u, center_mag_map, center_ph_map], axis=0).astype(np.float32)
targ = np.stack([mag_res, ph_res], axis=0).astype(np.float32)
anchor_coords, anchor_mag_res, anchor_ph_res, counts = compute_cluster_anchor_targets(a_np, b_np, labels_np, centers_np)
meta = {'labels': labels_np.astype(np.int32),
'centers': centers_np.astype(np.float32),
'anchor_coords': anchor_coords.astype(np.float32) if not is_torch(anchor_coords) else as_numpy(anchor_coords),
'anchor_mag_res': anchor_mag_res.astype(np.float32) if not is_torch(anchor_mag_res) else as_numpy(anchor_mag_res),
'anchor_ph_res': anchor_ph_res.astype(np.float32) if not is_torch(anchor_ph_res) else as_numpy(anchor_ph_res),
'counts': counts.astype(np.int32) if not is_torch(counts) else as_numpy(counts)}
return inp, targ, meta
def sample_at_subpixel(pred_tensor, coords):
input_is_torch = is_torch(pred_tensor)
coords_np = as_numpy(coords)
if not input_is_torch:
B, C, H, W = pred_tensor.shape
K = coords_np.shape[0]
out = np.zeros((B, C, K), dtype=np.float32)
for b in range(B):
for k, (cx, cy) in enumerate(coords_np):
cx, cy = float(cx), float(cy)
cx = max(0.0, min(cx, W-1.0))
cy = max(0.0, min(cy, H-1.0))
x0 = int(np.floor(cx))
x1 = min(x0+1, W-1)
y0 = int(np.floor(cy))
y1 = min(y0+1, H-1)
wx, wy = cx-x0, cy-y0
for c in range(C):
v = (1-wx)*(1-wy)*pred_tensor[b, c, y0, x0]+\
wx*(1-wy)*pred_tensor[b, c, y0, x1]+\
(1-wx)*wy*pred_tensor[b, c, y1, x0]+\
wx*wy*pred_tensor[b, c, y1, x1]
out[b,c,k] = v
return out
else:
B, C, H, W = pred_tensor.shape
K = coords_np.shape[0]
xs = (coords_np[:,0]/max(1, W-1))*2-1
ys = (coords_np[:,1]/max(1, H-1))*2-1
grid = np.stack([xs, ys], axis=1).astype(np.float32)
grid_t = torch.from_numpy(grid)[None, :, None, :].to(pred_tensor.device)
sampled = F.grid_sample(pred_tensor, grid_t, mode='bilinear', padding_mode='border', align_corners=True)
sampled = sampled.permute(0, 1, 2, 3).contiguous()
sampled = sampled.view(B, C, K)
return sampled
def train_regressor(shell_list, epochs=10, batch_size=1, n_clusters=4, lr=5e-4, device=torch.device('cpu')):
ds = ShellDataset(shell_list, n_clusters=n_clusters)
dl = DataLoader(ds, batch_size=batch_size, shuffle=True, collate_fn=collate_keep_meta)
model = SmallRegressor(in_ch=4, out_ch=2, base=16).to(device)
opt = torch.optim.Adam(model.parameters(), lr=lr)
for ep in range(epochs):
total_loss, n_batches = 0.0, 0
for inp, targ, meta in dl:
inp_t = torch.from_numpy(inp).to(device)
targ_t = torch.from_numpy(targ).to(device)
pred = model(inp_t)
mse = F.mse_loss(pred, targ_t)
batch_loss_anchor = 0.0
B = inp_t.shape[0]
for b in range(B):
sample_meta = meta[b]
if sample_meta is None:
continue
coords = sample_meta.get('anchor_coords')
if coords is None:
continue
coords = np.asarray(coords, dtype=np.float32)
if coords.size == 0:
continue
counts = np.asarray(sample_meta.get('counts', []), dtype=np.float32)
anchor_mag = np.asarray(sample_meta.get('anchor_mag_res', []), dtype=np.float32)
anchor_ph = np.asarray(sample_meta.get('anchor_ph_res', []), dtype=np.float32)
sampled = sample_at_subpixel(pred[b:b+1].detach().cpu().numpy(), coords)
sampled_np = sampled[0]
weights = counts+1.0
w = weights/(weights.sum()+1e-8)
mag_err = ((sampled_np[1]-anchor_mag)**2*w).sum()
ph_err = ((sampled_np[1]-anchor_ph)**2*w).sum()
batch_loss_anchor+=(mag_err+ph_err)
return model
def refine_residual_with_bspline(pred_res, labels, centers, anchor_mag_res, anchor_ph_res, control_grid_size=(16,16)):
input_is_torch = is_torch(pred_res)
pred_np = as_numpy(pred_res)
H, W = pred_np.shape[1], pred_np.shape[2]
ch, cw = control_grid_size
grid_mag = ndimage.zoom(pred_np[0], (ch/float(H), cw/float(W)), order=1)
grid_ph = ndimage.zoom(pred_np[1], (ch/float(H), cw/float(W)), order=1)
centers_np = as_numpy(centers)
K = centers_np.shape[0]
for k in range(K):
cx, cy = centers_np[k,0], centers_np[k,1]
gx = (cx/max(1.0, W-1.0))*(cw-1)
gy = (cy/max(1.0, H-1.0))*(ch-1)
ix, iy = int(round(gx)), int(round(gy))
if 0 <= ix < cw and 0 <= iy < ch:
alpha = 0.6
grid_mag[iy, ix] = alpha*anchor_mag_res[k]+(1-alpha)*grid_mag[iy, ix]
grid_ph[iy, ix] = alpha*anchor_ph_res[k]+(1-alpha)*grid_ph[iy, ix]
mag_dense = eval_bspline_grid(grid_mag, H, W)
ph_dense = eval_bspline_grid(grid_ph, H, W)
out = np.stack([mag_dense.astype(np.float32), ph_dense.astype(np.float32)], axis=0)
if input_is_torch:
return torch.from_numpy(out)
return out
def predict_next_cpu(model, mat_current, n_clusters=8, control_grid_size=(16, 16), device=torch.device('cpu')):
input_is_torch = is_torch(mat_current)
mat_np = as_numpy(mat_current)
labels, centers = kmeans_on_matrix(mat_np, n_clusters=n_clusters)
labels_np = as_numpy(labels)
centers_np = as_numpy(centers)
mag_cur = np.abs(mat_np).astype(np.float32)
ph_cur = np.angle(mat_np).astype(np.float32)
ph_cur_u = np.unwrap(np.unwrap(ph_cur, axis=0), axis=1)
H, W = mag_cur.shape
center_mag_map = np.zeros((H,W), dtype=np.float32)
center_ph_map = np.zeros((H,W), dtype=np.float32)
for k in range(centers_np.shape[0]):
cx_i, cy_i = int(round(centers_np[k, 0])), int(round(centers_np[k,1]))
if 0 <= cx_i < W and 0 <= cy_i < H:
center_mag_map[labels_np==k] = centers_np[k, 2]
center_ph_map[labels_np==k] = centers_np[k, 3]
inp = np.stack([mag_cur, ph_cur_u, center_mag_map, center_ph_map], axis=0)[None]
inp_t = torch.from_numpy(inp).to(device)
model = model.to(device)
model.eval()
with torch.no_grad():
pred_res_t = model(inp_t)
pred_res = pred_res_t.cpu().numpy()[0]
anchor_mag_res = np.zeros(centers_np.shape[0], dtype=np.float32)
anchor_ph_res = np.zeros(centers_np.shape[0], dtype=np.float32)
refined = refine_residual_with_bspline(pred_res, labels_np, centers_np, anchor_mag_res, anchor_ph_res, control_grid_size=control_grid_size)
pred_mag = mag_cur+as_numpy(refined[0])
pred_ph = ph_cur_u+as_numpy(refined[1])
pred_complex = magphase_to_complex(pred_mag, pred_ph)
if input_is_torch:
return as_torch(pred_complex, dtype=torch.complex64, device=device), {'labels': labels, 'centers': centers, 'pred_res': torch.from_numpy(pred_res)}
else:
return pred_complex, {'labels': labels_np, 'centers': centers_np, 'pred_res': pred_res}
def k_shell_frame_learn(H: int, W: int):
x, y = np.linspace(0, 4*np.pi, W), np.linspace(0, 4*np.pi, H)
X, Y = np.meshgrid(x, y)
frameA = (np.sin(X)+1j*np.cos(Y))*np.exp(1j*0.5*X)
frameB = np.roll(frameA, shift=2, axis=1)+0.02*(np.random.randn(H, W)+1j*np.random.randn(H, W))
shell = [frameA.astype(np.complex64), frameB.astype(np.complex64)]
model = train_regressor(shell, epochs=10, batch_size=1, n_clusters=4, lr=5e-4, device=torch.device('cpu'))
pred, meta = predict_next_cpu(model, shell[0], n_clusters=4, control_grid_size=(16, 16))
rel_err = np.linalg.norm(pred-shell[1])/(np.linalg.norm(shell[1])+1e-12)
print('Relative TT-DOM Shell Error (CPU regressor + bspline refine):', rel_err)
#///////////////////////////////////////////////////////////////////////////////////////////////////////
#_______________________________________________________________________________________________________
def _map_xml(pth: str, elem: ET.Element):
src = ET.tostring(elem, 'utf-8')
prs = minidom.parseString(src)
p_xml = prs.toprettyxml(indent=' ')
with open(pth, 'w', encoding='utf-8') as f:
f.write('\n'.join(ln for ln in p_xml.splitlines() if ln.strip()))
class TICTAC_UXCM1():
slots = ('_clcgX', '_clcgY')
def __init__(self, isSxlz: bool, _cd_lx_cwgrlX: float, _cd_lx_cwgrlY: float):
if isSxlz:
self._clcgX, self._clcgY = self.mmp_tqf_xyc1(_cd_lx_cwgrlX, _cd_lx_cwgrlY)
else:
xp_r1 = [abs(((cmath.sin(U2_LCX2[x]))-_cd_lx_cwgrlY)/math.pi) for x in range(9)]
xp_r2 = [(abs(cmath.sin(xp_r1[y])), abs(cmath.cos(U1_LCX1[y]))+_cd_lx_cwgrlX/(y+1)) for y in range(9)]
xp_r3 = [t[0]+t[1] for t in xp_r2]
self._clcgX = math.log(sum(xp_r1))*math.pi
self._clcgY = math.log(sum(xp_r3))*math.pi
def mmp_tqf_xyc1(self, clcg_x: float, clcg_y: float) -> tuple:
# Meyu+da'yut rehav ha-rubim ha-nikra'im: <x ad y>
ctd_lstA = []
ctd_lstB = []
for i in range(8):
tp_m = cmath.sin(U1_LCX1[i+1]-clcg_x)+math.sqrt(clcg_y)/clcg_x
if tp_m.real > 0:
ctd_lstB.append(tp_m+math.pi+1)
else:
tp_m = math.log(-tp_m.real+1)
if tp_m > math.pi+i+1:
ctd_lstA.append(tp_m-math.pi)
else:
ctd_lstA.append(tp_m+math.pi+i+1)
cla_sum = sum(ctd_lstA)
clb_sum = sum(ctd_lstB)
return math.ceil(abs(cmath.sqrt(cla_sum+clb_sum))), math.ceil(abs(cmath.sqrt(clb_sum-cla_sum)))
def adj_bw_exp(self, r_vz: float, l_r1: list, l_r2: list) -> tuple:
# Har'vacha mutkenet min hitchab'rut hitparzut
xd = [(cmath.exp(v1-v2*v1))/r_vz for v1, v2 in zip(l_r1, [l_r2[i]**(r_vz/(cmath.sin(l_r1[i])+math.pi)) for i in range(len(l_r1))])]
xd_len = len(xd)-1
eq_set = set()
l_c = -1
while 1:
l_c+=1
if l_c == xd_len:
l_c = 0
if abs(math.atan(random.randint(0, l_c))/(complex(abs(xd[l_c]), l_c))) > abs(xd[l_c+1]):
eq_set.add(cmath.sqrt(xd[l_c]))
else:
eq_set.add(abs(cmath.sqrt(xd[l_c+1])/(l_c+1)))
if len(eq_set) > xd_len:
break
eq_set, xd, c_nl, l_c = list(eq_set), [], None, 0
for c_n1 in eq_set:
if isinstance(c_n1, complex):
c_nl = abs(cmath.tan(c_n1)*c_n1.imag)
else:
if l_c != None:
c_nl = l_c+c_n1/(c_n1*r_vz)
else:
c_nl = math.log(c_n1*c_n1.real)
if l_c < c_nl:
xd.append(c_nl/math.pi)
else:
xd.append((c_nl+l_c)*math.pi)
l_c = c_nl
return min(xd), max(xd)
class TICTAC_SHELL_REDOCK:
slots = ('_dif_x', '_dif_z')
def __init__(self):
pass
def row_ratio_segments(mat, tol=1e-6, min_len=2, zero_threshold=1e-12):
m, n = mat.shape
if m < 2:
return [(0, m, mat[0].copy(), 1+0j)] if m == 1 else []
# kakhesh et ha-yahas ha-skalari bein shoutot okvavot
ratios = []
for i in range(m-1):
a, b = mat[i], mat[i+1]
mask = np.abs(a) > zero_threshold
# drush kama r'chivim tkafim
if mask.sum() >= max(1, n//20):
r_elems = b[mask] / a[mask]
# **hishtamesh be-median le-amidut
r_med = np.median(r_elems)
ratios.append(r_med)
else:
# hat'amat skaler be-minimum s'khum rivuqim r she-mamze'eret avur kol ha-r'khavim
# r = (a^H b)/(a^H a) /...im ha-makhneh lo katan miday
denom = np.vdot(a, a)
if np.abs(denom) > zero_threshold:
r_ls = np.vdot(a, b)/denom
ratios.append(r_ls)
else: ratios.append(None)
# kabetz r'tsafim r'tzifim she-bahem ha-yahas neftan le-hizuy veyatziv
segs, i = [], 0
while i < m:
if i == m-1:
segs.append((i, i+1, mat[i].copy(), 1+0j))
break
r0 = ratios[i]
if r0 is None:
segs.append((i, i+1, mat[i].copy(), 1+0j))
i+=1
continue
j = i+1
while j < m-1 and ratios[j] is not None and abs(ratios[j]-r0) <= tol: j+=1
length = j-i+1
if length >= min_len:
segs.append((i, j+1, mat[i].copy(), r0))
i = j+1
else:
# polet shurot budadot ke-hizun chozer
segs.append((i, i+1, mat[i].copy(), 1+0j))
i+=1
return segs
def reconstruct_segment(base, ratio, length):
# hechez ma'arakh b'tzura shel orekh n + ribu'a ha-yahas le-k
# shidur mika'ei -> base[None, :]*(R**ks) -> (L, n)
ks = np.arange(length, dtype=complex)[:, None]
return (base[None, :]*(ratio**ks)).astype(base.dtype)
def truncated_svd_updates(block, rank):
# machzir S-V-D mekutzer ve-adkunim mi-makhpalot hitzoniyot shel darga
U, s, Vh = svd(block, full_matrices=False)
return U[:, :rank], s[:rank], Vh[:rank, :]
def resize_rows(mat, target_rows, tol=1e-4, svd_rank_max=4, apply_row_update=None, apply_rank_update=None):
m, n = mat.shape
assert target_rows%2 == 0, '<TT-DOM 1.02> tq- shell, svd target row(s) are not even'
# tzor et ha-shurot al-yedei interpolatzia linearit al-indexim nifradim le-markiv ha-mamashi ve-ha-meduma
old_idx = np.linspace(0.0, 1.0, m)
new_idx = np.linspace(0.0, 1.0, target_rows)
real_interp = np.vstack([np.interp(new_idx, old_idx, mat[:, j].real) for j in range(n)]).T
imag_interp = np.vstack([np.interp(new_idx, old_idx, mat[:, j].imag) for j in range(n)]).T
desired = real_interp+1j*imag_interp
# ha-matrica ha-nokhehit muta'emet le-shurot ha-ya'ad le-hashva'ah
if target_rows == m: current = mat.copy()
elif target_rows > m:
repeats = int(np.ceil(target_rows/m))
tiled = np.tile(mat, (repeats, 1))[:target_rows]
current = tiled
else: current = mat[:target_rows].copy()
segs = row_ratio_segments(desired, tol=tol, min_len=2)
recon = np.zeros_like(desired)
for (s, e, base, ratio) in segs:
length = e-s
if not np.isclose(ratio, 1+0j):
# shakhzer sderot geometriyot me-ha-tzurot ha-metukanot
rows = reconstruct_segment(base, ratio, length)
err = np.linalg.norm(rows-desired[s:e], ord='fro')
if err <= tol*np.sqrt(length*n):
recon[s:e] = rows
if apply_row_update:
apply_row_update(s, rows)
continue
# b'tsa S-V-D be-dirug <-> namukh al ha-block ha-nokhehi <- sigma
block = desired[s:e]
U, sigma, Vh = svd(block, full_matrices=False)
total_energy = np.sum(sigma**2)
if total_energy == 0: rank = 1
else:
energy = np.cumsum(sigma**2)/total_energy
rank = int(np.searchsorted(energy, 0.999)+1)
rank = min(max(1, rank), min(svd_rank_max, block.shape[0], block.shape[1]))
U_r, s_r, Vh_r = U[:, :rank], sigma[:rank], Vh[:rank, :]
recon[s:e] = (U_r*s_r) @ Vh_r
if apply_rank_update:
for k in range(rank): apply_rank_update(s, U_r[:, k], s_r[k], Vh_r[k, :])
# hachal tikun la-shura al-pi shgi'a she'arit gdola
residual = desired-recon
mask = np.abs(residual) > tol
if apply_row_update:
rows_to_fix = np.where(mask.any(axis=1))[0]
for r in rows_to_fix:
apply_row_update(r, desired[r:r+1])
recon[r:r+1] = desired[r:r+1]
return recon
class TICTAC_JXML_IO:
@staticmethod
def _write(pth: str, cd: list):
r = len(cd)
c = len(cd[0]) if r > 0 else 0
arr = ET.Element('tictac2d', r=str(r), c=str(c))
for r_i, r_s in enumerate(cd):
r_e = ET.SubElement(arr, 'row', idx=str(r_i))
for c_i, c_s in enumerate(r_s):
ET.SubElement(r_e, 'val', col=str(c_i), real=str(c_s.real), imag=str(c_s.imag))
_map_xml(pth, arr)
@staticmethod
def _read(pth: str) -> list:
tree = ET.parse(pth)
root = tree.getroot()
rws = []
for r_e in root.findall('row'):
r = []
for v_t in r_e.findall('val'):
real = float(v_t.get('real', '0'))
imag = float(v_t.get('imag', '0'))
r.append(complex(real, imag))
rws.append(r)
return rws
@staticmethod
def _read_complex(pth: str, r: int, c: int) -> complex:
tree = ET.parse(pth)
root = tree.getroot()
r_e = root.find(f"./row[@idx='{r}']")
v_t = r_e.find(f"./val[@col='{c}']")
return complex(float(v_t.get('real', '0')), float(v_t.get('imag', '0')))
@staticmethod
def _edit_complex(pth: str, r: int, c: int, v: complex):
tree = ET.parse(pth)
root = tree.getroot()
r_e = root.find(f"./row[@idx='{r}']")
v_t = r_e.find(f"./val[@col='{c}']")
v_t.set('real', str(v.real))
v_t.set('imag', str(v.imag))
_map_xml(pth, root)
@staticmethod
def _insert_row(pth: str, r_i: int, r_v: list):
tree = ET.parse(pth)
root = tree.getroot()
rws = root.findall('row')
rws_len = len(rws)
c_cnt = int(root.get('c', '0'))
for r in rws[r_i:]:
r.set('idx', str(int(r.get('idx'))+1))
n_r = ET.Element('row', idx=str(r_i))
for c_i, c in enumerate(r_v):
ET.SubElement(n_r, 'val', col=str(c_i), real=str(c.real), imag=str(c.imag))
root.insert(r_i, n_r)
root.set('r', str(rws_len))
if c_cnt == 0:
root.set('c', str(len(r_v)))
_map_xml(pth, root)
@staticmethod
def _remove_row(pth: str, r_i: int):
tree = ET.parse(pth)
root = tree.getroot()
trgt = root.find(f"./row[@idx='{r_i}']")
root.remove(trgt)
for i, r in enumerate(root.findall('row')):
r.set('idx', str(i))
root.set('r', str(len(root.findall('row'))))
_map_xml(pth, root)
@staticmethod
def _insert_complex(pth: str, r_i: int, c_i: int, v: complex):
tree = ET.parse(pth)
root = tree.getroot()
r_e = root.find(f"./row[@idx='{r_i}']")
c_v = r_e.findall('val')
c_len = len(c_v)
for c_c in c_v[c_i:]:
c_c.set('col', str(int(c_c.get('col'))+1))
n_v = ET.Element('val', col=str(c_i), real=str(v.real), imag=str(v.imag))
r_e.insert(c_i, n_v)
root.set('c', str(max(int(root.get('c', '0')), c_len+1)))
_map_xml(pth, root)
@staticmethod
def _remove_complex(pth: str, r_i: int, c_i: int):
tree = ET.parse(pth)
root = tree.getroot()
r_e = root.find(f"./row[@idx='{r_i}']")
trgt = r_e.find(f"./val[@col='{c_i}']")
r_e.remove(trgt)
for i, c_v in enumerate(r_e.findall('val')):
c_v.set('col', str(i))
max_c = max((len(r.findall('val')) for r in root.findall('row')), default=0)
root.set('c', str(max_c))
_map_xml(pth, root)
class TICTAC_DOM_KRNL():
slots = ('_dom_csfs', '_shl_ry1', '_shl_ry2', '_shl_ry3', '_shl_ry4', '_last_shl',
'_cur_jxml')
def __init__(self):
self._last_shl = -1
self.lang_shell()
def lang_dirs(self, dir: str) -> str:
if dir == 'MAIN':
return Path(TT_DOM_PTH) / 'TICTAC_DOM_1_02'
elif dir == 'EWWW':
return Path(TT_DOM_PTH) / 'TICTAC_DOM_1_02' / 'EWWW_T_D'
elif dir == 'JXML':
return Path(TT_DOM_PTH) / 'TICTAC_DOM_1_02' / 'EWWW_T_D' / 'JXML_T_D'
def lang_shell(self):
if self._last_shl > -1:
pass
else:
new_dirs = False
fldr_pth = self.lang_dirs('MAIN')
if not os.path.isdir(fldr_pth):
os.makedirs(fldr_pth)
new_dirs = True
fldr_pth = self.lang_dirs('JXML')
if not os.path.isdir(fldr_pth):
os.makedirs(fldr_pth)
new_dirs = True
self.tt_domain_tpk(new_dirs)
def tt_domain_tpk(self, isNewDirs: bool):
JXML = TICTAC_JXML_IO()
if not isNewDirs:
pass
else:
self._cur_jxml = '_ttd_plex_1.xml'
pth = self.lang_dirs('JXML') / self._cur_jxml
JXML._write(pth, [[1+2j, 2-3j, 4+5j, 6-7j], [8-9j, 7+6j, 5-4j, 3+2j]])
tpk = ['LAST-JXML=_ttd_plex_1.xml',
'GRAPH-MEDIA=default_flir',
'DOMAIN-PAIRING-ORDER=null',
'DOMAIN-PAIRING-SEQ=null',
'DOMAIN-PAIRING-LMT=null',
'DOMAIN-PAIRING-PLUG-IN-OUTER=null',
'DOMAIN-PAIRING-PLUG-IN-INNER=null',
'DOMAIN-PAIRING-PLUG-IN-OUTER-PATH=null',
'DOMAIN-PAIRING-PLUG-IN-INNER-PATH=null',
'DOMAIN-CYCLE-RSZ1-OUTER=null',
'DOMAIN-CYCLE-RSZ2-OUTER=null',
'DOMAIN-CYCLE-RSZ1-INNER=null',
'DOMAIN-CYCLE-RSZ2-INNER=null',
'DOMAIN-SHELL-LAST=null',
'DOMAIN-SHELL-DIM=null']
pth = self.lang_dirs('EWWW') / 'ttd_dom_cfg.tpk'
self.tt_dom_write(pth, '\n'.join(tpk))
def tt_dom_print(self, txt: str, indent=0):
if indent == 2: print(f' .:{txt}')
else: print(f'-:. {txt}')
def tt_dom_write(self, pth: str, src: str):
with open(pth, mode='w', encoding='utf-8') as f: f.write(src)
def pre_lang_updt(self, src: (str, list)) -> list:
if isinstance(src, str) or isinstance(src, list):
if isinstance(src, str): src = src.split('\n')
else:
raise RuntimeError('<TT_DOM 1.02> tt-dom source code must be a str or list type')
src, rtn = [ln.lower().replace('~', '').replace(' ', '') for ln in src], []
cmmnt_pos_lst = [ln.find('||') for ln in src]
for i, ln in enumerate(src):
cmmnt_pos = cmmnt_pos_lst[i]
if ln:
if cmmnt_pos > -1:
if cmmnt_pos > 0: rtn.append(ln[:cmmnt_pos-len(ln)])
else: rtn.append('~')
else: rtn.append(ln)
else: rtn.append('~')
return rtn
def lang_updt(self, src: (str, list)):
print(f'TT-DOM Plex Interpreter is running, current plex-signal source code to process: {len(src)} line(s)')
src = self.pre_lang_updt(src)
if len(src) > 0:
cmd_re_lst = [r'tq.*\(.*\)']
self._dom_csfs = []
ln_n = 1
for ln in src:
ln_len = len(ln)
if ln_len > 1:
if ln_len > 6:
if ln[ln_len-1] == ':':
if ln[0] == 't' and ln[1] == 'q': self.cmd_lang_tq(ln[:ln_len-1], ln_n, cmd_re_lst[0])
else: raise RuntimeError(f'<TT_DOM 1.02> line({ln_n}) has no recognizable cmd, struct, fork or sym domain')
else: raise RuntimeError(f'<TT_DOM 1.02> line({ln_n}) missing a end line ":" terminator')
else: raise RuntimeError(f'<TT_DOM 1.02> line({ln_n}) non-recognizable syntax line length, less than 7 chars')
ln_n+=1
else: raise RuntimeError('<TT-DOM 1.02> no visible source code lines found')
def cmd_lang_tq(self, ln: str, ln_n: int, cel: str):
# TEMPLATE TQ:
# tq-|+1-4(shell<> | scan<> | respawn<> | wait<>) 3+2j, 5-2j, 1+9j:
# tq-|+1-4(") 3+2j, 5-2j, 1+9j; tq-2(") 3-2j, 5+2j, 1+9j:
# self._dom_csfs.append(('cmd', 'tq', ln_n-1, ln)}
tic = re.compile(r'^[+-]?\d[+-]\d[ijIJ]$')
try:
if ln.find(';') > -1: ln = ln.split(';')
else: ln = [ln]
for tq in ln:
tq_cmd = re.findall(cel, tq)
if len(tq_cmd) == 1:
tq_cmd = tq_cmd[0].split('(')
if len(tq_cmd[0]) == 4:
convr_smd, shell_cmd = tq_cmd[0].replace('tq', ''), tq_cmd[1].replace(')', '')
if len(convr_smd) == 2:
if convr_smd.startswith(('-', '+')) and convr_smd[1].isdigit():
tq_cmd = tq.split(')')
if tq_cmd[1].find(',') > -1:
tq_cmd = tq_cmd[1].split(',')
if all(bool(tic.fullmatch(T)) for T in tq_cmd):
if shell_cmd.find('shell<') > -1:
self.cmd_lang_tq_shell(shell_cmd.replace('shell<', '').replace('>', ''), convr_smd, tq_cmd)
elif shell_cmd.find('scan<') > -1:
self.cmd_lang_tq_scan(shell_cmd.replace('scan<', '').replace('>', ''), convr_smd, tq_cmd)
elif shell_cmd.find('respawn<') > -1:
pass
elif shell_cmd.find('wait<') > -1:
pass
else: raise RuntimeError(f'valid tq shell director not found; valid tqsd: shell, scan, respawn or wait')
else: raise RuntimeError(f'improper complex number notation found')
else: raise RuntimeError(f'no comma(s) found for tq complex list')
else: raise RuntimeError(f'improper tq shell matrix director syntax')
else: raise RuntimeError(f'improper tq shell director syntax')
else: raise RuntimeError(f'improper tq shell director syntax')
else: raise RuntimeError(f'improper syntax for tq cmd or missing')
except Exception as err_lang_tq: raise RuntimeError(f'<TT-DOM 1.02> tq- code err line({ln_n}), {err_lang_tq}')
def cmd_lang_tq_shell(self, shell_cmd: str, convr_smd: str, cn_lst: list):
# shell<exponent_ratio_start=int 1-3, svd_rank_max=int 3-5, target_rows_length=int 5-25>
self.tt_dom_print(f'mapping tq- shell director //SHELL /smd priority: {convr_smd} /total plex: {len(cn_lst)}')
self.parse_tq_complex_list(cn_lst)
if shell_cmd.find(',') > -1:
shell_cmd = shell_cmd.split(',')
if len(shell_cmd) == 3:
shell_cmd = [int(i) for i in shell_cmd if i.isdigit()]
if len(shell_cmd) == 3:
if shell_cmd[0] > 0 and shell_cmd[0] < 4:
if shell_cmd[1] > 2 and shell_cmd[1] < 6:
if shell_cmd[2] > 4 and shell_cmd[2] < 26:
self.tt_dom_print(f'exponent_ratio_start={shell_cmd[0]}', 2)
self.tt_dom_print(f'svd_rank_max={shell_cmd[1]}', 2)
self.tt_dom_print(f'target_rows_length={shell_cmd[2]}', 2)
# ><)))'>
else: raise RuntimeError(f'tq direct shell director arg "target_rows_length" err; valid int range: 5-25')
else: raise RuntimeError(f'tq direct shell director arg "svd_rank_max" err: valid int range: 3-5')
else: raise RuntimeError(f'tq direct shell director arg "exponent_ratio_start" err; valid int range: 1-3')
else: raise RuntimeError(f'tq shell director(direct "shell<>" call) missing int vals')
else: raise RuntimeError(f'tq shell director(direct "shell<>" call) missing params')
else: raise RuntimeError(f'tq shell director(direct "shell<>" call) params invalid')
def cmd_lang_tq_scan(self, shell_cmd: str, convr_smd: str, cn_lst: list):
# scan<nsx=int 360-720, xmax=float 1.0-9.9, hbar=float 1.0-4.9, mbar=float 1.0-4.9>
self.tt_dom_print(f'mapping tq- shell director //SCAN /smd priority: {convr_smd} /total plex: {len(cn_lst)}')
self.parse_tq_complex_list(cn_lst)
if shell_cmd.find(',') > -1:
shell_cmd = shell_cmd.split(',')
if len(shell_cmd) == 4:
if shell_cmd[0].isdigit():
nsx = int(shell_cmd[0])
if nsx > 359 and nsx < 721:
shell_cmd[0] = nsx
scp = self.parse_float_params(False, shell_cmd[1:])
if not isinstance(scp[0], str):
scp = tuple(scp)
xmax, hbar, mbar = scp
if xmax > 0.9 and xmax < 10.0:
if hbar > 0.9 and hbar < 5.0:
if mbar > 0.9 and mbar < 5.0:
self.tt_dom_print(f'nsx={nsx}', 2)
self.tt_dom_print(f'xmax={xmax}', 2)
self.tt_dom_print(f'hbar={hbar}', 2)
self.tt_dom_print(f'mbar={mbar}', 2)
else: raise RuntimeError(f'tq shell director("scan<> call") invalid param range @ "mbar"; valid: float 1.0-4.9')
else: raise RuntimeError(f'tq shell director("scan<> call") invalid param range @ "hbar"; valid: float 1.0-4.9')
else: raise RuntimeError(f'tq shell director("scan<> call") invalid param range @ "xmax"; valid: float 1.0-9.9')
else:
scp, p_nm = int(scp[0]), None
if scp == 0: p_nm = 'xmax'
elif scp == 1: p_nm = 'hbar'
else: p_nm = 'mbar'
raise RuntimeError(f'tq shell director("scan<> call") invalid param "{p_nm}" negative float or non-float error')
else: raise RuntimeError(f'tq shell director("scan<> call) invalid param range @ "nsx"; valid: int 360-720')
else: raise RuntimeError(f'tq shell director("scan<>" call) nsx param invalid type; valid: int 360-720')
else: raise RuntimeError(f'tq shell director("scan<>" call) missing params')
else: raise RuntimeError(f'tq shell director("scan<>" call) params invalid')
def parse_tq_complex_list(self, cn_lst: str) -> list:
cnvrt_n_check = lambda s: (cil := list(map(complex, s)), any(c.real < 0 or c.imag < 0 for c in cil))
cn_t = cnvrt_n_check(cn_lst)
return cn_t
def parse_float_params(self, allowsNegative: bool, f: (str, list)) -> list:
rtn, idx = [], -1
if isinstance(f, str): f = [f]
for dn in f:
idx+=1
if dn.find('.') < 0:
return [str(idx)]
pn = dn.replace('.', '')
if not allowsNegative and pn.find('-') > -1:
return [str(idx)]
if pn.isdigit(): rtn.append(float(dn))
else:
return [str(idx)]
return rtn
def run_tt_dom_inpr_testing():
#l_r1 = [5+2j, 3+2j, 1+6j, 4+6j, 6+2j, 8+4j, 3+9j, 4+2j]
#l_r2 = [7+1j, 8+3j, 3+7j, 2+9j, 9+6j, 8+1j, 7+4j, 3+8j]
#l_r3 = [[4+2j, 1+4j], [6+2j, 3+8j], [complex(2, 4), complex(5, 1)]]
#r_vz = 2.6229108
#cls = TICTAC_UXCM1(True, 124.13, 8.08)
#print(cls.adj_bw_exp(r_vz, l_r1, l_r2))
tt_dom_src = ['TQ-2(SHELL<2, 4, 22>) 7+4j, 3+2j, 5+8j, 6+1j, 2+5j, 4+9j, 3+6j, 1+4j: || comment1',
'TQ+3(SCAN<361, 3.6, 2.4, 2.9>) 4+2j, 7+8j, 3+2j, 8+2j, 4+7j, 3+2j, 9+1j, 3+4j: || comment2',
'|| comment3 ..............................................................',
' ',
'']
cls = TICTAC_DOM_KRNL()
cls.lang_updt(tt_dom_src)
#k_shell_frame_learn(123, 123)
# FLIR IMAGING SEQUENCE TEST(tq- respawn cmd)
#___________________________________________________________________________________________________
#gh, gw = 30, 40
#y = np.linspace(-1, 1, gh)
#x = np.linspace(-1, 1, gw)
#XX, YY = np.meshgrid(x, y)
#radius = np.hypot(XX, YY)
#amp = np.exp(-((radius-0.3)**2)/(2*0.08**2))+0.05*np.random.RandomState(2).randn(gh, gw)
#phase = np.arctan2(YY, XX)+2.0*(XX**2-YY**2)
#complex_grid = (amp*np.exp(1j*phase)).astype(np.complex128)
#sim = FlirSimulator(complex_grid, H=256, W=384, rng_seed=7)
#frames = sim.simulate(frames=90, start_idx=100, end_idx=999, save_dir='tt_dom_flir', save_pngs=True)
run_tt_dom_inpr_testing()
@lastforkbender
Copy link
Author

GPT-5 mini

Tron Ares functions as a corporate-produced artifact (Disney-backed) that isn’t overtly about corporations, but is nevertheless worshipped within a broader “big tech religion.” It positions Trent Reznor as a high‑priest figure, underscored by Nine Inch Nails' main song—shouted, "Give me something to believe in"—which crystallizes the film’s call. The movie depicts devotees who accept tech‑washed narratives and aestheticized dissent—trading real demands for curated spectacle—so the film itself becomes both product and sermon: an enticing altar that channels attention and desire into further concentration of profit and technological control. What's next, Leto blowing holographic $ chips at paparazzi intending the Gala, while wearing synchronized sabers?

@lastforkbender
Copy link
Author

Couldn't have stated better.

@lastforkbender
Copy link
Author

lastforkbender commented Oct 11, 2025

If you can fully explain the shell redock, with the section above it ---- how is exactly used with the real Ares AI of the B-Spline integration.....that would pretty much make you God. Seeing you are a Wall Street Babylon Whore of a Red Carpet Beast....is logical I'm sent here and so is posted. FU

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment