Created
September 27, 2025 16:57
-
-
Save lastforkbender/8a2da0964096cb55b6d3a78f37ed51b2 to your computer and use it in GitHub Desktop.
Grid Signal Processor with satellite interpolation kernels, pi-correlated RNG, spectral/harmonic phase sequencing opts, adjustable directional interpolation probability & per-pass callback(s).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # gsp.py - Grid Signal Processor with satellite interpolation kernels, pi-correlated RNG, | |
| # spectral/harmonic phase sequencing opts, adjustable directional interpolation | |
| # probability & per-pass callback(s). (Reproduces the wave audio file @ runtime | |
| # where this module is -> current_module_run_directory/gsp_wave/gsp.wav) The | |
| # constants associated with these classes are in sim the satellites are facing | |
| # some type of reflected source from an infinite [angular] density regression. | |
| #__________________________________________________________________________________ | |
| from typing import List, Optional, Callable, Tuple | |
| import hashlib | |
| import struct | |
| import cmath | |
| import math | |
| import wave | |
| import io | |
| import os | |
| #__________________________________________________________________________________ | |
| class PiCorrelatedRNG: | |
| A = 1664525 | |
| C = 1013904223 | |
| MOD = 2**32 | |
| PI_DIGITS = ('3141592653589793238462643383279502884197169399375105820974944592' | |
| '3078164062862089986280348253421170679821480865132823066470938446') | |
| #__________________________________________________________________________________ | |
| def __init__(self, seed_chunk_index: int = 0): | |
| chunk_size = 16 | |
| start = (seed_chunk_index*chunk_size)%max(1, len(self.PI_DIGITS)-chunk_size+1) | |
| chunk = self.PI_DIGITS[start:start + chunk_size] | |
| h = hashlib.sha256(chunk.encode('ascii')).digest() | |
| seed = int.from_bytes(h[:4], 'big')&0xFFFFFFFF | |
| self.state = seed if seed != 0 else 1 | |
| #__________________________________________________________________________________ | |
| def rand(self) -> float: | |
| self.state = (PiCorrelatedRNG.A*self.state+PiCorrelatedRNG.C)%PiCorrelatedRNG.MOD | |
| return self.state/PiCorrelatedRNG.MOD | |
| #__________________________________________________________________________________ | |
| def rand_between(self, a: float, b: float) -> float: | |
| return a+(b-a)*self.rand() | |
| #__________________________________________________________________________________ | |
| def randint(self, a: int, b: int) -> int: | |
| return a+int(self.rand()*(b-a+1)) | |
| #__________________________________________________________________________________ | |
| class Satellite: | |
| def __init__(self, | |
| seq_type: str, | |
| rotation_rate_deg: float, | |
| weight: float, | |
| sequence_mode: str = 'harmonic', # harmonic | spectral | custom | |
| harmonic_count: int = 3, | |
| spectral_weights: Optional[List[float]] = None, | |
| custom_phase_seq: Optional[List[float]] = None, | |
| active: bool = True): | |
| if seq_type not in ('primary', 'secondary'): | |
| raise ValueError("seq_type must be 'primary' or 'secondary'") | |
| if sequence_mode not in ('harmonic', 'spectral', 'custom'): | |
| raise ValueError("sequence_mode must be 'harmonic', 'spectral', or 'custom'") | |
| self.seq_type = seq_type | |
| self.rotation_rate_deg = float(rotation_rate_deg) | |
| self.weight = max(0.0, float(weight)) | |
| self.sequence_mode = sequence_mode | |
| self.harmonic_count = max(1, int(harmonic_count)) | |
| self.spectral_weights = spectral_weights[:] if spectral_weights else None | |
| self.custom_phase_seq = custom_phase_seq[:] if custom_phase_seq else None | |
| self.active = bool(active) | |
| self.angle_deg = 0.0 | |
| self.phase_seq = self._build_phase_sequence() | |
| #__________________________________________________________________________________ | |
| def _build_phase_sequence(self) -> List[float]: | |
| if self.sequence_mode == 'harmonic': | |
| base = 0.0 | |
| return [base+k*(math.pi*2/max(1, self.harmonic_count)) for k in range(1, self.harmonic_count+1)] | |
| elif self.sequence_mode == 'spectral': | |
| if not self.spectral_weights: | |
| # ...fallback to harmonic if no weights provided | |
| return [k*0.5*math.pi for k in range(1, self.harmonic_count+1)] | |
| # And build the phases proportional to the given weights | |
| total = sum(abs(w) for w in self.spectral_weights) or 1.0 | |
| return [(w/total)*2.0*math.pi for w in self.spectral_weights ] | |
| else: | |
| # Is custom | |
| return self.custom_phase_seq[:] if self.custom_phase_seq else [0.0] | |
| #__________________________________________________________________________________ | |
| def step_rotation(self, steps: int=1) -> float: | |
| self.angle_deg = (self.angle_deg+self.rotation_rate_deg*steps)%360.0 | |
| #__________________________________________________________________________________ | |
| def sample_modifier(self, t_index: int, rng: PiCorrelatedRNG) -> float: | |
| return math.sin(self.phase_seq[t_index%len(self.phase_seq)]+(rng.rand_between(-1.0, 1.0))*0.01) | |
| #__________________________________________________________________________________ | |
| class GridSignalProcessor: | |
| def __init__(self, | |
| amplitude: float, | |
| frequency: float, | |
| phase: float, | |
| sample_rate: float, | |
| duration: float, | |
| probability_constant: float, | |
| direction_deg: float, | |
| grid_size: int = 5, | |
| time_varying: bool = False, | |
| pi_chunk_index: int=0, | |
| interpolation_passes: int=2, | |
| satellites: Optional[List[Satellite]]=None, | |
| sat_inertia: float=0.85, | |
| directional_interp_prob: float=1.0, | |
| per_pass_callback: Optional[Callable[[int, List[List]], None]]=None): | |
| if sample_rate <= 0 or duration <= 0: | |
| raise ValueError('sample_rate and duration must be positive') | |
| if not (0.0 <= probability_constant <= 1.0): | |
| raise ValueError('probability_constant must be [0, 1]') | |
| if grid_size <= 0: | |
| raise ValueError('grid_size must be positive') | |
| if interpolation_passes < 0: | |
| raise ValueError('interpolation_passes must be >= 0') | |
| self.amplitude = float(amplitude) | |
| self.frequency = float(frequency) | |
| self.phase = float(phase) | |
| self.sample_rate = float(sample_rate) | |
| self.duration = float(duration) | |
| self.probability_constant = float(probability_constant) | |
| self.direction_deg = float(direction_deg) | |
| self.grid_size = int(grid_size) | |
| self.time_varying = bool(time_varying) | |
| self.interpolation_passes = int(interpolation_passes) | |
| self.pi_chunk_index = int(pi_chunk_index) | |
| self.sat_inertia = float(sat_inertia) | |
| self.directional_interp_prob = float(directional_interp_prob) | |
| self.per_pass_callback = per_pass_callback | |
| self.rng = PiCorrelatedRNG(seed_chunk_index=self.pi_chunk_index) | |
| self.time = self._generate_time_array() | |
| if satellites is None: | |
| satellites = [Satellite('primary', rotation_rate_deg=5.0, weight=0.6, sequence_mode='harmonic', harmonic_count=4), | |
| Satellite('primary', rotation_rate_deg=-3.0, weight=0.45, sequence_mode='spectral', spectral_weights=[1.0, 0.5, 0.25]), | |
| Satellite('secondary', rotation_rate_deg=2.0, weight=0.25, sequence_mode='custom', custom_phase_seq=[0.0, math.pi/3, math.pi/2])] | |
| self.satellites = satellites | |
| self.dynamic_direction = self.direction_deg | |
| self.dynamic_depth = 0.5 | |
| self.signal_grid = self._generate_initial_grid() | |
| if self.interpolation_passes > 0: | |
| self._apply_intersecting_interpolation(self.interpolation_passes) | |
| self._apply_reflective_symmetry() | |
| #__________________________________________________________________________________ | |
| def _generate_time_array(self) -> List[float]: | |
| num_samples = max(1, int(self.sample_rate*self.duration)) | |
| return [i/self.sample_rate for i in range(num_samples)] | |
| #__________________________________________________________________________________ | |
| def _complex_sinusoid(self, t: float, phase_offset: float=0.0) -> complex: | |
| total_phase = 2.0*math.pi*self.frequency*t+self.phase+phase_offset | |
| return self.amplitude*cmath.exp(1j*total_phase) | |
| #__________________________________________________________________________________ | |
| def _position_phase_offset(self, i: int, j: int) -> float: | |
| dir_rad = math.radians(self.dynamic_direction) | |
| center = (self.grid_size-1)/2.0 | |
| dx, dy = j-center, i-center | |
| pos_offset = math.hypot(dx, dy)*(2.0*math.pi/max(1.0, self.grid_size)) | |
| jitter = (self.rng.rand_between(-1.0, 1.0)*math.pi)*0.03 | |
| return dir_rad+pos_offset+jitter | |
| #__________________________________________________________________________________ | |
| def _generate_initial_grid(self) -> list: | |
| grid = [] | |
| for i in range(self.grid_size): | |
| row = [] | |
| for j in range(self.grid_size): | |
| use_signal = self.rng.rand() < self.probability_constant | |
| if use_signal: | |
| phase_offset = self._position_phase_offset(i, j) | |
| if self.time_varying: | |
| samples = [self._complex_sinusoid(t, phase_offset) for t in self.time] | |
| row.append(samples) | |
| else: | |
| row.append(self._complex_sinusoid(self.time[0], phase_offset)) | |
| else: | |
| if self.time_varying: | |
| row.append([0+0j for _ in self.time]) | |
| else: | |
| row.append(0+0j) | |
| grid.append(row) | |
| return grid | |
| #__________________________________________________________________________________ | |
| @staticmethod | |
| def _complex_bilinear_interp(z00: complex, z10: complex, z01: complex, z11: complex, u: float, v: float) -> complex: | |
| return (z00*(1-u)*(1-v)+z10*u*(1-v)+z01*(1-u)*v+z11*u*v) | |
| #__________________________________________________________________________________ | |
| def _compute_satellite_field(self, base_grid, pass_idx: int) -> list: | |
| n, sat_fields = self.grid_size, [] | |
| for sat in self.satellites: | |
| if not sat.active: | |
| continue | |
| sat.angle_deg = (sat.angle_deg+sat.rotation_rate_deg*(pass_idx+1))%360.0 | |
| angle_rad = math.radians(sat.angle_deg) | |
| field = [] | |
| for i in range(n): | |
| row = [] | |
| for j in range(n): | |
| pos_offset = self._position_phase_offset(i, j) | |
| t_index = pass_idx if not self.time_varying else 0 | |
| mod = sat.sample_modifier(t_index, self.rng) | |
| i1, j1 = (i+1)%n, (j+1)%n | |
| z00, z10 = base_grid[i][j], base_grid[i][j1] | |
| z01, z11 = base_grid[i1][j], base_grid[i1][j1] | |
| center = (n-1)/2.0 | |
| dx, dy = j-center, i-center | |
| proj = dx*math.cos(angle_rad)+dy*math.sin(angle_rad) | |
| maxr = math.hypot(center, center) or 1.0 | |
| u = 0.5+(proj/(2*maxr)) | |
| v = 0.5+((dx*-math.sin(angle_rad)+dy*math.cos(angle_rad))/(2*maxr)) | |
| u, v = max(0.0, min(1.0, u)), max(0.0, min(1.0, v)) | |
| if self.time_varying: | |
| interpolated = [self._complex_bilinear_interp(a, b, c, d, u, v) for a, b, c, d in zip(z00, z10, z01, z11)] | |
| samples = [(1.0+sat.weight*mod)*s for s in interpolated ] | |
| row.append(samples) | |
| else: | |
| interpolated = self._complex_bilinear_interp(z00, z10, z01, z11, u, v) | |
| val = (1.0+sat.weight*mod)*interpolated | |
| row.append(val) | |
| field.append(row) | |
| sat_fields.append((sat, field)) | |
| return sat_fields | |
| #__________________________________________________________________________________ | |
| def _interpolate_grid_once(self, grid, pass_idx: int=0) -> list: | |
| if self.rng.rand() > self.directional_interp_prob: | |
| return grid | |
| sat_fields = self._compute_satellite_field(grid, pass_idx) | |
| n = self.grid_size | |
| new_grid = [[grid[i][j] for j in range(n)] for i in range(n)] | |
| for i in range(n): | |
| for j in range(n): | |
| contribs, weights = [], [] | |
| for sat, field in sat_fields: | |
| contribs.append(field[i][j]) | |
| weights.append(sat.weight) | |
| if not contribs: | |
| continue | |
| total_w = sum(weights) or 1.0 | |
| norm_w = [w/total_w for w in weights] | |
| if self.time_varying: | |
| length, mixed = len(contribs[0]), [] | |
| for k in range(length): | |
| s = 0+0j | |
| for idx, c in enumerate(contribs): | |
| s+=norm_w[idx]*c[k] | |
| depth = getattr(self, 'dynamic_depth', 0.5) | |
| mixed.append((1-depth)*grid[i][j][k]+depth*s) | |
| new_grid[i][j] = mixed | |
| else: | |
| s = 0+0j | |
| for idx, c in enumerate(contribs): s+=norm_w[idx]*c | |
| depth = getattr(self, 'dynamic_depth', 0.5) | |
| new_grid[i][j] = (1-depth)*grid[i][j]+depth*s | |
| self._update_dynamic_properties_from_satellites(sat_fields) | |
| if callable(self.per_pass_callback): | |
| try: | |
| self.per_pass_callback(pass_idx, new_grid) | |
| except Exception: | |
| pass | |
| return new_grid | |
| #__________________________________________________________________________________ | |
| def _update_dynamic_properties_from_satellites(self, sat_fields): | |
| if not sat_fields: | |
| return | |
| total_w = sum(sat.weight for sat, _ in sat_fields) or 1.0 | |
| weighted_angles = sum(sat.weight * sat.angle_deg for sat, _ in sat_fields) | |
| target_direction = (weighted_angles / total_w) % 360.0 | |
| current_dir = getattr(self, 'dynamic_direction', self.direction_deg) | |
| self.dynamic_direction = (self.sat_inertia * current_dir) + ((1 - self.sat_inertia) * target_direction) | |
| target_depth = max(0.0, min(1.0, total_w / (len(self.satellites) + 1))) | |
| current_depth = getattr(self, 'dynamic_depth', 0.5) | |
| self.dynamic_depth = (self.sat_inertia * current_depth) + ((1 - self.sat_inertia) * target_depth) | |
| #__________________________________________________________________________________ | |
| def _apply_intersecting_interpolation(self, passes: int): | |
| current = self.signal_grid | |
| for p in range(passes): | |
| current = self._interpolate_grid_once(current, pass_idx=p) | |
| self.signal_grid = current | |
| #__________________________________________________________________________________ | |
| def _apply_reflective_symmetry(self): | |
| n = self.grid_size | |
| for i in range(n): | |
| for j in range(n): | |
| mi, mj = n-1-i, n-1-j | |
| a, b = self.signal_grid[i][j], self.signal_grid[mi][j] | |
| c, d = self.signal_grid[i][mj], self.signal_grid[mi][mj] | |
| if self.time_varying: | |
| avg = [(a[k]+b[k]+c[k]+d[k])/4.0 for k in range(len(a))] | |
| self.signal_grid[i][j] = avg | |
| self.signal_grid[mi][j] = avg | |
| self.signal_grid[i][mj] = avg | |
| self.signal_grid[mi][mj] = avg | |
| else: | |
| avg = (a+b+c+d)/4.0 | |
| self.signal_grid[i][j] = avg | |
| self.signal_grid[mi][j] = avg | |
| self.signal_grid[i][mj] = avg | |
| self.signal_grid[mi][mj] = avg | |
| #__________________________________________________________________________________ | |
| def get_signal_grid(self) -> list: | |
| return self.signal_grid | |
| #__________________________________________________________________________________ | |
| def set_directional_interp_prob(self, p: float): | |
| self.directional_interp_prob = max(0.0, min(1.0, float(p))) | |
| #__________________________________________________________________________________ | |
| #////////////////////////////////////////////////////////////////////////////////// | |
| # WAV Synthesis Params: | |
| max_int24 = (1<<23)-1 | |
| bytes_per_sample = 3 | |
| sample_rate = 44100 | |
| duration_sec = 4.0 | |
| freq = 440.0 | |
| channels = 2 | |
| #__________________________________________________________________________________ | |
| def float_to_int24_bytes(x: float) -> bytes: | |
| v = max(-1.0, min(1.0, x)) | |
| iv = int(round(v*max_int24)) | |
| if iv < 0: iv+=1<<24 | |
| return bytes((iv&0xFF, (iv>>8)&0xFF, (iv>>16)&0xFF)) | |
| #__________________________________________________________________________________ | |
| def write_24bit_gsp_wave_audio_file(out_path: str, center_mags: list, dyn_direction: float, dyn_depth: float): | |
| num_frames = int(sample_rate*duration_sec) | |
| n_blocks = len(center_mags) | |
| max_mag = max(center_mags) | |
| mag_scale = 0.98/max_mag | |
| scaled_mags = [m*mag_scale for m in center_mags] | |
| envelope = [0.0]*num_frames | |
| for i in range(num_frames): | |
| t = i/(num_frames-1) | |
| block_pos = t*(n_blocks-1) | |
| i0 = int(math.floor(block_pos)) | |
| i1 = min(i0+1, n_blocks-1) | |
| frac = block_pos-i0 | |
| envelope[i] = (1-frac)*scaled_mags[i0]+frac*scaled_mags[i1] | |
| pan = math.cos(math.radians(dyn_direction)) | |
| lfo_rate = 0.25 | |
| lfo_amp = dyn_depth*0.5 | |
| left_gains = [0.0]*num_frames | |
| right_gains = [0.0]*num_frames | |
| for i in range(num_frames): | |
| time = i/sample_rate | |
| lfo = 1.0-lfo_amp*(0.5*(1.0+math.sin(2*math.pi*lfo_rate*time))) | |
| left_pan = math.sqrt(0.5*(1.0-pan)) | |
| right_pan = math.sqrt(0.5*(1.0+pan)) | |
| left_gains[i] = left_pan*lfo | |
| right_gains[i] = right_pan*lfo | |
| two_pi_f = 2*math.pi*freq | |
| frames = bytearray() | |
| for i in range(num_frames): | |
| time = i/sample_rate | |
| sample = math.sin(two_pi_f*time) | |
| amp = envelope[i] | |
| left_val = sample*amp*left_gains[i] | |
| right_val = sample*amp*right_gains[i] | |
| frames+=float_to_int24_bytes(left_val) | |
| frames+=float_to_int24_bytes(right_val) | |
| with wave.open(out_path, 'wb') as wf: | |
| wf.setnchannels(channels) | |
| wf.setsampwidth(bytes_per_sample) | |
| wf.setframerate(sample_rate) | |
| wf.writeframes(frames) | |
| print(f"GSP wav audio file written: {out_path} ({duration_sec}s, {sample_rate} Hz, 24-bit stereo)") | |
| #__________________________________________________________________________________ | |
| if __name__ == "__main__": | |
| # Prints results & writes the wav audio file @ this current modules dir/gsp_wav/ | |
| def demo_callback(pass_idx, grid): | |
| # Prints optional pass index & mean magnitude of center cell | |
| n = len(grid) | |
| center = grid[n//2][n//2] | |
| val = center[0] if isinstance(center, list) else center | |
| print(f"Pass {pass_idx}: center magnitude (first sample) = {abs(val):.4f}") | |
| # Custom satellites using harmonic & spectral sequencing | |
| sats = [Satellite('primary', rotation_rate_deg=6.0, weight=0.6, | |
| sequence_mode='harmonic', harmonic_count=5), | |
| Satellite('primary', rotation_rate_deg=-2.5, weight=0.45, | |
| sequence_mode='spectral', spectral_weights=[1.0, 0.8, 0.4]), | |
| Satellite('secondary', rotation_rate_deg=1.5, weight=0.3, | |
| sequence_mode='custom', custom_phase_seq=[0.0, math.pi/5, math.pi/2])] | |
| # Here amplitude & interpolation effect the wav file audio transitions @sat_inertia | |
| # in correspond into facing an reflective source of a infinite regress density laps | |
| gp = GridSignalProcessor(amplitude=5.2, | |
| frequency=56.0, | |
| phase=0.27, | |
| sample_rate=375.5, | |
| duration=0.2, | |
| probability_constant=0.53, | |
| direction_deg=36.5, | |
| grid_size=5, | |
| time_varying=True, | |
| pi_chunk_index=1, | |
| interpolation_passes=28, | |
| satellites=sats, | |
| sat_inertia=1.88, | |
| directional_interp_prob=0.33, | |
| per_pass_callback=demo_callback) | |
| gp.set_directional_interp_prob(0.7) | |
| grid = gp.get_signal_grid() | |
| center = grid[len(grid)//2][len(grid)//2] | |
| cntr_mags = [abs(s) for s in center[:8]] | |
| dyn_direction = gp.dynamic_direction | |
| dyn_depth = gp.dynamic_depth | |
| if isinstance(center, list): | |
| print(f'Center magnitudes(first 8): {cntr_mags}') | |
| else: | |
| print(f'Center magnitude: {cntr_mags}') | |
| print(f'Dynamic direction: {dyn_direction}') | |
| print(f'Dynamic depth: {dyn_depth}') | |
| mdl_pth = f'{os.path.dirname(os.path.abspath(__file__))}' | |
| if not os.path.isdir(f'{mdl_pth}/gsp_wave'): | |
| os.makedirs(f'{mdl_pth}/gsp_wave') | |
| out_path = f'{mdl_pth}/gsp_wave/gsp.wav' | |
| print('Building gsp wav audio file........ (be patient)') | |
| write_24bit_gsp_wave_audio_file(out_path, cntr_mags, dyn_direction, dyn_depth) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment