"""DearPyGui front-end for the Crazyflie motor test stand. The render loop is driven manually so motor ramps advance smoothly every frame: sliders set per-motor *targets*, and each frame the current output is moved toward its target at the configured ramp rate, then the resulting PWM is pushed to the Crazyflie only when it changes. Safety model: * Motors never move until you press ARM (and throttle starts at 0). * A max-throttle limit clamps every target. * EMERGENCY STOP (button or SPACE) cuts all motors instantly. * Disconnecting or losing the link disarms automatically. """ from __future__ import annotations import time import dearpygui.dearpygui as dpg from .cf_link import (CrazyflieLink, LOG_BUFFER, PWM_MAX, ENABLE_INDIVIDUAL, ENABLE_ALL) NUM_MOTORS = 4 PLOT_WINDOW_S = 30.0 # seconds of history to keep in the plots class MotorChannel: """Tracks a motor's target (slider) and ramped current output.""" def __init__(self) -> None: self.current = 0.0 # % actually being output (after ramp) self.last_pwm = -1 # last value sent to the radio (-1 = never) def step(self, target_pct: float, rate_pps: float, dt: float) -> None: delta = target_pct - self.current max_step = rate_pps * dt if abs(delta) <= max_step: self.current = target_pct else: self.current += max_step if delta > 0 else -max_step @property def pwm(self) -> int: return int(round(self.current / 100.0 * PWM_MAX)) class TestStandApp: def __init__(self) -> None: self.link = CrazyflieLink() self.motors = [MotorChannel() for _ in range(NUM_MOTORS)] self._last_frame = time.monotonic() self._armed_mode = None # enable mode currently armed with, or None self._plot_t0 = None # monotonic time of arming; None while disarmed self._plot_t: list[float] = [] self._plot_throttle: list[float] = [] self._plot_vbat: list[float] = [] self._console_version = -1 # ------------------------------------------------------------------ # UI construction # ------------------------------------------------------------------ def build(self) -> None: dpg.create_context() with dpg.theme() as estop_theme: with dpg.theme_component(dpg.mvButton): dpg.add_theme_color(dpg.mvThemeCol_Button, (170, 30, 30)) dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (210, 40, 40)) dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (240, 60, 60)) self._estop_theme = estop_theme # ARM button: green when ready to arm, yellow once armed. with dpg.theme() as arm_ready_theme: with dpg.theme_component(dpg.mvButton): dpg.add_theme_color(dpg.mvThemeCol_Button, (35, 130, 45)) dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (45, 160, 55)) dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (55, 185, 65)) self._arm_ready_theme = arm_ready_theme with dpg.theme() as armed_theme: with dpg.theme_component(dpg.mvButton): dpg.add_theme_color(dpg.mvThemeCol_Button, (200, 170, 30)) dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (225, 195, 45)) dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (245, 215, 60)) dpg.add_theme_color(dpg.mvThemeCol_Text, (20, 20, 20)) self._armed_theme = armed_theme with dpg.window(tag="primary"): self._build_connection() dpg.add_separator() self._build_safety() dpg.add_separator() self._build_motors() dpg.add_separator() self._build_plots() dpg.add_separator() self._build_console() # SPACE = emergency stop, available anywhere. with dpg.handler_registry(): dpg.add_key_press_handler(dpg.mvKey_Spacebar, callback=self._on_estop) dpg.create_viewport(title="Crazyflie Motor Test Stand", width=900, height=1010) dpg.setup_dearpygui() dpg.show_viewport() dpg.set_primary_window("primary", True) def _build_connection(self) -> None: dpg.add_text("Connection") with dpg.group(horizontal=True): dpg.add_combo([], tag="uri_combo", width=320, default_value="(scan for devices)") dpg.add_button(label="Scan", callback=self._on_scan) dpg.add_button(label="Connect", tag="connect_btn", callback=self._on_connect_toggle) with dpg.group(horizontal=True): dpg.add_text("Status:") dpg.add_text("disconnected", tag="status_text") with dpg.group(horizontal=True): dpg.add_text("Battery:") dpg.add_text("-- V", tag="vbat_text") dpg.add_text(" Link:") dpg.add_text("--", tag="link_text") def _build_safety(self) -> None: dpg.add_text("Safety") with dpg.group(horizontal=True): dpg.add_button(label="ARM", tag="arm_btn", width=120, callback=self._on_arm_toggle, enabled=False) estop = dpg.add_button(label="EMERGENCY STOP (SPACE)", width=260, callback=self._on_estop) dpg.bind_item_theme(estop, self._estop_theme) dpg.add_slider_float(label="Max throttle limit (%)", tag="max_limit", default_value=30.0, min_value=0.0, max_value=100.0, width=300) dpg.add_slider_float(label="Ramp rate (%/s)", tag="ramp_rate", default_value=20.0, min_value=1.0, max_value=200.0, width=300) dpg.add_text("", tag="warn_text", color=(230, 180, 40)) def _build_motors(self) -> None: dpg.add_text("Motors") dpg.add_radio_button(("Linked (all together)", "Individual"), tag="mode_radio", default_value="Linked (all together)", horizontal=True, callback=self._on_mode_change) with dpg.group(horizontal=True): dpg.add_slider_float(label="", tag="master", default_value=0.0, min_value=0.0, max_value=100.0, width=360, callback=self._sync_from_slider, user_data="master") dpg.add_input_float(tag="master_input", default_value=0.0, min_value=0.0, max_value=100.0, step=1.0, format="%.1f", width=110, min_clamped=True, max_clamped=True, on_enter=True, callback=self._sync_from_input, user_data="master") dpg.add_text("Master throttle (%)") with dpg.group(horizontal=True): dpg.add_button(label="All to 0", callback=lambda: self._set_all_targets(0)) dpg.add_text(" (type a value or drag; output ramps at the rate above)") for i in range(NUM_MOTORS): with dpg.group(horizontal=True): dpg.add_slider_float(label="", tag=f"motor_{i}", default_value=0.0, min_value=0.0, max_value=100.0, width=280, callback=self._sync_from_slider, user_data=f"motor_{i}") dpg.add_input_float(tag=f"motor_{i}_input", default_value=0.0, min_value=0.0, max_value=100.0, step=1.0, format="%.1f", width=110, min_clamped=True, max_clamped=True, on_enter=True, callback=self._sync_from_input, user_data=f"motor_{i}") dpg.add_text(f"M{i + 1} (%)") dpg.add_text("out: 0.0% (0)", tag=f"motor_out_{i}") def _build_plots(self) -> None: dpg.add_text("Telemetry") dpg.add_text("Estimator state") dpg.add_text("Pos (m): --", tag="est_pos") dpg.add_text("Yaw: --", tag="est_yaw") dpg.add_text("Flow counts: --", tag="est_flow") dpg.add_separator() with dpg.plot(label="Output throttle (avg %)", height=180, width=-1): dpg.add_plot_axis(dpg.mvXAxis, label="t (s)", tag="thr_x") dpg.set_axis_limits("thr_x", 0, PLOT_WINDOW_S) with dpg.plot_axis(dpg.mvYAxis, label="%", tag="thr_y"): dpg.set_axis_limits("thr_y", 0, 100) dpg.add_line_series([], [], tag="thr_series") with dpg.plot(label="Battery (V)", height=180, width=-1): dpg.add_plot_axis(dpg.mvXAxis, label="t (s)", tag="vbat_x") dpg.set_axis_limits("vbat_x", 0, PLOT_WINDOW_S) with dpg.plot_axis(dpg.mvYAxis, label="V", tag="vbat_y"): dpg.set_axis_limits("vbat_y", 2.8, 4.3) dpg.add_line_series([], [], tag="vbat_series") def _build_console(self) -> None: with dpg.group(horizontal=True): dpg.add_text("Console") dpg.add_button(label="Clear", callback=lambda: LOG_BUFFER.clear()) with dpg.child_window(tag="console_child", height=170, width=-1): dpg.add_text("", tag="console_text", wrap=860) # ------------------------------------------------------------------ # Callbacks # ------------------------------------------------------------------ def _on_scan(self) -> None: uris = self.link.scan() if uris: dpg.configure_item("uri_combo", items=uris) dpg.set_value("uri_combo", uris[0]) else: dpg.configure_item("uri_combo", items=[]) dpg.set_value("uri_combo", "(none found)") def _on_connect_toggle(self) -> None: state = self.link.snapshot() if state.status in ("connected", "connecting"): self.link.disconnect() return uri = dpg.get_value("uri_combo") if uri and uri.startswith(("radio://", "usb://")): self.link.connect(uri) def _on_arm_toggle(self) -> None: state = self.link.snapshot() if state.armed: self._disarm() else: self._arm() def _arm(self) -> None: mode = self._enable_mode() missing = self.link.arm(mode) self._armed_mode = mode if missing: dpg.set_value("warn_text", "firmware missing params: " + ", ".join(missing)) else: dpg.set_value("warn_text", "") def _disarm(self) -> None: self.link.disarm() self._armed_mode = None self._set_all_targets(0) def _on_estop(self, *_) -> None: self.link.emergency_stop() self._armed_mode = None self._set_all_targets(0) for m in self.motors: m.current = 0.0 def _on_mode_change(self, *_) -> None: # Re-arm with the matching enable mode if currently armed. if self._armed_mode is not None: self._arm() def _sync_from_slider(self, sender, app_data, user_data) -> None: # Slider moved -> mirror its value into the paired text box. dpg.set_value(f"{user_data}_input", app_data) def _sync_from_input(self, sender, app_data, user_data) -> None: # Text box committed -> clamp and mirror into the slider. value = max(0.0, min(100.0, app_data)) dpg.set_value(user_data, value) dpg.set_value(f"{user_data}_input", value) def _set_target(self, base_tag: str, value: float) -> None: dpg.set_value(base_tag, value) dpg.set_value(f"{base_tag}_input", value) def _set_all_targets(self, value: float) -> None: self._set_target("master", value) for i in range(NUM_MOTORS): self._set_target(f"motor_{i}", value) # ------------------------------------------------------------------ # Per-frame update # ------------------------------------------------------------------ def _enable_mode(self) -> int: linked = dpg.get_value("mode_radio").startswith("Linked") return ENABLE_ALL if linked else ENABLE_INDIVIDUAL def _targets(self) -> list[float]: """Effective per-motor targets after mode + max-limit are applied.""" max_pct = dpg.get_value("max_limit") linked = dpg.get_value("mode_radio").startswith("Linked") if linked: t = dpg.get_value("master") raw = [t] * NUM_MOTORS else: raw = [dpg.get_value(f"motor_{i}") for i in range(NUM_MOTORS)] return [max(0.0, min(max_pct, v)) for v in raw] def update(self) -> None: now = time.monotonic() dt = now - self._last_frame self._last_frame = now state = self.link.snapshot() rate = dpg.get_value("ramp_rate") targets = self._targets() armed = state.armed and state.status == "connected" linked = dpg.get_value("mode_radio").startswith("Linked") for i, m in enumerate(self.motors): # When disarmed, force everything to zero so nothing lingers. m.step(targets[i] if armed else 0.0, rate, dt) # Push changed PWM values to the radio. if armed: if linked: pwm = self.motors[0].pwm if pwm != self.motors[0].last_pwm: self.link.set_all(pwm) for m in self.motors: m.last_pwm = pwm else: for i, m in enumerate(self.motors): if m.pwm != m.last_pwm: self.link.set_motor(i + 1, m.pwm) m.last_pwm = m.pwm self._refresh_widgets(state, armed) self._refresh_estimator() self._refresh_plots(now, state, armed) self._refresh_console() def _refresh_estimator(self) -> None: est = self.link.estimator # atomic read of the published snapshot if not est.valid: dpg.set_value("est_pos", "Pos (m): --") dpg.set_value("est_yaw", "Yaw: --") dpg.set_value("est_flow", "Flow counts: --") return dpg.set_value("est_pos", f"Pos (m): x {est.x:+.2f} y {est.y:+.2f} z {est.z:+.2f}") dpg.set_value("est_yaw", f"Yaw: {est.yaw:+.1f} deg") if est.has_flow: dpg.set_value("est_flow", f"Flow counts: dx {est.dx:+d} dy {est.dy:+d}") else: dpg.set_value("est_flow", "Flow counts: (no Flow deck)") def _refresh_console(self) -> None: version, lines = LOG_BUFFER.snapshot() if version == self._console_version: return self._console_version = version # Keep following the tail only if the user is already at the bottom. try: at_bottom = (dpg.get_y_scroll("console_child") >= dpg.get_y_scroll_max("console_child") - 2.0) except Exception: at_bottom = True dpg.set_value("console_text", "\n".join(lines)) if at_bottom: dpg.set_y_scroll("console_child", -1.0) def _refresh_widgets(self, state, armed: bool) -> None: dpg.set_value("status_text", state.message or state.status) dpg.set_value("vbat_text", f"{state.vbat:.2f} V" if state.vbat else "-- V") dpg.set_value("link_text", f"{state.link_quality:.0f}%") connected = state.status == "connected" dpg.configure_item("connect_btn", label="Disconnect" if state.status in ("connected", "connecting") else "Connect") dpg.configure_item("arm_btn", enabled=connected, label="DISARM" if state.armed else "ARM") # Disabled when not connectable; green when ready to arm; yellow when armed. if not connected: dpg.bind_item_theme("arm_btn", 0) elif state.armed: dpg.bind_item_theme("arm_btn", self._armed_theme) else: dpg.bind_item_theme("arm_btn", self._arm_ready_theme) for i, m in enumerate(self.motors): dpg.set_value(f"motor_out_{i}", f"out: {m.current:5.1f}% ({m.pwm})") def _refresh_plots(self, now: float, state, armed: bool) -> None: # Only collect/scroll while armed; the trace freezes when disarmed. if not armed: self._plot_t0 = None return if self._plot_t0 is None: # Arming transition: restart the time axis from zero. self._plot_t0 = now self._plot_t.clear() self._plot_throttle.clear() self._plot_vbat.clear() t = now - self._plot_t0 avg = sum(m.current for m in self.motors) / NUM_MOTORS self._plot_t.append(t) self._plot_throttle.append(avg) self._plot_vbat.append(state.vbat) # Drop history outside the rolling window. while self._plot_t and self._plot_t[0] < t - PLOT_WINDOW_S: self._plot_t.pop(0) self._plot_throttle.pop(0) self._plot_vbat.pop(0) dpg.set_value("thr_series", [self._plot_t, self._plot_throttle]) dpg.set_value("vbat_series", [self._plot_t, self._plot_vbat]) dpg.set_axis_limits("thr_x", max(0, t - PLOT_WINDOW_S), max(PLOT_WINDOW_S, t)) dpg.set_axis_limits("vbat_x", max(0, t - PLOT_WINDOW_S), max(PLOT_WINDOW_S, t)) # ------------------------------------------------------------------ # Run loop # ------------------------------------------------------------------ def run(self) -> None: self.build() try: while dpg.is_dearpygui_running(): self.update() dpg.render_dearpygui_frame() finally: # Best-effort: never leave the bench armed. try: self.link.emergency_stop() self.link.disconnect() except Exception: pass dpg.destroy_context() def main() -> None: TestStandApp().run() if __name__ == "__main__": main()