Skip to content

Instantly share code, notes, and snippets.

@lastforkbender
Created September 28, 2025 08:44
Show Gist options
  • Select an option

  • Save lastforkbender/db155e1e75b9b92cd2812e52ffd4db37 to your computer and use it in GitHub Desktop.

Select an option

Save lastforkbender/db155e1e75b9b92cd2812e52ffd4db37 to your computer and use it in GitHub Desktop.
hybrid collision ast cube/spheres error chaining
# catpis.py - hybrid collision ast cube/spheres error chaining
#____________________________________________________________________________________
from collections import Counter
import random
import cmath
import math
import json
import time
import sys
import os
#____________________________________________________________________________________
#////////////////////////////////////////////////////////////////////////////////////
GRID_W = 8
GRID_H = 8
class MiniQ8P:
def set_aa_short_bus(self, vhs_com):
mtx, vhs, chs, chn = vhs_com[0], vhs_com[1], vhs_com[2], vhs_com[3]
t = time.ctime().replace(' ', '').split(':')
t = math.log(int(f'{"".join([c for c in t if c.isdigit()])}{t[1]}')*vhs)
mtx_len = len(mtx)
if mtx_len%2 != 0:
raise Exception(f'<catpis(mini-q8p)> variance matrices length({len(mtx)}) not even')
mtx = [self.aa_vhs_jammer(mtx[i], mtx[i+1], vhs, chs, chn) for i in range(mtx_len-1)]
return [self.solve_aa_short_bus_pvt_cycle(t, mtx[i]) for i in range(mtx_len-1)]
#___________________________________________________________________________________
def solve_aa_short_bus_pvt_cycle(self, t, mt):
hd, R, tlr, srv = mt[0], mt[1], mt[2], mt[3]
if tlr < srv: gr = self.get_pvt_jam_range(False, t, tlr, srv)
else: gr = self.get_pvt_jam_range(True, t, tlr, srv)
if gr+hd > abs(cmath.sqrt(R.real)*cmath.sqrt(R.imag))**hd/gr:
return math.floor(abs((t+gr)*cmath.cos(R))+tlr), math.floor((gr+hd*t)/t)
else:
return 1, math.floor(((gr*hd)/abs(cmath.sin(R)))/t)
#___________________________________________________________________________________
def get_pvt_jam_range(self, isRested, t, tlr, srv):
x = tlr*srv/math.sqrt(t)
if isRested:
return math.ceil(abs(cmath.sqrt(complex((x+t)/math.tanh(t*2), x*t)))/(tlr+srv))
return math.ceil(abs(cmath.sqrt(complex(x+t, x*t)))/math.pi)
#___________________________________________________________________________________
def aa_vhs_jammer(self, vrncA, vrncB, vhs, chs, chn):
c_ttl = (chs.real+chs.imag)+(chn.real+chn.imag)
if sum(vrncA) < c_ttl or sum(vrncB) < c_ttl:
raise ValueError(f'<catpis(mini-q8p)> @chs + @chn summation({c_ttl}) > listed variances')
hd = (int(vhs*sum(vrncA)*sum(vrncB)*math.pi)*int(math.sqrt(abs(chs)+abs(chn))))*math.pi
R = min(math.cos(abs(chs)+abs(chn)), math.sin(vhs+abs(chn)))
t, q = cmath.tan(((chs*chn)/math.pi)+R), math.pi/2
k = (vrncA[0]*vrncA[len(vrncA)-1]*vrncB[0]*vrncB[len(vrncB)-1])**t
ds, s, c, r, rv = math.ceil(abs(cmath.exp(R+chs-chn)))**vhs*t, hd, 1, set(), set()
t+=k**R
while s > -1:
for h in vrncA:
t+=h-ds
m = (t*k+s)-cmath.sqrt((t-ds)+c)-t+(ds*(c*2))
if m.real < 0:
q = abs(c-t)
for n in vrncB:
w = sum([math.sin(abs(m)+q) for x in vrncB])
a_ds, a_ks = abs(ds), abs(cmath.cos(k))
if ((w/(c/2))+(R*n))-a_ds <= (vhs+c+R)/(math.pi-abs(t)+a_ks):
r.add(w)
else:
t-=vhs
n = math.sin((abs(cmath.cos(t)+ds)//(c*abs(t))))
if n > 0.0:
n, cn = math.floor(n*c*abs(cmath.tan(k+m))), 0
lm = [complex((t*c)/(n+1)/q, -1) for x in range(n)]
r = list(r)
d = len(r)
if d > 0:
while cn < len(lm):
l = -(abs(lm[cn])-cn)/q
if cn >= d: cn = 0
l = l+r[cn]
if l > 0.0: rv.add(l)
cn+=1
r = set(r)
a_m = abs(m)
if a_m > 0: r.add(a_m/(abs(cmath.sqrt(chn))+math.pi))
s+=-1
c+=1
r, rv = list(r), list(rv)
rv.sort()
tlr = -math.log((abs(cmath.tan(t))/hd)*abs(cmath.log(cmath.sqrt(m+hd))))
if not rv:
rv = [random.uniform(n, (n*n)+tlr/(n+1)) for n in range(len(r)//3)]
return hd, R+cmath.log(t), tlr, sum(r)/(sum(rv)/math.pi)/(cn+1)
return hd, R+cmath.log(t), tlr, sum(r)/sum(rv)/(cn+1)
#____________________________________________________________________________________
def set_cfg():
rng, cfg_grid= random.Random(random.randint(1199, 1599)), []
t_rng = [(0.1, 0.7), (0.2, 0.9), (0.6, 1.2), (0.8, 1.6), (1.0, 1.9), (1.5, 2.8)]
for _ in range(8):
cfg_grid.append([rng.uniform(t_rng[i][0], t_rng[i][1]) for i in range(6)])
vhs = rng.uniform(0.100, 0.210)
chs, chn = complex(1.0, rng.uniform(0.1, 0.5)), complex(1.0, rng.uniform(0.1, 0.5))
mini_q8p = MiniQ8P()
sbnl = mini_q8p.set_aa_short_bus((cfg_grid, vhs, chs, chn))
sbnl_res = []
for l in sbnl:
n = l[0]+l[1]
if n < 0: n = -n
sbnl_res.append(n)
sbnl_res = [x//3 for x in sbnl_res]
eip = max(sbnl_res)/min(sbnl_res)/math.pi/(GRID_H*GRID_W)
mrd = int(math.exp(math.sqrt(max(sbnl_res)/(math.pi+rng.randint(9, 18)))))
tms = float(math.floor(math.log((max(sbnl_res)/2)-(min(sbnl_res)/2))))
brs = tms/(rng.uniform(1.10, 1.21))/min(sbnl_res)
return {'seed': None,
'steps': None,
'grid': {'w': GRID_W, 'h': GRID_H},
'sphere_count': min(sbnl_res),
'error_injection_prob': eip,
'max_reflection_depth': mrd//3,
'timestep': tms,
'bucket_resolution': brs}
#____________________________________________________________________________________
DEFAULT_SEED = random.randint(1110, 1599)
DEFAULT_STEPS = random.randint(1599, 2111)
IMAGE_SIZE = 256
CFG = set_cfg()
#____________________________________________________________________________________
#////////////////////////////////////////////////////////////////////////////////////
class CrateTapRNG:
def __init__(self, seed):
self.seed = seed
self.rng = random.Random(seed)
#____________________________________________________________________________________
def next(self):
return self.rng.random()
#____________________________________________________________________________________
def randint(self, a, b):
return self.rng.randint(a, b)
#____________________________________________________________________________________
def choice(self, seq):
return self.rng.choice(seq)
#____________________________________________________________________________________
def uniform(self, a, b):
return self.rng.uniform(a, b)
#____________________________________________________________________________________
def spawn_initial_grid(w, h, rng):
cubes, ids = {}, []
for y in range(h):
for x in range(w):
cid = f'C{x}_{y}'
cubes[cid] = {'id': cid,
'grid_pos': [x, y],
'eagle_pos': [x+(y*0.5),y*0.866],
'orientation': int(rng.randint(0, 3)),
'value': rng.randint(0, 255)}
ids.append(cid)
rng.rng.shuffle(ids)
return cubes, ids
#____________________________________________________________________________________
def spawn_spheres(count, rng, bounds):
spheres = {}
for i in range(count):
sid = f'S{i}'
spheres[sid] = {'id': sid,
'pos': [rng.uniform(0, bounds[0])+i, rng.uniform(0, bounds[1])],
'vel': [rng.uniform(-0.2, 0.9), rng.uniform(-0.9, 1.5)],
'r': rng.uniform(0.3, 1.2),
'mass': rng.uniform(0.5, 0.9)}
return spheres
#____________________________________________________________________________________
def ast_eval(node, z, rng, error_injection_prob, trace):
if node is None:
return complex(0, 0)
if 'const' in node:
return complex(node['const'])
if 'var' in node:
if node['var'] == 'z':
return complex(z)
raise ValueError('<catpis> unknown variable')
op = node.get('op')
if op in ('add', 'sub', 'mul' ,'div', 'pow'):
a = ast_eval(node['a'], z, rng, error_injection_prob, trace)
b = ast_eval(node['b'], z, rng, error_injection_prob, trace)
try:
if op == 'add':
return a+b
if op == 'sub':
return a-b
if op == 'mul':
return a*b
if op == 'div':
return a/b if b!=0 else complex(float('inf'))
if op == 'pow':
return complex(pow(a, b.real))
except Exception as e:
trace.append({'type':'eval_error','op':op,'err':str(e)})
return complex(float('nan'))
if op in ('sin', 'cos', 'abs', 'arg', 'conj'):
v = ast_eval(node['v'], z, rng, error_injection_prob, trace)
try:
if op == 'sin':
return complex(math.sin(v.real)*math.cosh(v.imag), math.cos(v.real)*math.sinh(v.imag))
if op == 'cos':
return complex(math.cos(v.real)*math.cosh(v.imag), -math.sin(v.real)*math.sinh(v.imag))
if op == 'abs':
return complex(abs(v), 0.0)
if op == 'arg':
return complex(math.atan2(v.imag, v.real), 0.0)
if op == 'conj':
return complex(v.real, -v.imag)
except Exception as e:
trace.append({'type':'eval_error','op':op,'err':str(e)})
return complex(float('nan'))
if op == 'noise':
r1, r2 = rng.next(), rng.next()
val = complex(r1-0.5, r2-0.5)
if rng.next() < error_injection_prob:
trace.append({'type':'injected_error','op':'noise','r1':r1,'r2':r2})
if rng.next() < 0.5:
return complex(float('nan'), val.imag)
return complex(-val.real, -val.imag)
return val
if op == 'mix':
a = ast_eval(node['a'], z, rng, error_injection_prob, trace)
b = ast_eval(node['b'], z, rng, error_injection_prob, trace)
t = float(ast_eval(node['t'], z, rng, error_injection_prob, trace).real)
return a*(1-t)+b*t
if op == 'cond':
cond = node['cond']
if cond.get('op') == 'gt':
A = ast_eval(cond['a'], z, rng, error_injection_prob, trace)
B = ast_eval(cond['b'], z, rng, error_injection_prob, trace)
if A.real > B.real:
return ast_eval(node['a'], z, rng, error_injection_prob, trace)
else:
return ast_eval(node['b'], z, rng, error_injection_prob, trace)
raise ValueError('<catpis> unknown ast op, "{op}"')
#____________________________________________________________________________________
def generate_ast(depth, rng):
if depth <= 0:
if rng.next() < rng.uniform(0.3, 0.9):
return {'var': 'z'}
return {'const': rng.randint(-3,3)}
r = rng.next()
if r < 0.15:
return {'op': 'noise'}
if r < 0.4:
op = rng.choice(['add', 'sub', 'mul', 'div', 'pow'])
return {'op': op, 'a': generate_ast(depth-1, rng), 'b': generate_ast(depth-1, rng)}
if r < 0.6:
op = rng.choice(['sin', 'cos', 'abs', 'arg', 'conj'])
return {'op': op, 'v': generate_ast(depth-1, rng)}
if r < 0.75:
return {'op': 'mix', 'a': generate_ast(depth-1,rng), 'b' :generate_ast(depth-1,rng), 't': {'const': rng.next()}}
return {'op': 'cond', 'cond': {'op': 'gt', 'a' :{ 'op': 'abs', 'v' :{'var' : 'z'}}, 'b' : {'const': rng.uniform(0.1,3.0)}},
'a': generate_ast(depth-1,rng), 'b': generate_ast(depth-1,rng)}
#____________________________________________________________________________________
def bucket_key_for_cube(cube, resolution=0.5, coord_sys='eagle'):
if coord_sys == 'grid':
gx, gy = cube['grid_pos']
return (int(gx), int(gy), cube['orientation'], cube['value']//16)
x, y = cube['eagle_pos']
print(f' :X={x}')
print(f' :Y={y}')
resolution+=random.uniform(0.1, 0.3)
print(f'RESOLUTION :: {resolution}')
bx, by = int(math.floor(x/resolution)), int(math.floor(y / resolution))
return (bx, by, cube['orientation'], cube['value']//16)
#____________________________________________________________________________________
def spawn_tensor_from_complex(out):
if out is None:
return None
r = out.real if (not math.isnan(out.real) if hasattr(math, 'isnan') else True) else float('nan')
i = out.imag if (not math.isnan(out.imag) if hasattr(math, 'isnan') else True) else float('nan')
try:
mag = math.hypot(r, i) if not (math.isnan(r) or math.isnan(i)) else float('nan')
arg = math.atan2(i, r) if not (math.isnan(r) or math.isnan(i)) else float('nan')
except:
mag = float('nan'); arg = float('nan')
return [r, i, mag, arg]
#____________________________________________________________________________________
def tensor_distance(a, b):
if a is None or b is None:
return float('inf')
s = 0.0
for x,y in zip(a,b):
if math.isnan(x) or math.isnan(y): s+=1e3
else:
try:
s+=(x-y)**2
except:
s+=random.randint(1, 5)
return math.sqrt(s)
#____________________________________________________________________________________
NODE_CACHE = {}
def eval_with_cache(node_name, ast, cube, rng, coord_sys='eagle', tolerance=0.5, policy='mutate'):
trace = []
z = complex(cube['eagle_pos'][0], cube['eagle_pos'][1])
out = ast_eval(ast, z, rng, CFG['error_injection_prob'], trace)
tensor = spawn_tensor_from_complex(out)
key = bucket_key_for_cube(cube, resolution=CFG['bucket_resolution'], coord_sys=coord_sys)
node_cache = NODE_CACHE.setdefault(node_name, {})
if key not in node_cache:
node_cache[key] = list(tensor) if tensor is not None else None
return out, trace, None
cached = node_cache[key]
dist = tensor_distance(cached, tensor)
if dist > tolerance:
mismatch = {'type': 'cache_mismatch', 'node': node_name, 'key': key,
'dist': dist, 'cached': cached, 'current': tensor}
r = rng.next()
if policy == 'mutate':
alpha = 0.3+(r*random.uniform(0.4, 0.9))
new_cache = []
for c,t in zip(cached, tensor):
if math.isnan(c): new_cache.append(t if not math.isnan(t) else 0.0)
elif math.isnan(t): new_cache.append(c)
else: new_cache.append(c*(1-alpha)+t*alpha)
node_cache[key] = new_cache
if tensor is not None and not math.isnan(tensor[0]):
out = complex(tensor[0]*(1+(r-0.5)*random.uniform(0.3, 0.9)), tensor[1]*(1+(r-0.5)*0.1))
elif policy == 'corrupt':
node_cache[key] = cached
out = complex(float('nan'), float('nan'))
elif policy == 'overwrite': node_cache[key] = list(tensor)
return out, trace, mismatch
return out, trace, None
#____________________________________________________________________________________
def apply_reflection_tree(cube_ids, nodes, cubes, rng, event_trace):
affected = set(cube_ids)
for depth, node in enumerate(nodes):
new_affected = set()
for cid in list(affected):
cube = cubes[cid]
coord_sys = node.get('coord_sys', 'eagle')
tol = node.get('tol', 0.5)
policy = node.get('policy', 'mutate')
out, trace, mismatch = eval_with_cache(node['name'], node['field'], cube, rng, coord_sys=coord_sys, tolerance=tol, policy=policy)
if trace:
event_trace.append({'type': 'eval_trace', 'node': node['name'], 'cube': cid, 'trace': trace})
if mismatch:
mismatch['cube'] = cid
event_trace.append(mismatch)
selected = False
try:
if math.isnan(out.real) or math.isnan(out.imag):
if rng.next() < 0.5: selected = True
else:
mag = math.hypot(out.real, out.imag)
selected = (mag%1.0) > 0.5
except:
if rng.next() < rng.uniform(0.5, 0.7): selected = True
if selected:
typ = node.get('type', 'reflect')
if typ == 'reflect':
angle = node.get('angle', 0.0)+(rng.next()-rng.uniform(0.1, 0.5))*0.1
x, y = cube['eagle_pos']
ca = math.cos(angle); sa = math.sin(angle)
rx = ca*x+sa*y
ry = -sa*x+ca*y
rx = -rx
x2 = ca*rx-sa*ry
y2 = sa*rx+ca*ry
cube['eagle_pos'] = [x2, y2]
cube['orientation'] = (cube['orientation']+1)%4
elif typ == 'rotate':
ang = node.get('angle', 0.5)+(rng.next()-0.5)*0.2
x, y = cube['eagle_pos']
ca = math.cos(ang); sa = math.sin(ang)
x2 = ca*x-sa*y
y2 = sa*x+ca*y
cube['eagle_pos'] = [x2, y2]
cube['orientation'] = (cube['orientation']+1)%4
new_affected.add(cid)
affected = new_affected if new_affected else affected
return list(affected)
#____________________________________________________________________________________
def step_spheres(spheres, cubes, rng, bounds, event_trace):
for sid, s in spheres.items():
s['pos'][0]+=s['vel'][0]*CFG['timestep']
s['pos'][1]+=s['vel'][1]*CFG['timestep']
for k in (0, 1):
if s['pos'][k] < 0: s['pos'][k]+=bounds[k]
if s['pos'][k] > bounds[k]: s['pos'][k]-=bounds[k]
for cid, cube in cubes.items():
cx, cy = cube['eagle_pos']
for sid, s in spheres.items():
dx, dy = s['pos'][0]-cx, s['pos'][1]-cy
dist = math.hypot(dx, dy)
if dist < s['r']+random.uniform(0.3, 0.5):
overlap = (s['r']+0.5)-dist
if dist == 0:
nx, ny = 1.0, 0.0
else:
nx, ny = dx/dist, dy/dist
push = overlap*random.uniform(0.3, 0.7)/s['mass']
cube['eagle_pos'][0]-=nx*push
cube['eagle_pos'][1]-=ny*push
cube['value'] = (cube['value']+int(overlap*random.randint(10, 20))+1)%256
event_trace.append({'type': 'collision', 'cube': cid, 'sphere': sid, 'overlap': overlap})
#____________________________________________________________________________________
def project_vectors_to_image(vectors, size):
w = h = size
grid = [[0 for _ in range(w)] for __ in range(h)]
if not vectors:
return grid
xs = [v[0] if len(v)>0 else 0.0 for v in vectors]
ys = [v[1] if len(v)>1 else 0.0 for v in vectors]
meanx = sum(xs)/len(xs); meany = sum(ys)/len(ys)
varx = sum((a-meanx)**2 for a in xs)/len(xs)
vary = sum((a-meany)**2 for a in ys)/len(ys)
if varx < 1e-12 and len(vectors[0])>2:
xs = [v[2] for v in vectors]
meanx = sum(xs)/len(xs)
if vary < 1e-12 and len(vectors[0])>3:
ys = [v[3] for v in vectors]
meany = sum(ys)/len(ys)
for xval, yval in zip(xs, ys):
nx = int(((xval - meanx) + random.uniform(0.5, 0.7))*(w-1))
ny = int(((yval-meany)+0.5)*(h-1))
if nx < 0: nx = 0
if ny < 0: ny = 0
if nx >= w: nx = w-1
if ny >= h: ny = h-1
grid[ny][nx]+=1
return grid
#____________________________________________________________________________________
def write_ppm(grid, filename):
h = len(grid); w = len(grid[0]) if h>0 else 0
flat = [cell for row in grid for cell in row]
maxv = max(flat) if flat else 1
with open(filename, 'w') as f:
f.write('P3\n{} {}\n255\n'.format(w, h))
for y in range(h):
row = grid[y]
for x in range(w):
v = int((row[x]/maxv)*255)
r = v
g = max(0, 255-abs(128-v)*2)
b = 255-v
f.write('{} {} {} '.format(r, g, b))
f.write('\n')
#____________________________________________________________________________________
def shannon_entropy(counter):
total = sum(counter.values())
if total == 0:
return 0.0
ent = 0.0
for c in counter.values():
p = c/total
if p > 0: ent-=p*math.log(p, 2)
return ent
#____________________________________________________________________________________
def run_sim(seed, steps):
mdl_pth = f'{os.path.dirname(os.path.abspath(__file__))}'
if not os.path.isdir(f'{mdl_pth}/cbsp_rand'):
os.makedirs(f'{mdl_pth}/cbsp_rand')
mdl_pth = f'{mdl_pth}/cbsp_rand'
rng = CrateTapRNG(seed)
CFG['seed'] = seed
CFG['steps'] = steps
cubes, order = spawn_initial_grid(CFG['grid']['w'], CFG['grid']['h'], rng)
bounds = [CFG['grid']['w']*1.5, CFG['grid']['h']*1.5]
spheres = spawn_spheres(CFG['sphere_count'], rng, bounds)
nodes = []
for i in range(4):
node = {'name': f'N{i}',
'type': 'reflect' if i%2==0 else 'rotate',
'angle': (i+1)*0.3,
'field': generate_ast(3, rng),
'coord_sys': 'grid' if (i%2==1) else 'eagle',
'policy': 'mutate' if (i%3)!=0 else 'corrupt',
'tol': 0.6-(i*0.1)}
nodes.append(node)
event_log = []
rnd_vectors = []
sequence_counter = Counter()
mismatch_counter = Counter()
cache_snapshot = {}
for step in range(steps):
step_events = {'step': step, 'time': time.time(), 'events': []}
rvec = (rng.next(), rng.next(), rng.next(), rng.next())
rnd_vectors.append(rvec)
step_events['rand_vector'] = rvec
pick_count = 1+(int((rvec[0]*10))%6)
picked = order[:pick_count]
step_events['picked'] = picked
trace_events = []
affected = apply_reflection_tree(picked, nodes, cubes, rng, trace_events)
for te in trace_events:
step_events['events'].append(te)
if te.get('type')=='cache_mismatch':
mismatch_counter[te['node']]+=1
step_events['affected'] = affected
step_spheres(spheres, cubes, rng, bounds, step_events['events'])
for cid in affected:
cube = cubes[cid]
token = f"{cid}@{int(round(cube['eagle_pos'][0]*100))}_{int(round(cube['eagle_pos'][1]*100))}_{cube['value']}"
sequence_counter[token]+=1
step_events['events'].append({'type': 'sequence_token', 'token': token})
event_log.append(step_events)
print('\nWriting cbsp random sim results files...')
grid = project_vectors_to_image(rnd_vectors, IMAGE_SIZE)
write_ppm(grid, f'{mdl_pth}/projection.ppm')
with open(f'{mdl_pth}/config.json', 'w') as f:
json.dump(CFG, f, indent=2)
#with open(f'{mdl_pth}/events.json', 'w') as f:
#json.dump(event_log, f, indent=2, default=str)
total_tokens = sum(sequence_counter.values())
unique_tokens = len(sequence_counter)
ent = shannon_entropy(sequence_counter)
top_tokens = sequence_counter.most_common(10)
with open(f'{mdl_pth}/analysis.txt', 'w') as f:
f.write(f'seed: {seed}\nsteps: {steps}\n')
f.write(f'total sequence tokens: {total_tokens}\nunique tokens: {unique_tokens}\n')
f.write(f'estimated Shannon entropy (bits): {ent:.4f}\n')
f.write('mismatch counts per node:\n')
for n,c in mismatch_counter.items():
f.write(f' {n}: {c}\n')
f.write('top tokens:\n')
for t,c in top_tokens:
f.write(f' {t}: {c}\n')
print('Wrote: config.json, projection.ppm, analysis.txt')
print(f'seed={seed} steps={steps} unique_tokens={unique_tokens} entropy={ent:.4f}')
#____________________________________________________________________________________
if __name__ == '__main__':
seed = DEFAULT_SEED
steps = DEFAULT_STEPS
if len(sys.argv) > 1:
try:
seed = int(sys.argv[1])
except:
seed = DEFAULT_SEED
if len(sys.argv) > 2:
try:
steps = int(sys.argv[2])
except:
steps = DEFAULT_STEPS
run_sim(seed, steps)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment