Skip to content

Instantly share code, notes, and snippets.

@lastforkbender
Created September 27, 2025 16:57
Show Gist options
  • Select an option

  • Save lastforkbender/8a2da0964096cb55b6d3a78f37ed51b2 to your computer and use it in GitHub Desktop.

Select an option

Save lastforkbender/8a2da0964096cb55b6d3a78f37ed51b2 to your computer and use it in GitHub Desktop.
Grid Signal Processor with satellite interpolation kernels, pi-correlated RNG, spectral/harmonic phase sequencing opts, adjustable directional interpolation probability & per-pass callback(s).
# gsp.py - Grid Signal Processor with satellite interpolation kernels, pi-correlated RNG,
# spectral/harmonic phase sequencing opts, adjustable directional interpolation
# probability & per-pass callback(s). (Reproduces the wave audio file @ runtime
# where this module is -> current_module_run_directory/gsp_wave/gsp.wav) The
# constants associated with these classes are in sim the satellites are facing
# some type of reflected source from an infinite [angular] density regression.
#__________________________________________________________________________________
from typing import List, Optional, Callable, Tuple
import hashlib
import struct
import cmath
import math
import wave
import io
import os
#__________________________________________________________________________________
class PiCorrelatedRNG:
A = 1664525
C = 1013904223
MOD = 2**32
PI_DIGITS = ('3141592653589793238462643383279502884197169399375105820974944592'
'3078164062862089986280348253421170679821480865132823066470938446')
#__________________________________________________________________________________
def __init__(self, seed_chunk_index: int = 0):
chunk_size = 16
start = (seed_chunk_index*chunk_size)%max(1, len(self.PI_DIGITS)-chunk_size+1)
chunk = self.PI_DIGITS[start:start + chunk_size]
h = hashlib.sha256(chunk.encode('ascii')).digest()
seed = int.from_bytes(h[:4], 'big')&0xFFFFFFFF
self.state = seed if seed != 0 else 1
#__________________________________________________________________________________
def rand(self) -> float:
self.state = (PiCorrelatedRNG.A*self.state+PiCorrelatedRNG.C)%PiCorrelatedRNG.MOD
return self.state/PiCorrelatedRNG.MOD
#__________________________________________________________________________________
def rand_between(self, a: float, b: float) -> float:
return a+(b-a)*self.rand()
#__________________________________________________________________________________
def randint(self, a: int, b: int) -> int:
return a+int(self.rand()*(b-a+1))
#__________________________________________________________________________________
class Satellite:
def __init__(self,
seq_type: str,
rotation_rate_deg: float,
weight: float,
sequence_mode: str = 'harmonic', # harmonic | spectral | custom
harmonic_count: int = 3,
spectral_weights: Optional[List[float]] = None,
custom_phase_seq: Optional[List[float]] = None,
active: bool = True):
if seq_type not in ('primary', 'secondary'):
raise ValueError("seq_type must be 'primary' or 'secondary'")
if sequence_mode not in ('harmonic', 'spectral', 'custom'):
raise ValueError("sequence_mode must be 'harmonic', 'spectral', or 'custom'")
self.seq_type = seq_type
self.rotation_rate_deg = float(rotation_rate_deg)
self.weight = max(0.0, float(weight))
self.sequence_mode = sequence_mode
self.harmonic_count = max(1, int(harmonic_count))
self.spectral_weights = spectral_weights[:] if spectral_weights else None
self.custom_phase_seq = custom_phase_seq[:] if custom_phase_seq else None
self.active = bool(active)
self.angle_deg = 0.0
self.phase_seq = self._build_phase_sequence()
#__________________________________________________________________________________
def _build_phase_sequence(self) -> List[float]:
if self.sequence_mode == 'harmonic':
base = 0.0
return [base+k*(math.pi*2/max(1, self.harmonic_count)) for k in range(1, self.harmonic_count+1)]
elif self.sequence_mode == 'spectral':
if not self.spectral_weights:
# ...fallback to harmonic if no weights provided
return [k*0.5*math.pi for k in range(1, self.harmonic_count+1)]
# And build the phases proportional to the given weights
total = sum(abs(w) for w in self.spectral_weights) or 1.0
return [(w/total)*2.0*math.pi for w in self.spectral_weights ]
else:
# Is custom
return self.custom_phase_seq[:] if self.custom_phase_seq else [0.0]
#__________________________________________________________________________________
def step_rotation(self, steps: int=1) -> float:
self.angle_deg = (self.angle_deg+self.rotation_rate_deg*steps)%360.0
#__________________________________________________________________________________
def sample_modifier(self, t_index: int, rng: PiCorrelatedRNG) -> float:
return math.sin(self.phase_seq[t_index%len(self.phase_seq)]+(rng.rand_between(-1.0, 1.0))*0.01)
#__________________________________________________________________________________
class GridSignalProcessor:
def __init__(self,
amplitude: float,
frequency: float,
phase: float,
sample_rate: float,
duration: float,
probability_constant: float,
direction_deg: float,
grid_size: int = 5,
time_varying: bool = False,
pi_chunk_index: int=0,
interpolation_passes: int=2,
satellites: Optional[List[Satellite]]=None,
sat_inertia: float=0.85,
directional_interp_prob: float=1.0,
per_pass_callback: Optional[Callable[[int, List[List]], None]]=None):
if sample_rate <= 0 or duration <= 0:
raise ValueError('sample_rate and duration must be positive')
if not (0.0 <= probability_constant <= 1.0):
raise ValueError('probability_constant must be [0, 1]')
if grid_size <= 0:
raise ValueError('grid_size must be positive')
if interpolation_passes < 0:
raise ValueError('interpolation_passes must be >= 0')
self.amplitude = float(amplitude)
self.frequency = float(frequency)
self.phase = float(phase)
self.sample_rate = float(sample_rate)
self.duration = float(duration)
self.probability_constant = float(probability_constant)
self.direction_deg = float(direction_deg)
self.grid_size = int(grid_size)
self.time_varying = bool(time_varying)
self.interpolation_passes = int(interpolation_passes)
self.pi_chunk_index = int(pi_chunk_index)
self.sat_inertia = float(sat_inertia)
self.directional_interp_prob = float(directional_interp_prob)
self.per_pass_callback = per_pass_callback
self.rng = PiCorrelatedRNG(seed_chunk_index=self.pi_chunk_index)
self.time = self._generate_time_array()
if satellites is None:
satellites = [Satellite('primary', rotation_rate_deg=5.0, weight=0.6, sequence_mode='harmonic', harmonic_count=4),
Satellite('primary', rotation_rate_deg=-3.0, weight=0.45, sequence_mode='spectral', spectral_weights=[1.0, 0.5, 0.25]),
Satellite('secondary', rotation_rate_deg=2.0, weight=0.25, sequence_mode='custom', custom_phase_seq=[0.0, math.pi/3, math.pi/2])]
self.satellites = satellites
self.dynamic_direction = self.direction_deg
self.dynamic_depth = 0.5
self.signal_grid = self._generate_initial_grid()
if self.interpolation_passes > 0:
self._apply_intersecting_interpolation(self.interpolation_passes)
self._apply_reflective_symmetry()
#__________________________________________________________________________________
def _generate_time_array(self) -> List[float]:
num_samples = max(1, int(self.sample_rate*self.duration))
return [i/self.sample_rate for i in range(num_samples)]
#__________________________________________________________________________________
def _complex_sinusoid(self, t: float, phase_offset: float=0.0) -> complex:
total_phase = 2.0*math.pi*self.frequency*t+self.phase+phase_offset
return self.amplitude*cmath.exp(1j*total_phase)
#__________________________________________________________________________________
def _position_phase_offset(self, i: int, j: int) -> float:
dir_rad = math.radians(self.dynamic_direction)
center = (self.grid_size-1)/2.0
dx, dy = j-center, i-center
pos_offset = math.hypot(dx, dy)*(2.0*math.pi/max(1.0, self.grid_size))
jitter = (self.rng.rand_between(-1.0, 1.0)*math.pi)*0.03
return dir_rad+pos_offset+jitter
#__________________________________________________________________________________
def _generate_initial_grid(self) -> list:
grid = []
for i in range(self.grid_size):
row = []
for j in range(self.grid_size):
use_signal = self.rng.rand() < self.probability_constant
if use_signal:
phase_offset = self._position_phase_offset(i, j)
if self.time_varying:
samples = [self._complex_sinusoid(t, phase_offset) for t in self.time]
row.append(samples)
else:
row.append(self._complex_sinusoid(self.time[0], phase_offset))
else:
if self.time_varying:
row.append([0+0j for _ in self.time])
else:
row.append(0+0j)
grid.append(row)
return grid
#__________________________________________________________________________________
@staticmethod
def _complex_bilinear_interp(z00: complex, z10: complex, z01: complex, z11: complex, u: float, v: float) -> complex:
return (z00*(1-u)*(1-v)+z10*u*(1-v)+z01*(1-u)*v+z11*u*v)
#__________________________________________________________________________________
def _compute_satellite_field(self, base_grid, pass_idx: int) -> list:
n, sat_fields = self.grid_size, []
for sat in self.satellites:
if not sat.active:
continue
sat.angle_deg = (sat.angle_deg+sat.rotation_rate_deg*(pass_idx+1))%360.0
angle_rad = math.radians(sat.angle_deg)
field = []
for i in range(n):
row = []
for j in range(n):
pos_offset = self._position_phase_offset(i, j)
t_index = pass_idx if not self.time_varying else 0
mod = sat.sample_modifier(t_index, self.rng)
i1, j1 = (i+1)%n, (j+1)%n
z00, z10 = base_grid[i][j], base_grid[i][j1]
z01, z11 = base_grid[i1][j], base_grid[i1][j1]
center = (n-1)/2.0
dx, dy = j-center, i-center
proj = dx*math.cos(angle_rad)+dy*math.sin(angle_rad)
maxr = math.hypot(center, center) or 1.0
u = 0.5+(proj/(2*maxr))
v = 0.5+((dx*-math.sin(angle_rad)+dy*math.cos(angle_rad))/(2*maxr))
u, v = max(0.0, min(1.0, u)), max(0.0, min(1.0, v))
if self.time_varying:
interpolated = [self._complex_bilinear_interp(a, b, c, d, u, v) for a, b, c, d in zip(z00, z10, z01, z11)]
samples = [(1.0+sat.weight*mod)*s for s in interpolated ]
row.append(samples)
else:
interpolated = self._complex_bilinear_interp(z00, z10, z01, z11, u, v)
val = (1.0+sat.weight*mod)*interpolated
row.append(val)
field.append(row)
sat_fields.append((sat, field))
return sat_fields
#__________________________________________________________________________________
def _interpolate_grid_once(self, grid, pass_idx: int=0) -> list:
if self.rng.rand() > self.directional_interp_prob:
return grid
sat_fields = self._compute_satellite_field(grid, pass_idx)
n = self.grid_size
new_grid = [[grid[i][j] for j in range(n)] for i in range(n)]
for i in range(n):
for j in range(n):
contribs, weights = [], []
for sat, field in sat_fields:
contribs.append(field[i][j])
weights.append(sat.weight)
if not contribs:
continue
total_w = sum(weights) or 1.0
norm_w = [w/total_w for w in weights]
if self.time_varying:
length, mixed = len(contribs[0]), []
for k in range(length):
s = 0+0j
for idx, c in enumerate(contribs):
s+=norm_w[idx]*c[k]
depth = getattr(self, 'dynamic_depth', 0.5)
mixed.append((1-depth)*grid[i][j][k]+depth*s)
new_grid[i][j] = mixed
else:
s = 0+0j
for idx, c in enumerate(contribs): s+=norm_w[idx]*c
depth = getattr(self, 'dynamic_depth', 0.5)
new_grid[i][j] = (1-depth)*grid[i][j]+depth*s
self._update_dynamic_properties_from_satellites(sat_fields)
if callable(self.per_pass_callback):
try:
self.per_pass_callback(pass_idx, new_grid)
except Exception:
pass
return new_grid
#__________________________________________________________________________________
def _update_dynamic_properties_from_satellites(self, sat_fields):
if not sat_fields:
return
total_w = sum(sat.weight for sat, _ in sat_fields) or 1.0
weighted_angles = sum(sat.weight * sat.angle_deg for sat, _ in sat_fields)
target_direction = (weighted_angles / total_w) % 360.0
current_dir = getattr(self, 'dynamic_direction', self.direction_deg)
self.dynamic_direction = (self.sat_inertia * current_dir) + ((1 - self.sat_inertia) * target_direction)
target_depth = max(0.0, min(1.0, total_w / (len(self.satellites) + 1)))
current_depth = getattr(self, 'dynamic_depth', 0.5)
self.dynamic_depth = (self.sat_inertia * current_depth) + ((1 - self.sat_inertia) * target_depth)
#__________________________________________________________________________________
def _apply_intersecting_interpolation(self, passes: int):
current = self.signal_grid
for p in range(passes):
current = self._interpolate_grid_once(current, pass_idx=p)
self.signal_grid = current
#__________________________________________________________________________________
def _apply_reflective_symmetry(self):
n = self.grid_size
for i in range(n):
for j in range(n):
mi, mj = n-1-i, n-1-j
a, b = self.signal_grid[i][j], self.signal_grid[mi][j]
c, d = self.signal_grid[i][mj], self.signal_grid[mi][mj]
if self.time_varying:
avg = [(a[k]+b[k]+c[k]+d[k])/4.0 for k in range(len(a))]
self.signal_grid[i][j] = avg
self.signal_grid[mi][j] = avg
self.signal_grid[i][mj] = avg
self.signal_grid[mi][mj] = avg
else:
avg = (a+b+c+d)/4.0
self.signal_grid[i][j] = avg
self.signal_grid[mi][j] = avg
self.signal_grid[i][mj] = avg
self.signal_grid[mi][mj] = avg
#__________________________________________________________________________________
def get_signal_grid(self) -> list:
return self.signal_grid
#__________________________________________________________________________________
def set_directional_interp_prob(self, p: float):
self.directional_interp_prob = max(0.0, min(1.0, float(p)))
#__________________________________________________________________________________
#//////////////////////////////////////////////////////////////////////////////////
# WAV Synthesis Params:
max_int24 = (1<<23)-1
bytes_per_sample = 3
sample_rate = 44100
duration_sec = 4.0
freq = 440.0
channels = 2
#__________________________________________________________________________________
def float_to_int24_bytes(x: float) -> bytes:
v = max(-1.0, min(1.0, x))
iv = int(round(v*max_int24))
if iv < 0: iv+=1<<24
return bytes((iv&0xFF, (iv>>8)&0xFF, (iv>>16)&0xFF))
#__________________________________________________________________________________
def write_24bit_gsp_wave_audio_file(out_path: str, center_mags: list, dyn_direction: float, dyn_depth: float):
num_frames = int(sample_rate*duration_sec)
n_blocks = len(center_mags)
max_mag = max(center_mags)
mag_scale = 0.98/max_mag
scaled_mags = [m*mag_scale for m in center_mags]
envelope = [0.0]*num_frames
for i in range(num_frames):
t = i/(num_frames-1)
block_pos = t*(n_blocks-1)
i0 = int(math.floor(block_pos))
i1 = min(i0+1, n_blocks-1)
frac = block_pos-i0
envelope[i] = (1-frac)*scaled_mags[i0]+frac*scaled_mags[i1]
pan = math.cos(math.radians(dyn_direction))
lfo_rate = 0.25
lfo_amp = dyn_depth*0.5
left_gains = [0.0]*num_frames
right_gains = [0.0]*num_frames
for i in range(num_frames):
time = i/sample_rate
lfo = 1.0-lfo_amp*(0.5*(1.0+math.sin(2*math.pi*lfo_rate*time)))
left_pan = math.sqrt(0.5*(1.0-pan))
right_pan = math.sqrt(0.5*(1.0+pan))
left_gains[i] = left_pan*lfo
right_gains[i] = right_pan*lfo
two_pi_f = 2*math.pi*freq
frames = bytearray()
for i in range(num_frames):
time = i/sample_rate
sample = math.sin(two_pi_f*time)
amp = envelope[i]
left_val = sample*amp*left_gains[i]
right_val = sample*amp*right_gains[i]
frames+=float_to_int24_bytes(left_val)
frames+=float_to_int24_bytes(right_val)
with wave.open(out_path, 'wb') as wf:
wf.setnchannels(channels)
wf.setsampwidth(bytes_per_sample)
wf.setframerate(sample_rate)
wf.writeframes(frames)
print(f"GSP wav audio file written: {out_path} ({duration_sec}s, {sample_rate} Hz, 24-bit stereo)")
#__________________________________________________________________________________
if __name__ == "__main__":
# Prints results & writes the wav audio file @ this current modules dir/gsp_wav/
def demo_callback(pass_idx, grid):
# Prints optional pass index & mean magnitude of center cell
n = len(grid)
center = grid[n//2][n//2]
val = center[0] if isinstance(center, list) else center
print(f"Pass {pass_idx}: center magnitude (first sample) = {abs(val):.4f}")
# Custom satellites using harmonic & spectral sequencing
sats = [Satellite('primary', rotation_rate_deg=6.0, weight=0.6,
sequence_mode='harmonic', harmonic_count=5),
Satellite('primary', rotation_rate_deg=-2.5, weight=0.45,
sequence_mode='spectral', spectral_weights=[1.0, 0.8, 0.4]),
Satellite('secondary', rotation_rate_deg=1.5, weight=0.3,
sequence_mode='custom', custom_phase_seq=[0.0, math.pi/5, math.pi/2])]
# Here amplitude & interpolation effect the wav file audio transitions @sat_inertia
# in correspond into facing an reflective source of a infinite regress density laps
gp = GridSignalProcessor(amplitude=5.2,
frequency=56.0,
phase=0.27,
sample_rate=375.5,
duration=0.2,
probability_constant=0.53,
direction_deg=36.5,
grid_size=5,
time_varying=True,
pi_chunk_index=1,
interpolation_passes=28,
satellites=sats,
sat_inertia=1.88,
directional_interp_prob=0.33,
per_pass_callback=demo_callback)
gp.set_directional_interp_prob(0.7)
grid = gp.get_signal_grid()
center = grid[len(grid)//2][len(grid)//2]
cntr_mags = [abs(s) for s in center[:8]]
dyn_direction = gp.dynamic_direction
dyn_depth = gp.dynamic_depth
if isinstance(center, list):
print(f'Center magnitudes(first 8): {cntr_mags}')
else:
print(f'Center magnitude: {cntr_mags}')
print(f'Dynamic direction: {dyn_direction}')
print(f'Dynamic depth: {dyn_depth}')
mdl_pth = f'{os.path.dirname(os.path.abspath(__file__))}'
if not os.path.isdir(f'{mdl_pth}/gsp_wave'):
os.makedirs(f'{mdl_pth}/gsp_wave')
out_path = f'{mdl_pth}/gsp_wave/gsp.wav'
print('Building gsp wav audio file........ (be patient)')
write_24bit_gsp_wave_audio_file(out_path, cntr_mags, dyn_direction, dyn_depth)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment