Created
February 13, 2026 05:03
-
-
Save mrpollo/1b08846f939bbb582969afdeafc63828 to your computer and use it in GitHub Desktop.
MAVSDK test for PX4 FlightTaskOrbit - verifies orbit mode in SIH SITL simulation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Test FlightTaskOrbit via MAVSDK on PX4 SITL with SIH. | |
| Verifies that orbit mode activates and the vehicle flies a circular path. | |
| """ | |
| import asyncio | |
| import math | |
| import sys | |
| from mavsdk import System | |
| from mavsdk.action import OrbitYawBehavior | |
| async def main(): | |
| drone = System() | |
| print("Connecting to drone on udp://:14540...") | |
| await drone.connect(system_address="udp://:14540") | |
| # Wait for connection | |
| print("Waiting for drone to connect...") | |
| async for state in drone.core.connection_state(): | |
| if state.is_connected: | |
| print("Connected to drone!") | |
| break | |
| # Wait for global position estimate | |
| print("Waiting for global position estimate...") | |
| async for health in drone.telemetry.health(): | |
| if health.is_global_position_ok and health.is_home_position_ok: | |
| print("Global position estimate OK") | |
| break | |
| # Start flight mode monitoring in background | |
| flight_modes = [] | |
| mode_task = asyncio.create_task(monitor_flight_mode(drone, flight_modes)) | |
| # Arm | |
| print("\n[1] Arming...") | |
| await drone.action.arm() | |
| print(" Armed!") | |
| # Takeoff | |
| print("[2] Taking off to 10m...") | |
| await drone.action.set_takeoff_altitude(10.0) | |
| await drone.action.takeoff() | |
| # Wait for takeoff to complete | |
| print(" Waiting for altitude > 8m...") | |
| async for position in drone.telemetry.position(): | |
| if position.relative_altitude_m > 8.0: | |
| print(f" Reached altitude: {position.relative_altitude_m:.1f}m") | |
| break | |
| # Stabilize | |
| print("[3] Stabilizing hover for 5 seconds...") | |
| await asyncio.sleep(5) | |
| # Get current position for orbit center | |
| center_lat = None | |
| center_lon = None | |
| center_alt_abs = None | |
| async for position in drone.telemetry.position(): | |
| center_lat = position.latitude_deg | |
| center_lon = position.longitude_deg | |
| center_alt_abs = position.absolute_altitude_m | |
| print(f"[4] Current position: lat={center_lat:.7f}, lon={center_lon:.7f}, abs_alt={center_alt_abs:.1f}m") | |
| break | |
| # Start orbit with explicit lat/lon/alt instead of NaN | |
| orbit_radius_m = 20.0 | |
| orbit_velocity_ms = 3.0 # slower for better tracking | |
| print(f"[5] Commanding orbit: radius={orbit_radius_m}m, velocity={orbit_velocity_ms}m/s") | |
| print(f" Center: lat={center_lat}, lon={center_lon}, alt={center_alt_abs}") | |
| try: | |
| result = await drone.action.do_orbit( | |
| radius_m=orbit_radius_m, | |
| velocity_ms=orbit_velocity_ms, | |
| yaw_behavior=OrbitYawBehavior.HOLD_FRONT_TO_CIRCLE_CENTER, | |
| latitude_deg=center_lat, | |
| longitude_deg=center_lon, | |
| absolute_altitude_m=center_alt_abs, | |
| ) | |
| print(f" Orbit command accepted!") | |
| except Exception as e: | |
| print(f" ERROR: Orbit command failed: {e}") | |
| await drone.action.land() | |
| return False | |
| # Monitor telemetry for 45 seconds | |
| print("\n[6] Monitoring orbit for 45 seconds...") | |
| positions = [] | |
| start_time = asyncio.get_event_loop().time() | |
| sample_count = 0 | |
| async for position in drone.telemetry.position(): | |
| elapsed = asyncio.get_event_loop().time() - start_time | |
| if elapsed > 45.0: | |
| break | |
| lat = position.latitude_deg | |
| lon = position.longitude_deg | |
| alt = position.relative_altitude_m | |
| # Calculate distance from orbit center | |
| dlat = math.radians(lat - center_lat) | |
| dlon = math.radians(lon - center_lon) * math.cos(math.radians(center_lat)) | |
| dist_from_center = math.sqrt(dlat**2 + dlon**2) * 6371000.0 | |
| # Calculate angle from center | |
| angle_rad = math.atan2(dlon * 6371000.0, dlat * 6371000.0) | |
| angle_deg = math.degrees(angle_rad) | |
| positions.append({ | |
| 'time': elapsed, | |
| 'lat': lat, | |
| 'lon': lon, | |
| 'alt': alt, | |
| 'dist': dist_from_center, | |
| 'angle': angle_deg | |
| }) | |
| sample_count += 1 | |
| if sample_count % 50 == 0: | |
| modes_str = ', '.join(set(m[1] for m in flight_modes[-5:])) if flight_modes else 'unknown' | |
| print(f" t={elapsed:5.1f}s: alt={alt:5.1f}m, dist={dist_from_center:5.1f}m, angle={angle_deg:6.1f}deg, mode={modes_str}") | |
| mode_task.cancel() | |
| try: | |
| await mode_task | |
| except asyncio.CancelledError: | |
| pass | |
| # Analyze results | |
| print("\n[7] Analysis") | |
| print(f" Total samples: {len(positions)}") | |
| print(f" Flight modes observed: {sorted(set(m[1] for m in flight_modes))}") | |
| # Separate into phases: | |
| # - transition (first 10s): vehicle moves from center to orbit radius | |
| # - steady orbit (10-45s): should be at constant radius | |
| transition = [p for p in positions if p['time'] < 10.0] | |
| steady = [p for p in positions if p['time'] >= 10.0] | |
| if steady: | |
| steady_dists = [p['dist'] for p in steady] | |
| avg_dist = sum(steady_dists) / len(steady_dists) | |
| min_dist = min(steady_dists) | |
| max_dist = max(steady_dists) | |
| std_dist = math.sqrt(sum((d - avg_dist)**2 for d in steady_dists) / len(steady_dists)) | |
| print(f"\n Steady-state distance from center (t>10s):") | |
| print(f" Average: {avg_dist:.1f}m (expected: {orbit_radius_m}m)") | |
| print(f" Min: {min_dist:.1f}m") | |
| print(f" Max: {max_dist:.1f}m") | |
| print(f" StdDev: {std_dist:.1f}m") | |
| # Check angular coverage (should cover near 360 degrees for a full orbit) | |
| steady_angles = [p['angle'] for p in steady] | |
| angle_range = max(steady_angles) - min(steady_angles) | |
| # Check for angle wrapping | |
| if angle_range > 300: | |
| # Good coverage | |
| print(f" Angular coverage: {angle_range:.0f} degrees (good)") | |
| else: | |
| print(f" Angular coverage: {angle_range:.0f} degrees") | |
| # Determine if this is actually an orbit | |
| is_orbiting = ( | |
| avg_dist > 5.0 and # moved away from center | |
| std_dist < avg_dist * 0.5 # distance is reasonably stable | |
| ) | |
| is_at_right_radius = abs(avg_dist - orbit_radius_m) < orbit_radius_m * 0.5 # within 50% of target | |
| else: | |
| is_orbiting = False | |
| is_at_right_radius = False | |
| # Land | |
| print("\n[8] Landing...") | |
| await drone.action.land() | |
| print(" Waiting for disarm...") | |
| timeout = 30 | |
| try: | |
| armed_count = 0 | |
| async for armed in drone.telemetry.armed(): | |
| if not armed: | |
| print(" Disarmed!") | |
| break | |
| armed_count += 1 | |
| if armed_count > 300: | |
| print(" Timeout waiting for disarm, killing motors...") | |
| await drone.action.kill() | |
| break | |
| except Exception as e: | |
| print(f" Disarm error: {e}") | |
| # Final verdict | |
| print("\n" + "=" * 50) | |
| print("TEST RESULTS") | |
| print("=" * 50) | |
| unique_modes = sorted(set(m[1] for m in flight_modes)) | |
| print(f"Flight modes: {unique_modes}") | |
| if is_orbiting and is_at_right_radius: | |
| print(f"PASS: Vehicle orbited at {avg_dist:.1f}m radius (target: {orbit_radius_m}m)") | |
| print(" FlightTaskOrbit::activate() fix is verified working correctly.") | |
| return True | |
| elif is_orbiting: | |
| print(f"PASS (with deviation): Vehicle orbited at {avg_dist:.1f}m radius (target: {orbit_radius_m}m)") | |
| print(" FlightTaskOrbit::activate() accepted the command and the vehicle moved.") | |
| return True | |
| else: | |
| print("FAIL: Vehicle did not achieve stable orbit") | |
| return False | |
| async def monitor_flight_mode(drone, flight_modes): | |
| """Monitor flight mode changes in background.""" | |
| try: | |
| async for mode in drone.telemetry.flight_mode(): | |
| mode_str = str(mode) | |
| flight_modes.append((asyncio.get_event_loop().time(), mode_str)) | |
| except asyncio.CancelledError: | |
| pass | |
| if __name__ == "__main__": | |
| success = asyncio.run(main()) | |
| sys.exit(0 if success else 1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment