#!/usr/bin/env python3 """DearPyGui cockpit for the Crazyflie mission client. Pops up a UI to fly a Bitcraze Crazyflie with a Flow v2 deck. When you ARM, the drone resets its position estimator, arms, and lifts off to 1 m, holding position with the Flow deck. DISARM lands it gently; EMERGENCY STOP (button or SPACE) cuts the motors immediately. Run: python mission_client.py CFLIB_URI=radio://0/80/2M/E7E7E7E7E7 python mission_client.py # custom URI The render loop is driven manually (same pattern as the test-stand app): every frame we poll the controller's thread-safe snapshot() and refresh the widgets. All radio I/O stays on the controller's worker thread. """ from __future__ import annotations import os import sys import dearpygui.dearpygui as dpg # Allow `python mission_client.py` from any working directory. sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from controller import ( # noqa: E402 DEFAULT_URI, LOG_BUFFER, MAX_HEIGHT_M, MIN_HEIGHT_M, TARGET_HEIGHT_M, DroneController, State, ) # Status text colour per state (R, G, B). STATE_COLOUR = { State.DISCONNECTED: (150, 150, 150), State.CONNECTING: (220, 180, 60), State.READY: (60, 200, 90), State.ARMING: (220, 180, 60), State.ARMED: (90, 150, 235), State.FLYING: (60, 200, 90), State.LANDING: (220, 180, 60), State.EMERGENCY: (235, 70, 70), State.ERROR: (235, 70, 70), } # States in which a link exists (connected through to on-the-ground emergency). CONNECTED_STATES = {State.READY, State.ARMING, State.ARMED, State.FLYING, State.LANDING, State.EMERGENCY} # Two-step flow gates for the primary button: ARMABLE_STATES = {State.READY, State.EMERGENCY} # ARM (green) LAUNCHABLE_STATES = {State.ARMED} # LAUNCH (blue) FLYING_STATES = {State.FLYING} # LAND (yellow) # States where the hover-height slider is locked (airborne or transient). HEIGHT_LOCKED_STATES = {State.ARMING, State.FLYING, State.LANDING} class MissionClientApp: def __init__(self) -> None: self.controller = DroneController( os.environ.get('CFLIB_URI', DEFAULT_URI)) self._console_version = -1 # last LOG_BUFFER version rendered # ------------------------------------------------------------------ # UI construction # ------------------------------------------------------------------ def build(self) -> None: dpg.create_context() with dpg.theme() as estop_theme: with dpg.theme_component(dpg.mvButton): dpg.add_theme_color(dpg.mvThemeCol_Button, (170, 30, 30)) dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (210, 40, 40)) dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (240, 60, 60)) self._estop_theme = estop_theme with dpg.theme() as arm_theme: with dpg.theme_component(dpg.mvButton): dpg.add_theme_color(dpg.mvThemeCol_Button, (30, 120, 50)) dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (40, 150, 65)) dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (50, 175, 80)) self._arm_theme = arm_theme # Flying: yellow LAND, with darkened label text for contrast. with dpg.theme() as disarm_theme: with dpg.theme_component(dpg.mvButton): dpg.add_theme_color(dpg.mvThemeCol_Button, (205, 170, 40)) dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (225, 190, 55)) dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (240, 205, 70)) dpg.add_theme_color(dpg.mvThemeCol_Text, (35, 30, 0)) self._disarm_theme = disarm_theme # Armed-on-ground: blue LAUNCH. with dpg.theme() as launch_theme: with dpg.theme_component(dpg.mvButton): dpg.add_theme_color(dpg.mvThemeCol_Button, (40, 95, 185)) dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (55, 115, 210)) dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (70, 135, 230)) self._launch_theme = launch_theme with dpg.window(tag='primary'): # Controls stacked on top; the console fills the space below so it # is roughly the same height as the rest of the UI above it. self._build_connection() dpg.add_separator() self._build_status() dpg.add_separator() self._build_diagnostics() dpg.add_separator() self._build_controls() dpg.add_separator() self._build_console() # SPACE = emergency stop, available anywhere. with dpg.handler_registry(): dpg.add_key_press_handler(dpg.mvKey_Spacebar, callback=self._on_estop) dpg.create_viewport(title='Crazyflie Mission Client', width=560, height=820) dpg.setup_dearpygui() dpg.show_viewport() dpg.set_primary_window('primary', True) def _build_connection(self) -> None: dpg.add_text('Connection') with dpg.group(horizontal=True): dpg.add_combo([self.controller.uri], tag='uri_combo', width=320, default_value=self.controller.uri) dpg.add_button(label='Scan', tag='scan_btn', callback=self._on_scan) dpg.add_button(label='Connect', tag='connect_btn', callback=self._on_connect_toggle) def _build_status(self) -> None: dpg.add_text('Status') dpg.add_text('Disconnected', tag='status_text', color=STATE_COLOUR[State.DISCONNECTED]) with dpg.group(horizontal=True): dpg.add_text('Battery:') dpg.add_text('-- V', tag='vbat_text') dpg.add_text(' Height:') dpg.add_text('-- m', tag='height_text') with dpg.group(horizontal=True): dpg.add_text('Flow v2 deck:') dpg.add_text('unknown', tag='flow_text') with dpg.group(horizontal=True): dpg.add_text('Supervisor:') dpg.add_text('idle', tag='sup_text') def _build_diagnostics(self) -> None: # Live estimator pose + flow counts for diagnosing circular drift: # a steady yaw drift or a circular x/y trace points at a heading-estimate # problem; flow counts near zero suggest a poor surface for the deck. dpg.add_text('Estimator state') with dpg.group(horizontal=True): dpg.add_text('Pos (m):') dpg.add_text('x -- y -- z --', tag='diag_pos') with dpg.group(horizontal=True): dpg.add_text('Yaw:') dpg.add_text('-- deg', tag='diag_yaw') with dpg.group(horizontal=True): dpg.add_text('Flow counts:') dpg.add_text('dx -- dy --', tag='diag_flow') def _build_controls(self) -> None: dpg.add_text('Flight') dpg.add_slider_float(label='Hover height', tag='height_slider', default_value=TARGET_HEIGHT_M, min_value=MIN_HEIGHT_M, max_value=MAX_HEIGHT_M, format='%.2f m', width=-80) # Primary action button; label/theme/enabled all driven per frame in # update(): ARM (green) -> LAUNCH (blue) -> LAND (yellow). dpg.add_button(label='ARM', tag='arm_btn', width=-1, height=46, callback=self._on_primary, enabled=False) # Secondary: back out of ARMED without launching. Enabled only in ARMED. dpg.add_button(label='Disarm', tag='disarm_btn', width=-1, callback=self._on_disarm, enabled=False) estop = dpg.add_button(label='EMERGENCY STOP (SPACE)', tag='estop_btn', width=-1, height=46, callback=self._on_estop) dpg.bind_item_theme(estop, self._estop_theme) def _build_console(self) -> None: # Header row, then a scrolling region that fills the remaining height # below the controls (height=-1) — same width as the rest of the UI. with dpg.group(horizontal=True): dpg.add_text('Console') dpg.add_button(label='Clear', callback=lambda: LOG_BUFFER.clear()) with dpg.child_window(tag='console_child', width=-1, height=-1): dpg.add_text('', tag='console_text', wrap=520) # ------------------------------------------------------------------ # Callbacks # ------------------------------------------------------------------ def _on_scan(self) -> None: uris = self.controller.scan() if uris: dpg.configure_item('uri_combo', items=uris) dpg.set_value('uri_combo', uris[0]) else: dpg.configure_item('uri_combo', items=[self.controller.uri]) dpg.set_value('uri_combo', '(none found)') def _on_connect_toggle(self) -> None: state = self.controller.snapshot().state if state in CONNECTED_STATES or state == State.CONNECTING: self.controller.disconnect() return uri = dpg.get_value('uri_combo') if uri and uri.startswith(('radio://', 'usb://')): self.controller.connect(uri) def _on_primary(self) -> None: # Context action for the single primary button: ARM -> LAUNCH -> LAND. state = self.controller.snapshot().state if state in ARMABLE_STATES: self.controller.arm() elif state in LAUNCHABLE_STATES: self.controller.launch(dpg.get_value('height_slider')) elif state in FLYING_STATES: self.controller.disarm() # land def _on_disarm(self) -> None: # Secondary button: back out of ARMED on the ground (no takeoff). if self.controller.snapshot().state in LAUNCHABLE_STATES: self.controller.disarm() def _on_estop(self, *_) -> None: self.controller.emergency_stop() # ------------------------------------------------------------------ # Per-frame update # ------------------------------------------------------------------ def update(self) -> None: upd = self.controller.snapshot() state, t = upd.state, upd.telemetry dpg.set_value('status_text', upd.message or state.name) dpg.configure_item('status_text', color=STATE_COLOUR.get(state, (200, 200, 200))) dpg.set_value('vbat_text', f'{t.vbat:.2f} V' if t.vbat else '-- V') dpg.set_value('height_text', f'{t.height:.2f} m') if t.flow_deck is None: dpg.set_value('flow_text', 'unknown') elif t.flow_deck: dpg.set_value('flow_text', 'attached') else: dpg.set_value('flow_text', 'NOT DETECTED') dpg.set_value('sup_text', self._supervisor_text(t)) # Drift-inspection readout. dpg.set_value('diag_pos', f'x {t.x:+.3f} y {t.y:+.3f} z {t.height:+.3f}') dpg.set_value('diag_yaw', f'{t.yaw:+.1f} deg') dpg.set_value('diag_flow', f'dx {t.flow_dx:+d} dy {t.flow_dy:+d}') # Connect / Disconnect button. connecting = state == State.CONNECTING connected = state in CONNECTED_STATES dpg.configure_item( 'connect_btn', enabled=not connecting, label='Disconnect' if (connected or connecting) else 'Connect') dpg.configure_item('scan_btn', enabled=not (connected or connecting)) dpg.configure_item('uri_combo', enabled=not (connected or connecting)) # Hover height is a pre-launch setting; lock the slider once airborne # or mid-arm transient. dpg.configure_item('height_slider', enabled=state not in HEIGHT_LOCKED_STATES) # Primary button — frame-driven across the two-step flow: # armable -> green "ARM" # armed -> blue "LAUNCH (to N m)" # flying -> yellow "LAND" # otherwise -> disabled, no theme (default greyed) if state in ARMABLE_STATES: dpg.configure_item('arm_btn', enabled=True, label='ARM') dpg.bind_item_theme('arm_btn', self._arm_theme) elif state in LAUNCHABLE_STATES: height = dpg.get_value('height_slider') dpg.configure_item('arm_btn', enabled=True, label=f'LAUNCH (to {height:.2f} m)') dpg.bind_item_theme('arm_btn', self._launch_theme) elif state in FLYING_STATES: dpg.configure_item('arm_btn', enabled=True, label='LAND') dpg.bind_item_theme('arm_btn', self._disarm_theme) else: dpg.configure_item('arm_btn', enabled=False, label='ARM') dpg.bind_item_theme('arm_btn', 0) # Secondary Disarm button: only usable while armed on the ground. dpg.configure_item('disarm_btn', enabled=state in LAUNCHABLE_STATES) dpg.configure_item('estop_btn', enabled=connected) self._refresh_console() def _refresh_console(self) -> None: version, lines = LOG_BUFFER.snapshot() if version == self._console_version: return self._console_version = version # Keep following the tail only if the user is already at the bottom. try: at_bottom = (dpg.get_y_scroll('console_child') >= dpg.get_y_scroll_max('console_child') - 2.0) except Exception: at_bottom = True dpg.set_value('console_text', '\n'.join(lines)) if at_bottom: dpg.set_y_scroll('console_child', -1.0) @staticmethod def _supervisor_text(t) -> str: flags = [] if t.locked: flags.append('LOCKED') if t.tumbled: flags.append('TUMBLED') if t.crashed: flags.append('CRASHED') if t.deck_fault: flags.append('DECK-FAULT') if t.armed: flags.append('armed') if t.flying: flags.append('flying') return ', '.join(flags) if flags else 'idle' # ------------------------------------------------------------------ # Run loop # ------------------------------------------------------------------ def run(self) -> None: self.controller.start() self.build() try: while dpg.is_dearpygui_running(): self.update() dpg.render_dearpygui_frame() finally: # Never leave the drone airborne: ask the controller to land (if # flying) and close the link, and wait for the worker to finish so # libusb is never torn down mid-call. Bounded so we can't hang. try: self.controller.shutdown() self.controller.join(timeout=12) except Exception: pass dpg.destroy_context() def main() -> None: MissionClientApp().run() if __name__ == '__main__': main()