Created
September 29, 2025 02:37
-
-
Save lastforkbender/12699814c5037c3c6a050701c63c2b68 to your computer and use it in GitHub Desktop.
Minimal cognitive B-spline purposes limit-cycle model
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # cognitive_bspline_cycle.py | |
| #_____________________________________________________________________________________ | |
| # Minimal cognitive B-spline purposes limit-cycle model: | |
| # | |
| # • Adaptive conjunction matrix M mapping Theta -> Phi | |
| # • Weighted Recursive Least Squares updates for M using activations for weights | |
| # • E-Coupling: <||Phi-M(Theta)||^2> as extended merge dynamics | |
| # • New-node init from finite difference curvature prediction | |
| # | |
| # **This module sim ran by _run1() or _run2() at bottom, later is an csv files output | |
| #_____________________________________________________________________________________ | |
| import os | |
| import csv | |
| import math | |
| import random | |
| #_____________________________________________________________________________________ | |
| # Basis params: | |
| ETA = 0.05 # gradient descent step size for F | |
| ALPHA = 1.0 # weight on cognitive curvature | |
| BETA = 2.0 # weight on coupling term | |
| GAMMA = 0.1 # activation decay | |
| LAMBDA = 0.5 # activation coupling strength | |
| EPS = 0.15 # unitary fold rotation angle | |
| FOLD_INTERVAL = 5 # apply unitary fold every N steps | |
| STEPS = 4 # 0-5 | |
| SEED = 1 # ... | |
| random.seed(SEED) | |
| #_____________________________________________________________________________________ | |
| # Recursive Least Squares & M-adaptation params: | |
| ETA_REG = 1e-2 # regularizer eta for initial covariance | |
| RLS_FORGET = 0.995 # forgetting factor(lambda) in covariance update ~1 | |
| RLS_INIT_WEIGHT = 0.1 # synthetic weight for new-node bootstrap | |
| M_DECAY = 1e-3 # slow decay toward prior M0 | |
| ALPHA_M = 0.4 # learning rate fallback for gradient M updates if needed | |
| #_____________________________________________________________________________________ | |
| def clamp(v, lo=-1e6, hi=1e6): | |
| return max(lo, min(hi, v)) | |
| def discrete_curvature_2node(x1, x2): | |
| return (x2-x1)**2 | |
| def grad_curvature_2node(x1, x2): | |
| return -2.0*(x2-x1), 2.0*(x2-x1) | |
| def mat_vec_mul(A, v): | |
| return [A[0][0]*v[0]+A[0][1]*v[1], A[1][0]*v[0]+A[1][1]*v[1]] | |
| def vec_outer(u, v): | |
| return [[u[0]*v[0], u[0]*v[1]], [u[1]*v[0], u[1]*v[1]]] | |
| def mat_add(A, B): | |
| return [[A[0][0]+B[0][0], A[0][1]+B[0][1]], [A[1][0]+B[1][0], A[1][1]+B[1][1]]] | |
| def mat_sub(A, B): | |
| return [[A[0][0]-B[0][0], A[0][1]-B[0][1]], [A[1][0]-B[1][0], A[1][1]-B[1][1]]] | |
| def mat_scale(A, s): | |
| return [[A[0][0]*s, A[0][1]*s],[A[1][0]*s, A[1][1]*s]] | |
| def outer_to_vec(o): | |
| return [o[0][0], o[0][1], o[1][0], o[1][1]] | |
| #_____________________________________________________________________________________ | |
| class State: | |
| def __init__(self, theta=None, phi=None, activations=None, momenta=None, M=None, C=None): | |
| self.theta = theta if theta is not None else [random.uniform(-1,1) for _ in range(2)] | |
| self.phi = phi if phi is not None else [random.uniform(-1,1) for _ in range(2)] | |
| self.a = activations if activations is not None else [0.0, 0.0] | |
| self.momenta = momenta if momenta is not None else [0.0,0.0,0.0,0.0] | |
| # M 2x2 matrix -> init to some identity | |
| self.M = M if M is not None else [[1.0, 0.0],[0.0, 1.0]] | |
| # RLS covariance inverse(C), approximates(sum w Theta Theta^T+eta I)^{-1} | |
| self.C = C if C is not None else [[1.0/ETA_REG, 0.0],[0.0, 1.0/ETA_REG]] | |
| self.M0 = [[1.0, 0.0],[0.0, 1.0]] | |
| #_____________________________________________________________________________________ | |
| def copy(self): | |
| return State(self.theta[:], self.phi[:], self.a[:], self.momenta[:], | |
| [row[:] for row in self.M], [row[:] for row in self.C]) | |
| #_____________________________________________________________________________________ | |
| def rls_update(state, Theta, Phi, w): | |
| # Adaptive M; weighted RLS update per sample: | |
| # • RLS with forgetting factor & per-sample @w | |
| # • Maintains C approximating inverse covariance | |
| # C <- (1/forget)*(C-k Theta^T C) with k = (C Theta)/(1/w+Theta^T C Theta) | |
| # M <- M+(Phi-M Theta) k^T | |
| if w <= 0: | |
| return | |
| # Scale Theta by sqrt(w) to incorporate the weight | |
| s = math.sqrt(w) | |
| th, ph = [s*Theta[0], s*Theta[1]], [s*Phi[0], s*Phi[1]] | |
| C = state.C | |
| Cth = mat_vec_mul(C, th) | |
| thCth = th[0]*Cth[0]+th[1]*Cth[1] | |
| dnm = (1.0 if thCth == 0 else (1.0 + thCth)) | |
| k = [Cth[0]/dnm, Cth[1]/dnm] | |
| # Residual r = Phi-M Theta -> use @ unweighted Theta, Phi in residual | |
| Mth = mat_vec_mul(state.M, Theta) | |
| r = [Phi[0]-Mth[0], Phi[1]-Mth[1]] | |
| # Rank-1 update: M <- M+r k^T | |
| state.M[0][0] += r[0] * k[0]; state.M[0][1] += r[0] * k[1] | |
| state.M[1][0] += r[1] * k[0]; state.M[1][1] += r[1] * k[1] | |
| # Update C: C <- C-k th^T C ...used th scaled; apply forgetting | |
| thT_C = [[th[0]*C[0][0]+th[1]*C[1][0], th[0]*C[0][1]+th[1]*C[1][1]], | |
| [th[0]*C[0][0]+th[1]*C[1][0], th[0]*C[0][1]+th[1]*C[1][1]]] | |
| # Compute outer k*(th^T C) -> you can compute v = mat_vec_mul(C, th) | |
| # = Cth, then outer(k, Cth) of the above is then not needed repeated | |
| outer_k_Cth = [[k[0]*Cth[0], k[0]*Cth[1]],[k[1]*Cth[0], k[1]*Cth[1]]] | |
| C_new = mat_sub(C, outer_k_Cth) | |
| # Forgetting factor, -/ C_new 1/forget to gradually forget old data | |
| C_new = mat_scale(C_new, 1.0/RLS_FORGET) | |
| state.C = C_new | |
| #_____________________________________________________________________________________ | |
| def F_step_with_M(state): | |
| # Open dynamics F using E_coupling -> ||Phi-M Theta||^2 -> D(unitary fold) | |
| theta, phi, a = state.theta, state.phi, state.a | |
| # A curvature gradient for theta | |
| dK1, dK2 = grad_curvature_2node(theta[0], theta[1]) | |
| # And a gradient wrt Theta, d/dTheta = -2 M^T (Phi-M Theta) | |
| M = state.M | |
| Mth = mat_vec_mul(M, theta) | |
| resid = [phi[0] - Mth[0], phi[1] - Mth[1]] | |
| # Calculate the M^T residual | |
| Mt_res = [M[0][0]*resid[0]+M[1][0]*resid[1], M[0][1]*resid[0]+M[1][1]*resid[1]] | |
| g_theta = [ALPHA*dK1-2.0*BETA*Mt_res[0], ALPHA*dK2-2.0*BETA*Mt_res[1]] | |
| # And a gradient for Phi +2(Phi-M Theta) | |
| g_phi = [-2.0*BETA*resid[0], -2.0*BETA*resid[1]] | |
| # Apply the gradient descent update | |
| new_theta = [theta[i]-ETA*g_theta[i] for i in range(2)] | |
| new_phi = [phi[i]-ETA*g_phi[i] for i in range(2)] | |
| # Activations driven by avg-pos & res-mag: Strong residual <-> Drive adjustment | |
| new_a = [] | |
| for i in range(2): | |
| drive = LAMBDA*(0.5*(new_theta[i]+new_phi[i])+0.5*abs(resid[i])) | |
| ai = a[i]+ETA*(-GAMMA*a[i]+drive) | |
| new_a.append(ai) | |
| # *Momenta damper | |
| new_momenta = [0.99*m for m in state.momenta] | |
| s = State(new_theta, new_phi, new_a, new_momenta, [row[:] for row in state.M], [row[:] for row in state.C]) | |
| # Ensure M decays are slightly toward the prior M0 to avoid any drifting | |
| for i in range(2): | |
| for j in range(2): s.M[i][j] = (1.0-M_DECAY)*s.M[i][j]+M_DECAY*s.M0[i][j] | |
| return s | |
| #_____________________________________________________________________________________ | |
| def unitary_fold(state, eps=EPS): | |
| z0 = complex(state.theta[0], state.momenta[0]) | |
| z1 = complex(state.theta[1], state.momenta[1]) | |
| z2 = complex(state.phi[0], state.momenta[2]) | |
| z3 = complex(state.phi[1], state.momenta[3]) | |
| def mix_pair(z1, z2, angle): | |
| c, s = math.cos(angle), math.sin(angle) | |
| return c*z1+1j*s*z2, 1j*s*z1+c*z2 | |
| z0p, z2p = mix_pair(z0, z2, eps) | |
| z1p, z3p = mix_pair(z1, z3, eps) | |
| new_theta = [z0p.real, z1p.real] | |
| new_phi = [z2p.real, z3p.real] | |
| new_momenta = [z0p.imag, z1p.imag, z2p.imag, z3p.imag] | |
| s = State(new_theta, new_phi, state.a[:], new_momenta, [row[:] for row in state.M], [row[:] for row in state.C]) | |
| return s | |
| #_____________________________________________________________________________________ | |
| def projection_error_M(state): | |
| # @STEPS, dependent of the hardware setup | |
| Mth = mat_vec_mul(state.M, state.theta) | |
| return math.sqrt((state.phi[0]-Mth[0])**2+(state.phi[1]-Mth[1])**2) | |
| #_____________________________________________________________________________________ | |
| def initialize_new_node(state): | |
| # Finite-diff curvature prediction: | |
| # If a new Phi node were to be activated...predict it's initial value using | |
| # finite-diff curvature from the existing nodes. This is illustrative for a | |
| # 2-node case here, phi_new~phi_mean+(phi[1]-phi[0]) as small extrapolation | |
| th, ph = state.theta, state.phi | |
| # Predict delta using neighbor difference | |
| delta_phi = [ph[1]-ph[0], ph[1]-ph[0]] | |
| # Synthetic target -> Phi_pred = phi+small delta | |
| Phi_pred = [ph[0]+0.1*delta_phi[0], ph[1]+0.1*delta_phi[1]] | |
| Theta_sample = th[:] # prediction objective | |
| # And apply the small weight RLS-bootstrap | |
| rls_update(state, Theta_sample, Phi_pred, RLS_INIT_WEIGHT) | |
| #_____________________________________________________________________________________ | |
| def run_simulation_with_M(steps=STEPS, fold_interval=FOLD_INTERVAL): | |
| s, history = State(), [] | |
| history = [] | |
| for k in range(steps): | |
| # Sim occasional new-node activation | |
| if k % 50 == 0 and k > 0: | |
| initialize_new_node(s) | |
| # Open step with M | |
| s = F_step_with_M(s) | |
| # EXT INLINE OPTIONAL: | |
| # Perform a weighted RLS update with current activation | |
| # weighted sample; weight(w) from activations -> - part | |
| for i in range(2): | |
| pass | |
| # Choose scalar w = mean+(positive part)/activation | |
| w = max(0.0, 0.5*(max(0.0, s.a[0])+max(0.0, s.a[1]))) | |
| rls_update(s, s.theta, s.phi, w) | |
| # ...fold occasionally | |
| if (k+1)%fold_interval == 0: s = unitary_fold(s, EPS) | |
| history.append({'step': k+1, | |
| 'theta': s.theta[:], | |
| 'phi': s.phi[:], | |
| 'a': s.a[:], | |
| 'M': [row[:] for row in s.M], | |
| 'proj_err': projection_error_M(s)}) | |
| if (k%(steps//10 or 1) == 0): | |
| print(f"step {k+1}: proj_err={history[-1]['proj_err']:.4f} a={s.a}") | |
| return history | |
| #_____________________________________________________________________________________ | |
| def simple_plot_series(history, key, width=60): | |
| vals = [entry[key] for entry in history] | |
| vmin, vmax = min(vals), max(vals) | |
| if abs(vmax-vmin) < 1e-9: vmax = vmin+1.0 | |
| print(f"\nSeries '{key}': min={vmin:.4f} max={vmax:.4f}") | |
| for i, v in enumerate(vals): | |
| pos = int((v-vmin)/(vmax-vmin)*(width-1)) | |
| line = "."*pos+"*"+"."*(width-1-pos) | |
| print(f"{i+1:4d} {line} {v:.4f}") | |
| #_____________________________________________________________________________________ | |
| def run_and_log(beta, eps, run_id, steps, fold_interval, out_dir): | |
| s = State() | |
| os.makedirs(out_dir, exist_ok=True) | |
| filename = os.path.join(out_dir, f'run_beta{beta:.3f}_eps{eps:.3f}_id{run_id}.csv') | |
| with open(filename, 'w', newline='') as csvfile: | |
| fieldnames = ['step', | |
| 'theta0', | |
| 'theta1', | |
| 'phi0', | |
| 'phi1', | |
| 'a0', | |
| 'a1', | |
| 'M00', | |
| 'M01', | |
| 'M10', | |
| 'M11', | |
| 'proj_err'] | |
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for k in range(steps): | |
| if k%50 == 0 and k > 0: | |
| theta, phi = s.theta, s.phi | |
| Phi_pred = [phi[0]+0.05*(phi[1]-phi[0]), phi[1]+0.05*(phi[1]-phi[0])] | |
| rls_update(s, theta, Phi_pred, RLS_INIT_WEIGHT) | |
| s = F_step_with_M(s) | |
| w = max(0.0, 0.5*(max(0.0, s.a[0])+max(0.0, s.a[1]))) | |
| rls_update(s, s.theta, s.phi, w) | |
| if (k+1)%fold_interval == 0: | |
| s = unitary_fold(s, eps) | |
| row = {'step':k+1, | |
| 'theta0':s.theta[0], | |
| 'theta1':s.theta[1], | |
| 'phi0':s.phi[0], | |
| 'phi1':s.phi[1], | |
| 'a0':s.a[0], | |
| 'a1':s.a[1], | |
| 'M00':s.M[0][0], | |
| 'M01':s.M[0][1], | |
| 'M10':s.M[1][0], | |
| 'M11':s.M[1][1], | |
| 'proj_err':projection_error_M(s)} | |
| writer.writerow(row) | |
| return filename | |
| #_____________________________________________________________________________________ | |
| def sweep_and_save(betas, epss, runs_per_setting, out_dir): | |
| if not os.path.isdir(out_dir): os.makedirs(out_dir) | |
| files, run_id = [], 0 | |
| for beta in betas: | |
| for eps in epss: | |
| for r in range(runs_per_setting): | |
| run_id+=1 | |
| print(f'Running beta={beta:.3f}, eps={eps:.3f}, run {r+1}/{runs_per_setting}') | |
| fname = run_and_log(beta, eps, run_id, STEPS, FOLD_INTERVAL, out_dir) | |
| files.append(fname) | |
| print(f'Completed runs. CSV files saved in {out_dir}:') | |
| return files | |
| #_____________________________________________________________________________________ | |
| def _run1(): | |
| # Prints a histogram of run to console only, no csv file writes | |
| print('Running cognitive B-spline simulation with adaptive conjunction matrix M...') | |
| hist = run_simulation_with_M(STEPS, FOLD_INTERVAL) | |
| last = hist[-1] | |
| print('\nFinal state summary:') | |
| print(f"Theta: {last['theta']}") | |
| print(f"Phi: {last['phi']}") | |
| print(f"Activations: {last['a']}") | |
| print(f"M: {last['M']}") | |
| print(f"Projection error (||Phi - M Theta||): {last['proj_err']:.6f}") | |
| simple_plot_series(hist[-80:], 'proj_err') | |
| _run1() | |
| #_____________________________________________________________________________________ | |
| def _run2(): | |
| # Builds the csv files from current running module path + CBCM/ | |
| out_dir = f'{os.path.dirname(os.path.abspath(__file__))}/CBCM' | |
| # Example param ranges to sweep, @betas as step, @epss low-fi | |
| betas = [0.0, 0.5, 1.0, 2.0, 4.0] | |
| epss = [0.0, 0.05, 0.15, 0.3] | |
| files = sweep_and_save(betas, epss, 1, out_dir) | |
| print(f'Files generated: {files}') | |
| #_run2() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
NOTE: Corpotard is of no parallel to Retard. Retardant people are more than usual of apparent morals, kindness and care.