"""Flight + sensing controller for the Crazyflie range-mapping client. Owns the Crazyflie radio link on a single dedicated worker thread and exposes a small, thread-safe API to the UI (connect / disconnect / arm / scan / stop / emergency_stop / shutdown). Keeping every cflib call on one thread means the UI thread never blocks on radio I/O and there is exactly one place that can command the motors. The mission this controller flies is a *scan*: arm, lift off to a start height, then ascend slowly while rotating in place so the Multi-ranger deck's five ToF sensors sweep the drone's surroundings. Each range sample is combined with the estimator pose to project an obstacle point into a world frame, building a top-down point-cloud map of the immediate environment. Flight uses the firmware's *hover setpoint* stream (velocity-x/y, yaw-rate, absolute-z) together with the *supervisor* arming system. Position/altitude hold comes from the Flow v2 deck; the worker thread streams setpoints at 10 Hz so a slow or stalled UI cannot make the drone drift or fall. Sensor geometry (Crazyflie body frame: +x forward, +y left, +z up): front +x back -x left +y right -y up +z (ceiling) down -z (floor, from the Flow deck's z-ranger) The four horizontal sensors are what build the 2-D map; up/down are surfaced as ceiling/floor distances. """ from __future__ import annotations import collections import dataclasses import logging import math import queue import threading import time from dataclasses import dataclass from enum import Enum, auto import cflib.crtp from cflib.crazyflie import Crazyflie from cflib.crazyflie.log import LogConfig from cflib.crazyflie.syncCrazyflie import SyncCrazyflie from cflib.crazyflie.syncLogger import SyncLogger from cflib.utils import uri_helper from .cloud_publisher import CloudPublisher, DEFAULT_ADDRESS logger = logging.getLogger(__name__) class _StaleLogFilter(logging.Filter): """Drop cflib's benign "no LogEntry to handle id=N" warnings. Firmware streams log-data for a block id the host no longer tracks when a stale block is left over from a prior/parallel session. We delete our blocks on disconnect, but a handful of in-flight packets can still arrive right after reconnecting; they're harmless (power-cycle to clear), so keep them out of the console rather than alarming the user. """ def filter(self, record: logging.LogRecord) -> bool: return 'no LogEntry to handle' not in record.getMessage() class LogBuffer(logging.Handler): """A thread-safe ring buffer of formatted log lines for the UI console. cflib logs from its own thread, so reads/writes are guarded by a lock. A monotonic version counter lets the UI cheaply detect when there's something new to render. """ def __init__(self, capacity: int = 500) -> None: super().__init__() self._lines: collections.deque = collections.deque(maxlen=capacity) self._lock = threading.Lock() self._version = 0 self.setFormatter(logging.Formatter( '%(asctime)s %(levelname).1s %(name)s: %(message)s', '%H:%M:%S')) def emit(self, record: logging.LogRecord) -> None: try: line = self.format(record) except Exception: return with self._lock: self._lines.append(line) self._version += 1 def snapshot(self) -> tuple[int, list[str]]: with self._lock: return self._version, list(self._lines) def clear(self) -> None: with self._lock: self._lines.clear() self._version += 1 # Single shared console buffer, attached to the root logger once. Firmware # console output is logged under its own name so it's easy to spot. LOG_BUFFER = LogBuffer() _CONSOLE_LOGGER = logging.getLogger('crazyflie.console') _LOGGING_READY = False def _install_console_logging() -> None: """Route cflib + app logs into LOG_BUFFER for the UI console (idempotent).""" global _LOGGING_READY if _LOGGING_READY: return logging.getLogger('cflib.crazyflie.log').addFilter(_StaleLogFilter()) root = logging.getLogger() if root.level == logging.NOTSET or root.level > logging.INFO: root.setLevel(logging.INFO) LOG_BUFFER.setLevel(logging.INFO) root.addHandler(LOG_BUFFER) _LOGGING_READY = True # Radio URI. Override with the CFLIB_URI environment variable. DEFAULT_URI = uri_helper.uri_from_env(default='radio://0/80/2M/E7E7E7E7E7') # ---------------------------------------------------------------- scan params - # Defaults; all adjustable from the UI before a scan starts. Kept conservative # because the rotating scan is the part that previously caused a flyaway. DEFAULT_START_HEIGHT_M = 0.4 # lift-off / first sweep height DEFAULT_TOP_HEIGHT_M = 1.2 # final height after the ringed ascent DEFAULT_CLIMB_RATE_MPS = 0.15 # how slowly to move between ring heights DEFAULT_YAW_RATE_DPS = 20.0 # rotation speed during each discrete yaw step MIN_HEIGHT_M = 0.2 # Flow v2 deck altitude-hold bounds MAX_HEIGHT_M = 2.5 MIN_SAFE_VBAT = 3.10 # refuse to arm below this pack voltage # --- hardened rotating-scan safety envelope ------------------------------------ # The scan never spins continuously. It uses the firmware high-level commander # (which holds position autonomously) to take off, then rotates in discrete # "stop-and-stare" steps so the Flow-deck estimate re-converges between moves, # climbing in rings. Every parameter here errs toward caution. TAKEOFF_SETTLE_S = 2.5 # stabilise after takeoff before rotating YAW_STEP_DEG = 30.0 # rotation increment per stop-and-stare step YAW_SETTLE_S = 1.2 # pause at each bearing (also a clean static read) MAX_YAW_RATE_DPS = 30.0 # hard cap on per-step rotation speed RING_STEP_M = 0.3 # vertical spacing between sweep rings CEILING_MARGIN_M = 0.4 # stay at least this far below the sensed ceiling MIN_CLEARANCE_M = 0.25 # abort if any horizontal wall is closer than this DIVERGE_SPEED_MPS = 0.5 # raw horizontal estimate speed above this = abort HLC_PARAM = 'commander.enHighLevel' # Multi-ranger ToF validity. The VL53L1x sensors read out to ~4 m; a reading at # or beyond this (or zero) means "nothing in range", not an obstacle. RANGE_VALID_MAX_MM = 4000.0 # --- drift correction (zero-velocity update / ZUPT) ---------------------------- # A Flow v2 deck never produces a perfectly stable estimate: optical-flow noise # integrates into position and gyro bias into yaw, so a stationary drone slowly # "drifts". We detect rest from the *raw* sensors (gyro rate + optical-flow # counts, which don't depend on the drifting estimate) and, while at rest, pin # the corrected pose - folding the wander into an offset instead of the map. GYRO_STILL_DPS = 3.0 # max |gyro| (deg/s) to count as not rotating FLOW_STILL_COUNTS = 8.0 # max |flow delta| (counts) to count as not translating STILL_EMA = 0.6 # smoothing applied to the motion magnitudes # Minimum motion before another batch of map points is recorded - de-dupes the # cloud and keeps at-rest sensor noise out of it entirely. Height change is # included so a pure vertical sweep (ascending without rotating) still records. MIN_MOVE_M = 0.02 MIN_RISE_M = 0.05 MIN_TURN_DEG = 2.0 # Body-frame unit directions of the four horizontal ToF sensors (x fwd, y left). _HORIZONTAL_SENSORS = ( ('front', (1.0, 0.0)), ('back', (-1.0, 0.0)), ('left', (0.0, 1.0)), ('right', (0.0, -1.0)), ) # supervisor.info bitfield (see crazyflie-firmware src/modules/src/supervisor.c). SUP_CAN_ARM = 0x0001 SUP_IS_ARMED = 0x0002 SUP_AUTO_ARM = 0x0004 SUP_CAN_FLY = 0x0008 SUP_IS_FLYING = 0x0010 SUP_IS_TUMBLED = 0x0020 SUP_IS_LOCKED = 0x0040 SUP_IS_CRASHED = 0x0080 SUP_DECK_FAULT = 0x0800 # a deck reported a hardware fault def clamp_height(height: float) -> float: """Clamp a requested height to the safe Flow-deck range.""" return max(MIN_HEIGHT_M, min(MAX_HEIGHT_M, float(height))) @dataclass class ScanParams: """Latched-at-start parameters for one ascending, rotating scan.""" start_height: float = DEFAULT_START_HEIGHT_M top_height: float = DEFAULT_TOP_HEIGHT_M climb_rate: float = DEFAULT_CLIMB_RATE_MPS yaw_rate: float = DEFAULT_YAW_RATE_DPS class State(Enum): """High-level mission state, surfaced to the UI.""" DISCONNECTED = auto() CONNECTING = auto() READY = auto() # connected, on the ground, disarmed ARMING = auto() # transient: estimator reset + arming request ARMED = auto() # armed and idle on the ground, ready to SCAN SCANNING = auto() # airborne: ascending and rotating, building the map LANDING = auto() # controlled descent in progress EMERGENCY = auto() # motors cut; reboot required to fly again ERROR = auto() # connection or radio failure @dataclass class Telemetry: """Latest known drone state. Plain values; safe to copy across threads. The five ToF distances are in metres, or None when the sensor reads out-of-range (nothing close enough to map). """ vbat: float = 0.0 # Estimator pose (world frame, origin at lift-off). x: float = 0.0 y: float = 0.0 z: float = 0.0 yaw: float = 0.0 # degrees # Multi-ranger distances, metres or None. front: float | None = None back: float | None = None left: float | None = None right: float | None = None up: float | None = None down: float | None = None # Drift-correction state. still: bool = False # estimator-independent "at rest" detection drift_corrected: bool = True # ZUPT hold currently enabled dry_run: bool = False # props-off dry run (no motors) raw_x: float = 0.0 # uncorrected estimate, for comparison raw_y: float = 0.0 # Deck presence (None = not yet probed). multiranger: bool | None = None flow_deck: bool | None = None # Supervisor flags. can_arm: bool = False armed: bool = False flying: bool = False tumbled: bool = False locked: bool = False crashed: bool = False deck_fault: bool = False @dataclass class Update: """Immutable snapshot handed to the UI each frame.""" state: State message: str telemetry: Telemetry class RangeController: """Threaded Crazyflie controller that flies a scan and maps the surroundings. Usage: c = RangeController(uri, listener=on_update) c.start() # init radio drivers + spawn worker c.connect() # open link c.arm() # reset estimator + arm; idle armed on the ground c.scan() # lift off, ascend slowly while rotating c.stop() # land (from anywhere airborne) or disarm on ground c.emergency_stop() # cut motors immediately c.shutdown() # land if needed, close link, stop worker The accumulated map is read separately via map_snapshot(). """ def __init__(self, uri: str = DEFAULT_URI, listener=None, cache_dir: str = './cache'): self._uri = uri self._listener = listener or (lambda update: None) self._cache_dir = cache_dir self._cmd_q: queue.Queue[str] = queue.Queue() self._scan_params = ScanParams() self._state = State.DISCONNECTED self._message = 'Disconnected' self._telemetry = Telemetry() self._latest = Update(State.DISCONNECTED, 'Disconnected', Telemetry()) self._scf: SyncCrazyflie | None = None self._console_buf = '' self._log_configs: list = [] self._proto_v7 = False self._session_result = '' # Map accumulation is independent of flight: whenever this is set and # pose+range are streaming, hits are projected into the map. It defaults # on so handheld mapping works the moment you connect (hold the drone and # walk it around); the automated scan also turns it on. A plain bool is # GIL-atomic, so the UI flips it directly without going through the queue. self._recording = True self._lock = threading.Lock() # guards _scf access from other threads # --- drift correction (ZUPT) state, touched only on the cflib thread --- self._drift_correction = True self._raw_x = self._raw_y = self._raw_z = self._raw_yaw = 0.0 self._off_x = self._off_y = self._off_yaw = 0.0 # corrected = raw - off self._hold_x = self._hold_y = self._hold_yaw = 0.0 # frozen-while-still pose self._gyro_ema = 0.0 self._flow_ema = 0.0 self._still = False # Raw horizontal estimate speed, used to detect a diverging/flyaway # estimate during the scan (the drone is always commanded to hold (0,0), # so any sustained horizontal speed means the estimate is running off). self._est_speed = 0.0 self._last_pose_t: float | None = None self._last_raw_xy: tuple[float, float] | None = None # Props-off dry run: scan logic runs, no motors arm, no flight commands. self._dry_run = False # Accumulated obstacle point cloud (world frame), guarded by _map_lock. # Parallel deques so it drops straight into a DearPyGui scatter series. # This is the 2-D top-down slice (horizontal sensors only); the full 3-D # cloud (with height + ceiling/floor) is streamed to CloudView instead. self._map_lock = threading.Lock() self._map_x: collections.deque = collections.deque(maxlen=20000) self._map_y: collections.deque = collections.deque(maxlen=20000) self._map_version = 0 self._last_acc: tuple[float, float, float, float] | None = None # Optional 3-D point-cloud stream to a CloudView viewer (best-effort). self._publisher = CloudPublisher(source='rangeview', name='Crazyflie RangeView') self._thread = threading.Thread( target=self._run, name='range-worker', daemon=True) # ----------------------------------------------------------------- API --- @property def uri(self) -> str: return self._uri def set_listener(self, listener) -> None: self._listener = listener or (lambda update: None) def set_scan_params(self, start_height: float, top_height: float, climb_rate: float, yaw_rate: float) -> None: """Update the parameters used by the next scan() (ignored mid-scan).""" self._scan_params = ScanParams( start_height=clamp_height(start_height), top_height=clamp_height(top_height), climb_rate=max(0.02, float(climb_rate)), yaw_rate=float(yaw_rate)) def snapshot(self) -> Update: """Latest state/telemetry snapshot. Thread-safe and cheap; poll once a frame.""" return self._latest def map_snapshot(self) -> tuple[int, list[float], list[float]]: """Return (version, xs, ys) of the accumulated obstacle points. The version only changes when points are added or the map is cleared, so the UI can skip re-uploading an unchanged cloud. """ with self._map_lock: return self._map_version, list(self._map_x), list(self._map_y) def clear_map(self) -> None: with self._map_lock: self._map_x.clear() self._map_y.clear() self._map_version += 1 self._last_acc = None self._publisher.clear() logger.info('map cleared') @staticmethod def scan_uris() -> list[str]: """URIs of Crazyflies reachable on any installed interface.""" try: return [entry[0] for entry in cflib.crtp.scan_interfaces()] except Exception: logger.warning('scan failed', exc_info=True) return [] def start(self) -> None: _install_console_logging() cflib.crtp.init_drivers() self._thread.start() def connect(self, uri: str | None = None) -> None: if uri: self._uri = uri self._cmd_q.put('connect') def disconnect(self) -> None: self._cmd_q.put('disconnect') def set_recording(self, on: bool) -> None: """Enable/disable live map accumulation (independent of flight). On while you move the drone - by hand or under power - the map fills in.""" self._recording = bool(on) logger.info('map recording %s', 'ON' if on else 'OFF') @property def recording(self) -> bool: return self._recording def set_drift_correction(self, on: bool) -> None: """Enable/disable the zero-velocity hold that pins the pose while the drone is detected at rest (cancels stationary Flow-deck drift).""" self._drift_correction = bool(on) logger.info('drift correction %s', 'ON' if on else 'OFF') @property def drift_correction(self) -> bool: return self._drift_correction def set_streaming(self, enabled: bool, address: str | None = None) -> None: """Enable/disable streaming the 3-D cloud to a CloudView viewer at `address` (e.g. tcp://127.0.0.1:9870 or unix:///tmp/cloud.sock).""" self._publisher.configure(enabled, address) @property def streaming_connected(self) -> bool: return self._publisher.connected def set_dry_run(self, on: bool) -> None: """Enable/disable props-off dry-run mode. In dry run the scan walks its full state machine and logs every takeoff/go-to/land/abort decision but never arms the motors or sends a flight command, so you can validate the sequence on the bench with zero flight risk.""" self._dry_run = bool(on) self._telemetry.dry_run = self._dry_run logger.info('DRY RUN %s', 'ON (no motors will arm)' if on else 'OFF') @property def dry_run(self) -> bool: return self._dry_run def reset_origin(self) -> None: """Reset the Kalman estimator so the world origin is the drone's current spot, and clear the map. Use before a handheld mapping run to set 'here' as (0, 0). Serviced on the ground (READY/ARMED), ignored while flying.""" self._cmd_q.put('reset') def arm(self) -> None: """Arm the motors and hold on the ground. The drone idles armed until scan() or stop(); firmware auto-disarms after ~30 s if it never flies.""" self._cmd_q.put('arm') def scan(self) -> None: """Lift off and run the ascending, rotating scan (from ARMED).""" self._cmd_q.put('scan') def stop(self) -> None: """Context-sensitive: land if airborne, else disarm on the ground.""" self._cmd_q.put('stop') def emergency_stop(self) -> None: # Queue it so the worker breaks out of any flight loop and updates state, # but ALSO fire immediately from the caller thread for least latency. self._cmd_q.put('estop') with self._lock: scf = self._scf if scf is not None: try: self._cut_motors(scf) except Exception: logger.exception('direct emergency stop failed') def shutdown(self) -> None: self._cmd_q.put('shutdown') def alive(self) -> bool: return self._thread.is_alive() def join(self, timeout: float | None = None) -> None: if self._thread.is_alive(): self._thread.join(timeout) # ----------------------------------------------------------- worker loop - def _run(self) -> None: while True: cmd = self._cmd_q.get() if cmd == 'shutdown': return if cmd == 'connect': self._session() if self._session_result == 'shutdown': return # any other command while disconnected is ignored def _session(self) -> None: """Open the link and service commands until disconnect/shutdown.""" self._session_result = '' self._set_state(State.CONNECTING, f'Connecting to {self._uri} ...') try: cf = Crazyflie(rw_cache=self._cache_dir) with SyncCrazyflie(self._uri, cf=cf) as scf: with self._lock: self._scf = scf self._proto_v7 = self._detect_protocol(scf) self._setup_logging(scf) self._probe_decks(scf) # The firmware estimator integrates Flow-deck drift the whole # time it's powered, so a freshly-connected drone often reports a # wildly wrong position (tens of metres). Zero it on connect so # mapping starts from a clean origin; ZUPT then keeps it there. self._set_state(State.CONNECTING, 'Resetting estimator ...') self._reset_estimator(scf) self._set_state(State.READY, 'Connected - origin reset, ready') try: self._command_loop(scf) finally: # Delete log blocks while the link is open so firmware stops # streaming them (else "no LogEntry to handle" warnings). self._teardown_logging() except Exception as exc: logger.exception('session error') self._set_state(State.ERROR, f'Connection error: {exc}') finally: with self._lock: self._scf = None if self._session_result != 'shutdown': self._set_state(State.DISCONNECTED, 'Disconnected') def _command_loop(self, scf: SyncCrazyflie) -> None: """Service commands on the ground (READY / ARMED); the airborne phase is delegated to _do_scan().""" while True: # The firmware may have auto-disarmed us (~30 s pre-flight timeout, a # bump/tumble, or low battery) while idling armed on the ground. if (self._state == State.ARMED and self._proto_v7 and not self._telemetry.armed): self._set_state(State.READY, 'Auto-disarmed (armed too long or unsafe)') try: cmd = self._cmd_q.get(timeout=0.2) except queue.Empty: continue if cmd == 'reset': if self._state in (State.READY, State.ARMED, State.EMERGENCY): self._set_state(self._state, 'Resetting estimator / origin ...') self._reset_estimator(scf) self._set_state(self._state, 'Origin reset to current spot') elif cmd == 'arm': if self._state in (State.READY, State.EMERGENCY): self._do_arm(scf) elif cmd == 'scan': if self._state == State.ARMED: follow_up = self._do_scan(scf) if follow_up in ('disconnect', 'shutdown'): self._session_result = follow_up return elif cmd == 'stop': if self._state == State.ARMED: self._do_disarm_ground(scf) elif cmd == 'disconnect': if self._state == State.ARMED: self._do_disarm_ground(scf) self._session_result = 'disconnect' return elif cmd == 'shutdown': if self._state == State.ARMED: self._do_disarm_ground(scf) self._session_result = 'shutdown' return elif cmd == 'estop': self._do_estop(scf) # --------------------------------------------------------------- flight -- def _do_arm(self, scf: SyncCrazyflie) -> None: """Pre-flight checks + arm; on success enter ARMED (idle on the ground). Does NOT take off - that's _do_scan().""" cf = scf.cf if self._telemetry.flow_deck is False: self._set_state(State.READY, 'Arm refused: no Flow v2 deck detected') return ok, why = self._battery_ok() if not ok: self._set_state(State.READY, f'Arm refused: {why}') return if self._telemetry.tumbled: self._set_state(State.READY, 'Arm refused: drone is tumbled - set it level') return if self._telemetry.locked: self._set_state(State.READY, 'Arm refused: supervisor locked - reboot the drone') return if self._telemetry.multiranger is False: # Not fatal to flight, but the whole point is the map - warn loudly. logger.warning('no Multi-ranger deck detected - scan will collect no ' 'range data') self._set_state(State.ARMING, 'Resetting position estimator ...') self._reset_estimator(scf) if self._dry_run: # Props-off: run the pre-flight checks and estimator reset for # realism, but never actually arm. SCAN then walks the sequence. self._set_state(State.ARMED, 'DRY RUN armed (simulated - motors NOT armed)') return if self._proto_v7: # Don't gate on canArm: stock firmware auto-arms the instant # pre-flight checks pass, so canArm is only ever true for one tick. # The real signal is the armed bit; requesting arming is a harmless # no-op if already auto-armed. cf.supervisor.send_arming_request(True) if not self._wait_until(lambda: self._telemetry.armed, 5.0): self._set_state(State.READY, f'Arm failed: {self._arm_failure_reason()}') return else: # Legacy firmware: a zero setpoint unlocks the thrust-safety latch. cf.commander.send_setpoint(0, 0, 0, 0) self._set_state(State.ARMED, 'Armed - ready to SCAN (auto-disarms in ' '~30 s if idle)') def _do_disarm_ground(self, scf: SyncCrazyflie) -> None: """Disarm without flying (back out of ARMED).""" if self._proto_v7 and not self._dry_run: scf.cf.supervisor.send_arming_request(False) self._set_state(State.READY, 'Disarmed') def _do_scan(self, scf: SyncCrazyflie) -> str | None: """Fly the hardened, stop-and-stare rotating scan and build the map. Sequence: high-level-commander takeoff -> confirm a stable hover -> for each ring height, rotate one full turn in discrete YAW_STEP_DEG steps (pausing to let the estimate settle and the sensors take a clean static reading at each bearing) -> climb to the next ring -> land. The drone is always commanded to hold the takeoff point (0, 0); it never translates. Safety: aborts to a controlled landing on estimate divergence, wall proximity, or loss of the flying state, and to an emergency stop on a tumble/crash. Returns 'disconnect'/'shutdown' if such a command arrived mid-flight (caller tears down after landing), else None. WARNING: rotation on a Flow deck is inherently unstable. This is experimental - bench-test with props off, then tether, before free flight near obstacles. """ cf = scf.cf p = self._scan_params dry = self._dry_run if not self._proto_v7 and not dry: self._set_state(State.ARMED, 'Scan needs current firmware ' '(supervisor + high-level commander)') return None if not self._telemetry.armed and not dry: self._set_state(State.READY, 'Scan aborted: no longer armed') return None if dry: logger.info('[DRY RUN] scan walkthrough - no motors, no flight ' 'commands; live sensors still drive the safety guards') else: logger.warning('EXPERIMENTAL rotating scan starting - keep the area ' 'clear; EMERGENCY STOP (SPACE) is always available') try: cf.param.set_value(HLC_PARAM, '1') # ensure HLC is enabled except Exception: logger.exception('could not enable the high-level commander') self._recording = True hlc = cf.high_level_commander tag = '[DRY RUN] ' if dry else '' start = clamp_height(p.start_height) top = clamp_height(max(p.top_height, start)) climb = max(0.05, p.climb_rate) step_rate = max(5.0, min(p.yaw_rate, MAX_YAW_RATE_DPS)) # --- takeoff (firmware holds position autonomously) --------------- ascent = max(2.0, start / climb) self._set_state(State.SCANNING, f'{tag}Taking off to {start:.2f} m ...') self._fly(lambda: hlc.takeoff(start, ascent), f'takeoff to {start:.2f} m over {ascent:.1f}s') res = self._scan_wait(scf, ascent + TAKEOFF_SETTLE_S) if res != 'ok': return self._finish_scan(scf, res) # Don't rotate until the estimate is actually settled and we're flying. # (In dry run nothing is airborne, so skip the airborne-stability gate.) if not dry and not self._wait_until( lambda: self._est_speed < DIVERGE_SPEED_MPS and self._telemetry.flying, 3.0): logger.warning('estimate unsettled after takeoff - landing') return self._finish_scan(scf, 'abort') held_z = start heading = 0.0 # degrees # Ring heights from start up to top, RING_STEP_M apart. rings: list[float] = [] z = start while z < top - 1e-3: rings.append(z) z += RING_STEP_M rings.append(top) for ring_z in rings: target = self._ceiling_capped(ring_z) if target < held_z - 0.03: logger.warning('ceiling within margin - not climbing past ' '%.2f m', held_z) target = held_z # Climb/descend to the ring height, holding heading. if abs(target - held_z) > 0.03: dur = max(1.5, abs(target - held_z) / climb) self._set_state(State.SCANNING, f'{tag}Moving to {target:.2f} m ...') self._fly( lambda t=target, d=dur: hlc.go_to(0.0, 0.0, t, math.radians(heading), d), f'go_to z={target:.2f} m, yaw={heading:+.0f} deg over {dur:.1f}s') res = self._scan_wait(scf, dur + 0.5) if res != 'ok': return self._finish_scan(scf, res) held_z = target # One full revolution, stop-and-stare. steps = max(1, round(360.0 / YAW_STEP_DEG)) self._set_state(State.SCANNING, f'{tag}Sweeping at {held_z:.2f} m ({len(rings)} rings)') for _ in range(steps): heading = _wrap_deg(heading + YAW_STEP_DEG) dur = max(1.5, YAW_STEP_DEG / step_rate) self._fly( lambda h=heading, d=dur: hlc.go_to(0.0, 0.0, held_z, math.radians(h), d), f'go_to bearing {heading:+.0f} deg at {held_z:.2f} m ' f'over {dur:.1f}s') # The settle pause is also when the sensors sit still and read # cleanly at this bearing. res = self._scan_wait(scf, dur + YAW_SETTLE_S) if res != 'ok': return self._finish_scan(scf, res) return self._finish_scan(scf, 'done') def _fly(self, action, description: str) -> None: """Execute a flight command, or just log it in dry-run mode.""" if self._dry_run: logger.info('[DRY RUN] %s', description) return action() # ---- scan safety helpers ----------------------------------------------- def _scan_guard(self, scf: SyncCrazyflie) -> str | None: """Check abort conditions once. Returns an action string or None. 'estop' -> cut motors (tumble/crash or user) 'abort' -> controlled land (divergence/proximity) 'stop'/'disconnect'/'shutdown' -> user command (controlled land) """ cmd = self._poll_cmd() if cmd == 'estop': return 'estop' if cmd in ('stop', 'disconnect', 'shutdown'): return cmd t = self._telemetry if t.tumbled or t.crashed: logger.warning('tumble/crash detected - emergency stop') return 'estop' if self._est_speed > DIVERGE_SPEED_MPS: logger.warning('estimate diverging (%.2f m/s while holding) - ' 'aborting to land', self._est_speed) return 'abort' near = [d for d in (t.front, t.back, t.left, t.right) if d is not None] if near and min(near) < MIN_CLEARANCE_M: logger.warning('obstacle within %.2f m - aborting to land', min(near)) return 'abort' return None def _scan_wait(self, scf: SyncCrazyflie, duration: float) -> str: """Sleep up to `duration` while polling the safety guard. Returns 'ok' if the interval elapsed cleanly, else the guard's action string.""" if self._dry_run: # No real motion to wait for; just briefly poll the guards so live # sensor readings can still trigger (and be seen) in the walkthrough. duration = min(duration, 0.4) end = time.monotonic() + duration while time.monotonic() < end: action = self._scan_guard(scf) if action: return action time.sleep(0.05) return 'ok' def _finish_scan(self, scf: SyncCrazyflie, reason: str) -> str | None: """Resolve a scan exit: emergency stop, or a controlled landing.""" if reason == 'estop': self._do_estop(scf) return None self._land_hlc(scf) if reason in ('disconnect', 'shutdown'): return reason if reason == 'abort': self._set_state(State.READY, 'Scan aborted for safety - landed') return None def _ceiling_capped(self, target_z: float) -> float: """Clamp a target height to stay CEILING_MARGIN_M below the sensed ceiling (from the up-facing range), if there is one.""" up = self._telemetry.up if up is None: return target_z ceiling = self._telemetry.z + up return min(target_z, max(MIN_HEIGHT_M, ceiling - CEILING_MARGIN_M)) def _land_hlc(self, scf: SyncCrazyflie) -> None: """Controlled high-level-commander landing, staying responsive to an emergency stop and to a tumble during descent.""" cf = scf.cf if self._dry_run: logger.info('[DRY RUN] land and disarm') self._set_state(State.READY, 'DRY RUN complete - landed (simulated)') return self._set_state(State.LANDING, 'Landing ...') z = max(self._telemetry.z, 0.1) dur = max(2.0, z / max(0.05, self._scan_params.climb_rate)) try: cf.high_level_commander.land(0.0, dur, yaw=None) except Exception: logger.exception('high-level land failed - cutting motors') self._do_estop(scf) return end = time.monotonic() + dur while time.monotonic() < end: cmd = self._poll_cmd() if cmd == 'estop' or self._telemetry.tumbled or self._telemetry.crashed: self._do_estop(scf) return time.sleep(0.05) try: cf.high_level_commander.stop() except Exception: pass if self._proto_v7: cf.supervisor.send_arming_request(False) self._set_state(State.READY, 'Landed - on the ground, disarmed') def _do_estop(self, scf: SyncCrazyflie) -> None: try: self._cut_motors(scf) except Exception: logger.exception('emergency stop failed') if self._dry_run: self._set_state(State.READY, 'DRY RUN: emergency stop (no motors)') else: self._set_state(State.EMERGENCY, 'EMERGENCY STOP - motors cut (reboot to fly again)') def _cut_motors(self, scf: SyncCrazyflie) -> None: """Cut motors immediately. Safe to call from any thread.""" if self._dry_run: # Nothing is ever armed in dry run; don't send a flight command. logger.info('[DRY RUN] emergency stop (no motors to cut)') return cf = scf.cf if self._proto_v7: cf.supervisor.send_emergency_stop() else: cf.commander.send_stop_setpoint() # ----------------------------------------------------- estimator helper -- def _reset_estimator(self, scf: SyncCrazyflie) -> None: cf = scf.cf cf.param.set_value('kalman.resetEstimation', '1') time.sleep(0.1) cf.param.set_value('kalman.resetEstimation', '0') self._wait_for_position_estimator(scf) # A fresh estimator means a fresh world origin; the old map no longer # lines up, so start it clean and drop any accumulated drift offset. self.clear_map() self._reset_drift() def _reset_drift(self) -> None: """Zero the ZUPT offsets and rest detector (called on an origin reset).""" self._off_x = self._off_y = self._off_yaw = 0.0 self._hold_x = self._hold_y = self._hold_yaw = 0.0 self._gyro_ema = self._flow_ema = 0.0 self._still = False self._last_acc = None def _wait_for_position_estimator(self, scf: SyncCrazyflie, threshold: float = 0.001, timeout: float = 10.0) -> None: """Block until the Kalman position variance converges (or times out).""" log_conf = LogConfig(name='kalman_var', period_in_ms=100) log_conf.add_variable('kalman.varPX', 'float') log_conf.add_variable('kalman.varPY', 'float') log_conf.add_variable('kalman.varPZ', 'float') history = {'x': [1000.0] * 10, 'y': [1000.0] * 10, 'z': [1000.0] * 10} deadline = time.monotonic() + timeout try: with SyncLogger(scf, log_conf) as sync_log: for _, data, _ in sync_log: for key, var in (('x', 'kalman.varPX'), ('y', 'kalman.varPY'), ('z', 'kalman.varPZ')): history[key].append(data[var]) history[key].pop(0) spread = max(max(h) - min(h) for h in history.values()) if spread < threshold: return if time.monotonic() > deadline: logger.warning('estimator did not converge in %.0fs ' '(spread=%.4f); proceeding', timeout, spread) return except Exception: logger.exception('estimator convergence wait failed') # ---------------------------------------------------------- log / param -- def _detect_protocol(self, scf: SyncCrazyflie) -> bool: try: return scf.cf.platform.get_protocol_version() >= 7 except Exception: logger.exception('could not read protocol version') return False def _setup_logging(self, scf: SyncCrazyflie) -> None: self._log_configs = [] self._console_buf = '' scf.cf.console.receivedChar.add_callback(self._on_console) # Pose block: estimator x/y/z + heading. Kept separate from the range # block because all ten variables together exceed one CRTP log packet. pose = LogConfig(name='rv_pose', period_in_ms=100) pose.add_variable('stateEstimate.x', 'float') pose.add_variable('stateEstimate.y', 'float') pose.add_variable('stateEstimate.z', 'float') pose.add_variable('stabilizer.yaw', 'float') scf.cf.log.add_config(pose) pose.data_received_cb.add_callback(self._on_pose) pose.start() self._log_configs.append(pose) # Multi-ranger block: the five ToF sensors + the Flow deck's z-ranger. # Best-effort: without the deck these variables are absent and the whole # block is rejected, which is fine - the map just stays empty. try: rng = LogConfig(name='rv_range', period_in_ms=100) for var in ('range.front', 'range.back', 'range.left', 'range.right', 'range.up', 'range.zrange'): rng.add_variable(var, 'uint16_t') scf.cf.log.add_config(rng) rng.data_received_cb.add_callback(self._on_range) rng.start() self._log_configs.append(rng) except (KeyError, AttributeError) as exc: logger.warning('range logging unavailable (Multi-ranger deck?): %s', exc) # Raw-motion block for at-rest detection: gyro rates + optical-flow # counts. These come straight off the sensors, so (unlike the estimate) # they read ~0 when the drone is genuinely still. Best-effort: flow # counts need the Flow deck, without which ZUPT simply never engages. try: mot = LogConfig(name='rv_motion', period_in_ms=100) mot.add_variable('gyro.x', 'float') mot.add_variable('gyro.y', 'float') mot.add_variable('gyro.z', 'float') mot.add_variable('motion.deltaX', 'int16_t') mot.add_variable('motion.deltaY', 'int16_t') scf.cf.log.add_config(mot) mot.data_received_cb.add_callback(self._on_motion) mot.start() self._log_configs.append(mot) except (KeyError, AttributeError) as exc: logger.warning('motion logging unavailable (Flow deck?): %s - drift ' 'correction disabled', exc) # Battery + supervisor status. sup = LogConfig(name='rv_status', period_in_ms=200) sup.add_variable('pm.vbat', 'float') if self._proto_v7: sup.add_variable('supervisor.info', 'uint16_t') scf.cf.log.add_config(sup) sup.data_received_cb.add_callback(self._on_status) sup.start() self._log_configs.append(sup) def _teardown_logging(self) -> None: for conf in getattr(self, '_log_configs', []): try: conf.stop() conf.delete() except Exception: logger.exception('failed to delete log config %s', conf.name) self._log_configs = [] def _on_console(self, text: str) -> None: self._console_buf += text while '\n' in self._console_buf: line, self._console_buf = self._console_buf.split('\n', 1) line = line.rstrip('\r') if line: _CONSOLE_LOGGER.info(line) def _on_pose(self, _ts, data, _logconf) -> None: self._raw_x = data['stateEstimate.x'] self._raw_y = data['stateEstimate.y'] self._raw_z = data['stateEstimate.z'] self._raw_yaw = data['stabilizer.yaw'] # Track raw horizontal speed for divergence detection (smoothed). now = time.monotonic() if self._last_pose_t is not None and self._last_raw_xy is not None: dt = now - self._last_pose_t if dt > 1e-3: inst = math.hypot(self._raw_x - self._last_raw_xy[0], self._raw_y - self._last_raw_xy[1]) / dt self._est_speed = 0.5 * self._est_speed + 0.5 * inst self._last_pose_t = now self._last_raw_xy = (self._raw_x, self._raw_y) self._apply_correction() self._emit() def _on_motion(self, _ts, data, _logconf) -> None: """Update the at-rest detector from raw gyro + optical-flow counts.""" gmag = max(abs(data['gyro.x']), abs(data['gyro.y']), abs(data['gyro.z'])) fmag = max(abs(data['motion.deltaX']), abs(data['motion.deltaY'])) self._gyro_ema = STILL_EMA * self._gyro_ema + (1 - STILL_EMA) * gmag self._flow_ema = STILL_EMA * self._flow_ema + (1 - STILL_EMA) * fmag self._still = (self._gyro_ema < GYRO_STILL_DPS and self._flow_ema < FLOW_STILL_COUNTS) def _apply_correction(self) -> None: """Map the raw estimate to a drift-corrected pose and publish it. While at rest with correction on, the corrected pose is pinned to the spot it last held: the raw-minus-corrected offset is updated each sample so any estimator wander is cancelled. While moving, the offset is frozen so real motion passes straight through, and the current corrected pose is remembered as the point to freeze at the next rest. """ t = self._telemetry rx, ry, rz, ryaw = self._raw_x, self._raw_y, self._raw_z, self._raw_yaw pin = self._drift_correction and self._still if pin: self._off_x = rx - self._hold_x self._off_y = ry - self._hold_y self._off_yaw = _wrap_deg(ryaw - self._hold_yaw) cx = rx - self._off_x cy = ry - self._off_y cyaw = _wrap_deg(ryaw - self._off_yaw) if not pin: self._hold_x, self._hold_y, self._hold_yaw = cx, cy, cyaw t.x, t.y, t.z, t.yaw = cx, cy, rz, cyaw t.raw_x, t.raw_y = rx, ry t.still = self._still t.drift_corrected = self._drift_correction def _on_range(self, _ts, data, _logconf) -> None: t = self._telemetry t.front = _range_m(data['range.front']) t.back = _range_m(data['range.back']) t.left = _range_m(data['range.left']) t.right = _range_m(data['range.right']) t.up = _range_m(data['range.up']) t.down = _range_m(data['range.zrange']) # Project the horizontal hits into the world frame and add to the map. # Uses the most recent corrected pose (a separate, equally fast block); # the small skew is negligible against a slow ascent/rotation. Only add a # batch once the drone has actually moved, so at-rest noise stays out. if self._recording and self._moved_enough(t): self._accumulate(t) self._last_acc = (t.x, t.y, t.z, t.yaw) self._emit() def _moved_enough(self, t: Telemetry) -> bool: if self._last_acc is None: return True lx, ly, lz, lyaw = self._last_acc return (math.hypot(t.x - lx, t.y - ly) >= MIN_MOVE_M or abs(t.z - lz) >= MIN_RISE_M or abs(_wrap_deg(t.yaw - lyaw)) >= MIN_TURN_DEG) def _accumulate(self, t: Telemetry) -> None: yaw = math.radians(t.yaw) cos_y, sin_y = math.cos(yaw), math.sin(yaw) new_x: list[float] = [] # 2-D top-down slice (horizontal sensors) new_y: list[float] = [] cloud: list[tuple[float, float, float]] = [] # full 3-D batch to stream for name, (bx, by) in _HORIZONTAL_SENSORS: dist = getattr(t, name) if dist is None: continue # Rotate the body-frame sensor direction into the world frame. The # horizontal beams sit at the drone's current height. wx = cos_y * bx - sin_y * by wy = sin_y * bx + cos_y * by px, py = t.x + wx * dist, t.y + wy * dist new_x.append(px) new_y.append(py) cloud.append((px, py, t.z)) # Up/down sensors give ceiling/floor points (3-D only; they would just # clutter the 2-D map at the drone's own position). if t.up is not None: cloud.append((t.x, t.y, t.z + t.up)) if t.down is not None: cloud.append((t.x, t.y, t.z - t.down)) if new_x: with self._map_lock: self._map_x.extend(new_x) self._map_y.extend(new_y) self._map_version += 1 if cloud: self._publisher.add_points(cloud) def _on_status(self, _ts, data, _logconf) -> None: t = self._telemetry t.vbat = data['pm.vbat'] if 'supervisor.info' in data: info = data['supervisor.info'] t.can_arm = bool(info & SUP_CAN_ARM) t.armed = bool(info & SUP_IS_ARMED) t.flying = bool(info & SUP_IS_FLYING) t.tumbled = bool(info & SUP_IS_TUMBLED) t.locked = bool(info & SUP_IS_LOCKED) t.crashed = bool(info & SUP_IS_CRASHED) t.deck_fault = bool(info & SUP_DECK_FAULT) self._emit() def _probe_decks(self, scf: SyncCrazyflie) -> None: for param, attr in (('deck.bcFlow2', 'flow_deck'), ('deck.bcMultiranger', 'multiranger')): try: value = scf.cf.param.get_value(param, timeout=5) setattr(self._telemetry, attr, bool(int(value))) except Exception: logger.exception('could not read %s', param) setattr(self._telemetry, attr, None) self._emit() def _arm_failure_reason(self) -> str: t = self._telemetry if t.locked: return 'supervisor locked - reboot the drone' if t.tumbled: return 'drone is tumbled - set it level and still' if t.deck_fault: return 'a deck reported a hardware fault - reseat decks / reboot' if t.crashed: return 'supervisor in crashed state - flip level to recover' return ('pre-flight checks did not pass - place the drone level and ' 'stationary, then retry') # --------------------------------------------------------------- utils --- def _battery_ok(self) -> tuple[bool, str]: end = time.monotonic() + 1.5 while self._telemetry.vbat == 0.0 and time.monotonic() < end: time.sleep(0.05) v = self._telemetry.vbat if v and v < MIN_SAFE_VBAT: return False, f'battery low ({v:.2f} V, need {MIN_SAFE_VBAT:.2f} V)' return True, '' def _poll_cmd(self) -> str | None: try: return self._cmd_q.get_nowait() except queue.Empty: return None def _wait_until(self, predicate, timeout: float) -> bool: end = time.monotonic() + timeout while time.monotonic() < end: if predicate(): return True time.sleep(0.05) return False def _set_state(self, state: State, message: str) -> None: self._state = state self._message = message logger.info('[%s] %s', state.name, message) self._emit() def _emit(self) -> None: snapshot = Update( state=self._state, message=self._message, telemetry=dataclasses.replace(self._telemetry), ) self._latest = snapshot # atomic publish for snapshot() pollers try: self._listener(snapshot) except Exception: logger.exception('listener raised') def _wrap_deg(angle: float) -> float: """Wrap an angle in degrees to (-180, 180].""" return (angle + 180.0) % 360.0 - 180.0 def _range_m(raw_mm) -> float | None: """Convert a raw Multi-ranger reading (mm) to metres, or None if out of range (zero, or at/over the sensor's ~4 m ceiling = nothing to map).""" mm = float(raw_mm) if mm <= 0.0 or mm >= RANGE_VALID_MAX_MM: return None return mm / 1000.0