DearPyGui app that visualizes the Multi-ranger ToF sensors and builds a top-down map of the drone's surroundings. Handheld and experimental hardened-scan capture, Flow-deck drift correction (ZUPT), props-off dry-run mode, and optional 3D point-cloud streaming to CloudView. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1226 lines
51 KiB
Python
1226 lines
51 KiB
Python
"""Flight + sensing controller for the Crazyflie range-mapping client.
|
|
|
|
Owns the Crazyflie radio link on a single dedicated worker thread and exposes a
|
|
small, thread-safe API to the UI (connect / disconnect / arm / scan / stop /
|
|
emergency_stop / shutdown). Keeping every cflib call on one thread means the UI
|
|
thread never blocks on radio I/O and there is exactly one place that can command
|
|
the motors.
|
|
|
|
The mission this controller flies is a *scan*: arm, lift off to a start height,
|
|
then ascend slowly while rotating in place so the Multi-ranger deck's five ToF
|
|
sensors sweep the drone's surroundings. Each range sample is combined with the
|
|
estimator pose to project an obstacle point into a world frame, building a
|
|
top-down point-cloud map of the immediate environment.
|
|
|
|
Flight uses the firmware's *hover setpoint* stream (velocity-x/y, yaw-rate,
|
|
absolute-z) together with the *supervisor* arming system. Position/altitude hold
|
|
comes from the Flow v2 deck; the worker thread streams setpoints at 10 Hz so a
|
|
slow or stalled UI cannot make the drone drift or fall.
|
|
|
|
Sensor geometry (Crazyflie body frame: +x forward, +y left, +z up):
|
|
|
|
front +x back -x left +y right -y
|
|
up +z (ceiling) down -z (floor, from the Flow deck's z-ranger)
|
|
|
|
The four horizontal sensors are what build the 2-D map; up/down are surfaced as
|
|
ceiling/floor distances.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import collections
|
|
import dataclasses
|
|
import logging
|
|
import math
|
|
import queue
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
from enum import Enum, auto
|
|
|
|
import cflib.crtp
|
|
from cflib.crazyflie import Crazyflie
|
|
from cflib.crazyflie.log import LogConfig
|
|
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
|
|
from cflib.crazyflie.syncLogger import SyncLogger
|
|
from cflib.utils import uri_helper
|
|
|
|
from .cloud_publisher import CloudPublisher, DEFAULT_ADDRESS
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class _StaleLogFilter(logging.Filter):
|
|
"""Drop cflib's benign "no LogEntry to handle id=N" warnings.
|
|
|
|
Firmware streams log-data for a block id the host no longer tracks when a
|
|
stale block is left over from a prior/parallel session. We delete our blocks
|
|
on disconnect, but a handful of in-flight packets can still arrive right
|
|
after reconnecting; they're harmless (power-cycle to clear), so keep them out
|
|
of the console rather than alarming the user.
|
|
"""
|
|
|
|
def filter(self, record: logging.LogRecord) -> bool:
|
|
return 'no LogEntry to handle' not in record.getMessage()
|
|
|
|
|
|
class LogBuffer(logging.Handler):
|
|
"""A thread-safe ring buffer of formatted log lines for the UI console.
|
|
|
|
cflib logs from its own thread, so reads/writes are guarded by a lock. A
|
|
monotonic version counter lets the UI cheaply detect when there's something
|
|
new to render.
|
|
"""
|
|
|
|
def __init__(self, capacity: int = 500) -> None:
|
|
super().__init__()
|
|
self._lines: collections.deque = collections.deque(maxlen=capacity)
|
|
self._lock = threading.Lock()
|
|
self._version = 0
|
|
self.setFormatter(logging.Formatter(
|
|
'%(asctime)s %(levelname).1s %(name)s: %(message)s', '%H:%M:%S'))
|
|
|
|
def emit(self, record: logging.LogRecord) -> None:
|
|
try:
|
|
line = self.format(record)
|
|
except Exception:
|
|
return
|
|
with self._lock:
|
|
self._lines.append(line)
|
|
self._version += 1
|
|
|
|
def snapshot(self) -> tuple[int, list[str]]:
|
|
with self._lock:
|
|
return self._version, list(self._lines)
|
|
|
|
def clear(self) -> None:
|
|
with self._lock:
|
|
self._lines.clear()
|
|
self._version += 1
|
|
|
|
|
|
# Single shared console buffer, attached to the root logger once. Firmware
|
|
# console output is logged under its own name so it's easy to spot.
|
|
LOG_BUFFER = LogBuffer()
|
|
_CONSOLE_LOGGER = logging.getLogger('crazyflie.console')
|
|
_LOGGING_READY = False
|
|
|
|
|
|
def _install_console_logging() -> None:
|
|
"""Route cflib + app logs into LOG_BUFFER for the UI console (idempotent)."""
|
|
global _LOGGING_READY
|
|
if _LOGGING_READY:
|
|
return
|
|
logging.getLogger('cflib.crazyflie.log').addFilter(_StaleLogFilter())
|
|
root = logging.getLogger()
|
|
if root.level == logging.NOTSET or root.level > logging.INFO:
|
|
root.setLevel(logging.INFO)
|
|
LOG_BUFFER.setLevel(logging.INFO)
|
|
root.addHandler(LOG_BUFFER)
|
|
_LOGGING_READY = True
|
|
|
|
|
|
# Radio URI. Override with the CFLIB_URI environment variable.
|
|
DEFAULT_URI = uri_helper.uri_from_env(default='radio://0/80/2M/E7E7E7E7E7')
|
|
|
|
# ---------------------------------------------------------------- scan params -
|
|
# Defaults; all adjustable from the UI before a scan starts. Kept conservative
|
|
# because the rotating scan is the part that previously caused a flyaway.
|
|
DEFAULT_START_HEIGHT_M = 0.4 # lift-off / first sweep height
|
|
DEFAULT_TOP_HEIGHT_M = 1.2 # final height after the ringed ascent
|
|
DEFAULT_CLIMB_RATE_MPS = 0.15 # how slowly to move between ring heights
|
|
DEFAULT_YAW_RATE_DPS = 20.0 # rotation speed during each discrete yaw step
|
|
MIN_HEIGHT_M = 0.2 # Flow v2 deck altitude-hold bounds
|
|
MAX_HEIGHT_M = 2.5
|
|
MIN_SAFE_VBAT = 3.10 # refuse to arm below this pack voltage
|
|
|
|
# --- hardened rotating-scan safety envelope ------------------------------------
|
|
# The scan never spins continuously. It uses the firmware high-level commander
|
|
# (which holds position autonomously) to take off, then rotates in discrete
|
|
# "stop-and-stare" steps so the Flow-deck estimate re-converges between moves,
|
|
# climbing in rings. Every parameter here errs toward caution.
|
|
TAKEOFF_SETTLE_S = 2.5 # stabilise after takeoff before rotating
|
|
YAW_STEP_DEG = 30.0 # rotation increment per stop-and-stare step
|
|
YAW_SETTLE_S = 1.2 # pause at each bearing (also a clean static read)
|
|
MAX_YAW_RATE_DPS = 30.0 # hard cap on per-step rotation speed
|
|
RING_STEP_M = 0.3 # vertical spacing between sweep rings
|
|
CEILING_MARGIN_M = 0.4 # stay at least this far below the sensed ceiling
|
|
MIN_CLEARANCE_M = 0.25 # abort if any horizontal wall is closer than this
|
|
DIVERGE_SPEED_MPS = 0.5 # raw horizontal estimate speed above this = abort
|
|
HLC_PARAM = 'commander.enHighLevel'
|
|
|
|
# Multi-ranger ToF validity. The VL53L1x sensors read out to ~4 m; a reading at
|
|
# or beyond this (or zero) means "nothing in range", not an obstacle.
|
|
RANGE_VALID_MAX_MM = 4000.0
|
|
|
|
# --- drift correction (zero-velocity update / ZUPT) ----------------------------
|
|
# A Flow v2 deck never produces a perfectly stable estimate: optical-flow noise
|
|
# integrates into position and gyro bias into yaw, so a stationary drone slowly
|
|
# "drifts". We detect rest from the *raw* sensors (gyro rate + optical-flow
|
|
# counts, which don't depend on the drifting estimate) and, while at rest, pin
|
|
# the corrected pose - folding the wander into an offset instead of the map.
|
|
GYRO_STILL_DPS = 3.0 # max |gyro| (deg/s) to count as not rotating
|
|
FLOW_STILL_COUNTS = 8.0 # max |flow delta| (counts) to count as not translating
|
|
STILL_EMA = 0.6 # smoothing applied to the motion magnitudes
|
|
# Minimum motion before another batch of map points is recorded - de-dupes the
|
|
# cloud and keeps at-rest sensor noise out of it entirely. Height change is
|
|
# included so a pure vertical sweep (ascending without rotating) still records.
|
|
MIN_MOVE_M = 0.02
|
|
MIN_RISE_M = 0.05
|
|
MIN_TURN_DEG = 2.0
|
|
|
|
# Body-frame unit directions of the four horizontal ToF sensors (x fwd, y left).
|
|
_HORIZONTAL_SENSORS = (
|
|
('front', (1.0, 0.0)),
|
|
('back', (-1.0, 0.0)),
|
|
('left', (0.0, 1.0)),
|
|
('right', (0.0, -1.0)),
|
|
)
|
|
|
|
# supervisor.info bitfield (see crazyflie-firmware src/modules/src/supervisor.c).
|
|
SUP_CAN_ARM = 0x0001
|
|
SUP_IS_ARMED = 0x0002
|
|
SUP_AUTO_ARM = 0x0004
|
|
SUP_CAN_FLY = 0x0008
|
|
SUP_IS_FLYING = 0x0010
|
|
SUP_IS_TUMBLED = 0x0020
|
|
SUP_IS_LOCKED = 0x0040
|
|
SUP_IS_CRASHED = 0x0080
|
|
SUP_DECK_FAULT = 0x0800 # a deck reported a hardware fault
|
|
|
|
|
|
def clamp_height(height: float) -> float:
|
|
"""Clamp a requested height to the safe Flow-deck range."""
|
|
return max(MIN_HEIGHT_M, min(MAX_HEIGHT_M, float(height)))
|
|
|
|
|
|
@dataclass
|
|
class ScanParams:
|
|
"""Latched-at-start parameters for one ascending, rotating scan."""
|
|
start_height: float = DEFAULT_START_HEIGHT_M
|
|
top_height: float = DEFAULT_TOP_HEIGHT_M
|
|
climb_rate: float = DEFAULT_CLIMB_RATE_MPS
|
|
yaw_rate: float = DEFAULT_YAW_RATE_DPS
|
|
|
|
|
|
class State(Enum):
|
|
"""High-level mission state, surfaced to the UI."""
|
|
DISCONNECTED = auto()
|
|
CONNECTING = auto()
|
|
READY = auto() # connected, on the ground, disarmed
|
|
ARMING = auto() # transient: estimator reset + arming request
|
|
ARMED = auto() # armed and idle on the ground, ready to SCAN
|
|
SCANNING = auto() # airborne: ascending and rotating, building the map
|
|
LANDING = auto() # controlled descent in progress
|
|
EMERGENCY = auto() # motors cut; reboot required to fly again
|
|
ERROR = auto() # connection or radio failure
|
|
|
|
|
|
@dataclass
|
|
class Telemetry:
|
|
"""Latest known drone state. Plain values; safe to copy across threads.
|
|
|
|
The five ToF distances are in metres, or None when the sensor reads
|
|
out-of-range (nothing close enough to map).
|
|
"""
|
|
vbat: float = 0.0
|
|
# Estimator pose (world frame, origin at lift-off).
|
|
x: float = 0.0
|
|
y: float = 0.0
|
|
z: float = 0.0
|
|
yaw: float = 0.0 # degrees
|
|
# Multi-ranger distances, metres or None.
|
|
front: float | None = None
|
|
back: float | None = None
|
|
left: float | None = None
|
|
right: float | None = None
|
|
up: float | None = None
|
|
down: float | None = None
|
|
# Drift-correction state.
|
|
still: bool = False # estimator-independent "at rest" detection
|
|
drift_corrected: bool = True # ZUPT hold currently enabled
|
|
dry_run: bool = False # props-off dry run (no motors)
|
|
raw_x: float = 0.0 # uncorrected estimate, for comparison
|
|
raw_y: float = 0.0
|
|
# Deck presence (None = not yet probed).
|
|
multiranger: bool | None = None
|
|
flow_deck: bool | None = None
|
|
# Supervisor flags.
|
|
can_arm: bool = False
|
|
armed: bool = False
|
|
flying: bool = False
|
|
tumbled: bool = False
|
|
locked: bool = False
|
|
crashed: bool = False
|
|
deck_fault: bool = False
|
|
|
|
|
|
@dataclass
|
|
class Update:
|
|
"""Immutable snapshot handed to the UI each frame."""
|
|
state: State
|
|
message: str
|
|
telemetry: Telemetry
|
|
|
|
|
|
class RangeController:
|
|
"""Threaded Crazyflie controller that flies a scan and maps the surroundings.
|
|
|
|
Usage:
|
|
c = RangeController(uri, listener=on_update)
|
|
c.start() # init radio drivers + spawn worker
|
|
c.connect() # open link
|
|
c.arm() # reset estimator + arm; idle armed on the ground
|
|
c.scan() # lift off, ascend slowly while rotating
|
|
c.stop() # land (from anywhere airborne) or disarm on ground
|
|
c.emergency_stop() # cut motors immediately
|
|
c.shutdown() # land if needed, close link, stop worker
|
|
|
|
The accumulated map is read separately via map_snapshot().
|
|
"""
|
|
|
|
def __init__(self, uri: str = DEFAULT_URI, listener=None,
|
|
cache_dir: str = './cache'):
|
|
self._uri = uri
|
|
self._listener = listener or (lambda update: None)
|
|
self._cache_dir = cache_dir
|
|
|
|
self._cmd_q: queue.Queue[str] = queue.Queue()
|
|
self._scan_params = ScanParams()
|
|
self._state = State.DISCONNECTED
|
|
self._message = 'Disconnected'
|
|
self._telemetry = Telemetry()
|
|
self._latest = Update(State.DISCONNECTED, 'Disconnected', Telemetry())
|
|
|
|
self._scf: SyncCrazyflie | None = None
|
|
self._console_buf = ''
|
|
self._log_configs: list = []
|
|
self._proto_v7 = False
|
|
self._session_result = ''
|
|
# Map accumulation is independent of flight: whenever this is set and
|
|
# pose+range are streaming, hits are projected into the map. It defaults
|
|
# on so handheld mapping works the moment you connect (hold the drone and
|
|
# walk it around); the automated scan also turns it on. A plain bool is
|
|
# GIL-atomic, so the UI flips it directly without going through the queue.
|
|
self._recording = True
|
|
self._lock = threading.Lock() # guards _scf access from other threads
|
|
|
|
# --- drift correction (ZUPT) state, touched only on the cflib thread ---
|
|
self._drift_correction = True
|
|
self._raw_x = self._raw_y = self._raw_z = self._raw_yaw = 0.0
|
|
self._off_x = self._off_y = self._off_yaw = 0.0 # corrected = raw - off
|
|
self._hold_x = self._hold_y = self._hold_yaw = 0.0 # frozen-while-still pose
|
|
self._gyro_ema = 0.0
|
|
self._flow_ema = 0.0
|
|
self._still = False
|
|
# Raw horizontal estimate speed, used to detect a diverging/flyaway
|
|
# estimate during the scan (the drone is always commanded to hold (0,0),
|
|
# so any sustained horizontal speed means the estimate is running off).
|
|
self._est_speed = 0.0
|
|
self._last_pose_t: float | None = None
|
|
self._last_raw_xy: tuple[float, float] | None = None
|
|
# Props-off dry run: scan logic runs, no motors arm, no flight commands.
|
|
self._dry_run = False
|
|
|
|
# Accumulated obstacle point cloud (world frame), guarded by _map_lock.
|
|
# Parallel deques so it drops straight into a DearPyGui scatter series.
|
|
# This is the 2-D top-down slice (horizontal sensors only); the full 3-D
|
|
# cloud (with height + ceiling/floor) is streamed to CloudView instead.
|
|
self._map_lock = threading.Lock()
|
|
self._map_x: collections.deque = collections.deque(maxlen=20000)
|
|
self._map_y: collections.deque = collections.deque(maxlen=20000)
|
|
self._map_version = 0
|
|
self._last_acc: tuple[float, float, float, float] | None = None
|
|
|
|
# Optional 3-D point-cloud stream to a CloudView viewer (best-effort).
|
|
self._publisher = CloudPublisher(source='rangeview',
|
|
name='Crazyflie RangeView')
|
|
|
|
self._thread = threading.Thread(
|
|
target=self._run, name='range-worker', daemon=True)
|
|
|
|
# ----------------------------------------------------------------- API ---
|
|
|
|
@property
|
|
def uri(self) -> str:
|
|
return self._uri
|
|
|
|
def set_listener(self, listener) -> None:
|
|
self._listener = listener or (lambda update: None)
|
|
|
|
def set_scan_params(self, start_height: float, top_height: float,
|
|
climb_rate: float, yaw_rate: float) -> None:
|
|
"""Update the parameters used by the next scan() (ignored mid-scan)."""
|
|
self._scan_params = ScanParams(
|
|
start_height=clamp_height(start_height),
|
|
top_height=clamp_height(top_height),
|
|
climb_rate=max(0.02, float(climb_rate)),
|
|
yaw_rate=float(yaw_rate))
|
|
|
|
def snapshot(self) -> Update:
|
|
"""Latest state/telemetry snapshot. Thread-safe and cheap; poll once a
|
|
frame."""
|
|
return self._latest
|
|
|
|
def map_snapshot(self) -> tuple[int, list[float], list[float]]:
|
|
"""Return (version, xs, ys) of the accumulated obstacle points.
|
|
|
|
The version only changes when points are added or the map is cleared, so
|
|
the UI can skip re-uploading an unchanged cloud.
|
|
"""
|
|
with self._map_lock:
|
|
return self._map_version, list(self._map_x), list(self._map_y)
|
|
|
|
def clear_map(self) -> None:
|
|
with self._map_lock:
|
|
self._map_x.clear()
|
|
self._map_y.clear()
|
|
self._map_version += 1
|
|
self._last_acc = None
|
|
self._publisher.clear()
|
|
logger.info('map cleared')
|
|
|
|
@staticmethod
|
|
def scan_uris() -> list[str]:
|
|
"""URIs of Crazyflies reachable on any installed interface."""
|
|
try:
|
|
return [entry[0] for entry in cflib.crtp.scan_interfaces()]
|
|
except Exception:
|
|
logger.warning('scan failed', exc_info=True)
|
|
return []
|
|
|
|
def start(self) -> None:
|
|
_install_console_logging()
|
|
cflib.crtp.init_drivers()
|
|
self._thread.start()
|
|
|
|
def connect(self, uri: str | None = None) -> None:
|
|
if uri:
|
|
self._uri = uri
|
|
self._cmd_q.put('connect')
|
|
|
|
def disconnect(self) -> None:
|
|
self._cmd_q.put('disconnect')
|
|
|
|
def set_recording(self, on: bool) -> None:
|
|
"""Enable/disable live map accumulation (independent of flight). On while
|
|
you move the drone - by hand or under power - the map fills in."""
|
|
self._recording = bool(on)
|
|
logger.info('map recording %s', 'ON' if on else 'OFF')
|
|
|
|
@property
|
|
def recording(self) -> bool:
|
|
return self._recording
|
|
|
|
def set_drift_correction(self, on: bool) -> None:
|
|
"""Enable/disable the zero-velocity hold that pins the pose while the
|
|
drone is detected at rest (cancels stationary Flow-deck drift)."""
|
|
self._drift_correction = bool(on)
|
|
logger.info('drift correction %s', 'ON' if on else 'OFF')
|
|
|
|
@property
|
|
def drift_correction(self) -> bool:
|
|
return self._drift_correction
|
|
|
|
def set_streaming(self, enabled: bool, address: str | None = None) -> None:
|
|
"""Enable/disable streaming the 3-D cloud to a CloudView viewer at
|
|
`address` (e.g. tcp://127.0.0.1:9870 or unix:///tmp/cloud.sock)."""
|
|
self._publisher.configure(enabled, address)
|
|
|
|
@property
|
|
def streaming_connected(self) -> bool:
|
|
return self._publisher.connected
|
|
|
|
def set_dry_run(self, on: bool) -> None:
|
|
"""Enable/disable props-off dry-run mode. In dry run the scan walks its
|
|
full state machine and logs every takeoff/go-to/land/abort decision but
|
|
never arms the motors or sends a flight command, so you can validate the
|
|
sequence on the bench with zero flight risk."""
|
|
self._dry_run = bool(on)
|
|
self._telemetry.dry_run = self._dry_run
|
|
logger.info('DRY RUN %s', 'ON (no motors will arm)' if on else 'OFF')
|
|
|
|
@property
|
|
def dry_run(self) -> bool:
|
|
return self._dry_run
|
|
|
|
def reset_origin(self) -> None:
|
|
"""Reset the Kalman estimator so the world origin is the drone's current
|
|
spot, and clear the map. Use before a handheld mapping run to set 'here'
|
|
as (0, 0). Serviced on the ground (READY/ARMED), ignored while flying."""
|
|
self._cmd_q.put('reset')
|
|
|
|
def arm(self) -> None:
|
|
"""Arm the motors and hold on the ground. The drone idles armed until
|
|
scan() or stop(); firmware auto-disarms after ~30 s if it never flies."""
|
|
self._cmd_q.put('arm')
|
|
|
|
def scan(self) -> None:
|
|
"""Lift off and run the ascending, rotating scan (from ARMED)."""
|
|
self._cmd_q.put('scan')
|
|
|
|
def stop(self) -> None:
|
|
"""Context-sensitive: land if airborne, else disarm on the ground."""
|
|
self._cmd_q.put('stop')
|
|
|
|
def emergency_stop(self) -> None:
|
|
# Queue it so the worker breaks out of any flight loop and updates state,
|
|
# but ALSO fire immediately from the caller thread for least latency.
|
|
self._cmd_q.put('estop')
|
|
with self._lock:
|
|
scf = self._scf
|
|
if scf is not None:
|
|
try:
|
|
self._cut_motors(scf)
|
|
except Exception:
|
|
logger.exception('direct emergency stop failed')
|
|
|
|
def shutdown(self) -> None:
|
|
self._cmd_q.put('shutdown')
|
|
|
|
def alive(self) -> bool:
|
|
return self._thread.is_alive()
|
|
|
|
def join(self, timeout: float | None = None) -> None:
|
|
if self._thread.is_alive():
|
|
self._thread.join(timeout)
|
|
|
|
# ----------------------------------------------------------- worker loop -
|
|
|
|
def _run(self) -> None:
|
|
while True:
|
|
cmd = self._cmd_q.get()
|
|
if cmd == 'shutdown':
|
|
return
|
|
if cmd == 'connect':
|
|
self._session()
|
|
if self._session_result == 'shutdown':
|
|
return
|
|
# any other command while disconnected is ignored
|
|
|
|
def _session(self) -> None:
|
|
"""Open the link and service commands until disconnect/shutdown."""
|
|
self._session_result = ''
|
|
self._set_state(State.CONNECTING, f'Connecting to {self._uri} ...')
|
|
try:
|
|
cf = Crazyflie(rw_cache=self._cache_dir)
|
|
with SyncCrazyflie(self._uri, cf=cf) as scf:
|
|
with self._lock:
|
|
self._scf = scf
|
|
self._proto_v7 = self._detect_protocol(scf)
|
|
self._setup_logging(scf)
|
|
self._probe_decks(scf)
|
|
# The firmware estimator integrates Flow-deck drift the whole
|
|
# time it's powered, so a freshly-connected drone often reports a
|
|
# wildly wrong position (tens of metres). Zero it on connect so
|
|
# mapping starts from a clean origin; ZUPT then keeps it there.
|
|
self._set_state(State.CONNECTING, 'Resetting estimator ...')
|
|
self._reset_estimator(scf)
|
|
self._set_state(State.READY, 'Connected - origin reset, ready')
|
|
try:
|
|
self._command_loop(scf)
|
|
finally:
|
|
# Delete log blocks while the link is open so firmware stops
|
|
# streaming them (else "no LogEntry to handle" warnings).
|
|
self._teardown_logging()
|
|
except Exception as exc:
|
|
logger.exception('session error')
|
|
self._set_state(State.ERROR, f'Connection error: {exc}')
|
|
finally:
|
|
with self._lock:
|
|
self._scf = None
|
|
if self._session_result != 'shutdown':
|
|
self._set_state(State.DISCONNECTED, 'Disconnected')
|
|
|
|
def _command_loop(self, scf: SyncCrazyflie) -> None:
|
|
"""Service commands on the ground (READY / ARMED); the airborne phase is
|
|
delegated to _do_scan()."""
|
|
while True:
|
|
# The firmware may have auto-disarmed us (~30 s pre-flight timeout, a
|
|
# bump/tumble, or low battery) while idling armed on the ground.
|
|
if (self._state == State.ARMED and self._proto_v7
|
|
and not self._telemetry.armed):
|
|
self._set_state(State.READY,
|
|
'Auto-disarmed (armed too long or unsafe)')
|
|
try:
|
|
cmd = self._cmd_q.get(timeout=0.2)
|
|
except queue.Empty:
|
|
continue
|
|
|
|
if cmd == 'reset':
|
|
if self._state in (State.READY, State.ARMED, State.EMERGENCY):
|
|
self._set_state(self._state, 'Resetting estimator / origin ...')
|
|
self._reset_estimator(scf)
|
|
self._set_state(self._state, 'Origin reset to current spot')
|
|
elif cmd == 'arm':
|
|
if self._state in (State.READY, State.EMERGENCY):
|
|
self._do_arm(scf)
|
|
elif cmd == 'scan':
|
|
if self._state == State.ARMED:
|
|
follow_up = self._do_scan(scf)
|
|
if follow_up in ('disconnect', 'shutdown'):
|
|
self._session_result = follow_up
|
|
return
|
|
elif cmd == 'stop':
|
|
if self._state == State.ARMED:
|
|
self._do_disarm_ground(scf)
|
|
elif cmd == 'disconnect':
|
|
if self._state == State.ARMED:
|
|
self._do_disarm_ground(scf)
|
|
self._session_result = 'disconnect'
|
|
return
|
|
elif cmd == 'shutdown':
|
|
if self._state == State.ARMED:
|
|
self._do_disarm_ground(scf)
|
|
self._session_result = 'shutdown'
|
|
return
|
|
elif cmd == 'estop':
|
|
self._do_estop(scf)
|
|
|
|
# --------------------------------------------------------------- flight --
|
|
|
|
def _do_arm(self, scf: SyncCrazyflie) -> None:
|
|
"""Pre-flight checks + arm; on success enter ARMED (idle on the ground).
|
|
Does NOT take off - that's _do_scan()."""
|
|
cf = scf.cf
|
|
|
|
if self._telemetry.flow_deck is False:
|
|
self._set_state(State.READY, 'Arm refused: no Flow v2 deck detected')
|
|
return
|
|
ok, why = self._battery_ok()
|
|
if not ok:
|
|
self._set_state(State.READY, f'Arm refused: {why}')
|
|
return
|
|
if self._telemetry.tumbled:
|
|
self._set_state(State.READY,
|
|
'Arm refused: drone is tumbled - set it level')
|
|
return
|
|
if self._telemetry.locked:
|
|
self._set_state(State.READY,
|
|
'Arm refused: supervisor locked - reboot the drone')
|
|
return
|
|
if self._telemetry.multiranger is False:
|
|
# Not fatal to flight, but the whole point is the map - warn loudly.
|
|
logger.warning('no Multi-ranger deck detected - scan will collect no '
|
|
'range data')
|
|
|
|
self._set_state(State.ARMING, 'Resetting position estimator ...')
|
|
self._reset_estimator(scf)
|
|
|
|
if self._dry_run:
|
|
# Props-off: run the pre-flight checks and estimator reset for
|
|
# realism, but never actually arm. SCAN then walks the sequence.
|
|
self._set_state(State.ARMED,
|
|
'DRY RUN armed (simulated - motors NOT armed)')
|
|
return
|
|
|
|
if self._proto_v7:
|
|
# Don't gate on canArm: stock firmware auto-arms the instant
|
|
# pre-flight checks pass, so canArm is only ever true for one tick.
|
|
# The real signal is the armed bit; requesting arming is a harmless
|
|
# no-op if already auto-armed.
|
|
cf.supervisor.send_arming_request(True)
|
|
if not self._wait_until(lambda: self._telemetry.armed, 5.0):
|
|
self._set_state(State.READY,
|
|
f'Arm failed: {self._arm_failure_reason()}')
|
|
return
|
|
else:
|
|
# Legacy firmware: a zero setpoint unlocks the thrust-safety latch.
|
|
cf.commander.send_setpoint(0, 0, 0, 0)
|
|
|
|
self._set_state(State.ARMED, 'Armed - ready to SCAN (auto-disarms in '
|
|
'~30 s if idle)')
|
|
|
|
def _do_disarm_ground(self, scf: SyncCrazyflie) -> None:
|
|
"""Disarm without flying (back out of ARMED)."""
|
|
if self._proto_v7 and not self._dry_run:
|
|
scf.cf.supervisor.send_arming_request(False)
|
|
self._set_state(State.READY, 'Disarmed')
|
|
|
|
def _do_scan(self, scf: SyncCrazyflie) -> str | None:
|
|
"""Fly the hardened, stop-and-stare rotating scan and build the map.
|
|
|
|
Sequence: high-level-commander takeoff -> confirm a stable hover -> for
|
|
each ring height, rotate one full turn in discrete YAW_STEP_DEG steps
|
|
(pausing to let the estimate settle and the sensors take a clean static
|
|
reading at each bearing) -> climb to the next ring -> land. The drone is
|
|
always commanded to hold the takeoff point (0, 0); it never translates.
|
|
|
|
Safety: aborts to a controlled landing on estimate divergence, wall
|
|
proximity, or loss of the flying state, and to an emergency stop on a
|
|
tumble/crash. Returns 'disconnect'/'shutdown' if such a command arrived
|
|
mid-flight (caller tears down after landing), else None.
|
|
|
|
WARNING: rotation on a Flow deck is inherently unstable. This is
|
|
experimental - bench-test with props off, then tether, before free
|
|
flight near obstacles.
|
|
"""
|
|
cf = scf.cf
|
|
p = self._scan_params
|
|
dry = self._dry_run
|
|
|
|
if not self._proto_v7 and not dry:
|
|
self._set_state(State.ARMED, 'Scan needs current firmware '
|
|
'(supervisor + high-level commander)')
|
|
return None
|
|
if not self._telemetry.armed and not dry:
|
|
self._set_state(State.READY, 'Scan aborted: no longer armed')
|
|
return None
|
|
|
|
if dry:
|
|
logger.info('[DRY RUN] scan walkthrough - no motors, no flight '
|
|
'commands; live sensors still drive the safety guards')
|
|
else:
|
|
logger.warning('EXPERIMENTAL rotating scan starting - keep the area '
|
|
'clear; EMERGENCY STOP (SPACE) is always available')
|
|
try:
|
|
cf.param.set_value(HLC_PARAM, '1') # ensure HLC is enabled
|
|
except Exception:
|
|
logger.exception('could not enable the high-level commander')
|
|
self._recording = True
|
|
hlc = cf.high_level_commander
|
|
tag = '[DRY RUN] ' if dry else ''
|
|
|
|
start = clamp_height(p.start_height)
|
|
top = clamp_height(max(p.top_height, start))
|
|
climb = max(0.05, p.climb_rate)
|
|
step_rate = max(5.0, min(p.yaw_rate, MAX_YAW_RATE_DPS))
|
|
|
|
# --- takeoff (firmware holds position autonomously) ---------------
|
|
ascent = max(2.0, start / climb)
|
|
self._set_state(State.SCANNING, f'{tag}Taking off to {start:.2f} m ...')
|
|
self._fly(lambda: hlc.takeoff(start, ascent),
|
|
f'takeoff to {start:.2f} m over {ascent:.1f}s')
|
|
res = self._scan_wait(scf, ascent + TAKEOFF_SETTLE_S)
|
|
if res != 'ok':
|
|
return self._finish_scan(scf, res)
|
|
# Don't rotate until the estimate is actually settled and we're flying.
|
|
# (In dry run nothing is airborne, so skip the airborne-stability gate.)
|
|
if not dry and not self._wait_until(
|
|
lambda: self._est_speed < DIVERGE_SPEED_MPS
|
|
and self._telemetry.flying, 3.0):
|
|
logger.warning('estimate unsettled after takeoff - landing')
|
|
return self._finish_scan(scf, 'abort')
|
|
|
|
held_z = start
|
|
heading = 0.0 # degrees
|
|
|
|
# Ring heights from start up to top, RING_STEP_M apart.
|
|
rings: list[float] = []
|
|
z = start
|
|
while z < top - 1e-3:
|
|
rings.append(z)
|
|
z += RING_STEP_M
|
|
rings.append(top)
|
|
|
|
for ring_z in rings:
|
|
target = self._ceiling_capped(ring_z)
|
|
if target < held_z - 0.03:
|
|
logger.warning('ceiling within margin - not climbing past '
|
|
'%.2f m', held_z)
|
|
target = held_z
|
|
# Climb/descend to the ring height, holding heading.
|
|
if abs(target - held_z) > 0.03:
|
|
dur = max(1.5, abs(target - held_z) / climb)
|
|
self._set_state(State.SCANNING, f'{tag}Moving to {target:.2f} m ...')
|
|
self._fly(
|
|
lambda t=target, d=dur: hlc.go_to(0.0, 0.0, t,
|
|
math.radians(heading), d),
|
|
f'go_to z={target:.2f} m, yaw={heading:+.0f} deg over {dur:.1f}s')
|
|
res = self._scan_wait(scf, dur + 0.5)
|
|
if res != 'ok':
|
|
return self._finish_scan(scf, res)
|
|
held_z = target
|
|
|
|
# One full revolution, stop-and-stare.
|
|
steps = max(1, round(360.0 / YAW_STEP_DEG))
|
|
self._set_state(State.SCANNING,
|
|
f'{tag}Sweeping at {held_z:.2f} m ({len(rings)} rings)')
|
|
for _ in range(steps):
|
|
heading = _wrap_deg(heading + YAW_STEP_DEG)
|
|
dur = max(1.5, YAW_STEP_DEG / step_rate)
|
|
self._fly(
|
|
lambda h=heading, d=dur: hlc.go_to(0.0, 0.0, held_z,
|
|
math.radians(h), d),
|
|
f'go_to bearing {heading:+.0f} deg at {held_z:.2f} m '
|
|
f'over {dur:.1f}s')
|
|
# The settle pause is also when the sensors sit still and read
|
|
# cleanly at this bearing.
|
|
res = self._scan_wait(scf, dur + YAW_SETTLE_S)
|
|
if res != 'ok':
|
|
return self._finish_scan(scf, res)
|
|
|
|
return self._finish_scan(scf, 'done')
|
|
|
|
def _fly(self, action, description: str) -> None:
|
|
"""Execute a flight command, or just log it in dry-run mode."""
|
|
if self._dry_run:
|
|
logger.info('[DRY RUN] %s', description)
|
|
return
|
|
action()
|
|
|
|
# ---- scan safety helpers -----------------------------------------------
|
|
|
|
def _scan_guard(self, scf: SyncCrazyflie) -> str | None:
|
|
"""Check abort conditions once. Returns an action string or None.
|
|
|
|
'estop' -> cut motors (tumble/crash or user)
|
|
'abort' -> controlled land (divergence/proximity)
|
|
'stop'/'disconnect'/'shutdown' -> user command (controlled land)
|
|
"""
|
|
cmd = self._poll_cmd()
|
|
if cmd == 'estop':
|
|
return 'estop'
|
|
if cmd in ('stop', 'disconnect', 'shutdown'):
|
|
return cmd
|
|
t = self._telemetry
|
|
if t.tumbled or t.crashed:
|
|
logger.warning('tumble/crash detected - emergency stop')
|
|
return 'estop'
|
|
if self._est_speed > DIVERGE_SPEED_MPS:
|
|
logger.warning('estimate diverging (%.2f m/s while holding) - '
|
|
'aborting to land', self._est_speed)
|
|
return 'abort'
|
|
near = [d for d in (t.front, t.back, t.left, t.right) if d is not None]
|
|
if near and min(near) < MIN_CLEARANCE_M:
|
|
logger.warning('obstacle within %.2f m - aborting to land',
|
|
min(near))
|
|
return 'abort'
|
|
return None
|
|
|
|
def _scan_wait(self, scf: SyncCrazyflie, duration: float) -> str:
|
|
"""Sleep up to `duration` while polling the safety guard. Returns 'ok'
|
|
if the interval elapsed cleanly, else the guard's action string."""
|
|
if self._dry_run:
|
|
# No real motion to wait for; just briefly poll the guards so live
|
|
# sensor readings can still trigger (and be seen) in the walkthrough.
|
|
duration = min(duration, 0.4)
|
|
end = time.monotonic() + duration
|
|
while time.monotonic() < end:
|
|
action = self._scan_guard(scf)
|
|
if action:
|
|
return action
|
|
time.sleep(0.05)
|
|
return 'ok'
|
|
|
|
def _finish_scan(self, scf: SyncCrazyflie, reason: str) -> str | None:
|
|
"""Resolve a scan exit: emergency stop, or a controlled landing."""
|
|
if reason == 'estop':
|
|
self._do_estop(scf)
|
|
return None
|
|
self._land_hlc(scf)
|
|
if reason in ('disconnect', 'shutdown'):
|
|
return reason
|
|
if reason == 'abort':
|
|
self._set_state(State.READY, 'Scan aborted for safety - landed')
|
|
return None
|
|
|
|
def _ceiling_capped(self, target_z: float) -> float:
|
|
"""Clamp a target height to stay CEILING_MARGIN_M below the sensed
|
|
ceiling (from the up-facing range), if there is one."""
|
|
up = self._telemetry.up
|
|
if up is None:
|
|
return target_z
|
|
ceiling = self._telemetry.z + up
|
|
return min(target_z, max(MIN_HEIGHT_M, ceiling - CEILING_MARGIN_M))
|
|
|
|
def _land_hlc(self, scf: SyncCrazyflie) -> None:
|
|
"""Controlled high-level-commander landing, staying responsive to an
|
|
emergency stop and to a tumble during descent."""
|
|
cf = scf.cf
|
|
if self._dry_run:
|
|
logger.info('[DRY RUN] land and disarm')
|
|
self._set_state(State.READY, 'DRY RUN complete - landed (simulated)')
|
|
return
|
|
self._set_state(State.LANDING, 'Landing ...')
|
|
z = max(self._telemetry.z, 0.1)
|
|
dur = max(2.0, z / max(0.05, self._scan_params.climb_rate))
|
|
try:
|
|
cf.high_level_commander.land(0.0, dur, yaw=None)
|
|
except Exception:
|
|
logger.exception('high-level land failed - cutting motors')
|
|
self._do_estop(scf)
|
|
return
|
|
end = time.monotonic() + dur
|
|
while time.monotonic() < end:
|
|
cmd = self._poll_cmd()
|
|
if cmd == 'estop' or self._telemetry.tumbled or self._telemetry.crashed:
|
|
self._do_estop(scf)
|
|
return
|
|
time.sleep(0.05)
|
|
try:
|
|
cf.high_level_commander.stop()
|
|
except Exception:
|
|
pass
|
|
if self._proto_v7:
|
|
cf.supervisor.send_arming_request(False)
|
|
self._set_state(State.READY, 'Landed - on the ground, disarmed')
|
|
|
|
def _do_estop(self, scf: SyncCrazyflie) -> None:
|
|
try:
|
|
self._cut_motors(scf)
|
|
except Exception:
|
|
logger.exception('emergency stop failed')
|
|
if self._dry_run:
|
|
self._set_state(State.READY, 'DRY RUN: emergency stop (no motors)')
|
|
else:
|
|
self._set_state(State.EMERGENCY,
|
|
'EMERGENCY STOP - motors cut (reboot to fly again)')
|
|
|
|
def _cut_motors(self, scf: SyncCrazyflie) -> None:
|
|
"""Cut motors immediately. Safe to call from any thread."""
|
|
if self._dry_run:
|
|
# Nothing is ever armed in dry run; don't send a flight command.
|
|
logger.info('[DRY RUN] emergency stop (no motors to cut)')
|
|
return
|
|
cf = scf.cf
|
|
if self._proto_v7:
|
|
cf.supervisor.send_emergency_stop()
|
|
else:
|
|
cf.commander.send_stop_setpoint()
|
|
|
|
# ----------------------------------------------------- estimator helper --
|
|
|
|
def _reset_estimator(self, scf: SyncCrazyflie) -> None:
|
|
cf = scf.cf
|
|
cf.param.set_value('kalman.resetEstimation', '1')
|
|
time.sleep(0.1)
|
|
cf.param.set_value('kalman.resetEstimation', '0')
|
|
self._wait_for_position_estimator(scf)
|
|
# A fresh estimator means a fresh world origin; the old map no longer
|
|
# lines up, so start it clean and drop any accumulated drift offset.
|
|
self.clear_map()
|
|
self._reset_drift()
|
|
|
|
def _reset_drift(self) -> None:
|
|
"""Zero the ZUPT offsets and rest detector (called on an origin reset)."""
|
|
self._off_x = self._off_y = self._off_yaw = 0.0
|
|
self._hold_x = self._hold_y = self._hold_yaw = 0.0
|
|
self._gyro_ema = self._flow_ema = 0.0
|
|
self._still = False
|
|
self._last_acc = None
|
|
|
|
def _wait_for_position_estimator(self, scf: SyncCrazyflie,
|
|
threshold: float = 0.001,
|
|
timeout: float = 10.0) -> None:
|
|
"""Block until the Kalman position variance converges (or times out)."""
|
|
log_conf = LogConfig(name='kalman_var', period_in_ms=100)
|
|
log_conf.add_variable('kalman.varPX', 'float')
|
|
log_conf.add_variable('kalman.varPY', 'float')
|
|
log_conf.add_variable('kalman.varPZ', 'float')
|
|
|
|
history = {'x': [1000.0] * 10, 'y': [1000.0] * 10, 'z': [1000.0] * 10}
|
|
deadline = time.monotonic() + timeout
|
|
try:
|
|
with SyncLogger(scf, log_conf) as sync_log:
|
|
for _, data, _ in sync_log:
|
|
for key, var in (('x', 'kalman.varPX'),
|
|
('y', 'kalman.varPY'),
|
|
('z', 'kalman.varPZ')):
|
|
history[key].append(data[var])
|
|
history[key].pop(0)
|
|
spread = max(max(h) - min(h) for h in history.values())
|
|
if spread < threshold:
|
|
return
|
|
if time.monotonic() > deadline:
|
|
logger.warning('estimator did not converge in %.0fs '
|
|
'(spread=%.4f); proceeding', timeout,
|
|
spread)
|
|
return
|
|
except Exception:
|
|
logger.exception('estimator convergence wait failed')
|
|
|
|
# ---------------------------------------------------------- log / param --
|
|
|
|
def _detect_protocol(self, scf: SyncCrazyflie) -> bool:
|
|
try:
|
|
return scf.cf.platform.get_protocol_version() >= 7
|
|
except Exception:
|
|
logger.exception('could not read protocol version')
|
|
return False
|
|
|
|
def _setup_logging(self, scf: SyncCrazyflie) -> None:
|
|
self._log_configs = []
|
|
self._console_buf = ''
|
|
scf.cf.console.receivedChar.add_callback(self._on_console)
|
|
|
|
# Pose block: estimator x/y/z + heading. Kept separate from the range
|
|
# block because all ten variables together exceed one CRTP log packet.
|
|
pose = LogConfig(name='rv_pose', period_in_ms=100)
|
|
pose.add_variable('stateEstimate.x', 'float')
|
|
pose.add_variable('stateEstimate.y', 'float')
|
|
pose.add_variable('stateEstimate.z', 'float')
|
|
pose.add_variable('stabilizer.yaw', 'float')
|
|
scf.cf.log.add_config(pose)
|
|
pose.data_received_cb.add_callback(self._on_pose)
|
|
pose.start()
|
|
self._log_configs.append(pose)
|
|
|
|
# Multi-ranger block: the five ToF sensors + the Flow deck's z-ranger.
|
|
# Best-effort: without the deck these variables are absent and the whole
|
|
# block is rejected, which is fine - the map just stays empty.
|
|
try:
|
|
rng = LogConfig(name='rv_range', period_in_ms=100)
|
|
for var in ('range.front', 'range.back', 'range.left',
|
|
'range.right', 'range.up', 'range.zrange'):
|
|
rng.add_variable(var, 'uint16_t')
|
|
scf.cf.log.add_config(rng)
|
|
rng.data_received_cb.add_callback(self._on_range)
|
|
rng.start()
|
|
self._log_configs.append(rng)
|
|
except (KeyError, AttributeError) as exc:
|
|
logger.warning('range logging unavailable (Multi-ranger deck?): %s',
|
|
exc)
|
|
|
|
# Raw-motion block for at-rest detection: gyro rates + optical-flow
|
|
# counts. These come straight off the sensors, so (unlike the estimate)
|
|
# they read ~0 when the drone is genuinely still. Best-effort: flow
|
|
# counts need the Flow deck, without which ZUPT simply never engages.
|
|
try:
|
|
mot = LogConfig(name='rv_motion', period_in_ms=100)
|
|
mot.add_variable('gyro.x', 'float')
|
|
mot.add_variable('gyro.y', 'float')
|
|
mot.add_variable('gyro.z', 'float')
|
|
mot.add_variable('motion.deltaX', 'int16_t')
|
|
mot.add_variable('motion.deltaY', 'int16_t')
|
|
scf.cf.log.add_config(mot)
|
|
mot.data_received_cb.add_callback(self._on_motion)
|
|
mot.start()
|
|
self._log_configs.append(mot)
|
|
except (KeyError, AttributeError) as exc:
|
|
logger.warning('motion logging unavailable (Flow deck?): %s - drift '
|
|
'correction disabled', exc)
|
|
|
|
# Battery + supervisor status.
|
|
sup = LogConfig(name='rv_status', period_in_ms=200)
|
|
sup.add_variable('pm.vbat', 'float')
|
|
if self._proto_v7:
|
|
sup.add_variable('supervisor.info', 'uint16_t')
|
|
scf.cf.log.add_config(sup)
|
|
sup.data_received_cb.add_callback(self._on_status)
|
|
sup.start()
|
|
self._log_configs.append(sup)
|
|
|
|
def _teardown_logging(self) -> None:
|
|
for conf in getattr(self, '_log_configs', []):
|
|
try:
|
|
conf.stop()
|
|
conf.delete()
|
|
except Exception:
|
|
logger.exception('failed to delete log config %s', conf.name)
|
|
self._log_configs = []
|
|
|
|
def _on_console(self, text: str) -> None:
|
|
self._console_buf += text
|
|
while '\n' in self._console_buf:
|
|
line, self._console_buf = self._console_buf.split('\n', 1)
|
|
line = line.rstrip('\r')
|
|
if line:
|
|
_CONSOLE_LOGGER.info(line)
|
|
|
|
def _on_pose(self, _ts, data, _logconf) -> None:
|
|
self._raw_x = data['stateEstimate.x']
|
|
self._raw_y = data['stateEstimate.y']
|
|
self._raw_z = data['stateEstimate.z']
|
|
self._raw_yaw = data['stabilizer.yaw']
|
|
# Track raw horizontal speed for divergence detection (smoothed).
|
|
now = time.monotonic()
|
|
if self._last_pose_t is not None and self._last_raw_xy is not None:
|
|
dt = now - self._last_pose_t
|
|
if dt > 1e-3:
|
|
inst = math.hypot(self._raw_x - self._last_raw_xy[0],
|
|
self._raw_y - self._last_raw_xy[1]) / dt
|
|
self._est_speed = 0.5 * self._est_speed + 0.5 * inst
|
|
self._last_pose_t = now
|
|
self._last_raw_xy = (self._raw_x, self._raw_y)
|
|
self._apply_correction()
|
|
self._emit()
|
|
|
|
def _on_motion(self, _ts, data, _logconf) -> None:
|
|
"""Update the at-rest detector from raw gyro + optical-flow counts."""
|
|
gmag = max(abs(data['gyro.x']), abs(data['gyro.y']), abs(data['gyro.z']))
|
|
fmag = max(abs(data['motion.deltaX']), abs(data['motion.deltaY']))
|
|
self._gyro_ema = STILL_EMA * self._gyro_ema + (1 - STILL_EMA) * gmag
|
|
self._flow_ema = STILL_EMA * self._flow_ema + (1 - STILL_EMA) * fmag
|
|
self._still = (self._gyro_ema < GYRO_STILL_DPS
|
|
and self._flow_ema < FLOW_STILL_COUNTS)
|
|
|
|
def _apply_correction(self) -> None:
|
|
"""Map the raw estimate to a drift-corrected pose and publish it.
|
|
|
|
While at rest with correction on, the corrected pose is pinned to the
|
|
spot it last held: the raw-minus-corrected offset is updated each sample
|
|
so any estimator wander is cancelled. While moving, the offset is frozen
|
|
so real motion passes straight through, and the current corrected pose is
|
|
remembered as the point to freeze at the next rest.
|
|
"""
|
|
t = self._telemetry
|
|
rx, ry, rz, ryaw = self._raw_x, self._raw_y, self._raw_z, self._raw_yaw
|
|
pin = self._drift_correction and self._still
|
|
if pin:
|
|
self._off_x = rx - self._hold_x
|
|
self._off_y = ry - self._hold_y
|
|
self._off_yaw = _wrap_deg(ryaw - self._hold_yaw)
|
|
cx = rx - self._off_x
|
|
cy = ry - self._off_y
|
|
cyaw = _wrap_deg(ryaw - self._off_yaw)
|
|
if not pin:
|
|
self._hold_x, self._hold_y, self._hold_yaw = cx, cy, cyaw
|
|
t.x, t.y, t.z, t.yaw = cx, cy, rz, cyaw
|
|
t.raw_x, t.raw_y = rx, ry
|
|
t.still = self._still
|
|
t.drift_corrected = self._drift_correction
|
|
|
|
def _on_range(self, _ts, data, _logconf) -> None:
|
|
t = self._telemetry
|
|
t.front = _range_m(data['range.front'])
|
|
t.back = _range_m(data['range.back'])
|
|
t.left = _range_m(data['range.left'])
|
|
t.right = _range_m(data['range.right'])
|
|
t.up = _range_m(data['range.up'])
|
|
t.down = _range_m(data['range.zrange'])
|
|
# Project the horizontal hits into the world frame and add to the map.
|
|
# Uses the most recent corrected pose (a separate, equally fast block);
|
|
# the small skew is negligible against a slow ascent/rotation. Only add a
|
|
# batch once the drone has actually moved, so at-rest noise stays out.
|
|
if self._recording and self._moved_enough(t):
|
|
self._accumulate(t)
|
|
self._last_acc = (t.x, t.y, t.z, t.yaw)
|
|
self._emit()
|
|
|
|
def _moved_enough(self, t: Telemetry) -> bool:
|
|
if self._last_acc is None:
|
|
return True
|
|
lx, ly, lz, lyaw = self._last_acc
|
|
return (math.hypot(t.x - lx, t.y - ly) >= MIN_MOVE_M
|
|
or abs(t.z - lz) >= MIN_RISE_M
|
|
or abs(_wrap_deg(t.yaw - lyaw)) >= MIN_TURN_DEG)
|
|
|
|
def _accumulate(self, t: Telemetry) -> None:
|
|
yaw = math.radians(t.yaw)
|
|
cos_y, sin_y = math.cos(yaw), math.sin(yaw)
|
|
new_x: list[float] = [] # 2-D top-down slice (horizontal sensors)
|
|
new_y: list[float] = []
|
|
cloud: list[tuple[float, float, float]] = [] # full 3-D batch to stream
|
|
for name, (bx, by) in _HORIZONTAL_SENSORS:
|
|
dist = getattr(t, name)
|
|
if dist is None:
|
|
continue
|
|
# Rotate the body-frame sensor direction into the world frame. The
|
|
# horizontal beams sit at the drone's current height.
|
|
wx = cos_y * bx - sin_y * by
|
|
wy = sin_y * bx + cos_y * by
|
|
px, py = t.x + wx * dist, t.y + wy * dist
|
|
new_x.append(px)
|
|
new_y.append(py)
|
|
cloud.append((px, py, t.z))
|
|
# Up/down sensors give ceiling/floor points (3-D only; they would just
|
|
# clutter the 2-D map at the drone's own position).
|
|
if t.up is not None:
|
|
cloud.append((t.x, t.y, t.z + t.up))
|
|
if t.down is not None:
|
|
cloud.append((t.x, t.y, t.z - t.down))
|
|
|
|
if new_x:
|
|
with self._map_lock:
|
|
self._map_x.extend(new_x)
|
|
self._map_y.extend(new_y)
|
|
self._map_version += 1
|
|
if cloud:
|
|
self._publisher.add_points(cloud)
|
|
|
|
def _on_status(self, _ts, data, _logconf) -> None:
|
|
t = self._telemetry
|
|
t.vbat = data['pm.vbat']
|
|
if 'supervisor.info' in data:
|
|
info = data['supervisor.info']
|
|
t.can_arm = bool(info & SUP_CAN_ARM)
|
|
t.armed = bool(info & SUP_IS_ARMED)
|
|
t.flying = bool(info & SUP_IS_FLYING)
|
|
t.tumbled = bool(info & SUP_IS_TUMBLED)
|
|
t.locked = bool(info & SUP_IS_LOCKED)
|
|
t.crashed = bool(info & SUP_IS_CRASHED)
|
|
t.deck_fault = bool(info & SUP_DECK_FAULT)
|
|
self._emit()
|
|
|
|
def _probe_decks(self, scf: SyncCrazyflie) -> None:
|
|
for param, attr in (('deck.bcFlow2', 'flow_deck'),
|
|
('deck.bcMultiranger', 'multiranger')):
|
|
try:
|
|
value = scf.cf.param.get_value(param, timeout=5)
|
|
setattr(self._telemetry, attr, bool(int(value)))
|
|
except Exception:
|
|
logger.exception('could not read %s', param)
|
|
setattr(self._telemetry, attr, None)
|
|
self._emit()
|
|
|
|
def _arm_failure_reason(self) -> str:
|
|
t = self._telemetry
|
|
if t.locked:
|
|
return 'supervisor locked - reboot the drone'
|
|
if t.tumbled:
|
|
return 'drone is tumbled - set it level and still'
|
|
if t.deck_fault:
|
|
return 'a deck reported a hardware fault - reseat decks / reboot'
|
|
if t.crashed:
|
|
return 'supervisor in crashed state - flip level to recover'
|
|
return ('pre-flight checks did not pass - place the drone level and '
|
|
'stationary, then retry')
|
|
|
|
# --------------------------------------------------------------- utils ---
|
|
|
|
def _battery_ok(self) -> tuple[bool, str]:
|
|
end = time.monotonic() + 1.5
|
|
while self._telemetry.vbat == 0.0 and time.monotonic() < end:
|
|
time.sleep(0.05)
|
|
v = self._telemetry.vbat
|
|
if v and v < MIN_SAFE_VBAT:
|
|
return False, f'battery low ({v:.2f} V, need {MIN_SAFE_VBAT:.2f} V)'
|
|
return True, ''
|
|
|
|
def _poll_cmd(self) -> str | None:
|
|
try:
|
|
return self._cmd_q.get_nowait()
|
|
except queue.Empty:
|
|
return None
|
|
|
|
def _wait_until(self, predicate, timeout: float) -> bool:
|
|
end = time.monotonic() + timeout
|
|
while time.monotonic() < end:
|
|
if predicate():
|
|
return True
|
|
time.sleep(0.05)
|
|
return False
|
|
|
|
def _set_state(self, state: State, message: str) -> None:
|
|
self._state = state
|
|
self._message = message
|
|
logger.info('[%s] %s', state.name, message)
|
|
self._emit()
|
|
|
|
def _emit(self) -> None:
|
|
snapshot = Update(
|
|
state=self._state,
|
|
message=self._message,
|
|
telemetry=dataclasses.replace(self._telemetry),
|
|
)
|
|
self._latest = snapshot # atomic publish for snapshot() pollers
|
|
try:
|
|
self._listener(snapshot)
|
|
except Exception:
|
|
logger.exception('listener raised')
|
|
|
|
|
|
def _wrap_deg(angle: float) -> float:
|
|
"""Wrap an angle in degrees to (-180, 180]."""
|
|
return (angle + 180.0) % 360.0 - 180.0
|
|
|
|
|
|
def _range_m(raw_mm) -> float | None:
|
|
"""Convert a raw Multi-ranger reading (mm) to metres, or None if out of
|
|
range (zero, or at/over the sensor's ~4 m ceiling = nothing to map)."""
|
|
mm = float(raw_mm)
|
|
if mm <= 0.0 or mm >= RANGE_VALID_MAX_MM:
|
|
return None
|
|
return mm / 1000.0
|