Files
cf-rangeview/rangeview/controller.py
K. Isom 356218e8a2 Initial commit: RangeView Crazyflie ToF mapping client
DearPyGui app that visualizes the Multi-ranger ToF sensors and builds a top-down map of the drone's surroundings. Handheld and experimental hardened-scan capture, Flow-deck drift correction (ZUPT), props-off dry-run mode, and optional 3D point-cloud streaming to CloudView.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-24 14:09:01 -07:00

1226 lines
51 KiB
Python

"""Flight + sensing controller for the Crazyflie range-mapping client.
Owns the Crazyflie radio link on a single dedicated worker thread and exposes a
small, thread-safe API to the UI (connect / disconnect / arm / scan / stop /
emergency_stop / shutdown). Keeping every cflib call on one thread means the UI
thread never blocks on radio I/O and there is exactly one place that can command
the motors.
The mission this controller flies is a *scan*: arm, lift off to a start height,
then ascend slowly while rotating in place so the Multi-ranger deck's five ToF
sensors sweep the drone's surroundings. Each range sample is combined with the
estimator pose to project an obstacle point into a world frame, building a
top-down point-cloud map of the immediate environment.
Flight uses the firmware's *hover setpoint* stream (velocity-x/y, yaw-rate,
absolute-z) together with the *supervisor* arming system. Position/altitude hold
comes from the Flow v2 deck; the worker thread streams setpoints at 10 Hz so a
slow or stalled UI cannot make the drone drift or fall.
Sensor geometry (Crazyflie body frame: +x forward, +y left, +z up):
front +x back -x left +y right -y
up +z (ceiling) down -z (floor, from the Flow deck's z-ranger)
The four horizontal sensors are what build the 2-D map; up/down are surfaced as
ceiling/floor distances.
"""
from __future__ import annotations
import collections
import dataclasses
import logging
import math
import queue
import threading
import time
from dataclasses import dataclass
from enum import Enum, auto
import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.log import LogConfig
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.crazyflie.syncLogger import SyncLogger
from cflib.utils import uri_helper
from .cloud_publisher import CloudPublisher, DEFAULT_ADDRESS
logger = logging.getLogger(__name__)
class _StaleLogFilter(logging.Filter):
"""Drop cflib's benign "no LogEntry to handle id=N" warnings.
Firmware streams log-data for a block id the host no longer tracks when a
stale block is left over from a prior/parallel session. We delete our blocks
on disconnect, but a handful of in-flight packets can still arrive right
after reconnecting; they're harmless (power-cycle to clear), so keep them out
of the console rather than alarming the user.
"""
def filter(self, record: logging.LogRecord) -> bool:
return 'no LogEntry to handle' not in record.getMessage()
class LogBuffer(logging.Handler):
"""A thread-safe ring buffer of formatted log lines for the UI console.
cflib logs from its own thread, so reads/writes are guarded by a lock. A
monotonic version counter lets the UI cheaply detect when there's something
new to render.
"""
def __init__(self, capacity: int = 500) -> None:
super().__init__()
self._lines: collections.deque = collections.deque(maxlen=capacity)
self._lock = threading.Lock()
self._version = 0
self.setFormatter(logging.Formatter(
'%(asctime)s %(levelname).1s %(name)s: %(message)s', '%H:%M:%S'))
def emit(self, record: logging.LogRecord) -> None:
try:
line = self.format(record)
except Exception:
return
with self._lock:
self._lines.append(line)
self._version += 1
def snapshot(self) -> tuple[int, list[str]]:
with self._lock:
return self._version, list(self._lines)
def clear(self) -> None:
with self._lock:
self._lines.clear()
self._version += 1
# Single shared console buffer, attached to the root logger once. Firmware
# console output is logged under its own name so it's easy to spot.
LOG_BUFFER = LogBuffer()
_CONSOLE_LOGGER = logging.getLogger('crazyflie.console')
_LOGGING_READY = False
def _install_console_logging() -> None:
"""Route cflib + app logs into LOG_BUFFER for the UI console (idempotent)."""
global _LOGGING_READY
if _LOGGING_READY:
return
logging.getLogger('cflib.crazyflie.log').addFilter(_StaleLogFilter())
root = logging.getLogger()
if root.level == logging.NOTSET or root.level > logging.INFO:
root.setLevel(logging.INFO)
LOG_BUFFER.setLevel(logging.INFO)
root.addHandler(LOG_BUFFER)
_LOGGING_READY = True
# Radio URI. Override with the CFLIB_URI environment variable.
DEFAULT_URI = uri_helper.uri_from_env(default='radio://0/80/2M/E7E7E7E7E7')
# ---------------------------------------------------------------- scan params -
# Defaults; all adjustable from the UI before a scan starts. Kept conservative
# because the rotating scan is the part that previously caused a flyaway.
DEFAULT_START_HEIGHT_M = 0.4 # lift-off / first sweep height
DEFAULT_TOP_HEIGHT_M = 1.2 # final height after the ringed ascent
DEFAULT_CLIMB_RATE_MPS = 0.15 # how slowly to move between ring heights
DEFAULT_YAW_RATE_DPS = 20.0 # rotation speed during each discrete yaw step
MIN_HEIGHT_M = 0.2 # Flow v2 deck altitude-hold bounds
MAX_HEIGHT_M = 2.5
MIN_SAFE_VBAT = 3.10 # refuse to arm below this pack voltage
# --- hardened rotating-scan safety envelope ------------------------------------
# The scan never spins continuously. It uses the firmware high-level commander
# (which holds position autonomously) to take off, then rotates in discrete
# "stop-and-stare" steps so the Flow-deck estimate re-converges between moves,
# climbing in rings. Every parameter here errs toward caution.
TAKEOFF_SETTLE_S = 2.5 # stabilise after takeoff before rotating
YAW_STEP_DEG = 30.0 # rotation increment per stop-and-stare step
YAW_SETTLE_S = 1.2 # pause at each bearing (also a clean static read)
MAX_YAW_RATE_DPS = 30.0 # hard cap on per-step rotation speed
RING_STEP_M = 0.3 # vertical spacing between sweep rings
CEILING_MARGIN_M = 0.4 # stay at least this far below the sensed ceiling
MIN_CLEARANCE_M = 0.25 # abort if any horizontal wall is closer than this
DIVERGE_SPEED_MPS = 0.5 # raw horizontal estimate speed above this = abort
HLC_PARAM = 'commander.enHighLevel'
# Multi-ranger ToF validity. The VL53L1x sensors read out to ~4 m; a reading at
# or beyond this (or zero) means "nothing in range", not an obstacle.
RANGE_VALID_MAX_MM = 4000.0
# --- drift correction (zero-velocity update / ZUPT) ----------------------------
# A Flow v2 deck never produces a perfectly stable estimate: optical-flow noise
# integrates into position and gyro bias into yaw, so a stationary drone slowly
# "drifts". We detect rest from the *raw* sensors (gyro rate + optical-flow
# counts, which don't depend on the drifting estimate) and, while at rest, pin
# the corrected pose - folding the wander into an offset instead of the map.
GYRO_STILL_DPS = 3.0 # max |gyro| (deg/s) to count as not rotating
FLOW_STILL_COUNTS = 8.0 # max |flow delta| (counts) to count as not translating
STILL_EMA = 0.6 # smoothing applied to the motion magnitudes
# Minimum motion before another batch of map points is recorded - de-dupes the
# cloud and keeps at-rest sensor noise out of it entirely. Height change is
# included so a pure vertical sweep (ascending without rotating) still records.
MIN_MOVE_M = 0.02
MIN_RISE_M = 0.05
MIN_TURN_DEG = 2.0
# Body-frame unit directions of the four horizontal ToF sensors (x fwd, y left).
_HORIZONTAL_SENSORS = (
('front', (1.0, 0.0)),
('back', (-1.0, 0.0)),
('left', (0.0, 1.0)),
('right', (0.0, -1.0)),
)
# supervisor.info bitfield (see crazyflie-firmware src/modules/src/supervisor.c).
SUP_CAN_ARM = 0x0001
SUP_IS_ARMED = 0x0002
SUP_AUTO_ARM = 0x0004
SUP_CAN_FLY = 0x0008
SUP_IS_FLYING = 0x0010
SUP_IS_TUMBLED = 0x0020
SUP_IS_LOCKED = 0x0040
SUP_IS_CRASHED = 0x0080
SUP_DECK_FAULT = 0x0800 # a deck reported a hardware fault
def clamp_height(height: float) -> float:
"""Clamp a requested height to the safe Flow-deck range."""
return max(MIN_HEIGHT_M, min(MAX_HEIGHT_M, float(height)))
@dataclass
class ScanParams:
"""Latched-at-start parameters for one ascending, rotating scan."""
start_height: float = DEFAULT_START_HEIGHT_M
top_height: float = DEFAULT_TOP_HEIGHT_M
climb_rate: float = DEFAULT_CLIMB_RATE_MPS
yaw_rate: float = DEFAULT_YAW_RATE_DPS
class State(Enum):
"""High-level mission state, surfaced to the UI."""
DISCONNECTED = auto()
CONNECTING = auto()
READY = auto() # connected, on the ground, disarmed
ARMING = auto() # transient: estimator reset + arming request
ARMED = auto() # armed and idle on the ground, ready to SCAN
SCANNING = auto() # airborne: ascending and rotating, building the map
LANDING = auto() # controlled descent in progress
EMERGENCY = auto() # motors cut; reboot required to fly again
ERROR = auto() # connection or radio failure
@dataclass
class Telemetry:
"""Latest known drone state. Plain values; safe to copy across threads.
The five ToF distances are in metres, or None when the sensor reads
out-of-range (nothing close enough to map).
"""
vbat: float = 0.0
# Estimator pose (world frame, origin at lift-off).
x: float = 0.0
y: float = 0.0
z: float = 0.0
yaw: float = 0.0 # degrees
# Multi-ranger distances, metres or None.
front: float | None = None
back: float | None = None
left: float | None = None
right: float | None = None
up: float | None = None
down: float | None = None
# Drift-correction state.
still: bool = False # estimator-independent "at rest" detection
drift_corrected: bool = True # ZUPT hold currently enabled
dry_run: bool = False # props-off dry run (no motors)
raw_x: float = 0.0 # uncorrected estimate, for comparison
raw_y: float = 0.0
# Deck presence (None = not yet probed).
multiranger: bool | None = None
flow_deck: bool | None = None
# Supervisor flags.
can_arm: bool = False
armed: bool = False
flying: bool = False
tumbled: bool = False
locked: bool = False
crashed: bool = False
deck_fault: bool = False
@dataclass
class Update:
"""Immutable snapshot handed to the UI each frame."""
state: State
message: str
telemetry: Telemetry
class RangeController:
"""Threaded Crazyflie controller that flies a scan and maps the surroundings.
Usage:
c = RangeController(uri, listener=on_update)
c.start() # init radio drivers + spawn worker
c.connect() # open link
c.arm() # reset estimator + arm; idle armed on the ground
c.scan() # lift off, ascend slowly while rotating
c.stop() # land (from anywhere airborne) or disarm on ground
c.emergency_stop() # cut motors immediately
c.shutdown() # land if needed, close link, stop worker
The accumulated map is read separately via map_snapshot().
"""
def __init__(self, uri: str = DEFAULT_URI, listener=None,
cache_dir: str = './cache'):
self._uri = uri
self._listener = listener or (lambda update: None)
self._cache_dir = cache_dir
self._cmd_q: queue.Queue[str] = queue.Queue()
self._scan_params = ScanParams()
self._state = State.DISCONNECTED
self._message = 'Disconnected'
self._telemetry = Telemetry()
self._latest = Update(State.DISCONNECTED, 'Disconnected', Telemetry())
self._scf: SyncCrazyflie | None = None
self._console_buf = ''
self._log_configs: list = []
self._proto_v7 = False
self._session_result = ''
# Map accumulation is independent of flight: whenever this is set and
# pose+range are streaming, hits are projected into the map. It defaults
# on so handheld mapping works the moment you connect (hold the drone and
# walk it around); the automated scan also turns it on. A plain bool is
# GIL-atomic, so the UI flips it directly without going through the queue.
self._recording = True
self._lock = threading.Lock() # guards _scf access from other threads
# --- drift correction (ZUPT) state, touched only on the cflib thread ---
self._drift_correction = True
self._raw_x = self._raw_y = self._raw_z = self._raw_yaw = 0.0
self._off_x = self._off_y = self._off_yaw = 0.0 # corrected = raw - off
self._hold_x = self._hold_y = self._hold_yaw = 0.0 # frozen-while-still pose
self._gyro_ema = 0.0
self._flow_ema = 0.0
self._still = False
# Raw horizontal estimate speed, used to detect a diverging/flyaway
# estimate during the scan (the drone is always commanded to hold (0,0),
# so any sustained horizontal speed means the estimate is running off).
self._est_speed = 0.0
self._last_pose_t: float | None = None
self._last_raw_xy: tuple[float, float] | None = None
# Props-off dry run: scan logic runs, no motors arm, no flight commands.
self._dry_run = False
# Accumulated obstacle point cloud (world frame), guarded by _map_lock.
# Parallel deques so it drops straight into a DearPyGui scatter series.
# This is the 2-D top-down slice (horizontal sensors only); the full 3-D
# cloud (with height + ceiling/floor) is streamed to CloudView instead.
self._map_lock = threading.Lock()
self._map_x: collections.deque = collections.deque(maxlen=20000)
self._map_y: collections.deque = collections.deque(maxlen=20000)
self._map_version = 0
self._last_acc: tuple[float, float, float, float] | None = None
# Optional 3-D point-cloud stream to a CloudView viewer (best-effort).
self._publisher = CloudPublisher(source='rangeview',
name='Crazyflie RangeView')
self._thread = threading.Thread(
target=self._run, name='range-worker', daemon=True)
# ----------------------------------------------------------------- API ---
@property
def uri(self) -> str:
return self._uri
def set_listener(self, listener) -> None:
self._listener = listener or (lambda update: None)
def set_scan_params(self, start_height: float, top_height: float,
climb_rate: float, yaw_rate: float) -> None:
"""Update the parameters used by the next scan() (ignored mid-scan)."""
self._scan_params = ScanParams(
start_height=clamp_height(start_height),
top_height=clamp_height(top_height),
climb_rate=max(0.02, float(climb_rate)),
yaw_rate=float(yaw_rate))
def snapshot(self) -> Update:
"""Latest state/telemetry snapshot. Thread-safe and cheap; poll once a
frame."""
return self._latest
def map_snapshot(self) -> tuple[int, list[float], list[float]]:
"""Return (version, xs, ys) of the accumulated obstacle points.
The version only changes when points are added or the map is cleared, so
the UI can skip re-uploading an unchanged cloud.
"""
with self._map_lock:
return self._map_version, list(self._map_x), list(self._map_y)
def clear_map(self) -> None:
with self._map_lock:
self._map_x.clear()
self._map_y.clear()
self._map_version += 1
self._last_acc = None
self._publisher.clear()
logger.info('map cleared')
@staticmethod
def scan_uris() -> list[str]:
"""URIs of Crazyflies reachable on any installed interface."""
try:
return [entry[0] for entry in cflib.crtp.scan_interfaces()]
except Exception:
logger.warning('scan failed', exc_info=True)
return []
def start(self) -> None:
_install_console_logging()
cflib.crtp.init_drivers()
self._thread.start()
def connect(self, uri: str | None = None) -> None:
if uri:
self._uri = uri
self._cmd_q.put('connect')
def disconnect(self) -> None:
self._cmd_q.put('disconnect')
def set_recording(self, on: bool) -> None:
"""Enable/disable live map accumulation (independent of flight). On while
you move the drone - by hand or under power - the map fills in."""
self._recording = bool(on)
logger.info('map recording %s', 'ON' if on else 'OFF')
@property
def recording(self) -> bool:
return self._recording
def set_drift_correction(self, on: bool) -> None:
"""Enable/disable the zero-velocity hold that pins the pose while the
drone is detected at rest (cancels stationary Flow-deck drift)."""
self._drift_correction = bool(on)
logger.info('drift correction %s', 'ON' if on else 'OFF')
@property
def drift_correction(self) -> bool:
return self._drift_correction
def set_streaming(self, enabled: bool, address: str | None = None) -> None:
"""Enable/disable streaming the 3-D cloud to a CloudView viewer at
`address` (e.g. tcp://127.0.0.1:9870 or unix:///tmp/cloud.sock)."""
self._publisher.configure(enabled, address)
@property
def streaming_connected(self) -> bool:
return self._publisher.connected
def set_dry_run(self, on: bool) -> None:
"""Enable/disable props-off dry-run mode. In dry run the scan walks its
full state machine and logs every takeoff/go-to/land/abort decision but
never arms the motors or sends a flight command, so you can validate the
sequence on the bench with zero flight risk."""
self._dry_run = bool(on)
self._telemetry.dry_run = self._dry_run
logger.info('DRY RUN %s', 'ON (no motors will arm)' if on else 'OFF')
@property
def dry_run(self) -> bool:
return self._dry_run
def reset_origin(self) -> None:
"""Reset the Kalman estimator so the world origin is the drone's current
spot, and clear the map. Use before a handheld mapping run to set 'here'
as (0, 0). Serviced on the ground (READY/ARMED), ignored while flying."""
self._cmd_q.put('reset')
def arm(self) -> None:
"""Arm the motors and hold on the ground. The drone idles armed until
scan() or stop(); firmware auto-disarms after ~30 s if it never flies."""
self._cmd_q.put('arm')
def scan(self) -> None:
"""Lift off and run the ascending, rotating scan (from ARMED)."""
self._cmd_q.put('scan')
def stop(self) -> None:
"""Context-sensitive: land if airborne, else disarm on the ground."""
self._cmd_q.put('stop')
def emergency_stop(self) -> None:
# Queue it so the worker breaks out of any flight loop and updates state,
# but ALSO fire immediately from the caller thread for least latency.
self._cmd_q.put('estop')
with self._lock:
scf = self._scf
if scf is not None:
try:
self._cut_motors(scf)
except Exception:
logger.exception('direct emergency stop failed')
def shutdown(self) -> None:
self._cmd_q.put('shutdown')
def alive(self) -> bool:
return self._thread.is_alive()
def join(self, timeout: float | None = None) -> None:
if self._thread.is_alive():
self._thread.join(timeout)
# ----------------------------------------------------------- worker loop -
def _run(self) -> None:
while True:
cmd = self._cmd_q.get()
if cmd == 'shutdown':
return
if cmd == 'connect':
self._session()
if self._session_result == 'shutdown':
return
# any other command while disconnected is ignored
def _session(self) -> None:
"""Open the link and service commands until disconnect/shutdown."""
self._session_result = ''
self._set_state(State.CONNECTING, f'Connecting to {self._uri} ...')
try:
cf = Crazyflie(rw_cache=self._cache_dir)
with SyncCrazyflie(self._uri, cf=cf) as scf:
with self._lock:
self._scf = scf
self._proto_v7 = self._detect_protocol(scf)
self._setup_logging(scf)
self._probe_decks(scf)
# The firmware estimator integrates Flow-deck drift the whole
# time it's powered, so a freshly-connected drone often reports a
# wildly wrong position (tens of metres). Zero it on connect so
# mapping starts from a clean origin; ZUPT then keeps it there.
self._set_state(State.CONNECTING, 'Resetting estimator ...')
self._reset_estimator(scf)
self._set_state(State.READY, 'Connected - origin reset, ready')
try:
self._command_loop(scf)
finally:
# Delete log blocks while the link is open so firmware stops
# streaming them (else "no LogEntry to handle" warnings).
self._teardown_logging()
except Exception as exc:
logger.exception('session error')
self._set_state(State.ERROR, f'Connection error: {exc}')
finally:
with self._lock:
self._scf = None
if self._session_result != 'shutdown':
self._set_state(State.DISCONNECTED, 'Disconnected')
def _command_loop(self, scf: SyncCrazyflie) -> None:
"""Service commands on the ground (READY / ARMED); the airborne phase is
delegated to _do_scan()."""
while True:
# The firmware may have auto-disarmed us (~30 s pre-flight timeout, a
# bump/tumble, or low battery) while idling armed on the ground.
if (self._state == State.ARMED and self._proto_v7
and not self._telemetry.armed):
self._set_state(State.READY,
'Auto-disarmed (armed too long or unsafe)')
try:
cmd = self._cmd_q.get(timeout=0.2)
except queue.Empty:
continue
if cmd == 'reset':
if self._state in (State.READY, State.ARMED, State.EMERGENCY):
self._set_state(self._state, 'Resetting estimator / origin ...')
self._reset_estimator(scf)
self._set_state(self._state, 'Origin reset to current spot')
elif cmd == 'arm':
if self._state in (State.READY, State.EMERGENCY):
self._do_arm(scf)
elif cmd == 'scan':
if self._state == State.ARMED:
follow_up = self._do_scan(scf)
if follow_up in ('disconnect', 'shutdown'):
self._session_result = follow_up
return
elif cmd == 'stop':
if self._state == State.ARMED:
self._do_disarm_ground(scf)
elif cmd == 'disconnect':
if self._state == State.ARMED:
self._do_disarm_ground(scf)
self._session_result = 'disconnect'
return
elif cmd == 'shutdown':
if self._state == State.ARMED:
self._do_disarm_ground(scf)
self._session_result = 'shutdown'
return
elif cmd == 'estop':
self._do_estop(scf)
# --------------------------------------------------------------- flight --
def _do_arm(self, scf: SyncCrazyflie) -> None:
"""Pre-flight checks + arm; on success enter ARMED (idle on the ground).
Does NOT take off - that's _do_scan()."""
cf = scf.cf
if self._telemetry.flow_deck is False:
self._set_state(State.READY, 'Arm refused: no Flow v2 deck detected')
return
ok, why = self._battery_ok()
if not ok:
self._set_state(State.READY, f'Arm refused: {why}')
return
if self._telemetry.tumbled:
self._set_state(State.READY,
'Arm refused: drone is tumbled - set it level')
return
if self._telemetry.locked:
self._set_state(State.READY,
'Arm refused: supervisor locked - reboot the drone')
return
if self._telemetry.multiranger is False:
# Not fatal to flight, but the whole point is the map - warn loudly.
logger.warning('no Multi-ranger deck detected - scan will collect no '
'range data')
self._set_state(State.ARMING, 'Resetting position estimator ...')
self._reset_estimator(scf)
if self._dry_run:
# Props-off: run the pre-flight checks and estimator reset for
# realism, but never actually arm. SCAN then walks the sequence.
self._set_state(State.ARMED,
'DRY RUN armed (simulated - motors NOT armed)')
return
if self._proto_v7:
# Don't gate on canArm: stock firmware auto-arms the instant
# pre-flight checks pass, so canArm is only ever true for one tick.
# The real signal is the armed bit; requesting arming is a harmless
# no-op if already auto-armed.
cf.supervisor.send_arming_request(True)
if not self._wait_until(lambda: self._telemetry.armed, 5.0):
self._set_state(State.READY,
f'Arm failed: {self._arm_failure_reason()}')
return
else:
# Legacy firmware: a zero setpoint unlocks the thrust-safety latch.
cf.commander.send_setpoint(0, 0, 0, 0)
self._set_state(State.ARMED, 'Armed - ready to SCAN (auto-disarms in '
'~30 s if idle)')
def _do_disarm_ground(self, scf: SyncCrazyflie) -> None:
"""Disarm without flying (back out of ARMED)."""
if self._proto_v7 and not self._dry_run:
scf.cf.supervisor.send_arming_request(False)
self._set_state(State.READY, 'Disarmed')
def _do_scan(self, scf: SyncCrazyflie) -> str | None:
"""Fly the hardened, stop-and-stare rotating scan and build the map.
Sequence: high-level-commander takeoff -> confirm a stable hover -> for
each ring height, rotate one full turn in discrete YAW_STEP_DEG steps
(pausing to let the estimate settle and the sensors take a clean static
reading at each bearing) -> climb to the next ring -> land. The drone is
always commanded to hold the takeoff point (0, 0); it never translates.
Safety: aborts to a controlled landing on estimate divergence, wall
proximity, or loss of the flying state, and to an emergency stop on a
tumble/crash. Returns 'disconnect'/'shutdown' if such a command arrived
mid-flight (caller tears down after landing), else None.
WARNING: rotation on a Flow deck is inherently unstable. This is
experimental - bench-test with props off, then tether, before free
flight near obstacles.
"""
cf = scf.cf
p = self._scan_params
dry = self._dry_run
if not self._proto_v7 and not dry:
self._set_state(State.ARMED, 'Scan needs current firmware '
'(supervisor + high-level commander)')
return None
if not self._telemetry.armed and not dry:
self._set_state(State.READY, 'Scan aborted: no longer armed')
return None
if dry:
logger.info('[DRY RUN] scan walkthrough - no motors, no flight '
'commands; live sensors still drive the safety guards')
else:
logger.warning('EXPERIMENTAL rotating scan starting - keep the area '
'clear; EMERGENCY STOP (SPACE) is always available')
try:
cf.param.set_value(HLC_PARAM, '1') # ensure HLC is enabled
except Exception:
logger.exception('could not enable the high-level commander')
self._recording = True
hlc = cf.high_level_commander
tag = '[DRY RUN] ' if dry else ''
start = clamp_height(p.start_height)
top = clamp_height(max(p.top_height, start))
climb = max(0.05, p.climb_rate)
step_rate = max(5.0, min(p.yaw_rate, MAX_YAW_RATE_DPS))
# --- takeoff (firmware holds position autonomously) ---------------
ascent = max(2.0, start / climb)
self._set_state(State.SCANNING, f'{tag}Taking off to {start:.2f} m ...')
self._fly(lambda: hlc.takeoff(start, ascent),
f'takeoff to {start:.2f} m over {ascent:.1f}s')
res = self._scan_wait(scf, ascent + TAKEOFF_SETTLE_S)
if res != 'ok':
return self._finish_scan(scf, res)
# Don't rotate until the estimate is actually settled and we're flying.
# (In dry run nothing is airborne, so skip the airborne-stability gate.)
if not dry and not self._wait_until(
lambda: self._est_speed < DIVERGE_SPEED_MPS
and self._telemetry.flying, 3.0):
logger.warning('estimate unsettled after takeoff - landing')
return self._finish_scan(scf, 'abort')
held_z = start
heading = 0.0 # degrees
# Ring heights from start up to top, RING_STEP_M apart.
rings: list[float] = []
z = start
while z < top - 1e-3:
rings.append(z)
z += RING_STEP_M
rings.append(top)
for ring_z in rings:
target = self._ceiling_capped(ring_z)
if target < held_z - 0.03:
logger.warning('ceiling within margin - not climbing past '
'%.2f m', held_z)
target = held_z
# Climb/descend to the ring height, holding heading.
if abs(target - held_z) > 0.03:
dur = max(1.5, abs(target - held_z) / climb)
self._set_state(State.SCANNING, f'{tag}Moving to {target:.2f} m ...')
self._fly(
lambda t=target, d=dur: hlc.go_to(0.0, 0.0, t,
math.radians(heading), d),
f'go_to z={target:.2f} m, yaw={heading:+.0f} deg over {dur:.1f}s')
res = self._scan_wait(scf, dur + 0.5)
if res != 'ok':
return self._finish_scan(scf, res)
held_z = target
# One full revolution, stop-and-stare.
steps = max(1, round(360.0 / YAW_STEP_DEG))
self._set_state(State.SCANNING,
f'{tag}Sweeping at {held_z:.2f} m ({len(rings)} rings)')
for _ in range(steps):
heading = _wrap_deg(heading + YAW_STEP_DEG)
dur = max(1.5, YAW_STEP_DEG / step_rate)
self._fly(
lambda h=heading, d=dur: hlc.go_to(0.0, 0.0, held_z,
math.radians(h), d),
f'go_to bearing {heading:+.0f} deg at {held_z:.2f} m '
f'over {dur:.1f}s')
# The settle pause is also when the sensors sit still and read
# cleanly at this bearing.
res = self._scan_wait(scf, dur + YAW_SETTLE_S)
if res != 'ok':
return self._finish_scan(scf, res)
return self._finish_scan(scf, 'done')
def _fly(self, action, description: str) -> None:
"""Execute a flight command, or just log it in dry-run mode."""
if self._dry_run:
logger.info('[DRY RUN] %s', description)
return
action()
# ---- scan safety helpers -----------------------------------------------
def _scan_guard(self, scf: SyncCrazyflie) -> str | None:
"""Check abort conditions once. Returns an action string or None.
'estop' -> cut motors (tumble/crash or user)
'abort' -> controlled land (divergence/proximity)
'stop'/'disconnect'/'shutdown' -> user command (controlled land)
"""
cmd = self._poll_cmd()
if cmd == 'estop':
return 'estop'
if cmd in ('stop', 'disconnect', 'shutdown'):
return cmd
t = self._telemetry
if t.tumbled or t.crashed:
logger.warning('tumble/crash detected - emergency stop')
return 'estop'
if self._est_speed > DIVERGE_SPEED_MPS:
logger.warning('estimate diverging (%.2f m/s while holding) - '
'aborting to land', self._est_speed)
return 'abort'
near = [d for d in (t.front, t.back, t.left, t.right) if d is not None]
if near and min(near) < MIN_CLEARANCE_M:
logger.warning('obstacle within %.2f m - aborting to land',
min(near))
return 'abort'
return None
def _scan_wait(self, scf: SyncCrazyflie, duration: float) -> str:
"""Sleep up to `duration` while polling the safety guard. Returns 'ok'
if the interval elapsed cleanly, else the guard's action string."""
if self._dry_run:
# No real motion to wait for; just briefly poll the guards so live
# sensor readings can still trigger (and be seen) in the walkthrough.
duration = min(duration, 0.4)
end = time.monotonic() + duration
while time.monotonic() < end:
action = self._scan_guard(scf)
if action:
return action
time.sleep(0.05)
return 'ok'
def _finish_scan(self, scf: SyncCrazyflie, reason: str) -> str | None:
"""Resolve a scan exit: emergency stop, or a controlled landing."""
if reason == 'estop':
self._do_estop(scf)
return None
self._land_hlc(scf)
if reason in ('disconnect', 'shutdown'):
return reason
if reason == 'abort':
self._set_state(State.READY, 'Scan aborted for safety - landed')
return None
def _ceiling_capped(self, target_z: float) -> float:
"""Clamp a target height to stay CEILING_MARGIN_M below the sensed
ceiling (from the up-facing range), if there is one."""
up = self._telemetry.up
if up is None:
return target_z
ceiling = self._telemetry.z + up
return min(target_z, max(MIN_HEIGHT_M, ceiling - CEILING_MARGIN_M))
def _land_hlc(self, scf: SyncCrazyflie) -> None:
"""Controlled high-level-commander landing, staying responsive to an
emergency stop and to a tumble during descent."""
cf = scf.cf
if self._dry_run:
logger.info('[DRY RUN] land and disarm')
self._set_state(State.READY, 'DRY RUN complete - landed (simulated)')
return
self._set_state(State.LANDING, 'Landing ...')
z = max(self._telemetry.z, 0.1)
dur = max(2.0, z / max(0.05, self._scan_params.climb_rate))
try:
cf.high_level_commander.land(0.0, dur, yaw=None)
except Exception:
logger.exception('high-level land failed - cutting motors')
self._do_estop(scf)
return
end = time.monotonic() + dur
while time.monotonic() < end:
cmd = self._poll_cmd()
if cmd == 'estop' or self._telemetry.tumbled or self._telemetry.crashed:
self._do_estop(scf)
return
time.sleep(0.05)
try:
cf.high_level_commander.stop()
except Exception:
pass
if self._proto_v7:
cf.supervisor.send_arming_request(False)
self._set_state(State.READY, 'Landed - on the ground, disarmed')
def _do_estop(self, scf: SyncCrazyflie) -> None:
try:
self._cut_motors(scf)
except Exception:
logger.exception('emergency stop failed')
if self._dry_run:
self._set_state(State.READY, 'DRY RUN: emergency stop (no motors)')
else:
self._set_state(State.EMERGENCY,
'EMERGENCY STOP - motors cut (reboot to fly again)')
def _cut_motors(self, scf: SyncCrazyflie) -> None:
"""Cut motors immediately. Safe to call from any thread."""
if self._dry_run:
# Nothing is ever armed in dry run; don't send a flight command.
logger.info('[DRY RUN] emergency stop (no motors to cut)')
return
cf = scf.cf
if self._proto_v7:
cf.supervisor.send_emergency_stop()
else:
cf.commander.send_stop_setpoint()
# ----------------------------------------------------- estimator helper --
def _reset_estimator(self, scf: SyncCrazyflie) -> None:
cf = scf.cf
cf.param.set_value('kalman.resetEstimation', '1')
time.sleep(0.1)
cf.param.set_value('kalman.resetEstimation', '0')
self._wait_for_position_estimator(scf)
# A fresh estimator means a fresh world origin; the old map no longer
# lines up, so start it clean and drop any accumulated drift offset.
self.clear_map()
self._reset_drift()
def _reset_drift(self) -> None:
"""Zero the ZUPT offsets and rest detector (called on an origin reset)."""
self._off_x = self._off_y = self._off_yaw = 0.0
self._hold_x = self._hold_y = self._hold_yaw = 0.0
self._gyro_ema = self._flow_ema = 0.0
self._still = False
self._last_acc = None
def _wait_for_position_estimator(self, scf: SyncCrazyflie,
threshold: float = 0.001,
timeout: float = 10.0) -> None:
"""Block until the Kalman position variance converges (or times out)."""
log_conf = LogConfig(name='kalman_var', period_in_ms=100)
log_conf.add_variable('kalman.varPX', 'float')
log_conf.add_variable('kalman.varPY', 'float')
log_conf.add_variable('kalman.varPZ', 'float')
history = {'x': [1000.0] * 10, 'y': [1000.0] * 10, 'z': [1000.0] * 10}
deadline = time.monotonic() + timeout
try:
with SyncLogger(scf, log_conf) as sync_log:
for _, data, _ in sync_log:
for key, var in (('x', 'kalman.varPX'),
('y', 'kalman.varPY'),
('z', 'kalman.varPZ')):
history[key].append(data[var])
history[key].pop(0)
spread = max(max(h) - min(h) for h in history.values())
if spread < threshold:
return
if time.monotonic() > deadline:
logger.warning('estimator did not converge in %.0fs '
'(spread=%.4f); proceeding', timeout,
spread)
return
except Exception:
logger.exception('estimator convergence wait failed')
# ---------------------------------------------------------- log / param --
def _detect_protocol(self, scf: SyncCrazyflie) -> bool:
try:
return scf.cf.platform.get_protocol_version() >= 7
except Exception:
logger.exception('could not read protocol version')
return False
def _setup_logging(self, scf: SyncCrazyflie) -> None:
self._log_configs = []
self._console_buf = ''
scf.cf.console.receivedChar.add_callback(self._on_console)
# Pose block: estimator x/y/z + heading. Kept separate from the range
# block because all ten variables together exceed one CRTP log packet.
pose = LogConfig(name='rv_pose', period_in_ms=100)
pose.add_variable('stateEstimate.x', 'float')
pose.add_variable('stateEstimate.y', 'float')
pose.add_variable('stateEstimate.z', 'float')
pose.add_variable('stabilizer.yaw', 'float')
scf.cf.log.add_config(pose)
pose.data_received_cb.add_callback(self._on_pose)
pose.start()
self._log_configs.append(pose)
# Multi-ranger block: the five ToF sensors + the Flow deck's z-ranger.
# Best-effort: without the deck these variables are absent and the whole
# block is rejected, which is fine - the map just stays empty.
try:
rng = LogConfig(name='rv_range', period_in_ms=100)
for var in ('range.front', 'range.back', 'range.left',
'range.right', 'range.up', 'range.zrange'):
rng.add_variable(var, 'uint16_t')
scf.cf.log.add_config(rng)
rng.data_received_cb.add_callback(self._on_range)
rng.start()
self._log_configs.append(rng)
except (KeyError, AttributeError) as exc:
logger.warning('range logging unavailable (Multi-ranger deck?): %s',
exc)
# Raw-motion block for at-rest detection: gyro rates + optical-flow
# counts. These come straight off the sensors, so (unlike the estimate)
# they read ~0 when the drone is genuinely still. Best-effort: flow
# counts need the Flow deck, without which ZUPT simply never engages.
try:
mot = LogConfig(name='rv_motion', period_in_ms=100)
mot.add_variable('gyro.x', 'float')
mot.add_variable('gyro.y', 'float')
mot.add_variable('gyro.z', 'float')
mot.add_variable('motion.deltaX', 'int16_t')
mot.add_variable('motion.deltaY', 'int16_t')
scf.cf.log.add_config(mot)
mot.data_received_cb.add_callback(self._on_motion)
mot.start()
self._log_configs.append(mot)
except (KeyError, AttributeError) as exc:
logger.warning('motion logging unavailable (Flow deck?): %s - drift '
'correction disabled', exc)
# Battery + supervisor status.
sup = LogConfig(name='rv_status', period_in_ms=200)
sup.add_variable('pm.vbat', 'float')
if self._proto_v7:
sup.add_variable('supervisor.info', 'uint16_t')
scf.cf.log.add_config(sup)
sup.data_received_cb.add_callback(self._on_status)
sup.start()
self._log_configs.append(sup)
def _teardown_logging(self) -> None:
for conf in getattr(self, '_log_configs', []):
try:
conf.stop()
conf.delete()
except Exception:
logger.exception('failed to delete log config %s', conf.name)
self._log_configs = []
def _on_console(self, text: str) -> None:
self._console_buf += text
while '\n' in self._console_buf:
line, self._console_buf = self._console_buf.split('\n', 1)
line = line.rstrip('\r')
if line:
_CONSOLE_LOGGER.info(line)
def _on_pose(self, _ts, data, _logconf) -> None:
self._raw_x = data['stateEstimate.x']
self._raw_y = data['stateEstimate.y']
self._raw_z = data['stateEstimate.z']
self._raw_yaw = data['stabilizer.yaw']
# Track raw horizontal speed for divergence detection (smoothed).
now = time.monotonic()
if self._last_pose_t is not None and self._last_raw_xy is not None:
dt = now - self._last_pose_t
if dt > 1e-3:
inst = math.hypot(self._raw_x - self._last_raw_xy[0],
self._raw_y - self._last_raw_xy[1]) / dt
self._est_speed = 0.5 * self._est_speed + 0.5 * inst
self._last_pose_t = now
self._last_raw_xy = (self._raw_x, self._raw_y)
self._apply_correction()
self._emit()
def _on_motion(self, _ts, data, _logconf) -> None:
"""Update the at-rest detector from raw gyro + optical-flow counts."""
gmag = max(abs(data['gyro.x']), abs(data['gyro.y']), abs(data['gyro.z']))
fmag = max(abs(data['motion.deltaX']), abs(data['motion.deltaY']))
self._gyro_ema = STILL_EMA * self._gyro_ema + (1 - STILL_EMA) * gmag
self._flow_ema = STILL_EMA * self._flow_ema + (1 - STILL_EMA) * fmag
self._still = (self._gyro_ema < GYRO_STILL_DPS
and self._flow_ema < FLOW_STILL_COUNTS)
def _apply_correction(self) -> None:
"""Map the raw estimate to a drift-corrected pose and publish it.
While at rest with correction on, the corrected pose is pinned to the
spot it last held: the raw-minus-corrected offset is updated each sample
so any estimator wander is cancelled. While moving, the offset is frozen
so real motion passes straight through, and the current corrected pose is
remembered as the point to freeze at the next rest.
"""
t = self._telemetry
rx, ry, rz, ryaw = self._raw_x, self._raw_y, self._raw_z, self._raw_yaw
pin = self._drift_correction and self._still
if pin:
self._off_x = rx - self._hold_x
self._off_y = ry - self._hold_y
self._off_yaw = _wrap_deg(ryaw - self._hold_yaw)
cx = rx - self._off_x
cy = ry - self._off_y
cyaw = _wrap_deg(ryaw - self._off_yaw)
if not pin:
self._hold_x, self._hold_y, self._hold_yaw = cx, cy, cyaw
t.x, t.y, t.z, t.yaw = cx, cy, rz, cyaw
t.raw_x, t.raw_y = rx, ry
t.still = self._still
t.drift_corrected = self._drift_correction
def _on_range(self, _ts, data, _logconf) -> None:
t = self._telemetry
t.front = _range_m(data['range.front'])
t.back = _range_m(data['range.back'])
t.left = _range_m(data['range.left'])
t.right = _range_m(data['range.right'])
t.up = _range_m(data['range.up'])
t.down = _range_m(data['range.zrange'])
# Project the horizontal hits into the world frame and add to the map.
# Uses the most recent corrected pose (a separate, equally fast block);
# the small skew is negligible against a slow ascent/rotation. Only add a
# batch once the drone has actually moved, so at-rest noise stays out.
if self._recording and self._moved_enough(t):
self._accumulate(t)
self._last_acc = (t.x, t.y, t.z, t.yaw)
self._emit()
def _moved_enough(self, t: Telemetry) -> bool:
if self._last_acc is None:
return True
lx, ly, lz, lyaw = self._last_acc
return (math.hypot(t.x - lx, t.y - ly) >= MIN_MOVE_M
or abs(t.z - lz) >= MIN_RISE_M
or abs(_wrap_deg(t.yaw - lyaw)) >= MIN_TURN_DEG)
def _accumulate(self, t: Telemetry) -> None:
yaw = math.radians(t.yaw)
cos_y, sin_y = math.cos(yaw), math.sin(yaw)
new_x: list[float] = [] # 2-D top-down slice (horizontal sensors)
new_y: list[float] = []
cloud: list[tuple[float, float, float]] = [] # full 3-D batch to stream
for name, (bx, by) in _HORIZONTAL_SENSORS:
dist = getattr(t, name)
if dist is None:
continue
# Rotate the body-frame sensor direction into the world frame. The
# horizontal beams sit at the drone's current height.
wx = cos_y * bx - sin_y * by
wy = sin_y * bx + cos_y * by
px, py = t.x + wx * dist, t.y + wy * dist
new_x.append(px)
new_y.append(py)
cloud.append((px, py, t.z))
# Up/down sensors give ceiling/floor points (3-D only; they would just
# clutter the 2-D map at the drone's own position).
if t.up is not None:
cloud.append((t.x, t.y, t.z + t.up))
if t.down is not None:
cloud.append((t.x, t.y, t.z - t.down))
if new_x:
with self._map_lock:
self._map_x.extend(new_x)
self._map_y.extend(new_y)
self._map_version += 1
if cloud:
self._publisher.add_points(cloud)
def _on_status(self, _ts, data, _logconf) -> None:
t = self._telemetry
t.vbat = data['pm.vbat']
if 'supervisor.info' in data:
info = data['supervisor.info']
t.can_arm = bool(info & SUP_CAN_ARM)
t.armed = bool(info & SUP_IS_ARMED)
t.flying = bool(info & SUP_IS_FLYING)
t.tumbled = bool(info & SUP_IS_TUMBLED)
t.locked = bool(info & SUP_IS_LOCKED)
t.crashed = bool(info & SUP_IS_CRASHED)
t.deck_fault = bool(info & SUP_DECK_FAULT)
self._emit()
def _probe_decks(self, scf: SyncCrazyflie) -> None:
for param, attr in (('deck.bcFlow2', 'flow_deck'),
('deck.bcMultiranger', 'multiranger')):
try:
value = scf.cf.param.get_value(param, timeout=5)
setattr(self._telemetry, attr, bool(int(value)))
except Exception:
logger.exception('could not read %s', param)
setattr(self._telemetry, attr, None)
self._emit()
def _arm_failure_reason(self) -> str:
t = self._telemetry
if t.locked:
return 'supervisor locked - reboot the drone'
if t.tumbled:
return 'drone is tumbled - set it level and still'
if t.deck_fault:
return 'a deck reported a hardware fault - reseat decks / reboot'
if t.crashed:
return 'supervisor in crashed state - flip level to recover'
return ('pre-flight checks did not pass - place the drone level and '
'stationary, then retry')
# --------------------------------------------------------------- utils ---
def _battery_ok(self) -> tuple[bool, str]:
end = time.monotonic() + 1.5
while self._telemetry.vbat == 0.0 and time.monotonic() < end:
time.sleep(0.05)
v = self._telemetry.vbat
if v and v < MIN_SAFE_VBAT:
return False, f'battery low ({v:.2f} V, need {MIN_SAFE_VBAT:.2f} V)'
return True, ''
def _poll_cmd(self) -> str | None:
try:
return self._cmd_q.get_nowait()
except queue.Empty:
return None
def _wait_until(self, predicate, timeout: float) -> bool:
end = time.monotonic() + timeout
while time.monotonic() < end:
if predicate():
return True
time.sleep(0.05)
return False
def _set_state(self, state: State, message: str) -> None:
self._state = state
self._message = message
logger.info('[%s] %s', state.name, message)
self._emit()
def _emit(self) -> None:
snapshot = Update(
state=self._state,
message=self._message,
telemetry=dataclasses.replace(self._telemetry),
)
self._latest = snapshot # atomic publish for snapshot() pollers
try:
self._listener(snapshot)
except Exception:
logger.exception('listener raised')
def _wrap_deg(angle: float) -> float:
"""Wrap an angle in degrees to (-180, 180]."""
return (angle + 180.0) % 360.0 - 180.0
def _range_m(raw_mm) -> float | None:
"""Convert a raw Multi-ranger reading (mm) to metres, or None if out of
range (zero, or at/over the sensor's ~4 m ceiling = nothing to map)."""
mm = float(raw_mm)
if mm <= 0.0 or mm >= RANGE_VALID_MAX_MM:
return None
return mm / 1000.0