Created
September 28, 2025 08:44
-
-
Save lastforkbender/db155e1e75b9b92cd2812e52ffd4db37 to your computer and use it in GitHub Desktop.
hybrid collision ast cube/spheres error chaining
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # catpis.py - hybrid collision ast cube/spheres error chaining | |
| #____________________________________________________________________________________ | |
| from collections import Counter | |
| import random | |
| import cmath | |
| import math | |
| import json | |
| import time | |
| import sys | |
| import os | |
| #____________________________________________________________________________________ | |
| #//////////////////////////////////////////////////////////////////////////////////// | |
| GRID_W = 8 | |
| GRID_H = 8 | |
| class MiniQ8P: | |
| def set_aa_short_bus(self, vhs_com): | |
| mtx, vhs, chs, chn = vhs_com[0], vhs_com[1], vhs_com[2], vhs_com[3] | |
| t = time.ctime().replace(' ', '').split(':') | |
| t = math.log(int(f'{"".join([c for c in t if c.isdigit()])}{t[1]}')*vhs) | |
| mtx_len = len(mtx) | |
| if mtx_len%2 != 0: | |
| raise Exception(f'<catpis(mini-q8p)> variance matrices length({len(mtx)}) not even') | |
| mtx = [self.aa_vhs_jammer(mtx[i], mtx[i+1], vhs, chs, chn) for i in range(mtx_len-1)] | |
| return [self.solve_aa_short_bus_pvt_cycle(t, mtx[i]) for i in range(mtx_len-1)] | |
| #___________________________________________________________________________________ | |
| def solve_aa_short_bus_pvt_cycle(self, t, mt): | |
| hd, R, tlr, srv = mt[0], mt[1], mt[2], mt[3] | |
| if tlr < srv: gr = self.get_pvt_jam_range(False, t, tlr, srv) | |
| else: gr = self.get_pvt_jam_range(True, t, tlr, srv) | |
| if gr+hd > abs(cmath.sqrt(R.real)*cmath.sqrt(R.imag))**hd/gr: | |
| return math.floor(abs((t+gr)*cmath.cos(R))+tlr), math.floor((gr+hd*t)/t) | |
| else: | |
| return 1, math.floor(((gr*hd)/abs(cmath.sin(R)))/t) | |
| #___________________________________________________________________________________ | |
| def get_pvt_jam_range(self, isRested, t, tlr, srv): | |
| x = tlr*srv/math.sqrt(t) | |
| if isRested: | |
| return math.ceil(abs(cmath.sqrt(complex((x+t)/math.tanh(t*2), x*t)))/(tlr+srv)) | |
| return math.ceil(abs(cmath.sqrt(complex(x+t, x*t)))/math.pi) | |
| #___________________________________________________________________________________ | |
| def aa_vhs_jammer(self, vrncA, vrncB, vhs, chs, chn): | |
| c_ttl = (chs.real+chs.imag)+(chn.real+chn.imag) | |
| if sum(vrncA) < c_ttl or sum(vrncB) < c_ttl: | |
| raise ValueError(f'<catpis(mini-q8p)> @chs + @chn summation({c_ttl}) > listed variances') | |
| hd = (int(vhs*sum(vrncA)*sum(vrncB)*math.pi)*int(math.sqrt(abs(chs)+abs(chn))))*math.pi | |
| R = min(math.cos(abs(chs)+abs(chn)), math.sin(vhs+abs(chn))) | |
| t, q = cmath.tan(((chs*chn)/math.pi)+R), math.pi/2 | |
| k = (vrncA[0]*vrncA[len(vrncA)-1]*vrncB[0]*vrncB[len(vrncB)-1])**t | |
| ds, s, c, r, rv = math.ceil(abs(cmath.exp(R+chs-chn)))**vhs*t, hd, 1, set(), set() | |
| t+=k**R | |
| while s > -1: | |
| for h in vrncA: | |
| t+=h-ds | |
| m = (t*k+s)-cmath.sqrt((t-ds)+c)-t+(ds*(c*2)) | |
| if m.real < 0: | |
| q = abs(c-t) | |
| for n in vrncB: | |
| w = sum([math.sin(abs(m)+q) for x in vrncB]) | |
| a_ds, a_ks = abs(ds), abs(cmath.cos(k)) | |
| if ((w/(c/2))+(R*n))-a_ds <= (vhs+c+R)/(math.pi-abs(t)+a_ks): | |
| r.add(w) | |
| else: | |
| t-=vhs | |
| n = math.sin((abs(cmath.cos(t)+ds)//(c*abs(t)))) | |
| if n > 0.0: | |
| n, cn = math.floor(n*c*abs(cmath.tan(k+m))), 0 | |
| lm = [complex((t*c)/(n+1)/q, -1) for x in range(n)] | |
| r = list(r) | |
| d = len(r) | |
| if d > 0: | |
| while cn < len(lm): | |
| l = -(abs(lm[cn])-cn)/q | |
| if cn >= d: cn = 0 | |
| l = l+r[cn] | |
| if l > 0.0: rv.add(l) | |
| cn+=1 | |
| r = set(r) | |
| a_m = abs(m) | |
| if a_m > 0: r.add(a_m/(abs(cmath.sqrt(chn))+math.pi)) | |
| s+=-1 | |
| c+=1 | |
| r, rv = list(r), list(rv) | |
| rv.sort() | |
| tlr = -math.log((abs(cmath.tan(t))/hd)*abs(cmath.log(cmath.sqrt(m+hd)))) | |
| if not rv: | |
| rv = [random.uniform(n, (n*n)+tlr/(n+1)) for n in range(len(r)//3)] | |
| return hd, R+cmath.log(t), tlr, sum(r)/(sum(rv)/math.pi)/(cn+1) | |
| return hd, R+cmath.log(t), tlr, sum(r)/sum(rv)/(cn+1) | |
| #____________________________________________________________________________________ | |
| def set_cfg(): | |
| rng, cfg_grid= random.Random(random.randint(1199, 1599)), [] | |
| t_rng = [(0.1, 0.7), (0.2, 0.9), (0.6, 1.2), (0.8, 1.6), (1.0, 1.9), (1.5, 2.8)] | |
| for _ in range(8): | |
| cfg_grid.append([rng.uniform(t_rng[i][0], t_rng[i][1]) for i in range(6)]) | |
| vhs = rng.uniform(0.100, 0.210) | |
| chs, chn = complex(1.0, rng.uniform(0.1, 0.5)), complex(1.0, rng.uniform(0.1, 0.5)) | |
| mini_q8p = MiniQ8P() | |
| sbnl = mini_q8p.set_aa_short_bus((cfg_grid, vhs, chs, chn)) | |
| sbnl_res = [] | |
| for l in sbnl: | |
| n = l[0]+l[1] | |
| if n < 0: n = -n | |
| sbnl_res.append(n) | |
| sbnl_res = [x//3 for x in sbnl_res] | |
| eip = max(sbnl_res)/min(sbnl_res)/math.pi/(GRID_H*GRID_W) | |
| mrd = int(math.exp(math.sqrt(max(sbnl_res)/(math.pi+rng.randint(9, 18))))) | |
| tms = float(math.floor(math.log((max(sbnl_res)/2)-(min(sbnl_res)/2)))) | |
| brs = tms/(rng.uniform(1.10, 1.21))/min(sbnl_res) | |
| return {'seed': None, | |
| 'steps': None, | |
| 'grid': {'w': GRID_W, 'h': GRID_H}, | |
| 'sphere_count': min(sbnl_res), | |
| 'error_injection_prob': eip, | |
| 'max_reflection_depth': mrd//3, | |
| 'timestep': tms, | |
| 'bucket_resolution': brs} | |
| #____________________________________________________________________________________ | |
| DEFAULT_SEED = random.randint(1110, 1599) | |
| DEFAULT_STEPS = random.randint(1599, 2111) | |
| IMAGE_SIZE = 256 | |
| CFG = set_cfg() | |
| #____________________________________________________________________________________ | |
| #//////////////////////////////////////////////////////////////////////////////////// | |
| class CrateTapRNG: | |
| def __init__(self, seed): | |
| self.seed = seed | |
| self.rng = random.Random(seed) | |
| #____________________________________________________________________________________ | |
| def next(self): | |
| return self.rng.random() | |
| #____________________________________________________________________________________ | |
| def randint(self, a, b): | |
| return self.rng.randint(a, b) | |
| #____________________________________________________________________________________ | |
| def choice(self, seq): | |
| return self.rng.choice(seq) | |
| #____________________________________________________________________________________ | |
| def uniform(self, a, b): | |
| return self.rng.uniform(a, b) | |
| #____________________________________________________________________________________ | |
| def spawn_initial_grid(w, h, rng): | |
| cubes, ids = {}, [] | |
| for y in range(h): | |
| for x in range(w): | |
| cid = f'C{x}_{y}' | |
| cubes[cid] = {'id': cid, | |
| 'grid_pos': [x, y], | |
| 'eagle_pos': [x+(y*0.5),y*0.866], | |
| 'orientation': int(rng.randint(0, 3)), | |
| 'value': rng.randint(0, 255)} | |
| ids.append(cid) | |
| rng.rng.shuffle(ids) | |
| return cubes, ids | |
| #____________________________________________________________________________________ | |
| def spawn_spheres(count, rng, bounds): | |
| spheres = {} | |
| for i in range(count): | |
| sid = f'S{i}' | |
| spheres[sid] = {'id': sid, | |
| 'pos': [rng.uniform(0, bounds[0])+i, rng.uniform(0, bounds[1])], | |
| 'vel': [rng.uniform(-0.2, 0.9), rng.uniform(-0.9, 1.5)], | |
| 'r': rng.uniform(0.3, 1.2), | |
| 'mass': rng.uniform(0.5, 0.9)} | |
| return spheres | |
| #____________________________________________________________________________________ | |
| def ast_eval(node, z, rng, error_injection_prob, trace): | |
| if node is None: | |
| return complex(0, 0) | |
| if 'const' in node: | |
| return complex(node['const']) | |
| if 'var' in node: | |
| if node['var'] == 'z': | |
| return complex(z) | |
| raise ValueError('<catpis> unknown variable') | |
| op = node.get('op') | |
| if op in ('add', 'sub', 'mul' ,'div', 'pow'): | |
| a = ast_eval(node['a'], z, rng, error_injection_prob, trace) | |
| b = ast_eval(node['b'], z, rng, error_injection_prob, trace) | |
| try: | |
| if op == 'add': | |
| return a+b | |
| if op == 'sub': | |
| return a-b | |
| if op == 'mul': | |
| return a*b | |
| if op == 'div': | |
| return a/b if b!=0 else complex(float('inf')) | |
| if op == 'pow': | |
| return complex(pow(a, b.real)) | |
| except Exception as e: | |
| trace.append({'type':'eval_error','op':op,'err':str(e)}) | |
| return complex(float('nan')) | |
| if op in ('sin', 'cos', 'abs', 'arg', 'conj'): | |
| v = ast_eval(node['v'], z, rng, error_injection_prob, trace) | |
| try: | |
| if op == 'sin': | |
| return complex(math.sin(v.real)*math.cosh(v.imag), math.cos(v.real)*math.sinh(v.imag)) | |
| if op == 'cos': | |
| return complex(math.cos(v.real)*math.cosh(v.imag), -math.sin(v.real)*math.sinh(v.imag)) | |
| if op == 'abs': | |
| return complex(abs(v), 0.0) | |
| if op == 'arg': | |
| return complex(math.atan2(v.imag, v.real), 0.0) | |
| if op == 'conj': | |
| return complex(v.real, -v.imag) | |
| except Exception as e: | |
| trace.append({'type':'eval_error','op':op,'err':str(e)}) | |
| return complex(float('nan')) | |
| if op == 'noise': | |
| r1, r2 = rng.next(), rng.next() | |
| val = complex(r1-0.5, r2-0.5) | |
| if rng.next() < error_injection_prob: | |
| trace.append({'type':'injected_error','op':'noise','r1':r1,'r2':r2}) | |
| if rng.next() < 0.5: | |
| return complex(float('nan'), val.imag) | |
| return complex(-val.real, -val.imag) | |
| return val | |
| if op == 'mix': | |
| a = ast_eval(node['a'], z, rng, error_injection_prob, trace) | |
| b = ast_eval(node['b'], z, rng, error_injection_prob, trace) | |
| t = float(ast_eval(node['t'], z, rng, error_injection_prob, trace).real) | |
| return a*(1-t)+b*t | |
| if op == 'cond': | |
| cond = node['cond'] | |
| if cond.get('op') == 'gt': | |
| A = ast_eval(cond['a'], z, rng, error_injection_prob, trace) | |
| B = ast_eval(cond['b'], z, rng, error_injection_prob, trace) | |
| if A.real > B.real: | |
| return ast_eval(node['a'], z, rng, error_injection_prob, trace) | |
| else: | |
| return ast_eval(node['b'], z, rng, error_injection_prob, trace) | |
| raise ValueError('<catpis> unknown ast op, "{op}"') | |
| #____________________________________________________________________________________ | |
| def generate_ast(depth, rng): | |
| if depth <= 0: | |
| if rng.next() < rng.uniform(0.3, 0.9): | |
| return {'var': 'z'} | |
| return {'const': rng.randint(-3,3)} | |
| r = rng.next() | |
| if r < 0.15: | |
| return {'op': 'noise'} | |
| if r < 0.4: | |
| op = rng.choice(['add', 'sub', 'mul', 'div', 'pow']) | |
| return {'op': op, 'a': generate_ast(depth-1, rng), 'b': generate_ast(depth-1, rng)} | |
| if r < 0.6: | |
| op = rng.choice(['sin', 'cos', 'abs', 'arg', 'conj']) | |
| return {'op': op, 'v': generate_ast(depth-1, rng)} | |
| if r < 0.75: | |
| return {'op': 'mix', 'a': generate_ast(depth-1,rng), 'b' :generate_ast(depth-1,rng), 't': {'const': rng.next()}} | |
| return {'op': 'cond', 'cond': {'op': 'gt', 'a' :{ 'op': 'abs', 'v' :{'var' : 'z'}}, 'b' : {'const': rng.uniform(0.1,3.0)}}, | |
| 'a': generate_ast(depth-1,rng), 'b': generate_ast(depth-1,rng)} | |
| #____________________________________________________________________________________ | |
| def bucket_key_for_cube(cube, resolution=0.5, coord_sys='eagle'): | |
| if coord_sys == 'grid': | |
| gx, gy = cube['grid_pos'] | |
| return (int(gx), int(gy), cube['orientation'], cube['value']//16) | |
| x, y = cube['eagle_pos'] | |
| print(f' :X={x}') | |
| print(f' :Y={y}') | |
| resolution+=random.uniform(0.1, 0.3) | |
| print(f'RESOLUTION :: {resolution}') | |
| bx, by = int(math.floor(x/resolution)), int(math.floor(y / resolution)) | |
| return (bx, by, cube['orientation'], cube['value']//16) | |
| #____________________________________________________________________________________ | |
| def spawn_tensor_from_complex(out): | |
| if out is None: | |
| return None | |
| r = out.real if (not math.isnan(out.real) if hasattr(math, 'isnan') else True) else float('nan') | |
| i = out.imag if (not math.isnan(out.imag) if hasattr(math, 'isnan') else True) else float('nan') | |
| try: | |
| mag = math.hypot(r, i) if not (math.isnan(r) or math.isnan(i)) else float('nan') | |
| arg = math.atan2(i, r) if not (math.isnan(r) or math.isnan(i)) else float('nan') | |
| except: | |
| mag = float('nan'); arg = float('nan') | |
| return [r, i, mag, arg] | |
| #____________________________________________________________________________________ | |
| def tensor_distance(a, b): | |
| if a is None or b is None: | |
| return float('inf') | |
| s = 0.0 | |
| for x,y in zip(a,b): | |
| if math.isnan(x) or math.isnan(y): s+=1e3 | |
| else: | |
| try: | |
| s+=(x-y)**2 | |
| except: | |
| s+=random.randint(1, 5) | |
| return math.sqrt(s) | |
| #____________________________________________________________________________________ | |
| NODE_CACHE = {} | |
| def eval_with_cache(node_name, ast, cube, rng, coord_sys='eagle', tolerance=0.5, policy='mutate'): | |
| trace = [] | |
| z = complex(cube['eagle_pos'][0], cube['eagle_pos'][1]) | |
| out = ast_eval(ast, z, rng, CFG['error_injection_prob'], trace) | |
| tensor = spawn_tensor_from_complex(out) | |
| key = bucket_key_for_cube(cube, resolution=CFG['bucket_resolution'], coord_sys=coord_sys) | |
| node_cache = NODE_CACHE.setdefault(node_name, {}) | |
| if key not in node_cache: | |
| node_cache[key] = list(tensor) if tensor is not None else None | |
| return out, trace, None | |
| cached = node_cache[key] | |
| dist = tensor_distance(cached, tensor) | |
| if dist > tolerance: | |
| mismatch = {'type': 'cache_mismatch', 'node': node_name, 'key': key, | |
| 'dist': dist, 'cached': cached, 'current': tensor} | |
| r = rng.next() | |
| if policy == 'mutate': | |
| alpha = 0.3+(r*random.uniform(0.4, 0.9)) | |
| new_cache = [] | |
| for c,t in zip(cached, tensor): | |
| if math.isnan(c): new_cache.append(t if not math.isnan(t) else 0.0) | |
| elif math.isnan(t): new_cache.append(c) | |
| else: new_cache.append(c*(1-alpha)+t*alpha) | |
| node_cache[key] = new_cache | |
| if tensor is not None and not math.isnan(tensor[0]): | |
| out = complex(tensor[0]*(1+(r-0.5)*random.uniform(0.3, 0.9)), tensor[1]*(1+(r-0.5)*0.1)) | |
| elif policy == 'corrupt': | |
| node_cache[key] = cached | |
| out = complex(float('nan'), float('nan')) | |
| elif policy == 'overwrite': node_cache[key] = list(tensor) | |
| return out, trace, mismatch | |
| return out, trace, None | |
| #____________________________________________________________________________________ | |
| def apply_reflection_tree(cube_ids, nodes, cubes, rng, event_trace): | |
| affected = set(cube_ids) | |
| for depth, node in enumerate(nodes): | |
| new_affected = set() | |
| for cid in list(affected): | |
| cube = cubes[cid] | |
| coord_sys = node.get('coord_sys', 'eagle') | |
| tol = node.get('tol', 0.5) | |
| policy = node.get('policy', 'mutate') | |
| out, trace, mismatch = eval_with_cache(node['name'], node['field'], cube, rng, coord_sys=coord_sys, tolerance=tol, policy=policy) | |
| if trace: | |
| event_trace.append({'type': 'eval_trace', 'node': node['name'], 'cube': cid, 'trace': trace}) | |
| if mismatch: | |
| mismatch['cube'] = cid | |
| event_trace.append(mismatch) | |
| selected = False | |
| try: | |
| if math.isnan(out.real) or math.isnan(out.imag): | |
| if rng.next() < 0.5: selected = True | |
| else: | |
| mag = math.hypot(out.real, out.imag) | |
| selected = (mag%1.0) > 0.5 | |
| except: | |
| if rng.next() < rng.uniform(0.5, 0.7): selected = True | |
| if selected: | |
| typ = node.get('type', 'reflect') | |
| if typ == 'reflect': | |
| angle = node.get('angle', 0.0)+(rng.next()-rng.uniform(0.1, 0.5))*0.1 | |
| x, y = cube['eagle_pos'] | |
| ca = math.cos(angle); sa = math.sin(angle) | |
| rx = ca*x+sa*y | |
| ry = -sa*x+ca*y | |
| rx = -rx | |
| x2 = ca*rx-sa*ry | |
| y2 = sa*rx+ca*ry | |
| cube['eagle_pos'] = [x2, y2] | |
| cube['orientation'] = (cube['orientation']+1)%4 | |
| elif typ == 'rotate': | |
| ang = node.get('angle', 0.5)+(rng.next()-0.5)*0.2 | |
| x, y = cube['eagle_pos'] | |
| ca = math.cos(ang); sa = math.sin(ang) | |
| x2 = ca*x-sa*y | |
| y2 = sa*x+ca*y | |
| cube['eagle_pos'] = [x2, y2] | |
| cube['orientation'] = (cube['orientation']+1)%4 | |
| new_affected.add(cid) | |
| affected = new_affected if new_affected else affected | |
| return list(affected) | |
| #____________________________________________________________________________________ | |
| def step_spheres(spheres, cubes, rng, bounds, event_trace): | |
| for sid, s in spheres.items(): | |
| s['pos'][0]+=s['vel'][0]*CFG['timestep'] | |
| s['pos'][1]+=s['vel'][1]*CFG['timestep'] | |
| for k in (0, 1): | |
| if s['pos'][k] < 0: s['pos'][k]+=bounds[k] | |
| if s['pos'][k] > bounds[k]: s['pos'][k]-=bounds[k] | |
| for cid, cube in cubes.items(): | |
| cx, cy = cube['eagle_pos'] | |
| for sid, s in spheres.items(): | |
| dx, dy = s['pos'][0]-cx, s['pos'][1]-cy | |
| dist = math.hypot(dx, dy) | |
| if dist < s['r']+random.uniform(0.3, 0.5): | |
| overlap = (s['r']+0.5)-dist | |
| if dist == 0: | |
| nx, ny = 1.0, 0.0 | |
| else: | |
| nx, ny = dx/dist, dy/dist | |
| push = overlap*random.uniform(0.3, 0.7)/s['mass'] | |
| cube['eagle_pos'][0]-=nx*push | |
| cube['eagle_pos'][1]-=ny*push | |
| cube['value'] = (cube['value']+int(overlap*random.randint(10, 20))+1)%256 | |
| event_trace.append({'type': 'collision', 'cube': cid, 'sphere': sid, 'overlap': overlap}) | |
| #____________________________________________________________________________________ | |
| def project_vectors_to_image(vectors, size): | |
| w = h = size | |
| grid = [[0 for _ in range(w)] for __ in range(h)] | |
| if not vectors: | |
| return grid | |
| xs = [v[0] if len(v)>0 else 0.0 for v in vectors] | |
| ys = [v[1] if len(v)>1 else 0.0 for v in vectors] | |
| meanx = sum(xs)/len(xs); meany = sum(ys)/len(ys) | |
| varx = sum((a-meanx)**2 for a in xs)/len(xs) | |
| vary = sum((a-meany)**2 for a in ys)/len(ys) | |
| if varx < 1e-12 and len(vectors[0])>2: | |
| xs = [v[2] for v in vectors] | |
| meanx = sum(xs)/len(xs) | |
| if vary < 1e-12 and len(vectors[0])>3: | |
| ys = [v[3] for v in vectors] | |
| meany = sum(ys)/len(ys) | |
| for xval, yval in zip(xs, ys): | |
| nx = int(((xval - meanx) + random.uniform(0.5, 0.7))*(w-1)) | |
| ny = int(((yval-meany)+0.5)*(h-1)) | |
| if nx < 0: nx = 0 | |
| if ny < 0: ny = 0 | |
| if nx >= w: nx = w-1 | |
| if ny >= h: ny = h-1 | |
| grid[ny][nx]+=1 | |
| return grid | |
| #____________________________________________________________________________________ | |
| def write_ppm(grid, filename): | |
| h = len(grid); w = len(grid[0]) if h>0 else 0 | |
| flat = [cell for row in grid for cell in row] | |
| maxv = max(flat) if flat else 1 | |
| with open(filename, 'w') as f: | |
| f.write('P3\n{} {}\n255\n'.format(w, h)) | |
| for y in range(h): | |
| row = grid[y] | |
| for x in range(w): | |
| v = int((row[x]/maxv)*255) | |
| r = v | |
| g = max(0, 255-abs(128-v)*2) | |
| b = 255-v | |
| f.write('{} {} {} '.format(r, g, b)) | |
| f.write('\n') | |
| #____________________________________________________________________________________ | |
| def shannon_entropy(counter): | |
| total = sum(counter.values()) | |
| if total == 0: | |
| return 0.0 | |
| ent = 0.0 | |
| for c in counter.values(): | |
| p = c/total | |
| if p > 0: ent-=p*math.log(p, 2) | |
| return ent | |
| #____________________________________________________________________________________ | |
| def run_sim(seed, steps): | |
| mdl_pth = f'{os.path.dirname(os.path.abspath(__file__))}' | |
| if not os.path.isdir(f'{mdl_pth}/cbsp_rand'): | |
| os.makedirs(f'{mdl_pth}/cbsp_rand') | |
| mdl_pth = f'{mdl_pth}/cbsp_rand' | |
| rng = CrateTapRNG(seed) | |
| CFG['seed'] = seed | |
| CFG['steps'] = steps | |
| cubes, order = spawn_initial_grid(CFG['grid']['w'], CFG['grid']['h'], rng) | |
| bounds = [CFG['grid']['w']*1.5, CFG['grid']['h']*1.5] | |
| spheres = spawn_spheres(CFG['sphere_count'], rng, bounds) | |
| nodes = [] | |
| for i in range(4): | |
| node = {'name': f'N{i}', | |
| 'type': 'reflect' if i%2==0 else 'rotate', | |
| 'angle': (i+1)*0.3, | |
| 'field': generate_ast(3, rng), | |
| 'coord_sys': 'grid' if (i%2==1) else 'eagle', | |
| 'policy': 'mutate' if (i%3)!=0 else 'corrupt', | |
| 'tol': 0.6-(i*0.1)} | |
| nodes.append(node) | |
| event_log = [] | |
| rnd_vectors = [] | |
| sequence_counter = Counter() | |
| mismatch_counter = Counter() | |
| cache_snapshot = {} | |
| for step in range(steps): | |
| step_events = {'step': step, 'time': time.time(), 'events': []} | |
| rvec = (rng.next(), rng.next(), rng.next(), rng.next()) | |
| rnd_vectors.append(rvec) | |
| step_events['rand_vector'] = rvec | |
| pick_count = 1+(int((rvec[0]*10))%6) | |
| picked = order[:pick_count] | |
| step_events['picked'] = picked | |
| trace_events = [] | |
| affected = apply_reflection_tree(picked, nodes, cubes, rng, trace_events) | |
| for te in trace_events: | |
| step_events['events'].append(te) | |
| if te.get('type')=='cache_mismatch': | |
| mismatch_counter[te['node']]+=1 | |
| step_events['affected'] = affected | |
| step_spheres(spheres, cubes, rng, bounds, step_events['events']) | |
| for cid in affected: | |
| cube = cubes[cid] | |
| token = f"{cid}@{int(round(cube['eagle_pos'][0]*100))}_{int(round(cube['eagle_pos'][1]*100))}_{cube['value']}" | |
| sequence_counter[token]+=1 | |
| step_events['events'].append({'type': 'sequence_token', 'token': token}) | |
| event_log.append(step_events) | |
| print('\nWriting cbsp random sim results files...') | |
| grid = project_vectors_to_image(rnd_vectors, IMAGE_SIZE) | |
| write_ppm(grid, f'{mdl_pth}/projection.ppm') | |
| with open(f'{mdl_pth}/config.json', 'w') as f: | |
| json.dump(CFG, f, indent=2) | |
| #with open(f'{mdl_pth}/events.json', 'w') as f: | |
| #json.dump(event_log, f, indent=2, default=str) | |
| total_tokens = sum(sequence_counter.values()) | |
| unique_tokens = len(sequence_counter) | |
| ent = shannon_entropy(sequence_counter) | |
| top_tokens = sequence_counter.most_common(10) | |
| with open(f'{mdl_pth}/analysis.txt', 'w') as f: | |
| f.write(f'seed: {seed}\nsteps: {steps}\n') | |
| f.write(f'total sequence tokens: {total_tokens}\nunique tokens: {unique_tokens}\n') | |
| f.write(f'estimated Shannon entropy (bits): {ent:.4f}\n') | |
| f.write('mismatch counts per node:\n') | |
| for n,c in mismatch_counter.items(): | |
| f.write(f' {n}: {c}\n') | |
| f.write('top tokens:\n') | |
| for t,c in top_tokens: | |
| f.write(f' {t}: {c}\n') | |
| print('Wrote: config.json, projection.ppm, analysis.txt') | |
| print(f'seed={seed} steps={steps} unique_tokens={unique_tokens} entropy={ent:.4f}') | |
| #____________________________________________________________________________________ | |
| if __name__ == '__main__': | |
| seed = DEFAULT_SEED | |
| steps = DEFAULT_STEPS | |
| if len(sys.argv) > 1: | |
| try: | |
| seed = int(sys.argv[1]) | |
| except: | |
| seed = DEFAULT_SEED | |
| if len(sys.argv) > 2: | |
| try: | |
| steps = int(sys.argv[2]) | |
| except: | |
| steps = DEFAULT_STEPS | |
| run_sim(seed, steps) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment