#!/usr/bin/env python3 """DearPyGui front-end for the Crazyflie range-mapping client. Two independent parts, reflecting how the data is captured: 1. Mapping - projects the Multi-ranger ToF distances through the drone's position estimate into a top-down point-cloud map of the surroundings. This works whenever pose + range are streaming, so you can simply *hold the drone in your hand* and walk it around to build a map; no flight required. 2. Flight (automated scan) - an optional capture profile: arm, lift off, then ascend slowly while rotating in place so the sensors sweep a full circle on their own. The render loop is driven manually (same pattern as the test-stand / mission clients): every frame we poll the controller's thread-safe snapshot() and the map, and refresh the widgets. All radio I/O stays on the controller's worker thread. Run: python -m rangeview CFLIB_URI=radio://0/80/2M/E7E7E7E7E7 python -m rangeview # custom URI """ from __future__ import annotations import math import os import shutil import subprocess import dearpygui.dearpygui as dpg from .cloud_publisher import DEFAULT_ADDRESS from .controller import ( DEFAULT_CLIMB_RATE_MPS, DEFAULT_START_HEIGHT_M, DEFAULT_TOP_HEIGHT_M, DEFAULT_URI, DEFAULT_YAW_RATE_DPS, LOG_BUFFER, MAX_HEIGHT_M, MIN_HEIGHT_M, RangeController, State, ) # Status text colour per state (R, G, B). STATE_COLOUR = { State.DISCONNECTED: (150, 150, 150), State.CONNECTING: (220, 180, 60), State.READY: (60, 200, 90), State.ARMING: (220, 180, 60), State.ARMED: (90, 150, 235), State.SCANNING: (60, 200, 90), State.LANDING: (220, 180, 60), State.EMERGENCY: (235, 70, 70), State.ERROR: (235, 70, 70), } # States in which a link exists (connected through on-the-ground emergency). CONNECTED_STATES = {State.READY, State.ARMING, State.ARMED, State.SCANNING, State.LANDING, State.EMERGENCY} ARMABLE_STATES = {State.READY, State.EMERGENCY} # primary: ARM (green) SCANNABLE_STATES = {State.ARMED} # primary: SCAN (blue) FLYING_STATES = {State.SCANNING} # primary: LAND (yellow) # Estimator reset / origin is a ground operation only. RESETTABLE_STATES = {State.READY, State.ARMED, State.EMERGENCY} # Scan-parameter sliders lock once airborne or mid-arm transient. PARAMS_LOCKED_STATES = {State.ARMING, State.SCANNING, State.LANDING} # Map sensor colours, matching the live-ray series in the plot. RAY_COLOUR = { 'front': (90, 200, 255), 'back': (255, 170, 90), 'left': (140, 255, 140), 'right': (255, 140, 200), } # Heading arrow length and drone marker, in metres / plot units. HEADING_LEN_M = 0.35 class RangeViewApp: def __init__(self) -> None: self.controller = RangeController( os.environ.get('CFLIB_URI', DEFAULT_URI)) self._console_version = -1 # last LOG_BUFFER version rendered self._map_version = -1 # last map version uploaded to the plot # ------------------------------------------------------------------ # UI construction # ------------------------------------------------------------------ def build(self) -> None: dpg.create_context() self._load_font() self._build_themes() with dpg.window(tag='primary'): with dpg.group(horizontal=True): # Left column: connection, telemetry, mapping + flight controls. with dpg.child_window(width=440, autosize_y=True): self._build_connection() dpg.add_separator() self._build_status() dpg.add_separator() self._build_ranges() dpg.add_separator() self._build_mapping() dpg.add_separator() self._build_flight() dpg.add_separator() self._build_console() # Right column: the map fills the rest of the window. with dpg.child_window(width=-1, autosize_y=True): self._build_map() # SPACE = emergency stop, available anywhere. with dpg.handler_registry(): dpg.add_key_press_handler(dpg.mvKey_Spacebar, callback=self._on_estop) dpg.create_viewport(title='Crazyflie RangeView', width=1180, height=820) dpg.setup_dearpygui() dpg.show_viewport() dpg.set_primary_window('primary', True) def _load_font(self) -> None: """Bind Brass Mono as the UI font when available (falls back silently to the DearPyGui default). A real mono font also renders the box/glyph characters cleanly; the UI strings themselves are kept ASCII.""" path = _find_brass_mono() if not path: return with dpg.font_registry(): font = dpg.add_font(path, 16) dpg.bind_font(font) def _build_themes(self) -> None: def button_theme(base, hover, active, text=None): with dpg.theme() as theme: with dpg.theme_component(dpg.mvButton): dpg.add_theme_color(dpg.mvThemeCol_Button, base) dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, hover) dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, active) if text is not None: dpg.add_theme_color(dpg.mvThemeCol_Text, text) return theme self._estop_theme = button_theme((170, 30, 30), (210, 40, 40), (240, 60, 60)) self._arm_theme = button_theme((30, 120, 50), (40, 150, 65), (50, 175, 80)) self._scan_theme = button_theme((40, 95, 185), (55, 115, 210), (70, 135, 230)) self._land_theme = button_theme((205, 170, 40), (225, 190, 55), (240, 205, 70), text=(35, 30, 0)) def _build_connection(self) -> None: dpg.add_text('Connection') with dpg.group(horizontal=True): dpg.add_combo([self.controller.uri], tag='uri_combo', width=250, default_value=self.controller.uri) dpg.add_button(label='Scan', tag='scan_btn', callback=self._on_scan) dpg.add_button(label='Connect', tag='connect_btn', callback=self._on_connect_toggle) def _build_status(self) -> None: dpg.add_text('Status') dpg.add_text('Disconnected', tag='status_text', color=STATE_COLOUR[State.DISCONNECTED]) with dpg.group(horizontal=True): dpg.add_text('Battery:') dpg.add_text('-- V', tag='vbat_text') dpg.add_text(' Height:') dpg.add_text('-- m', tag='height_text') with dpg.group(horizontal=True): dpg.add_text('Multi-ranger:') dpg.add_text('unknown', tag='mr_text') dpg.add_text(' Flow v2:') dpg.add_text('unknown', tag='flow_text') with dpg.group(horizontal=True): dpg.add_text('Pos (m):') dpg.add_text('x -- y -- z --', tag='pos_text') dpg.add_text(' Yaw:') dpg.add_text('-- deg', tag='yaw_text') with dpg.group(horizontal=True): dpg.add_text('Motion:') dpg.add_text('--', tag='motion_text') def _build_ranges(self) -> None: dpg.add_text('ToF distances (m)') # Horizontal sensors, colour-matched to their map rays. with dpg.group(horizontal=True): for name in ('front', 'back', 'left', 'right'): dpg.add_text(f'{name[0].upper()}:') dpg.add_text('--', tag=f'rng_{name}', color=RAY_COLOUR[name]) dpg.add_spacer(width=6) with dpg.group(horizontal=True): dpg.add_text('Up (ceiling):') dpg.add_text('--', tag='rng_up') dpg.add_text(' Down (floor):') dpg.add_text('--', tag='rng_down') def _build_mapping(self) -> None: dpg.add_text('Mapping') dpg.add_text('Hold the drone and move it, or run a scan - both fill the ' 'map.', wrap=420, color=(150, 150, 150)) with dpg.group(horizontal=True): dpg.add_checkbox(label='Record', tag='record_chk', default_value=self.controller.recording, callback=self._on_record_toggle) dpg.add_button(label='Reset origin', tag='reset_btn', callback=self._on_reset_origin, enabled=False) dpg.add_button(label='Clear map', callback=lambda: self.controller.clear_map()) dpg.add_checkbox(label='Drift correction (hold pose when still)', tag='drift_chk', default_value=self.controller.drift_correction, callback=self._on_drift_toggle) dpg.add_text('Points: 0', tag='points_text') # 3-D stream to a CloudView viewer (the 2-D map above stays the live # diagnostic; the full 3-D cloud goes out over the socket). with dpg.group(horizontal=True): dpg.add_checkbox(label='Stream 3D to', tag='stream_chk', default_value=False, callback=self._on_stream_change) dpg.add_input_text(tag='stream_addr', width=210, default_value=DEFAULT_ADDRESS, callback=self._on_stream_change, on_enter=True) dpg.add_text('viewer: off', tag='stream_status', color=(150, 150, 150)) def _build_flight(self) -> None: dpg.add_text('Flight (automated scan)') dpg.add_text('EXPERIMENTAL - rotating flight on a Flow deck is risky. ' 'Bench-test props-off first; keep SPACE (e-stop) ready and ' 'the area clear.', wrap=420, color=(235, 170, 70)) dpg.add_slider_float(label='Start height', tag='start_slider', default_value=DEFAULT_START_HEIGHT_M, min_value=MIN_HEIGHT_M, max_value=MAX_HEIGHT_M, format='%.2f m', width=-110, callback=self._on_param_change) dpg.add_slider_float(label='Top height', tag='top_slider', default_value=DEFAULT_TOP_HEIGHT_M, min_value=MIN_HEIGHT_M, max_value=MAX_HEIGHT_M, format='%.2f m', width=-110, callback=self._on_param_change) dpg.add_slider_float(label='Climb rate', tag='climb_slider', default_value=DEFAULT_CLIMB_RATE_MPS, min_value=0.05, max_value=0.5, format='%.2f m/s', width=-110, callback=self._on_param_change) dpg.add_slider_float(label='Yaw rate', tag='yaw_slider', default_value=DEFAULT_YAW_RATE_DPS, min_value=15.0, max_value=120.0, format='%.0f deg/s', width=-110, callback=self._on_param_change) dpg.add_checkbox(label='Dry run (props off - walk the scan, no motors)', tag='dryrun_chk', default_value=self.controller.dry_run, callback=self._on_dryrun_toggle) # Primary action: ARM (green) -> SCAN (blue) -> LAND (yellow). dpg.add_button(label='ARM', tag='arm_btn', width=-1, height=42, callback=self._on_primary, enabled=False) with dpg.group(horizontal=True): dpg.add_button(label='Disarm', tag='disarm_btn', width=140, callback=self._on_disarm, enabled=False) estop = dpg.add_button(label='EMERGENCY STOP (SPACE)', tag='estop_btn', width=-1, height=42, callback=self._on_estop) dpg.bind_item_theme(estop, self._estop_theme) def _build_console(self) -> None: with dpg.group(horizontal=True): dpg.add_text('Console') dpg.add_button(label='Clear', callback=lambda: LOG_BUFFER.clear()) with dpg.child_window(tag='console_child', width=-1, height=150): dpg.add_text('', tag='console_text', wrap=410) def _build_map(self) -> None: with dpg.group(horizontal=True): dpg.add_text('Map (top-down - looking down on the drone)') dpg.add_spacer(width=12) dpg.add_text('extent +/-') dpg.add_slider_float(tag='extent_slider', default_value=3.0, min_value=1.0, max_value=8.0, format='%.1f m', width=160) with dpg.plot(tag='map_plot', height=-1, width=-1, equal_aspects=True): dpg.add_plot_legend() dpg.add_plot_axis(dpg.mvXAxis, label='x - forward (m)', tag='map_x') with dpg.plot_axis(dpg.mvYAxis, label='y - left (m)', tag='map_y'): dpg.add_scatter_series([], [], label='obstacles', tag='map_points') # Live sensor rays from the drone's current pose. for name in ('front', 'back', 'left', 'right'): dpg.add_line_series([], [], label=name, tag=f'ray_{name}') dpg.add_line_series([], [], label='heading', tag='heading_line') dpg.add_scatter_series([0.0], [0.0], label='drone', tag='drone_pt') # Series styling: small grey obstacle dots, coloured rays, bold drone. self._style_series('map_points', (200, 200, 80), marker_size=2) self._style_series('drone_pt', (60, 220, 90), marker_size=6, marker=dpg.mvPlotMarker_Circle) self._style_series('heading_line', (60, 220, 90), weight=2.0) for name in ('front', 'back', 'left', 'right'): self._style_series(f'ray_{name}', RAY_COLOUR[name], weight=1.5) def _style_series(self, tag, colour, marker_size=3, weight=1.0, marker=dpg.mvPlotMarker_Circle) -> None: with dpg.theme() as theme: with dpg.theme_component(dpg.mvScatterSeries): dpg.add_theme_color(dpg.mvPlotCol_Line, colour, category=dpg.mvThemeCat_Plots) dpg.add_theme_color(dpg.mvPlotCol_MarkerFill, colour, category=dpg.mvThemeCat_Plots) dpg.add_theme_color(dpg.mvPlotCol_MarkerOutline, colour, category=dpg.mvThemeCat_Plots) dpg.add_theme_style(dpg.mvPlotStyleVar_Marker, marker, category=dpg.mvThemeCat_Plots) dpg.add_theme_style(dpg.mvPlotStyleVar_MarkerSize, marker_size, category=dpg.mvThemeCat_Plots) with dpg.theme_component(dpg.mvLineSeries): dpg.add_theme_color(dpg.mvPlotCol_Line, colour, category=dpg.mvThemeCat_Plots) dpg.add_theme_style(dpg.mvPlotStyleVar_LineWeight, weight, category=dpg.mvThemeCat_Plots) dpg.bind_item_theme(tag, theme) # ------------------------------------------------------------------ # Callbacks # ------------------------------------------------------------------ def _on_scan(self) -> None: uris = self.controller.scan_uris() if uris: dpg.configure_item('uri_combo', items=uris) dpg.set_value('uri_combo', uris[0]) else: dpg.configure_item('uri_combo', items=[self.controller.uri]) dpg.set_value('uri_combo', '(none found)') def _on_connect_toggle(self) -> None: state = self.controller.snapshot().state if state in CONNECTED_STATES or state == State.CONNECTING: self.controller.disconnect() return uri = dpg.get_value('uri_combo') if uri and uri.startswith(('radio://', 'usb://')): self.controller.connect(uri) def _on_record_toggle(self, sender, value) -> None: self.controller.set_recording(value) def _on_reset_origin(self) -> None: self.controller.reset_origin() def _on_drift_toggle(self, sender, value) -> None: self.controller.set_drift_correction(value) def _on_dryrun_toggle(self, sender, value) -> None: self.controller.set_dry_run(value) def _on_stream_change(self, *_) -> None: self.controller.set_streaming(dpg.get_value('stream_chk'), dpg.get_value('stream_addr').strip()) def _on_param_change(self, *_) -> None: self.controller.set_scan_params( dpg.get_value('start_slider'), dpg.get_value('top_slider'), dpg.get_value('climb_slider'), dpg.get_value('yaw_slider')) def _on_primary(self) -> None: state = self.controller.snapshot().state if state in ARMABLE_STATES: self._on_param_change() # latch the latest slider values self.controller.arm() elif state in SCANNABLE_STATES: self.controller.scan() elif state in FLYING_STATES: self.controller.stop() # land def _on_disarm(self) -> None: if self.controller.snapshot().state in SCANNABLE_STATES: self.controller.stop() def _on_estop(self, *_) -> None: self.controller.emergency_stop() # ------------------------------------------------------------------ # Per-frame update # ------------------------------------------------------------------ def update(self) -> None: upd = self.controller.snapshot() state, t = upd.state, upd.telemetry dpg.set_value('status_text', upd.message or state.name) dpg.configure_item('status_text', color=STATE_COLOUR.get(state, (200, 200, 200))) dpg.set_value('vbat_text', f'{t.vbat:.2f} V' if t.vbat else '-- V') dpg.set_value('height_text', f'{t.z:.2f} m') dpg.set_value('mr_text', _deck_label(t.multiranger)) dpg.set_value('flow_text', _deck_label(t.flow_deck)) dpg.set_value('pos_text', f'x {t.x:+.2f} y {t.y:+.2f} z {t.z:+.2f}') dpg.set_value('yaw_text', f'{t.yaw:+.1f} deg') # Motion / drift-hold indicator: shows when the pose is being pinned, and # the residual raw drift it is absorbing so you can see it working. if t.still and t.drift_corrected: drift = math.hypot(t.raw_x - t.x, t.raw_y - t.y) dpg.set_value('motion_text', f'still - pose held (absorbing {drift * 100:.0f} cm drift)') dpg.configure_item('motion_text', color=(90, 150, 235)) elif t.still: dpg.set_value('motion_text', 'still (correction off)') dpg.configure_item('motion_text', color=(150, 150, 150)) else: dpg.set_value('motion_text', 'moving') dpg.configure_item('motion_text', color=(60, 200, 90)) for name in ('front', 'back', 'left', 'right', 'up', 'down'): dpg.set_value(f'rng_{name}', _range_label(getattr(t, name))) # 3-D stream status. if not dpg.get_value('stream_chk'): dpg.set_value('stream_status', 'viewer: off') dpg.configure_item('stream_status', color=(150, 150, 150)) elif self.controller.streaming_connected: dpg.set_value('stream_status', 'viewer: connected') dpg.configure_item('stream_status', color=(60, 200, 90)) else: dpg.set_value('stream_status', 'viewer: waiting for connection...') dpg.configure_item('stream_status', color=(220, 180, 60)) self._refresh_buttons(state) self._refresh_map(t) self._refresh_console() def _refresh_buttons(self, state: State) -> None: connecting = state == State.CONNECTING connected = state in CONNECTED_STATES dpg.configure_item( 'connect_btn', enabled=not connecting, label='Disconnect' if (connected or connecting) else 'Connect') dpg.configure_item('scan_btn', enabled=not (connected or connecting)) dpg.configure_item('uri_combo', enabled=not (connected or connecting)) dpg.configure_item('reset_btn', enabled=state in RESETTABLE_STATES) locked = state in PARAMS_LOCKED_STATES for tag in ('start_slider', 'top_slider', 'climb_slider', 'yaw_slider'): dpg.configure_item(tag, enabled=not locked) # Dry run swaps the lock to keep the params editable and re-labels the # buttons so it's obvious nothing will fly. dry = dpg.get_value('dryrun_chk') # Primary button across the ARM -> SCAN -> LAND flow. if state in ARMABLE_STATES: dpg.configure_item('arm_btn', enabled=True, label='ARM (dry run)' if dry else 'ARM') dpg.bind_item_theme('arm_btn', self._arm_theme) elif state in SCANNABLE_STATES: top = dpg.get_value('top_slider') label = (f'DRY-RUN SCAN (walk to {top:.2f} m, no motors)' if dry else f'SCAN (!) (rotating, to {top:.2f} m)') dpg.configure_item('arm_btn', enabled=True, label=label) dpg.bind_item_theme('arm_btn', self._scan_theme) elif state in FLYING_STATES: dpg.configure_item('arm_btn', enabled=True, label='LAND') dpg.bind_item_theme('arm_btn', self._land_theme) else: dpg.configure_item('arm_btn', enabled=False, label='ARM') dpg.bind_item_theme('arm_btn', 0) dpg.configure_item('disarm_btn', enabled=state in SCANNABLE_STATES) dpg.configure_item('estop_btn', enabled=connected) def _refresh_map(self, t) -> None: # Re-upload the accumulated cloud only when it actually changed. version, xs, ys = self.controller.map_snapshot() if version != self._map_version: self._map_version = version dpg.set_value('map_points', [xs, ys]) dpg.set_value('points_text', f'Points: {len(xs)}') # Drone marker, heading arrow, and live rays from the current pose. dpg.set_value('drone_pt', [[t.x], [t.y]]) yaw = math.radians(t.yaw) hx = t.x + HEADING_LEN_M * math.cos(yaw) hy = t.y + HEADING_LEN_M * math.sin(yaw) dpg.set_value('heading_line', [[t.x, hx], [t.y, hy]]) for name, (bx, by) in (('front', (1.0, 0.0)), ('back', (-1.0, 0.0)), ('left', (0.0, 1.0)), ('right', (0.0, -1.0))): dist = getattr(t, name) if dist is None: dpg.set_value(f'ray_{name}', [[], []]) continue wx = math.cos(yaw) * bx - math.sin(yaw) * by wy = math.sin(yaw) * bx + math.cos(yaw) * by dpg.set_value(f'ray_{name}', [[t.x, t.x + wx * dist], [t.y, t.y + wy * dist]]) # Keep the view centred on the origin with a user-set extent (equal # aspect keeps it square regardless of window shape). ext = dpg.get_value('extent_slider') dpg.set_axis_limits('map_x', -ext, ext) dpg.set_axis_limits('map_y', -ext, ext) def _refresh_console(self) -> None: version, lines = LOG_BUFFER.snapshot() if version == self._console_version: return self._console_version = version try: at_bottom = (dpg.get_y_scroll('console_child') >= dpg.get_y_scroll_max('console_child') - 2.0) except Exception: at_bottom = True dpg.set_value('console_text', '\n'.join(lines)) if at_bottom: dpg.set_y_scroll('console_child', -1.0) # ------------------------------------------------------------------ # Run loop # ------------------------------------------------------------------ def run(self) -> None: self.controller.start() self.build() try: while dpg.is_dearpygui_running(): self.update() dpg.render_dearpygui_frame() finally: # Never leave the drone airborne: ask the controller to land (if # flying) and close the link, and wait for the worker to finish so # libusb is never torn down mid-call. Bounded so we can't hang. try: self.controller.shutdown() self.controller.join(timeout=12) except Exception: pass dpg.destroy_context() def _find_brass_mono() -> str | None: """Locate the Brass Mono regular TTF, or None if it isn't installed.""" candidates = [ os.path.expanduser('~/.local/share/fonts/BrassMono-Regular.ttf'), '/usr/share/fonts/truetype/brassmono/BrassMono-Regular.ttf', '/usr/local/share/fonts/BrassMono-Regular.ttf', ] for path in candidates: if os.path.isfile(path): return path # Fall back to fontconfig, which resolves by family name. if shutil.which('fc-match'): try: out = subprocess.run(['fc-match', '-f', '%{file}', 'Brass Mono'], capture_output=True, text=True, timeout=3) path = out.stdout.strip() if path and os.path.isfile(path) and 'brass' in path.lower(): return path except Exception: pass return None def _deck_label(present) -> str: if present is None: return 'unknown' return 'attached' if present else 'NOT DETECTED' def _range_label(dist) -> str: return f'{dist:.2f}' if dist is not None else '--' def main() -> None: RangeViewApp().run() if __name__ == '__main__': main()