Last active
February 13, 2026 02:18
-
-
Save shrimo/f60299e52bfab2277013421c2e0629da to your computer and use it in GitHub Desktop.
EKF 3D Simulation with IMU and Visual Measurements
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| EKF 3D Simulation with IMU and Visual Measurements | |
| Simulates a 3D trajectory and performs Extended Kalman Filter (EKF) state estimation | |
| using high-frequency IMU (accelerometer) data and lower-frequency visual measurements. | |
| Physical model: | |
| State vector: x = [px, py, pz, vx, vy, vz] | |
| IMU input: linear acceleration a (integrated to propagate velocity) | |
| Visual input: noisy position measurements (camera/VIO) | |
| State transition (constant-acceleration model over dt): | |
| px_k+1 = px_k + vx_k*dt + 0.5*ax_k*dt^2 | |
| vx_k+1 = vx_k + ax_k*dt | |
| (same for y, z) | |
| EKF flow per timestep: | |
| 1. Predict: propagate x and P using F(dt) and control input u = a_imu | |
| 2. Update (when visual available): correct x and P using H and R_vis | |
| """ | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from matplotlib.animation import FuncAnimation | |
| import time | |
| np.random.seed(int(time.time())) | |
| # ============================================================ | |
| # Ground-Truth Trajectory Generator | |
| # ============================================================ | |
| def make_ground_truth(time: np.ndarray): | |
| """ | |
| Build a smooth sinusoidal 3D ground-truth trajectory. | |
| Returns: | |
| pos : (N, 3) true positions | |
| vel : (N, 3) true velocities (analytical derivative) | |
| accel: (N, 3) true accelerations (analytical second derivative) | |
| """ | |
| t = time | |
| # Position: x(t) = A*sin(omega*t) | |
| pos = np.column_stack([ | |
| 0.5 * np.sin(2 * np.pi * 0.5 * t), | |
| 0.5 * np.cos(2 * np.pi * 0.3 * t), | |
| 0.2 * np.sin(2 * np.pi * 0.7 * t), | |
| ]) | |
| # Velocity: analytical derivative of pos | |
| vel = np.column_stack([ | |
| 0.5 * 2 * np.pi * 0.5 * np.cos(2 * np.pi * 0.5 * t), | |
| -0.5 * 2 * np.pi * 0.3 * np.sin(2 * np.pi * 0.3 * t), | |
| 0.2 * 2 * np.pi * 0.7 * np.cos(2 * np.pi * 0.7 * t), | |
| ]) | |
| # Acceleration: analytical second derivative of pos | |
| accel = np.column_stack([ | |
| -0.5 * (2 * np.pi * 0.5) ** 2 * np.sin(2 * np.pi * 0.5 * t), | |
| -0.5 * (2 * np.pi * 0.3) ** 2 * np.cos(2 * np.pi * 0.3 * t), | |
| -0.2 * (2 * np.pi * 0.7) ** 2 * np.sin(2 * np.pi * 0.7 * t), | |
| ]) | |
| return pos, vel, accel | |
| # ============================================================ | |
| # IMU Data Generator | |
| # ============================================================ | |
| class IMUDataGenerator: | |
| """ | |
| Simulates a 3-axis accelerometer. | |
| A real IMU measures specific force (linear acceleration) in the body frame. | |
| Here we work in a world-aligned frame and add additive white Gaussian noise, | |
| which models sensor noise and vibration. | |
| noise_std [m/s^2]: standard deviation of accelerometer noise per axis. | |
| """ | |
| def __init__(self, true_accel: np.ndarray, noise_std: float = 0.3): | |
| self.true_accel = true_accel # (N, 3) true accelerations [m/s^2] | |
| self.noise_std = noise_std # [m/s^2] typical MEMS accelerometer noise | |
| def generate(self) -> np.ndarray: | |
| """Return noisy acceleration measurements (N, 3).""" | |
| return self.true_accel + np.random.randn(*self.true_accel.shape) * self.noise_std | |
| # ============================================================ | |
| # Visual (Camera) Data Generator | |
| # ============================================================ | |
| class VisualDataGenerator: | |
| """ | |
| Simulates sparse visual position measurements (e.g. from VIO or GPS). | |
| Measurements arrive at a much lower rate than IMU and represent | |
| noisy 3D position observations. | |
| noise_std [m]: standard deviation of position measurement noise. | |
| """ | |
| def __init__(self, true_pos: np.ndarray, indices: np.ndarray, noise_std: float = 0.05): | |
| self.true_pos = true_pos # (N, 3) full ground-truth positions | |
| self.indices = indices # timestep indices where visual data arrives | |
| self.noise_std = noise_std # [m] camera/GPS position noise | |
| def generate(self) -> np.ndarray: | |
| """Return noisy position measurements at visual_indices, shape (M, 3).""" | |
| return self.true_pos[self.indices] + \ | |
| np.random.randn(len(self.indices), 3) * self.noise_std | |
| # ============================================================ | |
| # EKF Class — Constant-Acceleration Motion Model | |
| # ============================================================ | |
| class EKF3D: | |
| """ | |
| Extended Kalman Filter for 6-DOF position + velocity estimation. | |
| State: x = [px, py, pz, vx, vy, vz] (6,) | |
| Control input: u = a_imu (3,) [m/s^2] | |
| Measurement: z = [px, py, pz] (3,) (visual only) | |
| State transition (ZOH, constant-acceleration over dt): | |
| F = [ I dt*I ] | |
| [ 0 I ] | |
| Control matrix (how acceleration enters the state): | |
| B = [ 0.5*dt^2 * I ] | |
| [ dt * I ] | |
| Measurement matrix (observe position only): | |
| H = [ I | 0 ] | |
| """ | |
| def __init__(self, dt: float, R_vis: np.ndarray, | |
| accel_noise_std: float, init_pos: np.ndarray): | |
| self.dt = dt | |
| # ---- State and covariance ---- | |
| # Initialise at the first true position with zero velocity | |
| self.x = np.zeros(6) | |
| self.x[:3] = init_pos # start at the true initial position | |
| self.P = np.eye(6) * 0.1 # initial uncertainty | |
| # ---- State-transition matrix F ---- | |
| self.F = np.block([ | |
| [np.eye(3), dt * np.eye(3)], | |
| [np.zeros((3, 3)), np.eye(3)], | |
| ]) | |
| # ---- Control-input matrix B (acceleration → state) ---- | |
| self.B = np.vstack([ | |
| 0.5 * dt ** 2 * np.eye(3), # position update: 0.5*a*dt^2 | |
| dt * np.eye(3), # velocity update: a*dt | |
| ]) | |
| # ---- Process noise Q ---- | |
| # Continuous white-noise acceleration model discretised over dt. | |
| # sigma_a^2 is the variance of the IMU acceleration noise [m^2/s^4]. | |
| sigma_a2 = accel_noise_std ** 2 | |
| # Standard Van Loan / discrete white noise acceleration covariance: | |
| Q_pos = 0.25 * dt ** 4 * sigma_a2 * np.eye(3) | |
| Q_pv = 0.5 * dt ** 3 * sigma_a2 * np.eye(3) | |
| Q_vel = dt ** 2 * sigma_a2 * np.eye(3) | |
| self.Q = np.block([ | |
| [Q_pos, Q_pv], | |
| [Q_pv, Q_vel], | |
| ]) | |
| # ---- Measurement matrix H (observe position only) ---- | |
| self.H = np.block([np.eye(3), np.zeros((3, 3))]) | |
| # ---- Measurement noise covariance R ---- | |
| self.R = R_vis # (3, 3) position measurement noise from camera | |
| # ---- History storage ---- | |
| self.ekf_positions = [] # (N, 6) full state history | |
| self.ekf_cov_diag = [] # (N, 6) diagonal of P at each step | |
| def step(self, a_imu: np.ndarray, z: np.ndarray = None): | |
| """ | |
| One EKF cycle. | |
| Args: | |
| a_imu: (3,) IMU acceleration measurement at current timestep [m/s^2] | |
| z: (3,) visual position measurement, or None if unavailable | |
| """ | |
| # ---- Predict step ---- | |
| # Propagate state using the motion model with IMU as control input | |
| self.x = self.F @ self.x + self.B @ a_imu | |
| # Propagate covariance (F is linear, so EKF == KF here) | |
| self.P = self.F @ self.P @ self.F.T + self.Q | |
| # ---- Update step (only when a visual measurement is available) ---- | |
| if z is not None: | |
| y = z - self.H @ self.x # innovation (measurement residual) | |
| S = self.H @ self.P @ self.H.T + self.R # innovation covariance | |
| K = self.P @ self.H.T @ np.linalg.inv(S) # Kalman gain | |
| self.x = self.x + K @ y # corrected state | |
| # Joseph form for numerical stability: (I - KH) P (I - KH)^T + K R K^T | |
| I_KH = np.eye(6) - K @ self.H | |
| self.P = I_KH @ self.P @ I_KH.T + K @ self.R @ K.T | |
| # Store full state and covariance diagonal for post-processing | |
| self.ekf_positions.append(self.x.copy()) | |
| self.ekf_cov_diag.append(np.diag(self.P).copy()) | |
| # ============================================================ | |
| # Main Simulator | |
| # ============================================================ | |
| class EKFSimulator: | |
| """ | |
| Orchestrates trajectory generation, sensor simulation, EKF, and animation. | |
| """ | |
| def __init__(self, total_time: float = 5.0, imu_freq: int = 250, | |
| visual_freq: int = 50): | |
| self.dt = 1.0 / imu_freq | |
| self.steps = int(total_time / self.dt) | |
| self.time = np.linspace(0, total_time, self.steps) | |
| # Ground-truth position, velocity, acceleration | |
| self.true_pos, self.true_vel, self.true_accel = make_ground_truth(self.time) | |
| # IMU: measures acceleration with noise [m/s^2] | |
| imu_noise_std = 0.3 # realistic MEMS accelerometer noise | |
| self.imu_data = IMUDataGenerator(self.true_accel, noise_std=imu_noise_std).generate() | |
| # Visual: sparse position measurements at lower rate | |
| vis_noise_std = 0.05 # [m] typical stereo-camera or GPS noise | |
| self.visual_indices = np.linspace(0, self.steps - 1, visual_freq, dtype=int) | |
| self.visual_data = VisualDataGenerator( | |
| self.true_pos, self.visual_indices, noise_std=vis_noise_std | |
| ).generate() | |
| # EKF: initialise at the true starting position | |
| R_vis = np.eye(3) * vis_noise_std ** 2 | |
| self.ekf = EKF3D( | |
| dt=self.dt, | |
| R_vis=R_vis, | |
| accel_noise_std=imu_noise_std, | |
| init_pos=self.true_pos[0], | |
| ) | |
| # -------------------------------------------------------- | |
| def run_ekf(self): | |
| """Run the full EKF forward pass over all timesteps.""" | |
| visual_map = {i: z for i, z in zip(self.visual_indices, self.visual_data)} | |
| for k in range(self.steps): | |
| z = visual_map.get(k, None) # visual measurement or None | |
| self.ekf.step(self.imu_data[k], z) # pass IMU acceleration, not position | |
| # Convert history lists to arrays for indexing | |
| self.ekf.ekf_positions = np.array(self.ekf.ekf_positions) # (N, 6) | |
| self.ekf.ekf_cov_diag = np.array(self.ekf.ekf_cov_diag) # (N, 6) | |
| # -------------------------------------------------------- | |
| def animate(self): | |
| """Render a two-panel animation: 3D trajectory and 2D position ± 2σ bands.""" | |
| fig = plt.figure(figsize=(16, 6)) | |
| ax1 = fig.add_subplot(121, projection='3d') | |
| ax2 = fig.add_subplot(122) | |
| # ---- Left panel: 3D trajectory ---- | |
| ax1.set_title("3D Trajectory + EKF Posterior") | |
| ax1.set_xlim(-0.6, 0.6) | |
| ax1.set_ylim(-0.6, 0.6) | |
| ax1.set_zlim(-0.3, 0.3) | |
| ax1.set_xlabel("X [m]") | |
| ax1.set_ylabel("Y [m]") | |
| ax1.set_zlabel("Z [m]") | |
| ax1.plot(*self.true_pos.T, 'k--', linewidth=1, label='True') | |
| imu_sc = ax1.scatter([], [], [], color='orange', alpha=0.3, s=4, label='IMU accel (integrated)') | |
| vis_sc = ax1.scatter([], [], [], color='blue', alpha=0.7, s=20, label='Visual meas.') | |
| ekf_line, = ax1.plot([], [], [], color='green', linewidth=2, label='EKF estimate') | |
| ax1.legend(fontsize=8) | |
| # ---- Right panel: per-axis position + 2σ confidence intervals ---- | |
| ax2.set_title("EKF Position Estimates ±2σ") | |
| ax2.set_xlabel("Time [s]") | |
| ax2.set_ylabel("Position [m]") | |
| colors = ['r', 'g', 'b'] | |
| axis_names = ['px', 'py', 'pz'] | |
| lines = [ax2.plot([], [], color=c, label=f'EKF {n}')[0] | |
| for c, n in zip(colors, axis_names)] | |
| # Ground-truth dashed references (all three axes) | |
| for i, c in enumerate(colors): | |
| ax2.plot(self.time, self.true_pos[:, i], '--', color=c, alpha=0.3, linewidth=1) | |
| meas_sc = [ax2.scatter([], [], color=c, s=25, alpha=0.8, zorder=5) for c in colors] | |
| ax2.legend(fontsize=8) | |
| # Dense array to look up visual measurements by frame index | |
| visual_map_arr = np.zeros_like(self.true_pos) | |
| visual_map_arr[self.visual_indices] = self.visual_data | |
| # Boolean mask to identify frames that actually carry a visual measurement | |
| has_visual = np.zeros(self.steps, dtype=bool) | |
| has_visual[self.visual_indices] = True | |
| def update(frame): | |
| # ---- 3D panel ---- | |
| # Show EKF-estimated positions as a proxy for integrated IMU trajectory | |
| ekf_pos = self.ekf.ekf_positions[:frame + 1, :3] | |
| imu_sc._offsets3d = (ekf_pos[:, 0], ekf_pos[:, 1], ekf_pos[:, 2]) | |
| # Show visual measurements received so far | |
| vis_mask = has_visual[:frame + 1] | |
| vis_pts = visual_map_arr[:frame + 1][vis_mask] | |
| if len(vis_pts): | |
| vis_sc._offsets3d = (vis_pts[:, 0], vis_pts[:, 1], vis_pts[:, 2]) | |
| ekf_line.set_data(ekf_pos[:, 0], ekf_pos[:, 1]) | |
| ekf_line.set_3d_properties(ekf_pos[:, 2]) | |
| # ---- 2D panel ---- | |
| # Remove previous fill_between patches before redrawing | |
| for coll in ax2.collections: | |
| coll.remove() | |
| t_now = self.time[:frame + 1] | |
| for i in range(3): | |
| pos = self.ekf.ekf_positions[:frame + 1, i] | |
| # Per-timestep 2σ confidence band from saved covariance history | |
| std = np.sqrt(np.maximum(self.ekf.ekf_cov_diag[:frame + 1, i], 0)) | |
| lines[i].set_data(t_now, pos) | |
| ax2.fill_between(t_now, pos - 2 * std, pos + 2 * std, | |
| color=colors[i], alpha=0.15) | |
| # Visual measurement scatter: only the points up to current frame | |
| vis_idx = np.where((self.visual_indices <= frame))[0] | |
| if len(vis_idx): | |
| meas_sc[i].set_offsets( | |
| np.c_[self.time[self.visual_indices[vis_idx]], | |
| self.visual_data[vis_idx, i]] | |
| ) | |
| else: | |
| meas_sc[i].set_offsets(np.empty((0, 2))) | |
| ax2.relim() | |
| ax2.autoscale_view() | |
| ani = FuncAnimation(fig, update, frames=self.steps, interval=30, blit=False) | |
| plt.tight_layout() | |
| plt.show() | |
| # ============================================================ | |
| # Entry Point | |
| # ============================================================ | |
| if __name__ == "__main__": | |
| sim = EKFSimulator(total_time=5.0, imu_freq=250, visual_freq=50) | |
| sim.run_ekf() | |
| sim.animate() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment