Files
cf-autohover/mission_client.py
K. Isom eed9778e59 Initial checkpoint: Crazyflie mission client (DearPyGui)
Two-step ARM -> LAUNCH -> LAND flight flow for a Crazyflie + Flow v2 deck:

- controller.py: cflib link + flight logic on one worker thread with a
  command queue. Supervisor arming, Kalman estimator reset/convergence
  wait, high-level-commander takeoff/land, configurable hover height
  (0.2-2.0 m), emergency stop, graceful land-on-shutdown, thread-safe
  snapshot()/scan(). States: DISCONNECTED/CONNECTING/READY/ARMING/ARMED/
  FLYING/LANDING/EMERGENCY/ERROR.
- mission_client.py: DearPyGui front-end, manual render loop polling
  snapshot() each frame. Context button ARM(green)->LAUNCH(blue)->LAND
  (yellow), secondary Disarm, hover-height slider, console log, and an
  Estimator state panel (x/y/z, yaw, flow counts) for drift inspection.
- uv-managed (pyproject.toml + uv.lock); run `uv run mission_client.py`.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 15:44:07 -07:00

364 lines
15 KiB
Python

#!/usr/bin/env python3
"""DearPyGui cockpit for the Crazyflie mission client.
Pops up a UI to fly a Bitcraze Crazyflie with a Flow v2 deck. When you ARM, the
drone resets its position estimator, arms, and lifts off to 1 m, holding
position with the Flow deck. DISARM lands it gently; EMERGENCY STOP (button or
SPACE) cuts the motors immediately.
Run:
python mission_client.py
CFLIB_URI=radio://0/80/2M/E7E7E7E7E7 python mission_client.py # custom URI
The render loop is driven manually (same pattern as the test-stand app): every
frame we poll the controller's thread-safe snapshot() and refresh the widgets.
All radio I/O stays on the controller's worker thread.
"""
from __future__ import annotations
import os
import sys
import dearpygui.dearpygui as dpg
# Allow `python mission_client.py` from any working directory.
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from controller import ( # noqa: E402
DEFAULT_URI,
LOG_BUFFER,
MAX_HEIGHT_M,
MIN_HEIGHT_M,
TARGET_HEIGHT_M,
DroneController,
State,
)
# Status text colour per state (R, G, B).
STATE_COLOUR = {
State.DISCONNECTED: (150, 150, 150),
State.CONNECTING: (220, 180, 60),
State.READY: (60, 200, 90),
State.ARMING: (220, 180, 60),
State.ARMED: (90, 150, 235),
State.FLYING: (60, 200, 90),
State.LANDING: (220, 180, 60),
State.EMERGENCY: (235, 70, 70),
State.ERROR: (235, 70, 70),
}
# States in which a link exists (connected through to on-the-ground emergency).
CONNECTED_STATES = {State.READY, State.ARMING, State.ARMED, State.FLYING,
State.LANDING, State.EMERGENCY}
# Two-step flow gates for the primary button:
ARMABLE_STATES = {State.READY, State.EMERGENCY} # ARM (green)
LAUNCHABLE_STATES = {State.ARMED} # LAUNCH (blue)
FLYING_STATES = {State.FLYING} # LAND (yellow)
# States where the hover-height slider is locked (airborne or transient).
HEIGHT_LOCKED_STATES = {State.ARMING, State.FLYING, State.LANDING}
class MissionClientApp:
def __init__(self) -> None:
self.controller = DroneController(
os.environ.get('CFLIB_URI', DEFAULT_URI))
self._console_version = -1 # last LOG_BUFFER version rendered
# ------------------------------------------------------------------
# UI construction
# ------------------------------------------------------------------
def build(self) -> None:
dpg.create_context()
with dpg.theme() as estop_theme:
with dpg.theme_component(dpg.mvButton):
dpg.add_theme_color(dpg.mvThemeCol_Button, (170, 30, 30))
dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (210, 40, 40))
dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (240, 60, 60))
self._estop_theme = estop_theme
with dpg.theme() as arm_theme:
with dpg.theme_component(dpg.mvButton):
dpg.add_theme_color(dpg.mvThemeCol_Button, (30, 120, 50))
dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (40, 150, 65))
dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (50, 175, 80))
self._arm_theme = arm_theme
# Flying: yellow LAND, with darkened label text for contrast.
with dpg.theme() as disarm_theme:
with dpg.theme_component(dpg.mvButton):
dpg.add_theme_color(dpg.mvThemeCol_Button, (205, 170, 40))
dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (225, 190, 55))
dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (240, 205, 70))
dpg.add_theme_color(dpg.mvThemeCol_Text, (35, 30, 0))
self._disarm_theme = disarm_theme
# Armed-on-ground: blue LAUNCH.
with dpg.theme() as launch_theme:
with dpg.theme_component(dpg.mvButton):
dpg.add_theme_color(dpg.mvThemeCol_Button, (40, 95, 185))
dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (55, 115, 210))
dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (70, 135, 230))
self._launch_theme = launch_theme
with dpg.window(tag='primary'):
# Controls stacked on top; the console fills the space below so it
# is roughly the same height as the rest of the UI above it.
self._build_connection()
dpg.add_separator()
self._build_status()
dpg.add_separator()
self._build_diagnostics()
dpg.add_separator()
self._build_controls()
dpg.add_separator()
self._build_console()
# SPACE = emergency stop, available anywhere.
with dpg.handler_registry():
dpg.add_key_press_handler(dpg.mvKey_Spacebar,
callback=self._on_estop)
dpg.create_viewport(title='Crazyflie Mission Client',
width=560, height=820)
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.set_primary_window('primary', True)
def _build_connection(self) -> None:
dpg.add_text('Connection')
with dpg.group(horizontal=True):
dpg.add_combo([self.controller.uri], tag='uri_combo', width=320,
default_value=self.controller.uri)
dpg.add_button(label='Scan', tag='scan_btn', callback=self._on_scan)
dpg.add_button(label='Connect', tag='connect_btn',
callback=self._on_connect_toggle)
def _build_status(self) -> None:
dpg.add_text('Status')
dpg.add_text('Disconnected', tag='status_text',
color=STATE_COLOUR[State.DISCONNECTED])
with dpg.group(horizontal=True):
dpg.add_text('Battery:')
dpg.add_text('-- V', tag='vbat_text')
dpg.add_text(' Height:')
dpg.add_text('-- m', tag='height_text')
with dpg.group(horizontal=True):
dpg.add_text('Flow v2 deck:')
dpg.add_text('unknown', tag='flow_text')
with dpg.group(horizontal=True):
dpg.add_text('Supervisor:')
dpg.add_text('idle', tag='sup_text')
def _build_diagnostics(self) -> None:
# Live estimator pose + flow counts for diagnosing circular drift:
# a steady yaw drift or a circular x/y trace points at a heading-estimate
# problem; flow counts near zero suggest a poor surface for the deck.
dpg.add_text('Estimator state')
with dpg.group(horizontal=True):
dpg.add_text('Pos (m):')
dpg.add_text('x -- y -- z --', tag='diag_pos')
with dpg.group(horizontal=True):
dpg.add_text('Yaw:')
dpg.add_text('-- deg', tag='diag_yaw')
with dpg.group(horizontal=True):
dpg.add_text('Flow counts:')
dpg.add_text('dx -- dy --', tag='diag_flow')
def _build_controls(self) -> None:
dpg.add_text('Flight')
dpg.add_slider_float(label='Hover height', tag='height_slider',
default_value=TARGET_HEIGHT_M,
min_value=MIN_HEIGHT_M, max_value=MAX_HEIGHT_M,
format='%.2f m', width=-80)
# Primary action button; label/theme/enabled all driven per frame in
# update(): ARM (green) -> LAUNCH (blue) -> LAND (yellow).
dpg.add_button(label='ARM', tag='arm_btn', width=-1, height=46,
callback=self._on_primary, enabled=False)
# Secondary: back out of ARMED without launching. Enabled only in ARMED.
dpg.add_button(label='Disarm', tag='disarm_btn', width=-1,
callback=self._on_disarm, enabled=False)
estop = dpg.add_button(label='EMERGENCY STOP (SPACE)', tag='estop_btn',
width=-1, height=46, callback=self._on_estop)
dpg.bind_item_theme(estop, self._estop_theme)
def _build_console(self) -> None:
# Header row, then a scrolling region that fills the remaining height
# below the controls (height=-1) — same width as the rest of the UI.
with dpg.group(horizontal=True):
dpg.add_text('Console')
dpg.add_button(label='Clear', callback=lambda: LOG_BUFFER.clear())
with dpg.child_window(tag='console_child', width=-1, height=-1):
dpg.add_text('', tag='console_text', wrap=520)
# ------------------------------------------------------------------
# Callbacks
# ------------------------------------------------------------------
def _on_scan(self) -> None:
uris = self.controller.scan()
if uris:
dpg.configure_item('uri_combo', items=uris)
dpg.set_value('uri_combo', uris[0])
else:
dpg.configure_item('uri_combo', items=[self.controller.uri])
dpg.set_value('uri_combo', '(none found)')
def _on_connect_toggle(self) -> None:
state = self.controller.snapshot().state
if state in CONNECTED_STATES or state == State.CONNECTING:
self.controller.disconnect()
return
uri = dpg.get_value('uri_combo')
if uri and uri.startswith(('radio://', 'usb://')):
self.controller.connect(uri)
def _on_primary(self) -> None:
# Context action for the single primary button: ARM -> LAUNCH -> LAND.
state = self.controller.snapshot().state
if state in ARMABLE_STATES:
self.controller.arm()
elif state in LAUNCHABLE_STATES:
self.controller.launch(dpg.get_value('height_slider'))
elif state in FLYING_STATES:
self.controller.disarm() # land
def _on_disarm(self) -> None:
# Secondary button: back out of ARMED on the ground (no takeoff).
if self.controller.snapshot().state in LAUNCHABLE_STATES:
self.controller.disarm()
def _on_estop(self, *_) -> None:
self.controller.emergency_stop()
# ------------------------------------------------------------------
# Per-frame update
# ------------------------------------------------------------------
def update(self) -> None:
upd = self.controller.snapshot()
state, t = upd.state, upd.telemetry
dpg.set_value('status_text', upd.message or state.name)
dpg.configure_item('status_text',
color=STATE_COLOUR.get(state, (200, 200, 200)))
dpg.set_value('vbat_text', f'{t.vbat:.2f} V' if t.vbat else '-- V')
dpg.set_value('height_text', f'{t.height:.2f} m')
if t.flow_deck is None:
dpg.set_value('flow_text', 'unknown')
elif t.flow_deck:
dpg.set_value('flow_text', 'attached')
else:
dpg.set_value('flow_text', 'NOT DETECTED')
dpg.set_value('sup_text', self._supervisor_text(t))
# Drift-inspection readout.
dpg.set_value('diag_pos',
f'x {t.x:+.3f} y {t.y:+.3f} z {t.height:+.3f}')
dpg.set_value('diag_yaw', f'{t.yaw:+.1f} deg')
dpg.set_value('diag_flow', f'dx {t.flow_dx:+d} dy {t.flow_dy:+d}')
# Connect / Disconnect button.
connecting = state == State.CONNECTING
connected = state in CONNECTED_STATES
dpg.configure_item(
'connect_btn', enabled=not connecting,
label='Disconnect' if (connected or connecting) else 'Connect')
dpg.configure_item('scan_btn', enabled=not (connected or connecting))
dpg.configure_item('uri_combo', enabled=not (connected or connecting))
# Hover height is a pre-launch setting; lock the slider once airborne
# or mid-arm transient.
dpg.configure_item('height_slider',
enabled=state not in HEIGHT_LOCKED_STATES)
# Primary button — frame-driven across the two-step flow:
# armable -> green "ARM"
# armed -> blue "LAUNCH (to N m)"
# flying -> yellow "LAND"
# otherwise -> disabled, no theme (default greyed)
if state in ARMABLE_STATES:
dpg.configure_item('arm_btn', enabled=True, label='ARM')
dpg.bind_item_theme('arm_btn', self._arm_theme)
elif state in LAUNCHABLE_STATES:
height = dpg.get_value('height_slider')
dpg.configure_item('arm_btn', enabled=True,
label=f'LAUNCH (to {height:.2f} m)')
dpg.bind_item_theme('arm_btn', self._launch_theme)
elif state in FLYING_STATES:
dpg.configure_item('arm_btn', enabled=True, label='LAND')
dpg.bind_item_theme('arm_btn', self._disarm_theme)
else:
dpg.configure_item('arm_btn', enabled=False, label='ARM')
dpg.bind_item_theme('arm_btn', 0)
# Secondary Disarm button: only usable while armed on the ground.
dpg.configure_item('disarm_btn', enabled=state in LAUNCHABLE_STATES)
dpg.configure_item('estop_btn', enabled=connected)
self._refresh_console()
def _refresh_console(self) -> None:
version, lines = LOG_BUFFER.snapshot()
if version == self._console_version:
return
self._console_version = version
# Keep following the tail only if the user is already at the bottom.
try:
at_bottom = (dpg.get_y_scroll('console_child')
>= dpg.get_y_scroll_max('console_child') - 2.0)
except Exception:
at_bottom = True
dpg.set_value('console_text', '\n'.join(lines))
if at_bottom:
dpg.set_y_scroll('console_child', -1.0)
@staticmethod
def _supervisor_text(t) -> str:
flags = []
if t.locked:
flags.append('LOCKED')
if t.tumbled:
flags.append('TUMBLED')
if t.crashed:
flags.append('CRASHED')
if t.deck_fault:
flags.append('DECK-FAULT')
if t.armed:
flags.append('armed')
if t.flying:
flags.append('flying')
return ', '.join(flags) if flags else 'idle'
# ------------------------------------------------------------------
# Run loop
# ------------------------------------------------------------------
def run(self) -> None:
self.controller.start()
self.build()
try:
while dpg.is_dearpygui_running():
self.update()
dpg.render_dearpygui_frame()
finally:
# Never leave the drone airborne: ask the controller to land (if
# flying) and close the link, and wait for the worker to finish so
# libusb is never torn down mid-call. Bounded so we can't hang.
try:
self.controller.shutdown()
self.controller.join(timeout=12)
except Exception:
pass
dpg.destroy_context()
def main() -> None:
MissionClientApp().run()
if __name__ == '__main__':
main()