"""Flight controller for the Crazyflie mission client. Owns the Crazyflie radio link on a single dedicated worker thread and exposes a small, thread-safe command API to the UI (connect / disconnect / arm / disarm / emergency_stop / shutdown). Keeping every cflib call on one thread means the UI thread never blocks on radio I/O and there is exactly one place that can command the motors. Flight strategy: this uses the firmware's *high-level commander* (takeoff / land / hover) together with the *supervisor* arming system. The high-level commander holds position autonomously in firmware once airborne, so a stalled or slow UI thread cannot make the drone drift or fall the way a missed stream of manual setpoints would. Position/altitude hold is provided by the Flow v2 deck. The API is intentionally tiny and all mutating commands go through a queue that only the worker thread consumes, except emergency_stop which *also* fires directly from the caller for minimum latency. """ from __future__ import annotations import collections import dataclasses import logging import queue import threading import time from dataclasses import dataclass from enum import Enum, auto import cflib.crtp from cflib.crazyflie import Crazyflie from cflib.crazyflie.log import LogConfig from cflib.crazyflie.syncCrazyflie import SyncCrazyflie from cflib.crazyflie.syncLogger import SyncLogger from cflib.utils import uri_helper logger = logging.getLogger(__name__) class _StaleLogFilter(logging.Filter): """Drop cflib's benign "no LogEntry to handle id=N" warnings. Firmware streams log-data for a block id the host no longer tracks when a stale block is left over from a prior/parallel session. We already delete our blocks on disconnect, but a handful of in-flight packets can still arrive right after reconnecting; they're harmless (power-cycle to clear), so keep them out of the console rather than alarming the user. """ def filter(self, record: logging.LogRecord) -> bool: return 'no LogEntry to handle' not in record.getMessage() class LogBuffer(logging.Handler): """A thread-safe ring buffer of formatted log lines for the UI console. cflib logs from its own thread, so reads/writes are guarded by a lock. A monotonic version counter lets the UI cheaply detect when there's something new to render. """ def __init__(self, capacity: int = 500) -> None: super().__init__() self._lines: collections.deque = collections.deque(maxlen=capacity) self._lock = threading.Lock() self._version = 0 self.setFormatter(logging.Formatter( '%(asctime)s %(levelname).1s %(name)s: %(message)s', '%H:%M:%S')) def emit(self, record: logging.LogRecord) -> None: try: line = self.format(record) except Exception: return with self._lock: self._lines.append(line) self._version += 1 def snapshot(self) -> tuple[int, list[str]]: with self._lock: return self._version, list(self._lines) def clear(self) -> None: with self._lock: self._lines.clear() self._version += 1 # Single shared console buffer, attached to the root logger once. Firmware # console output is logged under its own name so it's easy to spot. LOG_BUFFER = LogBuffer() _CONSOLE_LOGGER = logging.getLogger('crazyflie.console') _LOGGING_READY = False def _install_console_logging() -> None: """Route cflib + app logs into LOG_BUFFER for the UI console (idempotent).""" global _LOGGING_READY if _LOGGING_READY: return logging.getLogger('cflib.crazyflie.log').addFilter(_StaleLogFilter()) root = logging.getLogger() if root.level == logging.NOTSET or root.level > logging.INFO: root.setLevel(logging.INFO) LOG_BUFFER.setLevel(logging.INFO) root.addHandler(LOG_BUFFER) _LOGGING_READY = True # Radio URI. Override with the CFLIB_URI environment variable. DEFAULT_URI = uri_helper.uri_from_env(default='radio://0/80/2M/E7E7E7E7E7') # Mission parameters. TARGET_HEIGHT_M = 1.0 # default hover height above the ground MIN_HEIGHT_M = 0.2 # configurable hover-height bounds (Flow v2 range) MAX_HEIGHT_M = 2.0 CLIMB_RATE_MPS = 0.35 # ascent/descent speed used to time takeoff & land MIN_RAMP_S = 1.5 # floor on takeoff/land duration for low hovers MIN_SAFE_VBAT = 3.10 # refuse to arm below this pack voltage def clamp_height(height: float) -> float: """Clamp a requested hover height to the safe configurable range.""" return max(MIN_HEIGHT_M, min(MAX_HEIGHT_M, float(height))) def ramp_duration(height: float) -> float: """Time to climb/descend `height` metres at a gentle, consistent rate.""" return max(MIN_RAMP_S, abs(height) / CLIMB_RATE_MPS) # supervisor.info bitfield (see crazyflie-firmware src/modules/src/supervisor.c). SUP_CAN_ARM = 0x0001 SUP_IS_ARMED = 0x0002 SUP_AUTO_ARM = 0x0004 SUP_CAN_FLY = 0x0008 SUP_IS_FLYING = 0x0010 SUP_IS_TUMBLED = 0x0020 SUP_IS_LOCKED = 0x0040 SUP_IS_CRASHED = 0x0080 SUP_DECK_FAULT = 0x0800 # a deck reported a hardware fault class State(Enum): """High-level mission state, surfaced to the UI.""" DISCONNECTED = auto() CONNECTING = auto() READY = auto() # connected, on the ground, disarmed ARMING = auto() # transient: estimator reset + arming request ARMED = auto() # armed and idle on the ground, ready to LAUNCH FLYING = auto() # taking off or hovering at target height LANDING = auto() # controlled descent in progress EMERGENCY = auto() # motors cut; reboot required to fly again ERROR = auto() # connection or radio failure @dataclass class Telemetry: """Latest known drone state. Plain values; safe to copy across threads.""" vbat: float = 0.0 height: float = 0.0 # Estimator pose + raw optical-flow counts, for drift inspection. x: float = 0.0 y: float = 0.0 yaw: float = 0.0 flow_dx: int = 0 flow_dy: int = 0 flow_deck: bool | None = None # None = not yet probed can_arm: bool = False armed: bool = False flying: bool = False tumbled: bool = False locked: bool = False crashed: bool = False deck_fault: bool = False @dataclass class Update: """Immutable snapshot handed to the UI listener.""" state: State message: str telemetry: Telemetry class DroneController: """Threaded Crazyflie flight controller. Usage: c = DroneController(uri, listener=on_update) c.start() # init radio drivers + spawn worker c.connect() # open link c.arm() # reset estimator + arm; idle armed on the ground c.launch(1.0) # take off to 1 m and hover c.disarm() # land if flying, else disarm on the ground c.emergency_stop() # cut motors immediately c.shutdown() # land if needed, close link, stop worker """ def __init__(self, uri: str = DEFAULT_URI, listener=None, cache_dir: str = './cache'): self._uri = uri self._listener = listener or (lambda update: None) self._cache_dir = cache_dir self._cmd_q: queue.Queue[str] = queue.Queue() self._target_height = TARGET_HEIGHT_M # latched at arm time self._state = State.DISCONNECTED self._message = 'Disconnected' self._telemetry = Telemetry() # Latest immutable snapshot, replaced atomically in _emit() and read # without a lock by per-frame UIs via snapshot(). self._latest = Update(State.DISCONNECTED, 'Disconnected', Telemetry()) self._scf: SyncCrazyflie | None = None self._console_buf = '' # partial firmware console line self._log_configs: list = [] # active log blocks, deleted on teardown self._proto_v7 = False # supervisor/arming available? self._session_result = '' # 'disconnect' | 'shutdown' | '' self._lock = threading.Lock() # guards _scf access from other threads self._thread = threading.Thread( target=self._run, name='drone-worker', daemon=True) # ----------------------------------------------------------------- API --- @property def uri(self) -> str: return self._uri def set_listener(self, listener) -> None: """Register an optional push callback. Per-frame UIs can instead poll snapshot(); the listener is kept for event-driven consumers.""" self._listener = listener or (lambda update: None) def snapshot(self) -> Update: """Return the latest state/telemetry snapshot. Thread-safe and cheap; intended to be polled once per render frame.""" return self._latest @staticmethod def scan() -> list[str]: """URIs of Crazyflies reachable on any installed interface.""" try: return [entry[0] for entry in cflib.crtp.scan_interfaces()] except Exception: logger.warning('scan failed', exc_info=True) return [] def start(self) -> None: _install_console_logging() cflib.crtp.init_drivers() self._thread.start() def connect(self, uri: str | None = None) -> None: if uri: self._uri = uri self._cmd_q.put('connect') def disconnect(self) -> None: self._cmd_q.put('disconnect') def arm(self) -> None: """Arm the motors and hold on the ground (two-step flow). The drone idles armed in the ARMED state until launch() or disarm(); the firmware auto-disarms after ~30 s if it never flies.""" self._cmd_q.put('arm') def launch(self, height: float | None = None) -> None: """Take off from the ARMED state to `height` metres (clamped), then hover. Height is latched here, so the slider can be adjusted right up until launch.""" if height is not None: self._target_height = clamp_height(height) self._cmd_q.put('launch') def disarm(self) -> None: """Context-sensitive: land if flying, else disarm on the ground.""" self._cmd_q.put('disarm') def emergency_stop(self) -> None: # Queue it so the worker breaks out of any flight loop and updates # state, but ALSO fire immediately from the caller thread so motors # are cut with the least possible latency. self._cmd_q.put('estop') with self._lock: scf = self._scf if scf is not None: try: self._cut_motors(scf) except Exception: logger.exception('direct emergency stop failed') def shutdown(self) -> None: self._cmd_q.put('shutdown') def alive(self) -> bool: return self._thread.is_alive() def join(self, timeout: float | None = None) -> None: """Wait for the worker thread to finish (after shutdown()). Ensures the radio link is closed before the process exits so libusb is never torn down while the worker is still using it.""" if self._thread.is_alive(): self._thread.join(timeout) # ----------------------------------------------------------- worker loop - def _run(self) -> None: while True: cmd = self._cmd_q.get() if cmd == 'shutdown': return if cmd == 'connect': self._session() if self._session_result == 'shutdown': return # any other command while disconnected is ignored def _session(self) -> None: """Open the link and service commands until disconnect/shutdown.""" self._session_result = '' self._set_state(State.CONNECTING, f'Connecting to {self._uri} …') try: cf = Crazyflie(rw_cache=self._cache_dir) with SyncCrazyflie(self._uri, cf=cf) as scf: with self._lock: self._scf = scf self._proto_v7 = self._detect_protocol(scf) self._setup_logging(scf) self._probe_flow_deck(scf) self._set_state(State.READY, 'Connected — ready to arm') try: self._command_loop(scf) finally: # Delete our log blocks while the link is still open so the # firmware stops streaming them. Skipping this leaves blocks # registered in firmware until the next power-cycle, which # later shows up as "no LogEntry to handle id=N" warnings. self._teardown_logging() except Exception as exc: logger.exception('session error') self._set_state(State.ERROR, f'Connection error: {exc}') finally: with self._lock: self._scf = None if self._session_result != 'shutdown': self._set_state(State.DISCONNECTED, 'Disconnected') def _command_loop(self, scf: SyncCrazyflie) -> None: """Service commands on the ground (READY / ARMED); delegates the airborne phase to _do_launch().""" while True: # If we were armed on the ground but the firmware disarmed us # (≈30 s pre-flight timeout, a bump/tumble, or low battery), # fall back to READY so the UI reflects it. if (self._state == State.ARMED and self._proto_v7 and not self._telemetry.armed): self._set_state(State.READY, 'Auto-disarmed (armed too long or unsafe)') try: cmd = self._cmd_q.get(timeout=0.2) except queue.Empty: continue if cmd == 'arm': if self._state in (State.READY, State.EMERGENCY): self._do_arm(scf) elif cmd == 'launch': if self._state == State.ARMED: follow_up = self._do_launch(scf) if follow_up in ('disconnect', 'shutdown'): self._session_result = follow_up return elif cmd == 'disarm': if self._state == State.ARMED: self._do_disarm_ground(scf) elif cmd == 'disconnect': if self._state == State.ARMED: self._do_disarm_ground(scf) self._session_result = 'disconnect' return elif cmd == 'shutdown': if self._state == State.ARMED: self._do_disarm_ground(scf) self._session_result = 'shutdown' return elif cmd == 'estop': self._do_estop(scf) # 'connect' while on the ground: nothing to do. # --------------------------------------------------------------- flight -- def _do_arm(self, scf: SyncCrazyflie) -> None: """Run pre-flight checks and arm the motors; on success enter ARMED (idle on the ground). Does NOT take off — that's _do_launch().""" cf = scf.cf # --- pre-flight safety checks ------------------------------------- if self._telemetry.flow_deck is False: self._set_state(State.READY, 'Arm refused: no Flow v2 deck detected') return ok, why = self._battery_ok() if not ok: self._set_state(State.READY, f'Arm refused: {why}') return if self._telemetry.tumbled: self._set_state(State.READY, 'Arm refused: drone is tumbled — set it level') return if self._telemetry.locked: self._set_state(State.READY, 'Arm refused: supervisor locked — reboot the drone') return # --- reset the position estimator and let it settle --------------- self._set_state(State.ARMING, 'Resetting position estimator …') self._reset_estimator(scf) # --- arm (firmware supervisor) ------------------------------------ if self._proto_v7: # Don't gate on the `canArm` bit: stock firmware builds with # AUTO_ARMING=1, where the supervisor auto-arms the instant # pre-flight checks pass, so `canArm` is only ever true for a # single tick and a client essentially never observes it. The # real signal is the `armed` bit. Requesting arming is harmless # if already auto-armed (firmware no-ops it). cf.supervisor.send_arming_request(True) if not self._wait_until(lambda: self._telemetry.armed, 5.0): self._set_state(State.READY, f'Arm failed: {self._arm_failure_reason()}') return else: # Legacy firmware: unlock the thrust-safety latch with a zero # setpoint before the high-level commander will spin the motors. cf.commander.send_setpoint(0, 0, 0, 0) self._set_state(State.ARMED, 'Armed — ready to LAUNCH (auto-disarms ' 'in ~30 s if idle)') def _do_disarm_ground(self, scf: SyncCrazyflie) -> None: """Disarm without flying (back out of the ARMED state).""" if self._proto_v7: scf.cf.supervisor.send_arming_request(False) self._set_state(State.READY, 'Disarmed') def _do_launch(self, scf: SyncCrazyflie) -> str | None: """Take off from ARMED to self._target_height, then hover until told to stop. Returns 'disconnect'/'shutdown' if such a command arrived mid-flight (so the caller tears down after landing), else None.""" cf = scf.cf # The firmware may have auto-disarmed between ARM and LAUNCH. if self._proto_v7 and not self._telemetry.armed: self._set_state(State.READY, 'Launch aborted: no longer armed') return None height = self._target_height ascent_s = ramp_duration(height) self._set_state(State.FLYING, f'Taking off to {height:.2f} m …') cf.high_level_commander.takeoff(height, ascent_s) ascent_done_at = time.monotonic() + ascent_s hovering_announced = False while True: cmd = self._poll_cmd() if cmd == 'estop': self._do_estop(scf) return None if cmd in ('disarm', 'disconnect', 'shutdown'): self._land(scf) return cmd if cmd in ('disconnect', 'shutdown') else None # ignore 'arm'/'launch'/'connect' — already flying. if not hovering_announced and time.monotonic() >= ascent_done_at: self._set_state(State.FLYING, f'Hovering at {height:.2f} m') hovering_announced = True time.sleep(0.05) def _land(self, scf: SyncCrazyflie) -> None: """Controlled descent, then stop the commander and disarm.""" cf = scf.cf descent_s = ramp_duration(self._target_height) self._set_state(State.LANDING, 'Landing …') cf.high_level_commander.land(0.0, descent_s) end = time.monotonic() + descent_s while time.monotonic() < end: # Stay responsive to an emergency stop during descent. if self._poll_cmd() == 'estop': self._do_estop(scf) return time.sleep(0.05) cf.high_level_commander.stop() if self._proto_v7: cf.supervisor.send_arming_request(False) self._set_state(State.READY, 'Landed — on the ground, disarmed') def _do_estop(self, scf: SyncCrazyflie) -> None: try: self._cut_motors(scf) except Exception: logger.exception('emergency stop failed') self._set_state(State.EMERGENCY, 'EMERGENCY STOP — motors cut (reboot to fly again)') def _cut_motors(self, scf: SyncCrazyflie) -> None: """Cut motors immediately. Safe to call from any thread.""" cf = scf.cf if self._proto_v7: cf.supervisor.send_emergency_stop() else: cf.commander.send_stop_setpoint() try: cf.high_level_commander.stop() except Exception: pass # ----------------------------------------------------- estimator helper -- def _reset_estimator(self, scf: SyncCrazyflie) -> None: cf = scf.cf cf.param.set_value('kalman.resetEstimation', '1') time.sleep(0.1) cf.param.set_value('kalman.resetEstimation', '0') self._wait_for_position_estimator(scf) def _wait_for_position_estimator(self, scf: SyncCrazyflie, threshold: float = 0.001, timeout: float = 10.0) -> None: """Block until the Kalman position variance converges (or times out).""" log_conf = LogConfig(name='kalman_var', period_in_ms=100) log_conf.add_variable('kalman.varPX', 'float') log_conf.add_variable('kalman.varPY', 'float') log_conf.add_variable('kalman.varPZ', 'float') history = {'x': [1000.0] * 10, 'y': [1000.0] * 10, 'z': [1000.0] * 10} deadline = time.monotonic() + timeout try: with SyncLogger(scf, log_conf) as sync_log: for _, data, _ in sync_log: for key, var in (('x', 'kalman.varPX'), ('y', 'kalman.varPY'), ('z', 'kalman.varPZ')): history[key].append(data[var]) history[key].pop(0) spread = max(max(h) - min(h) for h in history.values()) if spread < threshold: return if time.monotonic() > deadline: logger.warning('estimator did not converge in %.0fs ' '(spread=%.4f); proceeding', timeout, spread) return except Exception: # Don't let a logging hiccup block takeoff entirely. logger.exception('estimator convergence wait failed') # ---------------------------------------------------------- log / param -- def _detect_protocol(self, scf: SyncCrazyflie) -> bool: try: return scf.cf.platform.get_protocol_version() >= 7 except Exception: logger.exception('could not read protocol version') return False def _setup_logging(self, scf: SyncCrazyflie) -> None: self._log_configs = [] # Mirror the firmware's console output into the UI console. The cf # object is recreated each session, so callbacks don't accumulate. self._console_buf = '' scf.cf.console.receivedChar.add_callback(self._on_console) tlm = LogConfig(name='mission_tlm', period_in_ms=200) tlm.add_variable('pm.vbat', 'float') tlm.add_variable('stateEstimate.z', 'float') scf.cf.log.add_config(tlm) tlm.data_received_cb.add_callback(self._on_telemetry) tlm.start() self._log_configs.append(tlm) if self._proto_v7: sup = LogConfig(name='mission_sup', period_in_ms=200) sup.add_variable('supervisor.info', 'uint16_t') scf.cf.log.add_config(sup) sup.data_received_cb.add_callback(self._on_supervisor) sup.start() self._log_configs.append(sup) # Drift-inspection block: estimator pose + raw flow counts. motion.* # only exists with a Flow deck attached, so set it up best-effort. try: diag = LogConfig(name='mission_diag', period_in_ms=200) diag.add_variable('stateEstimate.x', 'float') diag.add_variable('stateEstimate.y', 'float') diag.add_variable('stabilizer.yaw', 'float') diag.add_variable('motion.deltaX', 'int16_t') diag.add_variable('motion.deltaY', 'int16_t') scf.cf.log.add_config(diag) diag.data_received_cb.add_callback(self._on_diag) diag.start() self._log_configs.append(diag) except (KeyError, AttributeError) as exc: logger.warning('drift-inspection logging unavailable: %s', exc) def _teardown_logging(self) -> None: """Stop and delete our log blocks so firmware stops streaming them.""" for conf in getattr(self, '_log_configs', []): try: conf.stop() conf.delete() except Exception: logger.exception('failed to delete log config %s', conf.name) self._log_configs = [] def _on_console(self, text: str) -> None: # Firmware console arrives in chunks; emit one log line per newline. self._console_buf += text while '\n' in self._console_buf: line, self._console_buf = self._console_buf.split('\n', 1) line = line.rstrip('\r') if line: _CONSOLE_LOGGER.info(line) def _on_telemetry(self, _ts, data, _logconf) -> None: self._telemetry.vbat = data['pm.vbat'] self._telemetry.height = data['stateEstimate.z'] self._emit() def _on_diag(self, _ts, data, _logconf) -> None: t = self._telemetry t.x = data['stateEstimate.x'] t.y = data['stateEstimate.y'] t.yaw = data['stabilizer.yaw'] t.flow_dx = int(data['motion.deltaX']) t.flow_dy = int(data['motion.deltaY']) self._emit() def _on_supervisor(self, _ts, data, _logconf) -> None: info = data['supervisor.info'] t = self._telemetry t.can_arm = bool(info & SUP_CAN_ARM) t.armed = bool(info & SUP_IS_ARMED) t.flying = bool(info & SUP_IS_FLYING) t.tumbled = bool(info & SUP_IS_TUMBLED) t.locked = bool(info & SUP_IS_LOCKED) t.crashed = bool(info & SUP_IS_CRASHED) t.deck_fault = bool(info & SUP_DECK_FAULT) self._emit() def _probe_flow_deck(self, scf: SyncCrazyflie) -> None: try: value = scf.cf.param.get_value('deck.bcFlow2', timeout=5) self._telemetry.flow_deck = bool(int(value)) except Exception: logger.exception('could not read deck.bcFlow2') self._telemetry.flow_deck = None self._emit() def _arm_failure_reason(self) -> str: """Best-effort explanation for why the drone would not arm.""" t = self._telemetry if t.locked: return 'supervisor locked — reboot the drone' if t.tumbled: return 'drone is tumbled — set it level and still' if t.deck_fault: return 'a deck reported a hardware fault — reseat decks / reboot' if t.crashed: return 'supervisor in crashed state — flip level to recover' # Pre-flight checks never passed; usually means the drone is not level # and stationary, or a sensor is still calibrating. return ('pre-flight checks did not pass — place the drone level and ' 'stationary, then retry') # --------------------------------------------------------------- utils --- def _battery_ok(self) -> tuple[bool, str]: # Wait briefly for the first battery reading after connect. end = time.monotonic() + 1.5 while self._telemetry.vbat == 0.0 and time.monotonic() < end: time.sleep(0.05) v = self._telemetry.vbat if v and v < MIN_SAFE_VBAT: return False, f'battery low ({v:.2f} V, need {MIN_SAFE_VBAT:.2f} V)' return True, '' def _poll_cmd(self) -> str | None: try: return self._cmd_q.get_nowait() except queue.Empty: return None def _wait_until(self, predicate, timeout: float) -> bool: end = time.monotonic() + timeout while time.monotonic() < end: if predicate(): return True time.sleep(0.05) return False def _set_state(self, state: State, message: str) -> None: self._state = state self._message = message logger.info('[%s] %s', state.name, message) self._emit() def _emit(self) -> None: snapshot = Update( state=self._state, message=self._message, telemetry=dataclasses.replace(self._telemetry), ) self._latest = snapshot # atomic publish for snapshot() pollers try: self._listener(snapshot) except Exception: logger.exception('listener raised')