Crazyflie motor test stand GUI (cflib + DearPyGui)

uv-managed Python app for ramping Crazyflie 2.1 / Bolt motors on a
thrust test stand over a Crazyradio.

- cf_link.py: thread-safe cflib wrapper — scan/connect, supervisor
  arming (send_arming_request) with legacy forceArm fallback, motor
  override via motorPowerSet params, battery + estimator-state logging,
  firmware-console capture, and a log buffer for the UI console.
- app.py: DearPyGui UI with a per-frame ramp loop.
  * Linked / individual motor control, sliders + numeric entry (percent)
  * Max-throttle limit, ramp rate, EMERGENCY STOP (button + SPACE)
  * ARM button: disabled when disconnected, green when ready, yellow
    when armed
  * Telemetry: estimator-state panel (pose/yaw/flow), throttle + battery
    plots that start at zero and only scroll while armed (freeze on disarm)
  * Live console panel (app + cflib + firmware messages)

Benign cflib warnings (stale log blocks, missing optional params) are
filtered; safety teardown disarms on disconnect/quit.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-23 15:12:31 -07:00
commit c27d91c857
9 changed files with 1521 additions and 0 deletions

435
test_stand/app.py Normal file
View File

@@ -0,0 +1,435 @@
"""DearPyGui front-end for the Crazyflie motor test stand.
The render loop is driven manually so motor ramps advance smoothly every
frame: sliders set per-motor *targets*, and each frame the current output is
moved toward its target at the configured ramp rate, then the resulting PWM is
pushed to the Crazyflie only when it changes.
Safety model:
* Motors never move until you press ARM (and throttle starts at 0).
* A max-throttle limit clamps every target.
* EMERGENCY STOP (button or SPACE) cuts all motors instantly.
* Disconnecting or losing the link disarms automatically.
"""
from __future__ import annotations
import time
import dearpygui.dearpygui as dpg
from .cf_link import (CrazyflieLink, LOG_BUFFER, PWM_MAX,
ENABLE_INDIVIDUAL, ENABLE_ALL)
NUM_MOTORS = 4
PLOT_WINDOW_S = 30.0 # seconds of history to keep in the plots
class MotorChannel:
"""Tracks a motor's target (slider) and ramped current output."""
def __init__(self) -> None:
self.current = 0.0 # % actually being output (after ramp)
self.last_pwm = -1 # last value sent to the radio (-1 = never)
def step(self, target_pct: float, rate_pps: float, dt: float) -> None:
delta = target_pct - self.current
max_step = rate_pps * dt
if abs(delta) <= max_step:
self.current = target_pct
else:
self.current += max_step if delta > 0 else -max_step
@property
def pwm(self) -> int:
return int(round(self.current / 100.0 * PWM_MAX))
class TestStandApp:
def __init__(self) -> None:
self.link = CrazyflieLink()
self.motors = [MotorChannel() for _ in range(NUM_MOTORS)]
self._last_frame = time.monotonic()
self._armed_mode = None # enable mode currently armed with, or None
self._plot_t0 = None # monotonic time of arming; None while disarmed
self._plot_t: list[float] = []
self._plot_throttle: list[float] = []
self._plot_vbat: list[float] = []
self._console_version = -1
# ------------------------------------------------------------------
# UI construction
# ------------------------------------------------------------------
def build(self) -> None:
dpg.create_context()
with dpg.theme() as estop_theme:
with dpg.theme_component(dpg.mvButton):
dpg.add_theme_color(dpg.mvThemeCol_Button, (170, 30, 30))
dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (210, 40, 40))
dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (240, 60, 60))
self._estop_theme = estop_theme
# ARM button: green when ready to arm, yellow once armed.
with dpg.theme() as arm_ready_theme:
with dpg.theme_component(dpg.mvButton):
dpg.add_theme_color(dpg.mvThemeCol_Button, (35, 130, 45))
dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (45, 160, 55))
dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (55, 185, 65))
self._arm_ready_theme = arm_ready_theme
with dpg.theme() as armed_theme:
with dpg.theme_component(dpg.mvButton):
dpg.add_theme_color(dpg.mvThemeCol_Button, (200, 170, 30))
dpg.add_theme_color(dpg.mvThemeCol_ButtonHovered, (225, 195, 45))
dpg.add_theme_color(dpg.mvThemeCol_ButtonActive, (245, 215, 60))
dpg.add_theme_color(dpg.mvThemeCol_Text, (20, 20, 20))
self._armed_theme = armed_theme
with dpg.window(tag="primary"):
self._build_connection()
dpg.add_separator()
self._build_safety()
dpg.add_separator()
self._build_motors()
dpg.add_separator()
self._build_plots()
dpg.add_separator()
self._build_console()
# SPACE = emergency stop, available anywhere.
with dpg.handler_registry():
dpg.add_key_press_handler(dpg.mvKey_Spacebar, callback=self._on_estop)
dpg.create_viewport(title="Crazyflie Motor Test Stand", width=900, height=1010)
dpg.setup_dearpygui()
dpg.show_viewport()
dpg.set_primary_window("primary", True)
def _build_connection(self) -> None:
dpg.add_text("Connection")
with dpg.group(horizontal=True):
dpg.add_combo([], tag="uri_combo", width=320,
default_value="(scan for devices)")
dpg.add_button(label="Scan", callback=self._on_scan)
dpg.add_button(label="Connect", tag="connect_btn",
callback=self._on_connect_toggle)
with dpg.group(horizontal=True):
dpg.add_text("Status:")
dpg.add_text("disconnected", tag="status_text")
with dpg.group(horizontal=True):
dpg.add_text("Battery:")
dpg.add_text("-- V", tag="vbat_text")
dpg.add_text(" Link:")
dpg.add_text("--", tag="link_text")
def _build_safety(self) -> None:
dpg.add_text("Safety")
with dpg.group(horizontal=True):
dpg.add_button(label="ARM", tag="arm_btn", width=120,
callback=self._on_arm_toggle, enabled=False)
estop = dpg.add_button(label="EMERGENCY STOP (SPACE)", width=260,
callback=self._on_estop)
dpg.bind_item_theme(estop, self._estop_theme)
dpg.add_slider_float(label="Max throttle limit (%)", tag="max_limit",
default_value=30.0, min_value=0.0, max_value=100.0,
width=300)
dpg.add_slider_float(label="Ramp rate (%/s)", tag="ramp_rate",
default_value=20.0, min_value=1.0, max_value=200.0,
width=300)
dpg.add_text("", tag="warn_text", color=(230, 180, 40))
def _build_motors(self) -> None:
dpg.add_text("Motors")
dpg.add_radio_button(("Linked (all together)", "Individual"),
tag="mode_radio", default_value="Linked (all together)",
horizontal=True, callback=self._on_mode_change)
with dpg.group(horizontal=True):
dpg.add_slider_float(label="", tag="master", default_value=0.0,
min_value=0.0, max_value=100.0, width=360,
callback=self._sync_from_slider, user_data="master")
dpg.add_input_float(tag="master_input", default_value=0.0,
min_value=0.0, max_value=100.0, step=1.0,
format="%.1f", width=110, min_clamped=True,
max_clamped=True, on_enter=True,
callback=self._sync_from_input, user_data="master")
dpg.add_text("Master throttle (%)")
with dpg.group(horizontal=True):
dpg.add_button(label="All to 0", callback=lambda: self._set_all_targets(0))
dpg.add_text(" (type a value or drag; output ramps at the rate above)")
for i in range(NUM_MOTORS):
with dpg.group(horizontal=True):
dpg.add_slider_float(label="", tag=f"motor_{i}", default_value=0.0,
min_value=0.0, max_value=100.0, width=280,
callback=self._sync_from_slider,
user_data=f"motor_{i}")
dpg.add_input_float(tag=f"motor_{i}_input", default_value=0.0,
min_value=0.0, max_value=100.0, step=1.0,
format="%.1f", width=110, min_clamped=True,
max_clamped=True, on_enter=True,
callback=self._sync_from_input,
user_data=f"motor_{i}")
dpg.add_text(f"M{i + 1} (%)")
dpg.add_text("out: 0.0% (0)", tag=f"motor_out_{i}")
def _build_plots(self) -> None:
dpg.add_text("Telemetry")
dpg.add_text("Estimator state")
dpg.add_text("Pos (m): --", tag="est_pos")
dpg.add_text("Yaw: --", tag="est_yaw")
dpg.add_text("Flow counts: --", tag="est_flow")
dpg.add_separator()
with dpg.plot(label="Output throttle (avg %)", height=180, width=-1):
dpg.add_plot_axis(dpg.mvXAxis, label="t (s)", tag="thr_x")
dpg.set_axis_limits("thr_x", 0, PLOT_WINDOW_S)
with dpg.plot_axis(dpg.mvYAxis, label="%", tag="thr_y"):
dpg.set_axis_limits("thr_y", 0, 100)
dpg.add_line_series([], [], tag="thr_series")
with dpg.plot(label="Battery (V)", height=180, width=-1):
dpg.add_plot_axis(dpg.mvXAxis, label="t (s)", tag="vbat_x")
dpg.set_axis_limits("vbat_x", 0, PLOT_WINDOW_S)
with dpg.plot_axis(dpg.mvYAxis, label="V", tag="vbat_y"):
dpg.set_axis_limits("vbat_y", 2.8, 4.3)
dpg.add_line_series([], [], tag="vbat_series")
def _build_console(self) -> None:
with dpg.group(horizontal=True):
dpg.add_text("Console")
dpg.add_button(label="Clear", callback=lambda: LOG_BUFFER.clear())
with dpg.child_window(tag="console_child", height=170, width=-1):
dpg.add_text("", tag="console_text", wrap=860)
# ------------------------------------------------------------------
# Callbacks
# ------------------------------------------------------------------
def _on_scan(self) -> None:
uris = self.link.scan()
if uris:
dpg.configure_item("uri_combo", items=uris)
dpg.set_value("uri_combo", uris[0])
else:
dpg.configure_item("uri_combo", items=[])
dpg.set_value("uri_combo", "(none found)")
def _on_connect_toggle(self) -> None:
state = self.link.snapshot()
if state.status in ("connected", "connecting"):
self.link.disconnect()
return
uri = dpg.get_value("uri_combo")
if uri and uri.startswith(("radio://", "usb://")):
self.link.connect(uri)
def _on_arm_toggle(self) -> None:
state = self.link.snapshot()
if state.armed:
self._disarm()
else:
self._arm()
def _arm(self) -> None:
mode = self._enable_mode()
missing = self.link.arm(mode)
self._armed_mode = mode
if missing:
dpg.set_value("warn_text",
"firmware missing params: " + ", ".join(missing))
else:
dpg.set_value("warn_text", "")
def _disarm(self) -> None:
self.link.disarm()
self._armed_mode = None
self._set_all_targets(0)
def _on_estop(self, *_) -> None:
self.link.emergency_stop()
self._armed_mode = None
self._set_all_targets(0)
for m in self.motors:
m.current = 0.0
def _on_mode_change(self, *_) -> None:
# Re-arm with the matching enable mode if currently armed.
if self._armed_mode is not None:
self._arm()
def _sync_from_slider(self, sender, app_data, user_data) -> None:
# Slider moved -> mirror its value into the paired text box.
dpg.set_value(f"{user_data}_input", app_data)
def _sync_from_input(self, sender, app_data, user_data) -> None:
# Text box committed -> clamp and mirror into the slider.
value = max(0.0, min(100.0, app_data))
dpg.set_value(user_data, value)
dpg.set_value(f"{user_data}_input", value)
def _set_target(self, base_tag: str, value: float) -> None:
dpg.set_value(base_tag, value)
dpg.set_value(f"{base_tag}_input", value)
def _set_all_targets(self, value: float) -> None:
self._set_target("master", value)
for i in range(NUM_MOTORS):
self._set_target(f"motor_{i}", value)
# ------------------------------------------------------------------
# Per-frame update
# ------------------------------------------------------------------
def _enable_mode(self) -> int:
linked = dpg.get_value("mode_radio").startswith("Linked")
return ENABLE_ALL if linked else ENABLE_INDIVIDUAL
def _targets(self) -> list[float]:
"""Effective per-motor targets after mode + max-limit are applied."""
max_pct = dpg.get_value("max_limit")
linked = dpg.get_value("mode_radio").startswith("Linked")
if linked:
t = dpg.get_value("master")
raw = [t] * NUM_MOTORS
else:
raw = [dpg.get_value(f"motor_{i}") for i in range(NUM_MOTORS)]
return [max(0.0, min(max_pct, v)) for v in raw]
def update(self) -> None:
now = time.monotonic()
dt = now - self._last_frame
self._last_frame = now
state = self.link.snapshot()
rate = dpg.get_value("ramp_rate")
targets = self._targets()
armed = state.armed and state.status == "connected"
linked = dpg.get_value("mode_radio").startswith("Linked")
for i, m in enumerate(self.motors):
# When disarmed, force everything to zero so nothing lingers.
m.step(targets[i] if armed else 0.0, rate, dt)
# Push changed PWM values to the radio.
if armed:
if linked:
pwm = self.motors[0].pwm
if pwm != self.motors[0].last_pwm:
self.link.set_all(pwm)
for m in self.motors:
m.last_pwm = pwm
else:
for i, m in enumerate(self.motors):
if m.pwm != m.last_pwm:
self.link.set_motor(i + 1, m.pwm)
m.last_pwm = m.pwm
self._refresh_widgets(state, armed)
self._refresh_estimator()
self._refresh_plots(now, state, armed)
self._refresh_console()
def _refresh_estimator(self) -> None:
est = self.link.estimator # atomic read of the published snapshot
if not est.valid:
dpg.set_value("est_pos", "Pos (m): --")
dpg.set_value("est_yaw", "Yaw: --")
dpg.set_value("est_flow", "Flow counts: --")
return
dpg.set_value("est_pos",
f"Pos (m): x {est.x:+.2f} y {est.y:+.2f} z {est.z:+.2f}")
dpg.set_value("est_yaw", f"Yaw: {est.yaw:+.1f} deg")
if est.has_flow:
dpg.set_value("est_flow", f"Flow counts: dx {est.dx:+d} dy {est.dy:+d}")
else:
dpg.set_value("est_flow", "Flow counts: (no Flow deck)")
def _refresh_console(self) -> None:
version, lines = LOG_BUFFER.snapshot()
if version == self._console_version:
return
self._console_version = version
# Keep following the tail only if the user is already at the bottom.
try:
at_bottom = (dpg.get_y_scroll("console_child")
>= dpg.get_y_scroll_max("console_child") - 2.0)
except Exception:
at_bottom = True
dpg.set_value("console_text", "\n".join(lines))
if at_bottom:
dpg.set_y_scroll("console_child", -1.0)
def _refresh_widgets(self, state, armed: bool) -> None:
dpg.set_value("status_text", state.message or state.status)
dpg.set_value("vbat_text", f"{state.vbat:.2f} V" if state.vbat else "-- V")
dpg.set_value("link_text", f"{state.link_quality:.0f}%")
connected = state.status == "connected"
dpg.configure_item("connect_btn",
label="Disconnect" if state.status in ("connected", "connecting")
else "Connect")
dpg.configure_item("arm_btn", enabled=connected,
label="DISARM" if state.armed else "ARM")
# Disabled when not connectable; green when ready to arm; yellow when armed.
if not connected:
dpg.bind_item_theme("arm_btn", 0)
elif state.armed:
dpg.bind_item_theme("arm_btn", self._armed_theme)
else:
dpg.bind_item_theme("arm_btn", self._arm_ready_theme)
for i, m in enumerate(self.motors):
dpg.set_value(f"motor_out_{i}", f"out: {m.current:5.1f}% ({m.pwm})")
def _refresh_plots(self, now: float, state, armed: bool) -> None:
# Only collect/scroll while armed; the trace freezes when disarmed.
if not armed:
self._plot_t0 = None
return
if self._plot_t0 is None:
# Arming transition: restart the time axis from zero.
self._plot_t0 = now
self._plot_t.clear()
self._plot_throttle.clear()
self._plot_vbat.clear()
t = now - self._plot_t0
avg = sum(m.current for m in self.motors) / NUM_MOTORS
self._plot_t.append(t)
self._plot_throttle.append(avg)
self._plot_vbat.append(state.vbat)
# Drop history outside the rolling window.
while self._plot_t and self._plot_t[0] < t - PLOT_WINDOW_S:
self._plot_t.pop(0)
self._plot_throttle.pop(0)
self._plot_vbat.pop(0)
dpg.set_value("thr_series", [self._plot_t, self._plot_throttle])
dpg.set_value("vbat_series", [self._plot_t, self._plot_vbat])
dpg.set_axis_limits("thr_x", max(0, t - PLOT_WINDOW_S), max(PLOT_WINDOW_S, t))
dpg.set_axis_limits("vbat_x", max(0, t - PLOT_WINDOW_S), max(PLOT_WINDOW_S, t))
# ------------------------------------------------------------------
# Run loop
# ------------------------------------------------------------------
def run(self) -> None:
self.build()
try:
while dpg.is_dearpygui_running():
self.update()
dpg.render_dearpygui_frame()
finally:
# Best-effort: never leave the bench armed.
try:
self.link.emergency_stop()
self.link.disconnect()
except Exception:
pass
dpg.destroy_context()
def main() -> None:
TestStandApp().run()
if __name__ == "__main__":
main()