-
-
Save bczhc/611317bf451808a14df3c332b7df9978 to your computer and use it in GitHub Desktop.
对音频dft小玩具 #dft #fft #audio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //! Mainly wrote by Gemini. | |
| use anyhow::{Context, Result}; | |
| use clap::Parser; | |
| use hound::{SampleFormat, WavReader, WavSpec, WavWriter}; | |
| use num_complex::Complex64; | |
| use rustfft::FftPlanner; | |
| use std::env; | |
| use std::f64::consts::TAU; | |
| use std::path::PathBuf; | |
| #[derive(Parser, Debug)] | |
| #[command(author, version, about = "FFT Dual Output Processor")] | |
| struct Args { | |
| /// 输入原始 WAV 文件路径 | |
| input: String, | |
| /// 每个分块的单声道采样数 (建议 1024, 2048 或 4096) | |
| #[arg(short, long, default_value_t = 2048)] | |
| samples: usize, | |
| } | |
| fn main() -> Result<()> { | |
| let mut args = Args::parse(); | |
| // 定位家目录输出路径 | |
| let home = env::var("HOME").context("无法获取 HOME 环境变量")?; | |
| let out_freq_path = PathBuf::from(&home).join("out.wav"); | |
| let out_restored_path = PathBuf::from(&home).join("out-d.wav"); | |
| let mut reader = WavReader::open(&args.input)?; | |
| let spec = reader.spec(); | |
| let sample_rate = spec.sample_rate; | |
| if spec.channels != 2 { | |
| panic!("只支持立体声 WAV 文件"); | |
| } | |
| // 配置输出 Spec | |
| let out_spec = WavSpec { | |
| channels: 2, | |
| sample_rate, | |
| bits_per_sample: 16, | |
| sample_format: SampleFormat::Int, | |
| }; | |
| // 创建两个写入器 | |
| let mut freq_writer = WavWriter::create(&out_freq_path, out_spec)?; | |
| let mut restored_writer = WavWriter::create(&out_restored_path, out_spec)?; | |
| // 读取所有样本 | |
| let samples: Vec<i16> = reader.samples::<i16>().map(|s| s.unwrap()).collect(); | |
| println!("Total Samples: {}", samples.len()); | |
| println!("Total Samples (each channel): {}", samples.len() / 2); | |
| if args.samples == 0 { | |
| args.samples = samples.len() / 2; | |
| } | |
| println!("Chunk Size: {}, Sample Rate: {}", args.samples, sample_rate); | |
| let chunk_size = args.samples * 2; | |
| let n = args.samples as f64; | |
| // FFT 计划器 | |
| let mut planner = FftPlanner::new(); | |
| let fft = planner.plan_fft_forward(args.samples); | |
| let ifft = planner.plan_fft_inverse(args.samples); | |
| // 使用 chunks_exact 保证每一块长度一致,方便 FFT 处理 | |
| for (chunk_i, chunk) in samples.chunks_exact(chunk_size).enumerate() { | |
| if chunk_i % 100 == 0 { | |
| println!("Processing Chunk: {}", chunk_i); | |
| } | |
| // 1. 转换为复数 (L + Ri) | |
| let mut buffer: Vec<Complex64> = chunk | |
| .chunks_exact(2) | |
| .map(|p| Complex64::new(p[0] as f64 / i16::MAX as f64, p[1] as f64 / i16::MAX as f64)) | |
| .collect(); | |
| // 2. 执行 FFT (DFT) | |
| fft.process(&mut buffer); | |
| // 归一化并写入频域文件 (out.wav) | |
| for c in buffer.iter_mut() { | |
| *c /= n; // 归一化 | |
| let left = (c.re.clamp(-1.0, 1.0) * i16::MAX as f64) as i16; | |
| let right = (c.im.clamp(-1.0, 1.0) * i16::MAX as f64) as i16; | |
| freq_writer.write_sample(left)?; | |
| freq_writer.write_sample(right)?; | |
| } | |
| // 好玩的频域处理 | |
| freq_domain_process_mixed(&mut buffer, &args); | |
| // 3. 执行 IFFT (IDFT) 还原 | |
| // 直接使用刚才 buffer 里的频域数据进行逆变换 | |
| ifft.process(&mut buffer); | |
| // 写入还原文件 (out-d.wav) | |
| for (_i, c) in buffer.iter().enumerate() { | |
| let left = (c.re.clamp(-1.0, 1.0) * i16::MAX as f64) as i16; | |
| let right = (c.im.clamp(-1.0, 1.0) * i16::MAX as f64) as i16; | |
| restored_writer.write_sample(left)?; | |
| restored_writer.write_sample(right)?; | |
| } | |
| } | |
| freq_writer.finalize()?; | |
| restored_writer.finalize()?; | |
| println!("\n处理完成!"); | |
| println!("频域输出: {:?}", out_freq_path); | |
| println!("还原输出: {:?}", out_restored_path); | |
| Ok(()) | |
| } | |
| /// Apply manipulations on the "mixed" frequency-domain buffer. | |
| /// | |
| /// Typically, processing stereo audio requires two separate FFTs for the Left and Right channels. | |
| /// Instead, I packed the Left channel into the Real part and the Right channel into the | |
| /// Imaginary part of a single complex-valued sequence ($z[n] = L[n] + jR[n]$). | |
| /// | |
| /// Because the FFT output of this mixed sequence intertwines the two channels, | |
| /// I must "unpack" them using Hermitian symmetry properties to recover the individual | |
| /// spectrums ($X_L$ and $X_R$) before processing. After processing, I "repack" them | |
| /// so that a single IFFT can restore both channels simultaneously. | |
| fn freq_domain_process_mixed(buffer: &mut Vec<Complex64>, args: &Args) { | |
| let n = buffer.len(); | |
| let mut x_l = vec![Complex64::default(); n]; | |
| let mut x_r = vec![Complex64::default(); n]; | |
| // --- 1. Unpack: 从混杂的 buffer 中提取左右声道频谱 --- | |
| for k in 0..n { | |
| // 对称索引 k' = (N - k) % N | |
| let k_rev = (n - k) % n; | |
| let z_k = buffer[k]; | |
| let z_k_rev_conj = buffer[k_rev].conj(); | |
| // 提取左声道频谱: X_L(k) = (Z(k) + Z*(N-k)) / 2 | |
| x_l[k] = (z_k + z_k_rev_conj) * 0.5; | |
| // 提取右声道频谱: X_R(k) = (Z(k) - Z*(N-k)) / 2j | |
| // 注意: 除以 j 等于 乘以 -j | |
| let diff = z_k - z_k_rev_conj; | |
| x_r[k] = Complex64::new(diff.im, -diff.re) * 0.5; | |
| } | |
| // --- 2. 在这里进行你的频域处理 (例如滤波、增益等) --- | |
| // 示例:给右声道加个简单的低通或增益 | |
| freq_domain_process(&mut x_l, args); | |
| freq_domain_process(&mut x_r, args); | |
| // --- 3. Pack: 处理完后合回 buffer,准备做 IFFT --- | |
| // 恢复公式: Z(k) = X_L(k) + j * X_R(k) | |
| for k in 0..n { | |
| let j = Complex64::i(); | |
| buffer[k] = x_l[k] + j * x_r[k]; | |
| } | |
| } | |
| fn apply_hermitian_symmetry(buffer: &mut [Complex64]) { | |
| let n = buffer.len(); | |
| if n == 0 { return; } | |
| buffer[0] = Complex64::new(buffer[0].re, 0.0); | |
| let mid = n / 2; | |
| if n % 2 == 0 { | |
| buffer[mid] = Complex64::new(buffer[mid].re, 0.0); | |
| } | |
| for i in 1..((n + 1) / 2) { | |
| let mirror_idx = n - i; | |
| buffer[mirror_idx] = buffer[i].conj(); | |
| } | |
| } | |
| fn freq_domain_process(buffer: &mut [Complex64], args: &Args) { | |
| // 振幅量化:8-bit 数字化效果 | |
| // for x in buffer.iter_mut() { | |
| // let (r, theta) = x.to_polar(); | |
| // let quantized_r = (r * 10.0).round() / 10.0; // 这里的 10.0 决定了细节多少 | |
| // *x = Complex64::from_polar(quantized_r, theta); | |
| // } | |
| // // 简单的频谱左移(降低音调) | |
| // let shift = 10; // 移动的 bin 数量 | |
| // buffer.rotate_left(shift); | |
| // for x in buffer.iter_mut() { | |
| // let (r, _) = x.to_polar(); | |
| // 放弃原始相位,固定为 0 | |
| // *x = Complex64::new(r, 0.0); | |
| // } | |
| // // 你需要一个在循环外定义的变量来存储上一块的能量 | |
| // let mut prev_magnitudes = vec![0.0f64; args.samples]; | |
| // | |
| // // 在循环内部: | |
| // for x in buffer.iter_mut().enumerate() { | |
| // let (r, theta) = x.1.to_polar(); | |
| // // 让当前的振幅受上一块影响 (0.9 是反馈系数) | |
| // let smeared_r = r * 0.1 + prev_magnitudes[x.0] * 0.9; | |
| // prev_magnitudes[x.0] = smeared_r; | |
| // *x.1 = Complex64::from_polar(smeared_r, theta); | |
| // } | |
| // 翻转信号 | |
| // { | |
| // let n = buffer.len(); | |
| // let mid = n / 2; | |
| // | |
| // let target_freq_hz = 1000.0; | |
| // // 1. 计算目标频率对应的索引位置 (Bin index) | |
| // // 频率分辨率 df = sample_rate / n | |
| // let pivot_idx = (target_freq_hz * (n as f64) / args.samples as f64).round() as usize; | |
| // | |
| // // 确保 pivot 在合法范围内 (0..mid) | |
| // let pivot = pivot_idx.clamp(1, mid - 1); | |
| // | |
| // // 2. 翻转逻辑: | |
| // // 我们将 1..mid 这一段以 pivot 为中心进行“左右互换” | |
| // // 这里使用切片的 rotate_left/right 是最安全且高效的方法 | |
| // // 这种操作在数学上等同于在时域乘以一个复指数信号 | |
| // let part_to_flip = &mut buffer[1..mid]; | |
| // let len = part_to_flip.len(); | |
| // | |
| // // 这种翻转实质上是频移,我们可以根据 pivot 位置进行旋转 | |
| // part_to_flip.rotate_left(pivot % len); | |
| // | |
| // // 3. 重新应用共轭对称,保证时域信号仍为实数 | |
| // apply_hermitian_symmetry(buffer); | |
| // } | |
| // buffer.reverse(); | |
| // let mid = buffer.len() as f64 * 0.8; | |
| // for x in buffer[..(mid as usize)].iter_mut() { | |
| // *x = Complex64::new(0.0, 0.0); | |
| // } | |
| // buffer.shuffle(); | |
| // buffer.sort_by(|a,b| a.norm().partial_cmp(&b.norm()).unwrap()) | |
| let buffer_len = buffer.len(); | |
| for (idx, x) in buffer.iter_mut().enumerate() { | |
| let t = idx as f64 / buffer_len as f64 * TAU * 44100 as f64; | |
| let (r, theta) = x.to_polar(); | |
| // *x = Complex64::from_polar(r, theta * f64::sin(t)); | |
| *x = Complex64::from_polar(r * t, theta); | |
| } | |
| apply_hermitian_symmetry(buffer); | |
| } | |
| #[cfg(test)] | |
| mod test { | |
| use num_complex::Complex64; | |
| use rustfft::FftPlanner; | |
| #[test] | |
| fn test() { | |
| let mut planner = FftPlanner::new(); | |
| let fft = planner.plan_fft_forward(2); | |
| let mut input = [Complex64::new(0.0, 0.0), Complex64::new(1.0, 1.0)]; | |
| fft.process(&mut input); | |
| println!("{:?}", input); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment