Files
cf-autohover/controller.py
K. Isom eed9778e59 Initial checkpoint: Crazyflie mission client (DearPyGui)
Two-step ARM -> LAUNCH -> LAND flight flow for a Crazyflie + Flow v2 deck:

- controller.py: cflib link + flight logic on one worker thread with a
  command queue. Supervisor arming, Kalman estimator reset/convergence
  wait, high-level-commander takeoff/land, configurable hover height
  (0.2-2.0 m), emergency stop, graceful land-on-shutdown, thread-safe
  snapshot()/scan(). States: DISCONNECTED/CONNECTING/READY/ARMING/ARMED/
  FLYING/LANDING/EMERGENCY/ERROR.
- mission_client.py: DearPyGui front-end, manual render loop polling
  snapshot() each frame. Context button ARM(green)->LAUNCH(blue)->LAND
  (yellow), secondary Disarm, hover-height slider, console log, and an
  Estimator state panel (x/y/z, yaw, flow counts) for drift inspection.
- uv-managed (pyproject.toml + uv.lock); run `uv run mission_client.py`.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 15:44:07 -07:00

722 lines
28 KiB
Python

"""Flight controller for the Crazyflie mission client.
Owns the Crazyflie radio link on a single dedicated worker thread and exposes a
small, thread-safe command API to the UI (connect / disconnect / arm / disarm /
emergency_stop / shutdown). Keeping every cflib call on one thread means the UI
thread never blocks on radio I/O and there is exactly one place that can command
the motors.
Flight strategy: this uses the firmware's *high-level commander* (takeoff / land
/ hover) together with the *supervisor* arming system. The high-level commander
holds position autonomously in firmware once airborne, so a stalled or slow UI
thread cannot make the drone drift or fall the way a missed stream of manual
setpoints would. Position/altitude hold is provided by the Flow v2 deck.
The API is intentionally tiny and all mutating commands go through a queue that
only the worker thread consumes, except emergency_stop which *also* fires
directly from the caller for minimum latency.
"""
from __future__ import annotations
import collections
import dataclasses
import logging
import queue
import threading
import time
from dataclasses import dataclass
from enum import Enum, auto
import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.log import LogConfig
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.crazyflie.syncLogger import SyncLogger
from cflib.utils import uri_helper
logger = logging.getLogger(__name__)
class _StaleLogFilter(logging.Filter):
"""Drop cflib's benign "no LogEntry to handle id=N" warnings.
Firmware streams log-data for a block id the host no longer tracks when a
stale block is left over from a prior/parallel session. We already delete
our blocks on disconnect, but a handful of in-flight packets can still
arrive right after reconnecting; they're harmless (power-cycle to clear),
so keep them out of the console rather than alarming the user.
"""
def filter(self, record: logging.LogRecord) -> bool:
return 'no LogEntry to handle' not in record.getMessage()
class LogBuffer(logging.Handler):
"""A thread-safe ring buffer of formatted log lines for the UI console.
cflib logs from its own thread, so reads/writes are guarded by a lock. A
monotonic version counter lets the UI cheaply detect when there's something
new to render.
"""
def __init__(self, capacity: int = 500) -> None:
super().__init__()
self._lines: collections.deque = collections.deque(maxlen=capacity)
self._lock = threading.Lock()
self._version = 0
self.setFormatter(logging.Formatter(
'%(asctime)s %(levelname).1s %(name)s: %(message)s', '%H:%M:%S'))
def emit(self, record: logging.LogRecord) -> None:
try:
line = self.format(record)
except Exception:
return
with self._lock:
self._lines.append(line)
self._version += 1
def snapshot(self) -> tuple[int, list[str]]:
with self._lock:
return self._version, list(self._lines)
def clear(self) -> None:
with self._lock:
self._lines.clear()
self._version += 1
# Single shared console buffer, attached to the root logger once. Firmware
# console output is logged under its own name so it's easy to spot.
LOG_BUFFER = LogBuffer()
_CONSOLE_LOGGER = logging.getLogger('crazyflie.console')
_LOGGING_READY = False
def _install_console_logging() -> None:
"""Route cflib + app logs into LOG_BUFFER for the UI console (idempotent)."""
global _LOGGING_READY
if _LOGGING_READY:
return
logging.getLogger('cflib.crazyflie.log').addFilter(_StaleLogFilter())
root = logging.getLogger()
if root.level == logging.NOTSET or root.level > logging.INFO:
root.setLevel(logging.INFO)
LOG_BUFFER.setLevel(logging.INFO)
root.addHandler(LOG_BUFFER)
_LOGGING_READY = True
# Radio URI. Override with the CFLIB_URI environment variable.
DEFAULT_URI = uri_helper.uri_from_env(default='radio://0/80/2M/E7E7E7E7E7')
# Mission parameters.
TARGET_HEIGHT_M = 1.0 # default hover height above the ground
MIN_HEIGHT_M = 0.2 # configurable hover-height bounds (Flow v2 range)
MAX_HEIGHT_M = 2.0
CLIMB_RATE_MPS = 0.35 # ascent/descent speed used to time takeoff & land
MIN_RAMP_S = 1.5 # floor on takeoff/land duration for low hovers
MIN_SAFE_VBAT = 3.10 # refuse to arm below this pack voltage
def clamp_height(height: float) -> float:
"""Clamp a requested hover height to the safe configurable range."""
return max(MIN_HEIGHT_M, min(MAX_HEIGHT_M, float(height)))
def ramp_duration(height: float) -> float:
"""Time to climb/descend `height` metres at a gentle, consistent rate."""
return max(MIN_RAMP_S, abs(height) / CLIMB_RATE_MPS)
# supervisor.info bitfield (see crazyflie-firmware src/modules/src/supervisor.c).
SUP_CAN_ARM = 0x0001
SUP_IS_ARMED = 0x0002
SUP_AUTO_ARM = 0x0004
SUP_CAN_FLY = 0x0008
SUP_IS_FLYING = 0x0010
SUP_IS_TUMBLED = 0x0020
SUP_IS_LOCKED = 0x0040
SUP_IS_CRASHED = 0x0080
SUP_DECK_FAULT = 0x0800 # a deck reported a hardware fault
class State(Enum):
"""High-level mission state, surfaced to the UI."""
DISCONNECTED = auto()
CONNECTING = auto()
READY = auto() # connected, on the ground, disarmed
ARMING = auto() # transient: estimator reset + arming request
ARMED = auto() # armed and idle on the ground, ready to LAUNCH
FLYING = auto() # taking off or hovering at target height
LANDING = auto() # controlled descent in progress
EMERGENCY = auto() # motors cut; reboot required to fly again
ERROR = auto() # connection or radio failure
@dataclass
class Telemetry:
"""Latest known drone state. Plain values; safe to copy across threads."""
vbat: float = 0.0
height: float = 0.0
# Estimator pose + raw optical-flow counts, for drift inspection.
x: float = 0.0
y: float = 0.0
yaw: float = 0.0
flow_dx: int = 0
flow_dy: int = 0
flow_deck: bool | None = None # None = not yet probed
can_arm: bool = False
armed: bool = False
flying: bool = False
tumbled: bool = False
locked: bool = False
crashed: bool = False
deck_fault: bool = False
@dataclass
class Update:
"""Immutable snapshot handed to the UI listener."""
state: State
message: str
telemetry: Telemetry
class DroneController:
"""Threaded Crazyflie flight controller.
Usage:
c = DroneController(uri, listener=on_update)
c.start() # init radio drivers + spawn worker
c.connect() # open link
c.arm() # reset estimator + arm; idle armed on the ground
c.launch(1.0) # take off to 1 m and hover
c.disarm() # land if flying, else disarm on the ground
c.emergency_stop() # cut motors immediately
c.shutdown() # land if needed, close link, stop worker
"""
def __init__(self, uri: str = DEFAULT_URI, listener=None,
cache_dir: str = './cache'):
self._uri = uri
self._listener = listener or (lambda update: None)
self._cache_dir = cache_dir
self._cmd_q: queue.Queue[str] = queue.Queue()
self._target_height = TARGET_HEIGHT_M # latched at arm time
self._state = State.DISCONNECTED
self._message = 'Disconnected'
self._telemetry = Telemetry()
# Latest immutable snapshot, replaced atomically in _emit() and read
# without a lock by per-frame UIs via snapshot().
self._latest = Update(State.DISCONNECTED, 'Disconnected', Telemetry())
self._scf: SyncCrazyflie | None = None
self._console_buf = '' # partial firmware console line
self._log_configs: list = [] # active log blocks, deleted on teardown
self._proto_v7 = False # supervisor/arming available?
self._session_result = '' # 'disconnect' | 'shutdown' | ''
self._lock = threading.Lock() # guards _scf access from other threads
self._thread = threading.Thread(
target=self._run, name='drone-worker', daemon=True)
# ----------------------------------------------------------------- API ---
@property
def uri(self) -> str:
return self._uri
def set_listener(self, listener) -> None:
"""Register an optional push callback. Per-frame UIs can instead poll
snapshot(); the listener is kept for event-driven consumers."""
self._listener = listener or (lambda update: None)
def snapshot(self) -> Update:
"""Return the latest state/telemetry snapshot. Thread-safe and cheap;
intended to be polled once per render frame."""
return self._latest
@staticmethod
def scan() -> list[str]:
"""URIs of Crazyflies reachable on any installed interface."""
try:
return [entry[0] for entry in cflib.crtp.scan_interfaces()]
except Exception:
logger.warning('scan failed', exc_info=True)
return []
def start(self) -> None:
_install_console_logging()
cflib.crtp.init_drivers()
self._thread.start()
def connect(self, uri: str | None = None) -> None:
if uri:
self._uri = uri
self._cmd_q.put('connect')
def disconnect(self) -> None:
self._cmd_q.put('disconnect')
def arm(self) -> None:
"""Arm the motors and hold on the ground (two-step flow). The drone
idles armed in the ARMED state until launch() or disarm(); the firmware
auto-disarms after ~30 s if it never flies."""
self._cmd_q.put('arm')
def launch(self, height: float | None = None) -> None:
"""Take off from the ARMED state to `height` metres (clamped), then
hover. Height is latched here, so the slider can be adjusted right up
until launch."""
if height is not None:
self._target_height = clamp_height(height)
self._cmd_q.put('launch')
def disarm(self) -> None:
"""Context-sensitive: land if flying, else disarm on the ground."""
self._cmd_q.put('disarm')
def emergency_stop(self) -> None:
# Queue it so the worker breaks out of any flight loop and updates
# state, but ALSO fire immediately from the caller thread so motors
# are cut with the least possible latency.
self._cmd_q.put('estop')
with self._lock:
scf = self._scf
if scf is not None:
try:
self._cut_motors(scf)
except Exception:
logger.exception('direct emergency stop failed')
def shutdown(self) -> None:
self._cmd_q.put('shutdown')
def alive(self) -> bool:
return self._thread.is_alive()
def join(self, timeout: float | None = None) -> None:
"""Wait for the worker thread to finish (after shutdown()). Ensures the
radio link is closed before the process exits so libusb is never torn
down while the worker is still using it."""
if self._thread.is_alive():
self._thread.join(timeout)
# ----------------------------------------------------------- worker loop -
def _run(self) -> None:
while True:
cmd = self._cmd_q.get()
if cmd == 'shutdown':
return
if cmd == 'connect':
self._session()
if self._session_result == 'shutdown':
return
# any other command while disconnected is ignored
def _session(self) -> None:
"""Open the link and service commands until disconnect/shutdown."""
self._session_result = ''
self._set_state(State.CONNECTING, f'Connecting to {self._uri}')
try:
cf = Crazyflie(rw_cache=self._cache_dir)
with SyncCrazyflie(self._uri, cf=cf) as scf:
with self._lock:
self._scf = scf
self._proto_v7 = self._detect_protocol(scf)
self._setup_logging(scf)
self._probe_flow_deck(scf)
self._set_state(State.READY, 'Connected — ready to arm')
try:
self._command_loop(scf)
finally:
# Delete our log blocks while the link is still open so the
# firmware stops streaming them. Skipping this leaves blocks
# registered in firmware until the next power-cycle, which
# later shows up as "no LogEntry to handle id=N" warnings.
self._teardown_logging()
except Exception as exc:
logger.exception('session error')
self._set_state(State.ERROR, f'Connection error: {exc}')
finally:
with self._lock:
self._scf = None
if self._session_result != 'shutdown':
self._set_state(State.DISCONNECTED, 'Disconnected')
def _command_loop(self, scf: SyncCrazyflie) -> None:
"""Service commands on the ground (READY / ARMED); delegates the
airborne phase to _do_launch()."""
while True:
# If we were armed on the ground but the firmware disarmed us
# (≈30 s pre-flight timeout, a bump/tumble, or low battery),
# fall back to READY so the UI reflects it.
if (self._state == State.ARMED and self._proto_v7
and not self._telemetry.armed):
self._set_state(State.READY,
'Auto-disarmed (armed too long or unsafe)')
try:
cmd = self._cmd_q.get(timeout=0.2)
except queue.Empty:
continue
if cmd == 'arm':
if self._state in (State.READY, State.EMERGENCY):
self._do_arm(scf)
elif cmd == 'launch':
if self._state == State.ARMED:
follow_up = self._do_launch(scf)
if follow_up in ('disconnect', 'shutdown'):
self._session_result = follow_up
return
elif cmd == 'disarm':
if self._state == State.ARMED:
self._do_disarm_ground(scf)
elif cmd == 'disconnect':
if self._state == State.ARMED:
self._do_disarm_ground(scf)
self._session_result = 'disconnect'
return
elif cmd == 'shutdown':
if self._state == State.ARMED:
self._do_disarm_ground(scf)
self._session_result = 'shutdown'
return
elif cmd == 'estop':
self._do_estop(scf)
# 'connect' while on the ground: nothing to do.
# --------------------------------------------------------------- flight --
def _do_arm(self, scf: SyncCrazyflie) -> None:
"""Run pre-flight checks and arm the motors; on success enter ARMED
(idle on the ground). Does NOT take off — that's _do_launch()."""
cf = scf.cf
# --- pre-flight safety checks -------------------------------------
if self._telemetry.flow_deck is False:
self._set_state(State.READY, 'Arm refused: no Flow v2 deck detected')
return
ok, why = self._battery_ok()
if not ok:
self._set_state(State.READY, f'Arm refused: {why}')
return
if self._telemetry.tumbled:
self._set_state(State.READY,
'Arm refused: drone is tumbled — set it level')
return
if self._telemetry.locked:
self._set_state(State.READY,
'Arm refused: supervisor locked — reboot the drone')
return
# --- reset the position estimator and let it settle ---------------
self._set_state(State.ARMING, 'Resetting position estimator …')
self._reset_estimator(scf)
# --- arm (firmware supervisor) ------------------------------------
if self._proto_v7:
# Don't gate on the `canArm` bit: stock firmware builds with
# AUTO_ARMING=1, where the supervisor auto-arms the instant
# pre-flight checks pass, so `canArm` is only ever true for a
# single tick and a client essentially never observes it. The
# real signal is the `armed` bit. Requesting arming is harmless
# if already auto-armed (firmware no-ops it).
cf.supervisor.send_arming_request(True)
if not self._wait_until(lambda: self._telemetry.armed, 5.0):
self._set_state(State.READY,
f'Arm failed: {self._arm_failure_reason()}')
return
else:
# Legacy firmware: unlock the thrust-safety latch with a zero
# setpoint before the high-level commander will spin the motors.
cf.commander.send_setpoint(0, 0, 0, 0)
self._set_state(State.ARMED, 'Armed — ready to LAUNCH (auto-disarms '
'in ~30 s if idle)')
def _do_disarm_ground(self, scf: SyncCrazyflie) -> None:
"""Disarm without flying (back out of the ARMED state)."""
if self._proto_v7:
scf.cf.supervisor.send_arming_request(False)
self._set_state(State.READY, 'Disarmed')
def _do_launch(self, scf: SyncCrazyflie) -> str | None:
"""Take off from ARMED to self._target_height, then hover until told to
stop. Returns 'disconnect'/'shutdown' if such a command arrived
mid-flight (so the caller tears down after landing), else None."""
cf = scf.cf
# The firmware may have auto-disarmed between ARM and LAUNCH.
if self._proto_v7 and not self._telemetry.armed:
self._set_state(State.READY, 'Launch aborted: no longer armed')
return None
height = self._target_height
ascent_s = ramp_duration(height)
self._set_state(State.FLYING, f'Taking off to {height:.2f} m …')
cf.high_level_commander.takeoff(height, ascent_s)
ascent_done_at = time.monotonic() + ascent_s
hovering_announced = False
while True:
cmd = self._poll_cmd()
if cmd == 'estop':
self._do_estop(scf)
return None
if cmd in ('disarm', 'disconnect', 'shutdown'):
self._land(scf)
return cmd if cmd in ('disconnect', 'shutdown') else None
# ignore 'arm'/'launch'/'connect' — already flying.
if not hovering_announced and time.monotonic() >= ascent_done_at:
self._set_state(State.FLYING, f'Hovering at {height:.2f} m')
hovering_announced = True
time.sleep(0.05)
def _land(self, scf: SyncCrazyflie) -> None:
"""Controlled descent, then stop the commander and disarm."""
cf = scf.cf
descent_s = ramp_duration(self._target_height)
self._set_state(State.LANDING, 'Landing …')
cf.high_level_commander.land(0.0, descent_s)
end = time.monotonic() + descent_s
while time.monotonic() < end:
# Stay responsive to an emergency stop during descent.
if self._poll_cmd() == 'estop':
self._do_estop(scf)
return
time.sleep(0.05)
cf.high_level_commander.stop()
if self._proto_v7:
cf.supervisor.send_arming_request(False)
self._set_state(State.READY, 'Landed — on the ground, disarmed')
def _do_estop(self, scf: SyncCrazyflie) -> None:
try:
self._cut_motors(scf)
except Exception:
logger.exception('emergency stop failed')
self._set_state(State.EMERGENCY,
'EMERGENCY STOP — motors cut (reboot to fly again)')
def _cut_motors(self, scf: SyncCrazyflie) -> None:
"""Cut motors immediately. Safe to call from any thread."""
cf = scf.cf
if self._proto_v7:
cf.supervisor.send_emergency_stop()
else:
cf.commander.send_stop_setpoint()
try:
cf.high_level_commander.stop()
except Exception:
pass
# ----------------------------------------------------- estimator helper --
def _reset_estimator(self, scf: SyncCrazyflie) -> None:
cf = scf.cf
cf.param.set_value('kalman.resetEstimation', '1')
time.sleep(0.1)
cf.param.set_value('kalman.resetEstimation', '0')
self._wait_for_position_estimator(scf)
def _wait_for_position_estimator(self, scf: SyncCrazyflie,
threshold: float = 0.001,
timeout: float = 10.0) -> None:
"""Block until the Kalman position variance converges (or times out)."""
log_conf = LogConfig(name='kalman_var', period_in_ms=100)
log_conf.add_variable('kalman.varPX', 'float')
log_conf.add_variable('kalman.varPY', 'float')
log_conf.add_variable('kalman.varPZ', 'float')
history = {'x': [1000.0] * 10, 'y': [1000.0] * 10, 'z': [1000.0] * 10}
deadline = time.monotonic() + timeout
try:
with SyncLogger(scf, log_conf) as sync_log:
for _, data, _ in sync_log:
for key, var in (('x', 'kalman.varPX'),
('y', 'kalman.varPY'),
('z', 'kalman.varPZ')):
history[key].append(data[var])
history[key].pop(0)
spread = max(max(h) - min(h) for h in history.values())
if spread < threshold:
return
if time.monotonic() > deadline:
logger.warning('estimator did not converge in %.0fs '
'(spread=%.4f); proceeding', timeout,
spread)
return
except Exception:
# Don't let a logging hiccup block takeoff entirely.
logger.exception('estimator convergence wait failed')
# ---------------------------------------------------------- log / param --
def _detect_protocol(self, scf: SyncCrazyflie) -> bool:
try:
return scf.cf.platform.get_protocol_version() >= 7
except Exception:
logger.exception('could not read protocol version')
return False
def _setup_logging(self, scf: SyncCrazyflie) -> None:
self._log_configs = []
# Mirror the firmware's console output into the UI console. The cf
# object is recreated each session, so callbacks don't accumulate.
self._console_buf = ''
scf.cf.console.receivedChar.add_callback(self._on_console)
tlm = LogConfig(name='mission_tlm', period_in_ms=200)
tlm.add_variable('pm.vbat', 'float')
tlm.add_variable('stateEstimate.z', 'float')
scf.cf.log.add_config(tlm)
tlm.data_received_cb.add_callback(self._on_telemetry)
tlm.start()
self._log_configs.append(tlm)
if self._proto_v7:
sup = LogConfig(name='mission_sup', period_in_ms=200)
sup.add_variable('supervisor.info', 'uint16_t')
scf.cf.log.add_config(sup)
sup.data_received_cb.add_callback(self._on_supervisor)
sup.start()
self._log_configs.append(sup)
# Drift-inspection block: estimator pose + raw flow counts. motion.*
# only exists with a Flow deck attached, so set it up best-effort.
try:
diag = LogConfig(name='mission_diag', period_in_ms=200)
diag.add_variable('stateEstimate.x', 'float')
diag.add_variable('stateEstimate.y', 'float')
diag.add_variable('stabilizer.yaw', 'float')
diag.add_variable('motion.deltaX', 'int16_t')
diag.add_variable('motion.deltaY', 'int16_t')
scf.cf.log.add_config(diag)
diag.data_received_cb.add_callback(self._on_diag)
diag.start()
self._log_configs.append(diag)
except (KeyError, AttributeError) as exc:
logger.warning('drift-inspection logging unavailable: %s', exc)
def _teardown_logging(self) -> None:
"""Stop and delete our log blocks so firmware stops streaming them."""
for conf in getattr(self, '_log_configs', []):
try:
conf.stop()
conf.delete()
except Exception:
logger.exception('failed to delete log config %s', conf.name)
self._log_configs = []
def _on_console(self, text: str) -> None:
# Firmware console arrives in chunks; emit one log line per newline.
self._console_buf += text
while '\n' in self._console_buf:
line, self._console_buf = self._console_buf.split('\n', 1)
line = line.rstrip('\r')
if line:
_CONSOLE_LOGGER.info(line)
def _on_telemetry(self, _ts, data, _logconf) -> None:
self._telemetry.vbat = data['pm.vbat']
self._telemetry.height = data['stateEstimate.z']
self._emit()
def _on_diag(self, _ts, data, _logconf) -> None:
t = self._telemetry
t.x = data['stateEstimate.x']
t.y = data['stateEstimate.y']
t.yaw = data['stabilizer.yaw']
t.flow_dx = int(data['motion.deltaX'])
t.flow_dy = int(data['motion.deltaY'])
self._emit()
def _on_supervisor(self, _ts, data, _logconf) -> None:
info = data['supervisor.info']
t = self._telemetry
t.can_arm = bool(info & SUP_CAN_ARM)
t.armed = bool(info & SUP_IS_ARMED)
t.flying = bool(info & SUP_IS_FLYING)
t.tumbled = bool(info & SUP_IS_TUMBLED)
t.locked = bool(info & SUP_IS_LOCKED)
t.crashed = bool(info & SUP_IS_CRASHED)
t.deck_fault = bool(info & SUP_DECK_FAULT)
self._emit()
def _probe_flow_deck(self, scf: SyncCrazyflie) -> None:
try:
value = scf.cf.param.get_value('deck.bcFlow2', timeout=5)
self._telemetry.flow_deck = bool(int(value))
except Exception:
logger.exception('could not read deck.bcFlow2')
self._telemetry.flow_deck = None
self._emit()
def _arm_failure_reason(self) -> str:
"""Best-effort explanation for why the drone would not arm."""
t = self._telemetry
if t.locked:
return 'supervisor locked — reboot the drone'
if t.tumbled:
return 'drone is tumbled — set it level and still'
if t.deck_fault:
return 'a deck reported a hardware fault — reseat decks / reboot'
if t.crashed:
return 'supervisor in crashed state — flip level to recover'
# Pre-flight checks never passed; usually means the drone is not level
# and stationary, or a sensor is still calibrating.
return ('pre-flight checks did not pass — place the drone level and '
'stationary, then retry')
# --------------------------------------------------------------- utils ---
def _battery_ok(self) -> tuple[bool, str]:
# Wait briefly for the first battery reading after connect.
end = time.monotonic() + 1.5
while self._telemetry.vbat == 0.0 and time.monotonic() < end:
time.sleep(0.05)
v = self._telemetry.vbat
if v and v < MIN_SAFE_VBAT:
return False, f'battery low ({v:.2f} V, need {MIN_SAFE_VBAT:.2f} V)'
return True, ''
def _poll_cmd(self) -> str | None:
try:
return self._cmd_q.get_nowait()
except queue.Empty:
return None
def _wait_until(self, predicate, timeout: float) -> bool:
end = time.monotonic() + timeout
while time.monotonic() < end:
if predicate():
return True
time.sleep(0.05)
return False
def _set_state(self, state: State, message: str) -> None:
self._state = state
self._message = message
logger.info('[%s] %s', state.name, message)
self._emit()
def _emit(self) -> None:
snapshot = Update(
state=self._state,
message=self._message,
telemetry=dataclasses.replace(self._telemetry),
)
self._latest = snapshot # atomic publish for snapshot() pollers
try:
self._listener(snapshot)
except Exception:
logger.exception('listener raised')