"""Crazyflie communication layer for the motor test stand. Wraps cflib so the UI only deals with a small, thread-safe surface: connect/disconnect, arm/disarm, and writing raw motor PWM values. Motor override is done through the firmware ``motorPowerSet`` param group: motorPowerSet.enable = 0 normal flight control (no override) = 1 override each motor individually via m1..m4 = 2 override all motors with the m1 value motorPowerSet.m1..m4 = 0..65535 raw PWM per motor Current firmware arms through the supervisor via a platform arming request (``cf.platform.send_arming_request(True)``); the older ``system.forceArm`` param no longer exists. ``motor.batCompensation = 0`` would disable battery compensation so a commanded PWM maps directly to output, for repeatable measurements. Both the arming request and those legacy params are sent best-effort, so firmware with either mechanism works. All cflib callbacks run on cflib's own thread, so shared state is guarded by a lock and exposed through getters. Param writes are sent from the caller's thread, which cflib handles safely. """ from __future__ import annotations import logging import collections import threading from dataclasses import dataclass, field from typing import Callable, Optional import cflib.crtp from cflib.crazyflie import Crazyflie from cflib.crazyflie.log import LogConfig logger = logging.getLogger(__name__) # Firmware console output is logged under this name so it's easy to spot. _CONSOLE_LOGGER = logging.getLogger("crazyflie.console") PWM_MAX = 65535 # motorPowerSet.enable values ENABLE_OFF = 0 ENABLE_INDIVIDUAL = 1 ENABLE_ALL = 2 class _StaleLogFilter(logging.Filter): """Drop cflib's benign "no LogEntry to handle id=N" warnings. Firmware streams log-data for a block id the host no longer tracks when a stale block is left over from a prior/parallel session. It's harmless and not actionable from here (power-cycle the Crazyflie to clear it), so we keep it out of the console instead of alarming the user. """ def filter(self, record: logging.LogRecord) -> bool: return "no LogEntry to handle" not in record.getMessage() class LogBuffer(logging.Handler): """A thread-safe ring buffer of formatted log lines for the UI console. cflib logs from its own thread, so reads/writes are guarded by a lock. A monotonic version counter lets the UI cheaply detect when there's something new to render. """ def __init__(self, capacity: int = 500) -> None: super().__init__() self._lines: collections.deque = collections.deque(maxlen=capacity) self._lock = threading.Lock() self._version = 0 self.setFormatter(logging.Formatter( "%(asctime)s %(levelname).1s %(name)s: %(message)s", "%H:%M:%S")) def emit(self, record: logging.LogRecord) -> None: try: line = self.format(record) except Exception: return with self._lock: self._lines.append(line) self._version += 1 def snapshot(self) -> tuple[int, list[str]]: with self._lock: return self._version, list(self._lines) def clear(self) -> None: with self._lock: self._lines.clear() self._version += 1 # Single shared console buffer, attached to the root logger once. LOG_BUFFER = LogBuffer() _LOGGING_READY = False def _setup_logging() -> None: """Route cflib + app logs into LOG_BUFFER and the console (idempotent).""" global _LOGGING_READY if _LOGGING_READY: return logging.getLogger("cflib.crazyflie.log").addFilter(_StaleLogFilter()) root = logging.getLogger() if root.level == logging.NOTSET or root.level > logging.INFO: root.setLevel(logging.INFO) LOG_BUFFER.setLevel(logging.INFO) root.addHandler(LOG_BUFFER) _LOGGING_READY = True @dataclass(frozen=True) class EstimatorState: """Immutable snapshot of estimator pose + raw flow counts. Published atomically: the cflib log callback builds a fresh instance and assigns it to a single attribute, which is atomic under the GIL, so the render thread can read it lock-free. """ valid: bool = False # True once at least one sample has arrived has_flow: bool = False # True if a Flow deck supplied motion.* counts x: float = 0.0 # metres, origin at takeoff y: float = 0.0 z: float = 0.0 # laser-ranger height yaw: float = 0.0 # degrees dx: int = 0 # raw flow counts since last sample dy: int = 0 @dataclass class LinkState: """Snapshot of link status, copied out under lock for the UI.""" status: str = "disconnected" # disconnected | connecting | connected | failed uri: Optional[str] = None message: str = "" armed: bool = False vbat: float = 0.0 link_quality: float = 0.0 missing_params: list = field(default_factory=list) class CrazyflieLink: """Thread-safe wrapper around a single Crazyflie connection.""" def __init__(self) -> None: _setup_logging() cflib.crtp.init_drivers() self._cf = Crazyflie(rw_cache="./.cache") self._lock = threading.Lock() self._state = LinkState() self._log: Optional[LogConfig] = None self._est_log: Optional[LogConfig] = None self._est_has_flow = False # Lock-free snapshot: written by the cflib thread, read by the renderer. self._estimator = EstimatorState() self._console_buf = "" # partial firmware console line self._cf.console.receivedChar.add_callback(self._on_console) self._cf.connected.add_callback(self._on_connected) self._cf.fully_connected.add_callback(self._on_fully_connected) self._cf.disconnected.add_callback(self._on_disconnected) self._cf.connection_failed.add_callback(self._on_connection_failed) self._cf.connection_lost.add_callback(self._on_connection_lost) # link_statistics is the current home for link quality; fall back to the # deprecated top-level callback on older cflib. link_stats = getattr(self._cf, "link_statistics", None) if link_stats is not None and hasattr(link_stats, "link_quality_updated"): link_stats.link_quality_updated.add_callback(self._on_link_quality) else: # pragma: no cover - older cflib self._cf.link_quality_updated.add_callback(self._on_link_quality) # ------------------------------------------------------------------ # Discovery / connection # ------------------------------------------------------------------ @staticmethod def scan() -> list[str]: """Return URIs of Crazyflies reachable on any installed interface.""" try: return [entry[0] for entry in cflib.crtp.scan_interfaces()] except Exception as exc: # pragma: no cover - hardware dependent logger.warning("scan failed: %s", exc) return [] def connect(self, uri: str) -> None: logger.info("connecting to %s", uri) with self._lock: self._state = LinkState(status="connecting", uri=uri, message=f"connecting to {uri}...") self._cf.open_link(uri) def disconnect(self) -> None: # Make sure nothing is left spinning before we drop the link. try: if self._state.status == "connected": self.disarm() finally: self._cf.close_link() # ------------------------------------------------------------------ # Arming / motor control # ------------------------------------------------------------------ def arm(self, enable_mode: int) -> list[str]: """Arm the motor override. Returns the list of *critical* missing params. Only ``motorPowerSet.*`` is required. The legacy ``system.forceArm`` / ``motor.batCompensation`` params don't exist on current firmware (the supervisor handles arming via a platform request instead), so they are set best-effort and their absence is not reported. """ missing: list[str] = [] # Modern supervisor arming (current firmware). self._send_arming(True) # Legacy / optional params — best-effort, silent if absent. self._set_param("motor.batCompensation", 0) self._set_param("system.forceArm", 1) # Critical override params: report these if the firmware lacks them. for i in range(1, 5): self._set_param(f"motorPowerSet.m{i}", 0, missing) self._set_param("motorPowerSet.enable", enable_mode, missing) with self._lock: self._state.armed = True self._state.missing_params = missing mode_name = "all-linked" if enable_mode == ENABLE_ALL else "individual" logger.info("ARMED (%s mode)", mode_name) if missing: logger.warning("firmware missing motor params: %s", ", ".join(missing)) return missing def disarm(self) -> None: for i in range(1, 5): self._set_param(f"motorPowerSet.m{i}", 0) self._set_param("motorPowerSet.enable", ENABLE_OFF) self._set_param("system.forceArm", 0) self._send_arming(False) with self._lock: self._state.armed = False logger.info("disarmed") def emergency_stop(self) -> None: """Immediately cut all motors. Safe to call from any thread/state.""" try: for i in range(1, 5): self._set_param(f"motorPowerSet.m{i}", 0) self._set_param("motorPowerSet.enable", ENABLE_OFF) self._set_param("system.forceArm", 0) self._send_arming(False) finally: with self._lock: self._state.armed = False self._state.message = "EMERGENCY STOP" logger.warning("EMERGENCY STOP") def _send_arming(self, armed: bool) -> None: """Send a supervisor arming request, best-effort (older fw lacks it).""" try: self._cf.platform.send_arming_request(armed) except Exception as exc: # pragma: no cover - hardware dependent logger.debug("arming request (%s) failed: %s", armed, exc) def set_motor(self, index: int, pwm: int) -> None: """Set a single motor (1..4) to a raw PWM value (0..65535).""" self._set_param(f"motorPowerSet.m{index}", _clamp_pwm(pwm)) def set_all(self, pwm: int) -> None: """Set all motors at once (requires ENABLE_ALL mode).""" self._set_param("motorPowerSet.m1", _clamp_pwm(pwm)) # ------------------------------------------------------------------ # State access # ------------------------------------------------------------------ def snapshot(self) -> LinkState: with self._lock: return LinkState( status=self._state.status, uri=self._state.uri, message=self._state.message, armed=self._state.armed, vbat=self._state.vbat, link_quality=self._state.link_quality, missing_params=list(self._state.missing_params), ) @property def estimator(self) -> EstimatorState: """Latest estimator snapshot (lock-free atomic read of one reference).""" return self._estimator # ------------------------------------------------------------------ # Internals # ------------------------------------------------------------------ def _has_param(self, name: str) -> bool: # Use get_element(group, name) rather than get_element_by_complete_name: # the latter logs a noisy "Unable to find variable" warning on a miss, # and we probe optional params on purpose. try: group, var = name.split(".", 1) return self._cf.param.toc.get_element(group, var) is not None except Exception: return False def _set_param(self, name: str, value, missing: Optional[list] = None) -> bool: if not self._has_param(name): if missing is not None: missing.append(name) return False try: self._cf.param.set_value(name, value) return True except Exception as exc: # pragma: no cover - hardware dependent logger.warning("failed to set %s=%s: %s", name, value, exc) return False # --- cflib callbacks (run on cflib's thread) ---------------------- def _on_console(self, text: str) -> None: # Firmware console arrives in chunks; emit one log line per newline. self._console_buf += text while "\n" in self._console_buf: line, self._console_buf = self._console_buf.split("\n", 1) line = line.rstrip("\r") if line: _CONSOLE_LOGGER.info(line) def _on_connected(self, uri: str) -> None: with self._lock: self._state.message = f"connected to {uri}, downloading TOC..." def _on_fully_connected(self, uri: str) -> None: with self._lock: self._state.status = "connected" self._state.uri = uri self._state.message = f"connected: {uri}" logger.info("fully connected: %s", uri) self._start_logging() self._start_estimator_logging() def _on_disconnected(self, uri: str) -> None: self._stop_logging() self._stop_estimator_logging() self._estimator = EstimatorState() # reset to "no data" with self._lock: # Don't clobber a "failed" message with a routine disconnect. if self._state.status != "failed": self._state.status = "disconnected" self._state.message = "disconnected" self._state.armed = False self._state.vbat = 0.0 self._state.link_quality = 0.0 def _on_connection_failed(self, uri: str, msg: str) -> None: with self._lock: self._state.status = "failed" self._state.message = f"connection failed: {msg}" self._state.armed = False def _on_connection_lost(self, uri: str, msg: str) -> None: with self._lock: self._state.status = "failed" self._state.message = f"connection lost: {msg}" self._state.armed = False def _on_link_quality(self, quality: float) -> None: with self._lock: self._state.link_quality = quality # --- telemetry logging -------------------------------------------- def _start_logging(self) -> None: # Never stack blocks: tear down any previous one first. self._stop_logging() lg = LogConfig(name="teststand", period_in_ms=200) try: lg.add_variable("pm.vbat", "float") self._cf.log.add_config(lg) lg.data_received_cb.add_callback(self._on_log_data) lg.start() self._log = lg except (KeyError, AttributeError) as exc: logger.warning("could not start battery logging: %s", exc) self._log = None def _stop_logging(self) -> None: if self._log is not None: try: self._log.stop() self._log.delete() except Exception: pass self._log = None def _on_log_data(self, timestamp, data, logconf) -> None: with self._lock: self._state.vbat = data.get("pm.vbat", self._state.vbat) # --- estimator-state logging -------------------------------------- def _start_estimator_logging(self) -> None: """Start the estimator/pose block, best-effort. Tries the full block (incl. Flow-deck motion.* counts) first; if those variables are absent (no deck) the whole block would be rejected, so we fall back to a pose-only block. Either way pose + yaw still stream. """ self._stop_estimator_logging() core = [("stateEstimate.x", "float"), ("stateEstimate.y", "float"), ("stateEstimate.z", "float"), ("stabilizer.yaw", "float")] flow = [("motion.deltaX", "int16_t"), ("motion.deltaY", "int16_t")] for variables, has_flow in ((core + flow, True), (core, False)): lg = LogConfig(name="estimator", period_in_ms=200) for name, vtype in variables: lg.add_variable(name, vtype) try: self._cf.log.add_config(lg) lg.data_received_cb.add_callback(self._on_estimator_data) lg.start() self._est_log = lg self._est_has_flow = has_flow logger.info("estimator logging started (flow=%s)", has_flow) return except (KeyError, AttributeError) as exc: logger.info("estimator block %s unavailable: %s", "with flow" if has_flow else "pose-only", exc) self._est_log = None def _stop_estimator_logging(self) -> None: if self._est_log is not None: try: self._est_log.stop() self._est_log.delete() except Exception: pass self._est_log = None def _on_estimator_data(self, timestamp, data, logconf) -> None: # Build a fresh immutable snapshot and publish it atomically (single # reference assignment under the GIL — no lock needed). self._estimator = EstimatorState( valid=True, has_flow=self._est_has_flow, x=data.get("stateEstimate.x", 0.0), y=data.get("stateEstimate.y", 0.0), z=data.get("stateEstimate.z", 0.0), yaw=data.get("stabilizer.yaw", 0.0), dx=int(data.get("motion.deltaX", 0)), dy=int(data.get("motion.deltaY", 0)), ) def _clamp_pwm(value) -> int: return max(0, min(PWM_MAX, int(value)))