Two-step ARM -> LAUNCH -> LAND flight flow for a Crazyflie + Flow v2 deck: - controller.py: cflib link + flight logic on one worker thread with a command queue. Supervisor arming, Kalman estimator reset/convergence wait, high-level-commander takeoff/land, configurable hover height (0.2-2.0 m), emergency stop, graceful land-on-shutdown, thread-safe snapshot()/scan(). States: DISCONNECTED/CONNECTING/READY/ARMING/ARMED/ FLYING/LANDING/EMERGENCY/ERROR. - mission_client.py: DearPyGui front-end, manual render loop polling snapshot() each frame. Context button ARM(green)->LAUNCH(blue)->LAND (yellow), secondary Disarm, hover-height slider, console log, and an Estimator state panel (x/y/z, yaw, flow counts) for drift inspection. - uv-managed (pyproject.toml + uv.lock); run `uv run mission_client.py`. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
722 lines
28 KiB
Python
722 lines
28 KiB
Python
"""Flight controller for the Crazyflie mission client.
|
|
|
|
Owns the Crazyflie radio link on a single dedicated worker thread and exposes a
|
|
small, thread-safe command API to the UI (connect / disconnect / arm / disarm /
|
|
emergency_stop / shutdown). Keeping every cflib call on one thread means the UI
|
|
thread never blocks on radio I/O and there is exactly one place that can command
|
|
the motors.
|
|
|
|
Flight strategy: this uses the firmware's *high-level commander* (takeoff / land
|
|
/ hover) together with the *supervisor* arming system. The high-level commander
|
|
holds position autonomously in firmware once airborne, so a stalled or slow UI
|
|
thread cannot make the drone drift or fall the way a missed stream of manual
|
|
setpoints would. Position/altitude hold is provided by the Flow v2 deck.
|
|
|
|
The API is intentionally tiny and all mutating commands go through a queue that
|
|
only the worker thread consumes, except emergency_stop which *also* fires
|
|
directly from the caller for minimum latency.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import collections
|
|
import dataclasses
|
|
import logging
|
|
import queue
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
from enum import Enum, auto
|
|
|
|
import cflib.crtp
|
|
from cflib.crazyflie import Crazyflie
|
|
from cflib.crazyflie.log import LogConfig
|
|
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
|
|
from cflib.crazyflie.syncLogger import SyncLogger
|
|
from cflib.utils import uri_helper
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class _StaleLogFilter(logging.Filter):
|
|
"""Drop cflib's benign "no LogEntry to handle id=N" warnings.
|
|
|
|
Firmware streams log-data for a block id the host no longer tracks when a
|
|
stale block is left over from a prior/parallel session. We already delete
|
|
our blocks on disconnect, but a handful of in-flight packets can still
|
|
arrive right after reconnecting; they're harmless (power-cycle to clear),
|
|
so keep them out of the console rather than alarming the user.
|
|
"""
|
|
|
|
def filter(self, record: logging.LogRecord) -> bool:
|
|
return 'no LogEntry to handle' not in record.getMessage()
|
|
|
|
|
|
class LogBuffer(logging.Handler):
|
|
"""A thread-safe ring buffer of formatted log lines for the UI console.
|
|
|
|
cflib logs from its own thread, so reads/writes are guarded by a lock. A
|
|
monotonic version counter lets the UI cheaply detect when there's something
|
|
new to render.
|
|
"""
|
|
|
|
def __init__(self, capacity: int = 500) -> None:
|
|
super().__init__()
|
|
self._lines: collections.deque = collections.deque(maxlen=capacity)
|
|
self._lock = threading.Lock()
|
|
self._version = 0
|
|
self.setFormatter(logging.Formatter(
|
|
'%(asctime)s %(levelname).1s %(name)s: %(message)s', '%H:%M:%S'))
|
|
|
|
def emit(self, record: logging.LogRecord) -> None:
|
|
try:
|
|
line = self.format(record)
|
|
except Exception:
|
|
return
|
|
with self._lock:
|
|
self._lines.append(line)
|
|
self._version += 1
|
|
|
|
def snapshot(self) -> tuple[int, list[str]]:
|
|
with self._lock:
|
|
return self._version, list(self._lines)
|
|
|
|
def clear(self) -> None:
|
|
with self._lock:
|
|
self._lines.clear()
|
|
self._version += 1
|
|
|
|
|
|
# Single shared console buffer, attached to the root logger once. Firmware
|
|
# console output is logged under its own name so it's easy to spot.
|
|
LOG_BUFFER = LogBuffer()
|
|
_CONSOLE_LOGGER = logging.getLogger('crazyflie.console')
|
|
_LOGGING_READY = False
|
|
|
|
|
|
def _install_console_logging() -> None:
|
|
"""Route cflib + app logs into LOG_BUFFER for the UI console (idempotent)."""
|
|
global _LOGGING_READY
|
|
if _LOGGING_READY:
|
|
return
|
|
logging.getLogger('cflib.crazyflie.log').addFilter(_StaleLogFilter())
|
|
root = logging.getLogger()
|
|
if root.level == logging.NOTSET or root.level > logging.INFO:
|
|
root.setLevel(logging.INFO)
|
|
LOG_BUFFER.setLevel(logging.INFO)
|
|
root.addHandler(LOG_BUFFER)
|
|
_LOGGING_READY = True
|
|
|
|
|
|
# Radio URI. Override with the CFLIB_URI environment variable.
|
|
DEFAULT_URI = uri_helper.uri_from_env(default='radio://0/80/2M/E7E7E7E7E7')
|
|
|
|
# Mission parameters.
|
|
TARGET_HEIGHT_M = 1.0 # default hover height above the ground
|
|
MIN_HEIGHT_M = 0.2 # configurable hover-height bounds (Flow v2 range)
|
|
MAX_HEIGHT_M = 2.0
|
|
CLIMB_RATE_MPS = 0.35 # ascent/descent speed used to time takeoff & land
|
|
MIN_RAMP_S = 1.5 # floor on takeoff/land duration for low hovers
|
|
MIN_SAFE_VBAT = 3.10 # refuse to arm below this pack voltage
|
|
|
|
|
|
def clamp_height(height: float) -> float:
|
|
"""Clamp a requested hover height to the safe configurable range."""
|
|
return max(MIN_HEIGHT_M, min(MAX_HEIGHT_M, float(height)))
|
|
|
|
|
|
def ramp_duration(height: float) -> float:
|
|
"""Time to climb/descend `height` metres at a gentle, consistent rate."""
|
|
return max(MIN_RAMP_S, abs(height) / CLIMB_RATE_MPS)
|
|
|
|
# supervisor.info bitfield (see crazyflie-firmware src/modules/src/supervisor.c).
|
|
SUP_CAN_ARM = 0x0001
|
|
SUP_IS_ARMED = 0x0002
|
|
SUP_AUTO_ARM = 0x0004
|
|
SUP_CAN_FLY = 0x0008
|
|
SUP_IS_FLYING = 0x0010
|
|
SUP_IS_TUMBLED = 0x0020
|
|
SUP_IS_LOCKED = 0x0040
|
|
SUP_IS_CRASHED = 0x0080
|
|
SUP_DECK_FAULT = 0x0800 # a deck reported a hardware fault
|
|
|
|
|
|
class State(Enum):
|
|
"""High-level mission state, surfaced to the UI."""
|
|
DISCONNECTED = auto()
|
|
CONNECTING = auto()
|
|
READY = auto() # connected, on the ground, disarmed
|
|
ARMING = auto() # transient: estimator reset + arming request
|
|
ARMED = auto() # armed and idle on the ground, ready to LAUNCH
|
|
FLYING = auto() # taking off or hovering at target height
|
|
LANDING = auto() # controlled descent in progress
|
|
EMERGENCY = auto() # motors cut; reboot required to fly again
|
|
ERROR = auto() # connection or radio failure
|
|
|
|
|
|
@dataclass
|
|
class Telemetry:
|
|
"""Latest known drone state. Plain values; safe to copy across threads."""
|
|
vbat: float = 0.0
|
|
height: float = 0.0
|
|
# Estimator pose + raw optical-flow counts, for drift inspection.
|
|
x: float = 0.0
|
|
y: float = 0.0
|
|
yaw: float = 0.0
|
|
flow_dx: int = 0
|
|
flow_dy: int = 0
|
|
flow_deck: bool | None = None # None = not yet probed
|
|
can_arm: bool = False
|
|
armed: bool = False
|
|
flying: bool = False
|
|
tumbled: bool = False
|
|
locked: bool = False
|
|
crashed: bool = False
|
|
deck_fault: bool = False
|
|
|
|
|
|
@dataclass
|
|
class Update:
|
|
"""Immutable snapshot handed to the UI listener."""
|
|
state: State
|
|
message: str
|
|
telemetry: Telemetry
|
|
|
|
|
|
class DroneController:
|
|
"""Threaded Crazyflie flight controller.
|
|
|
|
Usage:
|
|
c = DroneController(uri, listener=on_update)
|
|
c.start() # init radio drivers + spawn worker
|
|
c.connect() # open link
|
|
c.arm() # reset estimator + arm; idle armed on the ground
|
|
c.launch(1.0) # take off to 1 m and hover
|
|
c.disarm() # land if flying, else disarm on the ground
|
|
c.emergency_stop() # cut motors immediately
|
|
c.shutdown() # land if needed, close link, stop worker
|
|
"""
|
|
|
|
def __init__(self, uri: str = DEFAULT_URI, listener=None,
|
|
cache_dir: str = './cache'):
|
|
self._uri = uri
|
|
self._listener = listener or (lambda update: None)
|
|
self._cache_dir = cache_dir
|
|
|
|
self._cmd_q: queue.Queue[str] = queue.Queue()
|
|
self._target_height = TARGET_HEIGHT_M # latched at arm time
|
|
self._state = State.DISCONNECTED
|
|
self._message = 'Disconnected'
|
|
self._telemetry = Telemetry()
|
|
# Latest immutable snapshot, replaced atomically in _emit() and read
|
|
# without a lock by per-frame UIs via snapshot().
|
|
self._latest = Update(State.DISCONNECTED, 'Disconnected', Telemetry())
|
|
|
|
self._scf: SyncCrazyflie | None = None
|
|
self._console_buf = '' # partial firmware console line
|
|
self._log_configs: list = [] # active log blocks, deleted on teardown
|
|
self._proto_v7 = False # supervisor/arming available?
|
|
self._session_result = '' # 'disconnect' | 'shutdown' | ''
|
|
self._lock = threading.Lock() # guards _scf access from other threads
|
|
|
|
self._thread = threading.Thread(
|
|
target=self._run, name='drone-worker', daemon=True)
|
|
|
|
# ----------------------------------------------------------------- API ---
|
|
|
|
@property
|
|
def uri(self) -> str:
|
|
return self._uri
|
|
|
|
def set_listener(self, listener) -> None:
|
|
"""Register an optional push callback. Per-frame UIs can instead poll
|
|
snapshot(); the listener is kept for event-driven consumers."""
|
|
self._listener = listener or (lambda update: None)
|
|
|
|
def snapshot(self) -> Update:
|
|
"""Return the latest state/telemetry snapshot. Thread-safe and cheap;
|
|
intended to be polled once per render frame."""
|
|
return self._latest
|
|
|
|
@staticmethod
|
|
def scan() -> list[str]:
|
|
"""URIs of Crazyflies reachable on any installed interface."""
|
|
try:
|
|
return [entry[0] for entry in cflib.crtp.scan_interfaces()]
|
|
except Exception:
|
|
logger.warning('scan failed', exc_info=True)
|
|
return []
|
|
|
|
def start(self) -> None:
|
|
_install_console_logging()
|
|
cflib.crtp.init_drivers()
|
|
self._thread.start()
|
|
|
|
def connect(self, uri: str | None = None) -> None:
|
|
if uri:
|
|
self._uri = uri
|
|
self._cmd_q.put('connect')
|
|
|
|
def disconnect(self) -> None:
|
|
self._cmd_q.put('disconnect')
|
|
|
|
def arm(self) -> None:
|
|
"""Arm the motors and hold on the ground (two-step flow). The drone
|
|
idles armed in the ARMED state until launch() or disarm(); the firmware
|
|
auto-disarms after ~30 s if it never flies."""
|
|
self._cmd_q.put('arm')
|
|
|
|
def launch(self, height: float | None = None) -> None:
|
|
"""Take off from the ARMED state to `height` metres (clamped), then
|
|
hover. Height is latched here, so the slider can be adjusted right up
|
|
until launch."""
|
|
if height is not None:
|
|
self._target_height = clamp_height(height)
|
|
self._cmd_q.put('launch')
|
|
|
|
def disarm(self) -> None:
|
|
"""Context-sensitive: land if flying, else disarm on the ground."""
|
|
self._cmd_q.put('disarm')
|
|
|
|
def emergency_stop(self) -> None:
|
|
# Queue it so the worker breaks out of any flight loop and updates
|
|
# state, but ALSO fire immediately from the caller thread so motors
|
|
# are cut with the least possible latency.
|
|
self._cmd_q.put('estop')
|
|
with self._lock:
|
|
scf = self._scf
|
|
if scf is not None:
|
|
try:
|
|
self._cut_motors(scf)
|
|
except Exception:
|
|
logger.exception('direct emergency stop failed')
|
|
|
|
def shutdown(self) -> None:
|
|
self._cmd_q.put('shutdown')
|
|
|
|
def alive(self) -> bool:
|
|
return self._thread.is_alive()
|
|
|
|
def join(self, timeout: float | None = None) -> None:
|
|
"""Wait for the worker thread to finish (after shutdown()). Ensures the
|
|
radio link is closed before the process exits so libusb is never torn
|
|
down while the worker is still using it."""
|
|
if self._thread.is_alive():
|
|
self._thread.join(timeout)
|
|
|
|
# ----------------------------------------------------------- worker loop -
|
|
|
|
def _run(self) -> None:
|
|
while True:
|
|
cmd = self._cmd_q.get()
|
|
if cmd == 'shutdown':
|
|
return
|
|
if cmd == 'connect':
|
|
self._session()
|
|
if self._session_result == 'shutdown':
|
|
return
|
|
# any other command while disconnected is ignored
|
|
|
|
def _session(self) -> None:
|
|
"""Open the link and service commands until disconnect/shutdown."""
|
|
self._session_result = ''
|
|
self._set_state(State.CONNECTING, f'Connecting to {self._uri} …')
|
|
try:
|
|
cf = Crazyflie(rw_cache=self._cache_dir)
|
|
with SyncCrazyflie(self._uri, cf=cf) as scf:
|
|
with self._lock:
|
|
self._scf = scf
|
|
self._proto_v7 = self._detect_protocol(scf)
|
|
self._setup_logging(scf)
|
|
self._probe_flow_deck(scf)
|
|
self._set_state(State.READY, 'Connected — ready to arm')
|
|
try:
|
|
self._command_loop(scf)
|
|
finally:
|
|
# Delete our log blocks while the link is still open so the
|
|
# firmware stops streaming them. Skipping this leaves blocks
|
|
# registered in firmware until the next power-cycle, which
|
|
# later shows up as "no LogEntry to handle id=N" warnings.
|
|
self._teardown_logging()
|
|
except Exception as exc:
|
|
logger.exception('session error')
|
|
self._set_state(State.ERROR, f'Connection error: {exc}')
|
|
finally:
|
|
with self._lock:
|
|
self._scf = None
|
|
if self._session_result != 'shutdown':
|
|
self._set_state(State.DISCONNECTED, 'Disconnected')
|
|
|
|
def _command_loop(self, scf: SyncCrazyflie) -> None:
|
|
"""Service commands on the ground (READY / ARMED); delegates the
|
|
airborne phase to _do_launch()."""
|
|
while True:
|
|
# If we were armed on the ground but the firmware disarmed us
|
|
# (≈30 s pre-flight timeout, a bump/tumble, or low battery),
|
|
# fall back to READY so the UI reflects it.
|
|
if (self._state == State.ARMED and self._proto_v7
|
|
and not self._telemetry.armed):
|
|
self._set_state(State.READY,
|
|
'Auto-disarmed (armed too long or unsafe)')
|
|
try:
|
|
cmd = self._cmd_q.get(timeout=0.2)
|
|
except queue.Empty:
|
|
continue
|
|
|
|
if cmd == 'arm':
|
|
if self._state in (State.READY, State.EMERGENCY):
|
|
self._do_arm(scf)
|
|
elif cmd == 'launch':
|
|
if self._state == State.ARMED:
|
|
follow_up = self._do_launch(scf)
|
|
if follow_up in ('disconnect', 'shutdown'):
|
|
self._session_result = follow_up
|
|
return
|
|
elif cmd == 'disarm':
|
|
if self._state == State.ARMED:
|
|
self._do_disarm_ground(scf)
|
|
elif cmd == 'disconnect':
|
|
if self._state == State.ARMED:
|
|
self._do_disarm_ground(scf)
|
|
self._session_result = 'disconnect'
|
|
return
|
|
elif cmd == 'shutdown':
|
|
if self._state == State.ARMED:
|
|
self._do_disarm_ground(scf)
|
|
self._session_result = 'shutdown'
|
|
return
|
|
elif cmd == 'estop':
|
|
self._do_estop(scf)
|
|
# 'connect' while on the ground: nothing to do.
|
|
|
|
# --------------------------------------------------------------- flight --
|
|
|
|
def _do_arm(self, scf: SyncCrazyflie) -> None:
|
|
"""Run pre-flight checks and arm the motors; on success enter ARMED
|
|
(idle on the ground). Does NOT take off — that's _do_launch()."""
|
|
cf = scf.cf
|
|
|
|
# --- pre-flight safety checks -------------------------------------
|
|
if self._telemetry.flow_deck is False:
|
|
self._set_state(State.READY, 'Arm refused: no Flow v2 deck detected')
|
|
return
|
|
ok, why = self._battery_ok()
|
|
if not ok:
|
|
self._set_state(State.READY, f'Arm refused: {why}')
|
|
return
|
|
if self._telemetry.tumbled:
|
|
self._set_state(State.READY,
|
|
'Arm refused: drone is tumbled — set it level')
|
|
return
|
|
if self._telemetry.locked:
|
|
self._set_state(State.READY,
|
|
'Arm refused: supervisor locked — reboot the drone')
|
|
return
|
|
|
|
# --- reset the position estimator and let it settle ---------------
|
|
self._set_state(State.ARMING, 'Resetting position estimator …')
|
|
self._reset_estimator(scf)
|
|
|
|
# --- arm (firmware supervisor) ------------------------------------
|
|
if self._proto_v7:
|
|
# Don't gate on the `canArm` bit: stock firmware builds with
|
|
# AUTO_ARMING=1, where the supervisor auto-arms the instant
|
|
# pre-flight checks pass, so `canArm` is only ever true for a
|
|
# single tick and a client essentially never observes it. The
|
|
# real signal is the `armed` bit. Requesting arming is harmless
|
|
# if already auto-armed (firmware no-ops it).
|
|
cf.supervisor.send_arming_request(True)
|
|
if not self._wait_until(lambda: self._telemetry.armed, 5.0):
|
|
self._set_state(State.READY,
|
|
f'Arm failed: {self._arm_failure_reason()}')
|
|
return
|
|
else:
|
|
# Legacy firmware: unlock the thrust-safety latch with a zero
|
|
# setpoint before the high-level commander will spin the motors.
|
|
cf.commander.send_setpoint(0, 0, 0, 0)
|
|
|
|
self._set_state(State.ARMED, 'Armed — ready to LAUNCH (auto-disarms '
|
|
'in ~30 s if idle)')
|
|
|
|
def _do_disarm_ground(self, scf: SyncCrazyflie) -> None:
|
|
"""Disarm without flying (back out of the ARMED state)."""
|
|
if self._proto_v7:
|
|
scf.cf.supervisor.send_arming_request(False)
|
|
self._set_state(State.READY, 'Disarmed')
|
|
|
|
def _do_launch(self, scf: SyncCrazyflie) -> str | None:
|
|
"""Take off from ARMED to self._target_height, then hover until told to
|
|
stop. Returns 'disconnect'/'shutdown' if such a command arrived
|
|
mid-flight (so the caller tears down after landing), else None."""
|
|
cf = scf.cf
|
|
|
|
# The firmware may have auto-disarmed between ARM and LAUNCH.
|
|
if self._proto_v7 and not self._telemetry.armed:
|
|
self._set_state(State.READY, 'Launch aborted: no longer armed')
|
|
return None
|
|
|
|
height = self._target_height
|
|
ascent_s = ramp_duration(height)
|
|
self._set_state(State.FLYING, f'Taking off to {height:.2f} m …')
|
|
cf.high_level_commander.takeoff(height, ascent_s)
|
|
|
|
ascent_done_at = time.monotonic() + ascent_s
|
|
hovering_announced = False
|
|
while True:
|
|
cmd = self._poll_cmd()
|
|
if cmd == 'estop':
|
|
self._do_estop(scf)
|
|
return None
|
|
if cmd in ('disarm', 'disconnect', 'shutdown'):
|
|
self._land(scf)
|
|
return cmd if cmd in ('disconnect', 'shutdown') else None
|
|
# ignore 'arm'/'launch'/'connect' — already flying.
|
|
|
|
if not hovering_announced and time.monotonic() >= ascent_done_at:
|
|
self._set_state(State.FLYING, f'Hovering at {height:.2f} m')
|
|
hovering_announced = True
|
|
time.sleep(0.05)
|
|
|
|
def _land(self, scf: SyncCrazyflie) -> None:
|
|
"""Controlled descent, then stop the commander and disarm."""
|
|
cf = scf.cf
|
|
descent_s = ramp_duration(self._target_height)
|
|
self._set_state(State.LANDING, 'Landing …')
|
|
cf.high_level_commander.land(0.0, descent_s)
|
|
|
|
end = time.monotonic() + descent_s
|
|
while time.monotonic() < end:
|
|
# Stay responsive to an emergency stop during descent.
|
|
if self._poll_cmd() == 'estop':
|
|
self._do_estop(scf)
|
|
return
|
|
time.sleep(0.05)
|
|
|
|
cf.high_level_commander.stop()
|
|
if self._proto_v7:
|
|
cf.supervisor.send_arming_request(False)
|
|
self._set_state(State.READY, 'Landed — on the ground, disarmed')
|
|
|
|
def _do_estop(self, scf: SyncCrazyflie) -> None:
|
|
try:
|
|
self._cut_motors(scf)
|
|
except Exception:
|
|
logger.exception('emergency stop failed')
|
|
self._set_state(State.EMERGENCY,
|
|
'EMERGENCY STOP — motors cut (reboot to fly again)')
|
|
|
|
def _cut_motors(self, scf: SyncCrazyflie) -> None:
|
|
"""Cut motors immediately. Safe to call from any thread."""
|
|
cf = scf.cf
|
|
if self._proto_v7:
|
|
cf.supervisor.send_emergency_stop()
|
|
else:
|
|
cf.commander.send_stop_setpoint()
|
|
try:
|
|
cf.high_level_commander.stop()
|
|
except Exception:
|
|
pass
|
|
|
|
# ----------------------------------------------------- estimator helper --
|
|
|
|
def _reset_estimator(self, scf: SyncCrazyflie) -> None:
|
|
cf = scf.cf
|
|
cf.param.set_value('kalman.resetEstimation', '1')
|
|
time.sleep(0.1)
|
|
cf.param.set_value('kalman.resetEstimation', '0')
|
|
self._wait_for_position_estimator(scf)
|
|
|
|
def _wait_for_position_estimator(self, scf: SyncCrazyflie,
|
|
threshold: float = 0.001,
|
|
timeout: float = 10.0) -> None:
|
|
"""Block until the Kalman position variance converges (or times out)."""
|
|
log_conf = LogConfig(name='kalman_var', period_in_ms=100)
|
|
log_conf.add_variable('kalman.varPX', 'float')
|
|
log_conf.add_variable('kalman.varPY', 'float')
|
|
log_conf.add_variable('kalman.varPZ', 'float')
|
|
|
|
history = {'x': [1000.0] * 10, 'y': [1000.0] * 10, 'z': [1000.0] * 10}
|
|
deadline = time.monotonic() + timeout
|
|
try:
|
|
with SyncLogger(scf, log_conf) as sync_log:
|
|
for _, data, _ in sync_log:
|
|
for key, var in (('x', 'kalman.varPX'),
|
|
('y', 'kalman.varPY'),
|
|
('z', 'kalman.varPZ')):
|
|
history[key].append(data[var])
|
|
history[key].pop(0)
|
|
spread = max(max(h) - min(h) for h in history.values())
|
|
if spread < threshold:
|
|
return
|
|
if time.monotonic() > deadline:
|
|
logger.warning('estimator did not converge in %.0fs '
|
|
'(spread=%.4f); proceeding', timeout,
|
|
spread)
|
|
return
|
|
except Exception:
|
|
# Don't let a logging hiccup block takeoff entirely.
|
|
logger.exception('estimator convergence wait failed')
|
|
|
|
# ---------------------------------------------------------- log / param --
|
|
|
|
def _detect_protocol(self, scf: SyncCrazyflie) -> bool:
|
|
try:
|
|
return scf.cf.platform.get_protocol_version() >= 7
|
|
except Exception:
|
|
logger.exception('could not read protocol version')
|
|
return False
|
|
|
|
def _setup_logging(self, scf: SyncCrazyflie) -> None:
|
|
self._log_configs = []
|
|
|
|
# Mirror the firmware's console output into the UI console. The cf
|
|
# object is recreated each session, so callbacks don't accumulate.
|
|
self._console_buf = ''
|
|
scf.cf.console.receivedChar.add_callback(self._on_console)
|
|
|
|
tlm = LogConfig(name='mission_tlm', period_in_ms=200)
|
|
tlm.add_variable('pm.vbat', 'float')
|
|
tlm.add_variable('stateEstimate.z', 'float')
|
|
scf.cf.log.add_config(tlm)
|
|
tlm.data_received_cb.add_callback(self._on_telemetry)
|
|
tlm.start()
|
|
self._log_configs.append(tlm)
|
|
|
|
if self._proto_v7:
|
|
sup = LogConfig(name='mission_sup', period_in_ms=200)
|
|
sup.add_variable('supervisor.info', 'uint16_t')
|
|
scf.cf.log.add_config(sup)
|
|
sup.data_received_cb.add_callback(self._on_supervisor)
|
|
sup.start()
|
|
self._log_configs.append(sup)
|
|
|
|
# Drift-inspection block: estimator pose + raw flow counts. motion.*
|
|
# only exists with a Flow deck attached, so set it up best-effort.
|
|
try:
|
|
diag = LogConfig(name='mission_diag', period_in_ms=200)
|
|
diag.add_variable('stateEstimate.x', 'float')
|
|
diag.add_variable('stateEstimate.y', 'float')
|
|
diag.add_variable('stabilizer.yaw', 'float')
|
|
diag.add_variable('motion.deltaX', 'int16_t')
|
|
diag.add_variable('motion.deltaY', 'int16_t')
|
|
scf.cf.log.add_config(diag)
|
|
diag.data_received_cb.add_callback(self._on_diag)
|
|
diag.start()
|
|
self._log_configs.append(diag)
|
|
except (KeyError, AttributeError) as exc:
|
|
logger.warning('drift-inspection logging unavailable: %s', exc)
|
|
|
|
def _teardown_logging(self) -> None:
|
|
"""Stop and delete our log blocks so firmware stops streaming them."""
|
|
for conf in getattr(self, '_log_configs', []):
|
|
try:
|
|
conf.stop()
|
|
conf.delete()
|
|
except Exception:
|
|
logger.exception('failed to delete log config %s', conf.name)
|
|
self._log_configs = []
|
|
|
|
def _on_console(self, text: str) -> None:
|
|
# Firmware console arrives in chunks; emit one log line per newline.
|
|
self._console_buf += text
|
|
while '\n' in self._console_buf:
|
|
line, self._console_buf = self._console_buf.split('\n', 1)
|
|
line = line.rstrip('\r')
|
|
if line:
|
|
_CONSOLE_LOGGER.info(line)
|
|
|
|
def _on_telemetry(self, _ts, data, _logconf) -> None:
|
|
self._telemetry.vbat = data['pm.vbat']
|
|
self._telemetry.height = data['stateEstimate.z']
|
|
self._emit()
|
|
|
|
def _on_diag(self, _ts, data, _logconf) -> None:
|
|
t = self._telemetry
|
|
t.x = data['stateEstimate.x']
|
|
t.y = data['stateEstimate.y']
|
|
t.yaw = data['stabilizer.yaw']
|
|
t.flow_dx = int(data['motion.deltaX'])
|
|
t.flow_dy = int(data['motion.deltaY'])
|
|
self._emit()
|
|
|
|
def _on_supervisor(self, _ts, data, _logconf) -> None:
|
|
info = data['supervisor.info']
|
|
t = self._telemetry
|
|
t.can_arm = bool(info & SUP_CAN_ARM)
|
|
t.armed = bool(info & SUP_IS_ARMED)
|
|
t.flying = bool(info & SUP_IS_FLYING)
|
|
t.tumbled = bool(info & SUP_IS_TUMBLED)
|
|
t.locked = bool(info & SUP_IS_LOCKED)
|
|
t.crashed = bool(info & SUP_IS_CRASHED)
|
|
t.deck_fault = bool(info & SUP_DECK_FAULT)
|
|
self._emit()
|
|
|
|
def _probe_flow_deck(self, scf: SyncCrazyflie) -> None:
|
|
try:
|
|
value = scf.cf.param.get_value('deck.bcFlow2', timeout=5)
|
|
self._telemetry.flow_deck = bool(int(value))
|
|
except Exception:
|
|
logger.exception('could not read deck.bcFlow2')
|
|
self._telemetry.flow_deck = None
|
|
self._emit()
|
|
|
|
def _arm_failure_reason(self) -> str:
|
|
"""Best-effort explanation for why the drone would not arm."""
|
|
t = self._telemetry
|
|
if t.locked:
|
|
return 'supervisor locked — reboot the drone'
|
|
if t.tumbled:
|
|
return 'drone is tumbled — set it level and still'
|
|
if t.deck_fault:
|
|
return 'a deck reported a hardware fault — reseat decks / reboot'
|
|
if t.crashed:
|
|
return 'supervisor in crashed state — flip level to recover'
|
|
# Pre-flight checks never passed; usually means the drone is not level
|
|
# and stationary, or a sensor is still calibrating.
|
|
return ('pre-flight checks did not pass — place the drone level and '
|
|
'stationary, then retry')
|
|
|
|
# --------------------------------------------------------------- utils ---
|
|
|
|
def _battery_ok(self) -> tuple[bool, str]:
|
|
# Wait briefly for the first battery reading after connect.
|
|
end = time.monotonic() + 1.5
|
|
while self._telemetry.vbat == 0.0 and time.monotonic() < end:
|
|
time.sleep(0.05)
|
|
v = self._telemetry.vbat
|
|
if v and v < MIN_SAFE_VBAT:
|
|
return False, f'battery low ({v:.2f} V, need {MIN_SAFE_VBAT:.2f} V)'
|
|
return True, ''
|
|
|
|
def _poll_cmd(self) -> str | None:
|
|
try:
|
|
return self._cmd_q.get_nowait()
|
|
except queue.Empty:
|
|
return None
|
|
|
|
def _wait_until(self, predicate, timeout: float) -> bool:
|
|
end = time.monotonic() + timeout
|
|
while time.monotonic() < end:
|
|
if predicate():
|
|
return True
|
|
time.sleep(0.05)
|
|
return False
|
|
|
|
def _set_state(self, state: State, message: str) -> None:
|
|
self._state = state
|
|
self._message = message
|
|
logger.info('[%s] %s', state.name, message)
|
|
self._emit()
|
|
|
|
def _emit(self) -> None:
|
|
snapshot = Update(
|
|
state=self._state,
|
|
message=self._message,
|
|
telemetry=dataclasses.replace(self._telemetry),
|
|
)
|
|
self._latest = snapshot # atomic publish for snapshot() pollers
|
|
try:
|
|
self._listener(snapshot)
|
|
except Exception:
|
|
logger.exception('listener raised')
|