Two-step ARM -> LAUNCH -> LAND flight flow for a Crazyflie + Flow v2 deck: - controller.py: cflib link + flight logic on one worker thread with a command queue. Supervisor arming, Kalman estimator reset/convergence wait, high-level-commander takeoff/land, configurable hover height (0.2-2.0 m), emergency stop, graceful land-on-shutdown, thread-safe snapshot()/scan(). States: DISCONNECTED/CONNECTING/READY/ARMING/ARMED/ FLYING/LANDING/EMERGENCY/ERROR. - mission_client.py: DearPyGui front-end, manual render loop polling snapshot() each frame. Context button ARM(green)->LAUNCH(blue)->LAND (yellow), secondary Disarm, hover-height slider, console log, and an Estimator state panel (x/y/z, yaw, flow counts) for drift inspection. - uv-managed (pyproject.toml + uv.lock); run `uv run mission_client.py`. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
364 lines
15 KiB
Python
364 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""DearPyGui cockpit for the Crazyflie mission client.
|
|
|
|
Pops up a UI to fly a Bitcraze Crazyflie with a Flow v2 deck. When you ARM, the
|
|
drone resets its position estimator, arms, and lifts off to 1 m, holding
|
|
position with the Flow deck. DISARM lands it gently; EMERGENCY STOP (button or
|
|
SPACE) cuts the motors immediately.
|
|
|
|
Run:
|
|
python mission_client.py
|
|
CFLIB_URI=radio://0/80/2M/E7E7E7E7E7 python mission_client.py # custom URI
|
|
|
|
The render loop is driven manually (same pattern as the test-stand app): every
|
|
frame we poll the controller's thread-safe snapshot() and refresh the widgets.
|
|
All radio I/O stays on the controller's worker thread.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
|
|
import dearpygui.dearpygui as dpg
|
|
|
|
# Allow `python mission_client.py` from any working directory.
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from controller import ( # noqa: E402
|
|
DEFAULT_URI,
|
|
LOG_BUFFER,
|
|
MAX_HEIGHT_M,
|
|
MIN_HEIGHT_M,
|
|
TARGET_HEIGHT_M,
|
|
DroneController,
|
|
State,
|
|
)
|
|
|
|
# Status text colour per state (R, G, B).
|
|
STATE_COLOUR = {
|
|
State.DISCONNECTED: (150, 150, 150),
|
|
State.CONNECTING: (220, 180, 60),
|
|
State.READY: (60, 200, 90),
|
|
State.ARMING: (220, 180, 60),
|
|
State.ARMED: (90, 150, 235),
|
|
State.FLYING: (60, 200, 90),
|
|
State.LANDING: (220, 180, 60),
|
|
State.EMERGENCY: (235, 70, 70),
|
|
State.ERROR: (235, 70, 70),
|
|
}
|
|
|
|
# States in which a link exists (connected through to on-the-ground emergency).
|
|
CONNECTED_STATES = {State.READY, State.ARMING, State.ARMED, State.FLYING,
|
|
State.LANDING, State.EMERGENCY}
|
|
# Two-step flow gates for the primary button:
|
|
ARMABLE_STATES = {State.READY, State.EMERGENCY} # ARM (green)
|
|
LAUNCHABLE_STATES = {State.ARMED} # LAUNCH (blue)
|
|
FLYING_STATES = {State.FLYING} # LAND (yellow)
|
|
# States where the hover-height slider is locked (airborne or transient).
|
|
HEIGHT_LOCKED_STATES = {State.ARMING, State.FLYING, State.LANDING}
|
|
|
|
|
|
class MissionClientApp:
|
|
def __init__(self) -> None:
|
|
self.controller = DroneController(
|
|
os.environ.get('CFLIB_URI', DEFAULT_URI))
|
|
self._console_version = -1 # last LOG_BUFFER version rendered
|
|
|
|
# ------------------------------------------------------------------
|
|
# UI construction
|
|
# ------------------------------------------------------------------
|
|
def build(self) -> None:
|
|
dpg.create_context()
|
|
|
|
with dpg.theme() as estop_theme:
|
|
with dpg.theme_component(dpg.mvButton):
|
|
dpg.add_theme_color(dpg.mvThemeCol_Button, (170, 30, 30))
|
|
dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (210, 40, 40))
|
|
dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (240, 60, 60))
|
|
self._estop_theme = estop_theme
|
|
|
|
with dpg.theme() as arm_theme:
|
|
with dpg.theme_component(dpg.mvButton):
|
|
dpg.add_theme_color(dpg.mvThemeCol_Button, (30, 120, 50))
|
|
dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (40, 150, 65))
|
|
dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (50, 175, 80))
|
|
self._arm_theme = arm_theme
|
|
|
|
# Flying: yellow LAND, with darkened label text for contrast.
|
|
with dpg.theme() as disarm_theme:
|
|
with dpg.theme_component(dpg.mvButton):
|
|
dpg.add_theme_color(dpg.mvThemeCol_Button, (205, 170, 40))
|
|
dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (225, 190, 55))
|
|
dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (240, 205, 70))
|
|
dpg.add_theme_color(dpg.mvThemeCol_Text, (35, 30, 0))
|
|
self._disarm_theme = disarm_theme
|
|
|
|
# Armed-on-ground: blue LAUNCH.
|
|
with dpg.theme() as launch_theme:
|
|
with dpg.theme_component(dpg.mvButton):
|
|
dpg.add_theme_color(dpg.mvThemeCol_Button, (40, 95, 185))
|
|
dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (55, 115, 210))
|
|
dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (70, 135, 230))
|
|
self._launch_theme = launch_theme
|
|
|
|
with dpg.window(tag='primary'):
|
|
# Controls stacked on top; the console fills the space below so it
|
|
# is roughly the same height as the rest of the UI above it.
|
|
self._build_connection()
|
|
dpg.add_separator()
|
|
self._build_status()
|
|
dpg.add_separator()
|
|
self._build_diagnostics()
|
|
dpg.add_separator()
|
|
self._build_controls()
|
|
dpg.add_separator()
|
|
self._build_console()
|
|
|
|
# SPACE = emergency stop, available anywhere.
|
|
with dpg.handler_registry():
|
|
dpg.add_key_press_handler(dpg.mvKey_Spacebar,
|
|
callback=self._on_estop)
|
|
|
|
dpg.create_viewport(title='Crazyflie Mission Client',
|
|
width=560, height=820)
|
|
dpg.setup_dearpygui()
|
|
dpg.show_viewport()
|
|
dpg.set_primary_window('primary', True)
|
|
|
|
def _build_connection(self) -> None:
|
|
dpg.add_text('Connection')
|
|
with dpg.group(horizontal=True):
|
|
dpg.add_combo([self.controller.uri], tag='uri_combo', width=320,
|
|
default_value=self.controller.uri)
|
|
dpg.add_button(label='Scan', tag='scan_btn', callback=self._on_scan)
|
|
dpg.add_button(label='Connect', tag='connect_btn',
|
|
callback=self._on_connect_toggle)
|
|
|
|
def _build_status(self) -> None:
|
|
dpg.add_text('Status')
|
|
dpg.add_text('Disconnected', tag='status_text',
|
|
color=STATE_COLOUR[State.DISCONNECTED])
|
|
with dpg.group(horizontal=True):
|
|
dpg.add_text('Battery:')
|
|
dpg.add_text('-- V', tag='vbat_text')
|
|
dpg.add_text(' Height:')
|
|
dpg.add_text('-- m', tag='height_text')
|
|
with dpg.group(horizontal=True):
|
|
dpg.add_text('Flow v2 deck:')
|
|
dpg.add_text('unknown', tag='flow_text')
|
|
with dpg.group(horizontal=True):
|
|
dpg.add_text('Supervisor:')
|
|
dpg.add_text('idle', tag='sup_text')
|
|
|
|
def _build_diagnostics(self) -> None:
|
|
# Live estimator pose + flow counts for diagnosing circular drift:
|
|
# a steady yaw drift or a circular x/y trace points at a heading-estimate
|
|
# problem; flow counts near zero suggest a poor surface for the deck.
|
|
dpg.add_text('Estimator state')
|
|
with dpg.group(horizontal=True):
|
|
dpg.add_text('Pos (m):')
|
|
dpg.add_text('x -- y -- z --', tag='diag_pos')
|
|
with dpg.group(horizontal=True):
|
|
dpg.add_text('Yaw:')
|
|
dpg.add_text('-- deg', tag='diag_yaw')
|
|
with dpg.group(horizontal=True):
|
|
dpg.add_text('Flow counts:')
|
|
dpg.add_text('dx -- dy --', tag='diag_flow')
|
|
|
|
def _build_controls(self) -> None:
|
|
dpg.add_text('Flight')
|
|
dpg.add_slider_float(label='Hover height', tag='height_slider',
|
|
default_value=TARGET_HEIGHT_M,
|
|
min_value=MIN_HEIGHT_M, max_value=MAX_HEIGHT_M,
|
|
format='%.2f m', width=-80)
|
|
# Primary action button; label/theme/enabled all driven per frame in
|
|
# update(): ARM (green) -> LAUNCH (blue) -> LAND (yellow).
|
|
dpg.add_button(label='ARM', tag='arm_btn', width=-1, height=46,
|
|
callback=self._on_primary, enabled=False)
|
|
# Secondary: back out of ARMED without launching. Enabled only in ARMED.
|
|
dpg.add_button(label='Disarm', tag='disarm_btn', width=-1,
|
|
callback=self._on_disarm, enabled=False)
|
|
estop = dpg.add_button(label='EMERGENCY STOP (SPACE)', tag='estop_btn',
|
|
width=-1, height=46, callback=self._on_estop)
|
|
dpg.bind_item_theme(estop, self._estop_theme)
|
|
|
|
def _build_console(self) -> None:
|
|
# Header row, then a scrolling region that fills the remaining height
|
|
# below the controls (height=-1) — same width as the rest of the UI.
|
|
with dpg.group(horizontal=True):
|
|
dpg.add_text('Console')
|
|
dpg.add_button(label='Clear', callback=lambda: LOG_BUFFER.clear())
|
|
with dpg.child_window(tag='console_child', width=-1, height=-1):
|
|
dpg.add_text('', tag='console_text', wrap=520)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Callbacks
|
|
# ------------------------------------------------------------------
|
|
def _on_scan(self) -> None:
|
|
uris = self.controller.scan()
|
|
if uris:
|
|
dpg.configure_item('uri_combo', items=uris)
|
|
dpg.set_value('uri_combo', uris[0])
|
|
else:
|
|
dpg.configure_item('uri_combo', items=[self.controller.uri])
|
|
dpg.set_value('uri_combo', '(none found)')
|
|
|
|
def _on_connect_toggle(self) -> None:
|
|
state = self.controller.snapshot().state
|
|
if state in CONNECTED_STATES or state == State.CONNECTING:
|
|
self.controller.disconnect()
|
|
return
|
|
uri = dpg.get_value('uri_combo')
|
|
if uri and uri.startswith(('radio://', 'usb://')):
|
|
self.controller.connect(uri)
|
|
|
|
def _on_primary(self) -> None:
|
|
# Context action for the single primary button: ARM -> LAUNCH -> LAND.
|
|
state = self.controller.snapshot().state
|
|
if state in ARMABLE_STATES:
|
|
self.controller.arm()
|
|
elif state in LAUNCHABLE_STATES:
|
|
self.controller.launch(dpg.get_value('height_slider'))
|
|
elif state in FLYING_STATES:
|
|
self.controller.disarm() # land
|
|
|
|
def _on_disarm(self) -> None:
|
|
# Secondary button: back out of ARMED on the ground (no takeoff).
|
|
if self.controller.snapshot().state in LAUNCHABLE_STATES:
|
|
self.controller.disarm()
|
|
|
|
def _on_estop(self, *_) -> None:
|
|
self.controller.emergency_stop()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Per-frame update
|
|
# ------------------------------------------------------------------
|
|
def update(self) -> None:
|
|
upd = self.controller.snapshot()
|
|
state, t = upd.state, upd.telemetry
|
|
|
|
dpg.set_value('status_text', upd.message or state.name)
|
|
dpg.configure_item('status_text',
|
|
color=STATE_COLOUR.get(state, (200, 200, 200)))
|
|
|
|
dpg.set_value('vbat_text', f'{t.vbat:.2f} V' if t.vbat else '-- V')
|
|
dpg.set_value('height_text', f'{t.height:.2f} m')
|
|
|
|
if t.flow_deck is None:
|
|
dpg.set_value('flow_text', 'unknown')
|
|
elif t.flow_deck:
|
|
dpg.set_value('flow_text', 'attached')
|
|
else:
|
|
dpg.set_value('flow_text', 'NOT DETECTED')
|
|
|
|
dpg.set_value('sup_text', self._supervisor_text(t))
|
|
|
|
# Drift-inspection readout.
|
|
dpg.set_value('diag_pos',
|
|
f'x {t.x:+.3f} y {t.y:+.3f} z {t.height:+.3f}')
|
|
dpg.set_value('diag_yaw', f'{t.yaw:+.1f} deg')
|
|
dpg.set_value('diag_flow', f'dx {t.flow_dx:+d} dy {t.flow_dy:+d}')
|
|
|
|
# Connect / Disconnect button.
|
|
connecting = state == State.CONNECTING
|
|
connected = state in CONNECTED_STATES
|
|
dpg.configure_item(
|
|
'connect_btn', enabled=not connecting,
|
|
label='Disconnect' if (connected or connecting) else 'Connect')
|
|
dpg.configure_item('scan_btn', enabled=not (connected or connecting))
|
|
dpg.configure_item('uri_combo', enabled=not (connected or connecting))
|
|
|
|
# Hover height is a pre-launch setting; lock the slider once airborne
|
|
# or mid-arm transient.
|
|
dpg.configure_item('height_slider',
|
|
enabled=state not in HEIGHT_LOCKED_STATES)
|
|
|
|
# Primary button — frame-driven across the two-step flow:
|
|
# armable -> green "ARM"
|
|
# armed -> blue "LAUNCH (to N m)"
|
|
# flying -> yellow "LAND"
|
|
# otherwise -> disabled, no theme (default greyed)
|
|
if state in ARMABLE_STATES:
|
|
dpg.configure_item('arm_btn', enabled=True, label='ARM')
|
|
dpg.bind_item_theme('arm_btn', self._arm_theme)
|
|
elif state in LAUNCHABLE_STATES:
|
|
height = dpg.get_value('height_slider')
|
|
dpg.configure_item('arm_btn', enabled=True,
|
|
label=f'LAUNCH (to {height:.2f} m)')
|
|
dpg.bind_item_theme('arm_btn', self._launch_theme)
|
|
elif state in FLYING_STATES:
|
|
dpg.configure_item('arm_btn', enabled=True, label='LAND')
|
|
dpg.bind_item_theme('arm_btn', self._disarm_theme)
|
|
else:
|
|
dpg.configure_item('arm_btn', enabled=False, label='ARM')
|
|
dpg.bind_item_theme('arm_btn', 0)
|
|
|
|
# Secondary Disarm button: only usable while armed on the ground.
|
|
dpg.configure_item('disarm_btn', enabled=state in LAUNCHABLE_STATES)
|
|
|
|
dpg.configure_item('estop_btn', enabled=connected)
|
|
|
|
self._refresh_console()
|
|
|
|
def _refresh_console(self) -> None:
|
|
version, lines = LOG_BUFFER.snapshot()
|
|
if version == self._console_version:
|
|
return
|
|
self._console_version = version
|
|
# Keep following the tail only if the user is already at the bottom.
|
|
try:
|
|
at_bottom = (dpg.get_y_scroll('console_child')
|
|
>= dpg.get_y_scroll_max('console_child') - 2.0)
|
|
except Exception:
|
|
at_bottom = True
|
|
dpg.set_value('console_text', '\n'.join(lines))
|
|
if at_bottom:
|
|
dpg.set_y_scroll('console_child', -1.0)
|
|
|
|
@staticmethod
|
|
def _supervisor_text(t) -> str:
|
|
flags = []
|
|
if t.locked:
|
|
flags.append('LOCKED')
|
|
if t.tumbled:
|
|
flags.append('TUMBLED')
|
|
if t.crashed:
|
|
flags.append('CRASHED')
|
|
if t.deck_fault:
|
|
flags.append('DECK-FAULT')
|
|
if t.armed:
|
|
flags.append('armed')
|
|
if t.flying:
|
|
flags.append('flying')
|
|
return ', '.join(flags) if flags else 'idle'
|
|
|
|
# ------------------------------------------------------------------
|
|
# Run loop
|
|
# ------------------------------------------------------------------
|
|
def run(self) -> None:
|
|
self.controller.start()
|
|
self.build()
|
|
try:
|
|
while dpg.is_dearpygui_running():
|
|
self.update()
|
|
dpg.render_dearpygui_frame()
|
|
finally:
|
|
# Never leave the drone airborne: ask the controller to land (if
|
|
# flying) and close the link, and wait for the worker to finish so
|
|
# libusb is never torn down mid-call. Bounded so we can't hang.
|
|
try:
|
|
self.controller.shutdown()
|
|
self.controller.join(timeout=12)
|
|
except Exception:
|
|
pass
|
|
dpg.destroy_context()
|
|
|
|
|
|
def main() -> None:
|
|
MissionClientApp().run()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|