"""Best-effort point-cloud publisher (producer side of the CloudView protocol). Streams batches of ``(x, y, z)`` points to a CloudView viewer over a stream socket (TCP or Unix) as NDJSON. Design goals: * **Never block the caller.** Points are produced from the cflib log callback / flight thread; those must not stall on the network. ``add_points`` only appends to in-memory buffers and returns immediately. A dedicated socket thread does all I/O, and outgoing batches are dropped (not queued unbounded) if the viewer is slow or absent. * **Survive a missing/restarted viewer.** The socket thread reconnects with backoff, and on every (re)connect it replays the whole retained cloud, so a viewer started *after* the drone still sees the full map. Only the standard library is used, so RangeView gains no heavy dependency; the NDJSON format is the sole contract with the viewer. """ from __future__ import annotations import json import logging import socket import threading import time from collections import deque logger = logging.getLogger(__name__) DEFAULT_ADDRESS = "tcp://127.0.0.1:9870" def _encode(msg: dict) -> bytes: return (json.dumps(msg, separators=(",", ":")) + "\n").encode("utf-8") def _parse_address(addr: str): """('tcp', host, port) or ('unix', path, None). Mirrors cloudview.protocol so the two programs stay independent.""" if addr.startswith("tcp://"): host, _, port = addr[len("tcp://"):].partition(":") return ("tcp", host or "127.0.0.1", int(port or 9870)) if addr.startswith("unix://"): return ("unix", addr[len("unix://"):], None) if addr.startswith("unix:"): return ("unix", addr[len("unix:"):], None) if ":" in addr: host, _, port = addr.partition(":") return ("tcp", host or "127.0.0.1", int(port)) raise ValueError(f"unrecognised address: {addr!r}") class CloudPublisher: """Streams a retained point cloud to a CloudView viewer, best-effort.""" def __init__(self, source: str = "rangeview", name: str = "RangeView", max_points: int = 200_000, max_pending: int = 2000) -> None: self._source = source self._name = name self._lock = threading.Lock() self._points: deque = deque(maxlen=max_points) # retained full cloud self._pending: deque = deque(maxlen=max_pending) # unsent delta batches self._enabled = False self._address: str | None = None self._generation = 0 # bumped on any config change to force reconnect self._connected = False self._stop = False self._thread = threading.Thread(target=self._run, name="cloud-pub", daemon=True) self._thread.start() # ---------------------------------------------------------------- API --- def configure(self, enabled: bool, address: str | None) -> None: """Enable/disable streaming and/or change the viewer address.""" with self._lock: self._enabled = bool(enabled) if address: self._address = address self._generation += 1 # make the socket thread re-evaluate/reconnect @property def connected(self) -> bool: return self._connected def add_points(self, pts) -> None: """Append a batch of (x, y, z) to the retained cloud and queue it for sending. Cheap and non-blocking; safe to call from any thread.""" batch = [(float(x), float(y), float(z)) for x, y, z in pts] if not batch: return with self._lock: self._points.extend(batch) if self._enabled: self._pending.append(batch) def clear(self) -> None: """Drop the retained cloud and tell the viewer to clear this source.""" with self._lock: self._points.clear() if self._enabled: self._pending.append("clear") def close(self) -> None: self._stop = True # ------------------------------------------------------------ internals - def _config(self): with self._lock: return self._enabled, self._address, self._generation def _run(self) -> None: while not self._stop: enabled, address, gen = self._config() if not enabled or not address: self._connected = False time.sleep(0.2) continue try: sock = self._connect(address) except Exception as exc: logger.debug("cloud viewer connect to %s failed: %s", address, exc) time.sleep(1.0) continue self._connected = True logger.info("streaming points to %s", address) try: self._on_connect(sock) self._pump(sock, gen) except Exception as exc: logger.debug("cloud stream to %s ended: %s", address, exc) finally: self._connected = False try: sock.close() except OSError: pass time.sleep(0.3) # backoff before reconnecting @staticmethod def _connect(address: str) -> socket.socket: kind, a, b = _parse_address(address) if kind == "tcp": sock = socket.create_connection((a, b), timeout=2.0) else: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(2.0) sock.connect(a) sock.settimeout(2.0) return sock def _on_connect(self, sock: socket.socket) -> None: """Announce ourselves and replay the full retained cloud, so a freshly connected viewer is immediately consistent. Any deltas queued while we were disconnected are discarded first (the replay already covers them).""" with self._lock: self._pending.clear() points = list(self._points) sock.sendall(_encode({"type": "hello", "source": self._source, "name": self._name})) sock.sendall(_encode({"type": "clear", "source": self._source})) for i in range(0, len(points), 2000): chunk = points[i:i + 2000] sock.sendall(_encode({"type": "points", "source": self._source, "frame": "world", "points": [list(p) for p in chunk]})) def _pump(self, sock: socket.socket, gen: int) -> None: """Send queued batches until disabled, reconfigured, or disconnected.""" while not self._stop: enabled, _addr, cur_gen = self._config() if not enabled or cur_gen != gen: return items = self._drain() if not items: time.sleep(0.02) continue blob = b"".join(self._render(item) for item in items) sock.sendall(blob) def _drain(self) -> list: with self._lock: items = list(self._pending) self._pending.clear() return items def _render(self, item) -> bytes: if item == "clear": return _encode({"type": "clear", "source": self._source}) return _encode({"type": "points", "source": self._source, "frame": "world", "points": [list(p) for p in item]})