Initial commit: CloudView generic networked point-cloud viewer

Standalone Open3D viewer that listens on a TCP/Unix socket and renders NDJSON point-cloud streams from any producer. Decoupled server/protocol/store layers (no Open3D dependency, testable headless) plus a lazy Open3D render loop.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-24 14:09:11 -07:00
commit 54a01625ce
13 changed files with 3373 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
.venv/
__pycache__/
*.pyc
.cache/

88
README.md Normal file
View File

@@ -0,0 +1,88 @@
# CloudView
A small, **generic** point-cloud viewer. It listens on a socket, accepts
newline-delimited JSON (NDJSON) from any number of producers, and renders the
combined cloud in an interactive [Open3D](https://www.open3d.org/) window. It is
not tied to any one robot: anything that can write JSON lines to a socket — a
Crazyflie, a rover, a LIDAR rig, a log replay — gets live 3D visualization for
free.
RangeView is the first producer (its 2D map stays the at-a-glance live
diagnostic; the full 3D cloud is streamed here), but the wire format is the only
contract, so other systems can feed the same viewer.
```
producers (any language) viewer
┌───────────────────────┐ NDJSON over ┌──────────────────────┐
│ rangeview ───────────┼── tcp:// ────────▶ CloudView (Open3D) │
│ rover-2 ───────────┼── unix:// ───────▶ - merges all sources │
│ lidar-rig ───────────┼──────────────────▶ - colour by height │
└───────────────────────┘ └──────────────────────┘
```
## Install & run
```sh
# with uv (resolves from the committed uv.lock)
uv run main.py # listen on tcp://127.0.0.1:9870
uv run main.py -l tcp://0.0.0.0:9870 # accept from other machines
# or a plain venv
python -m venv .venv && . .venv/bin/activate
pip install -r requirements.txt # open3d + numpy
python -m cloudview
```
Listen addresses (repeatable, TCP and/or Unix):
```sh
uv run main.py -l tcp://0.0.0.0:9870 -l unix:///tmp/cloud.sock
```
Viewer keys: **C** toggle colour (by height / per-source), **R** refit the
camera, **B** cycle background, mouse to orbit/zoom/pan.
## Wire protocol (NDJSON)
One JSON object per line over a stream socket. `source` keys a distinct cloud,
so one viewer shows several robots at once. Coordinates are floats in whatever
frame the producer uses.
| message | fields | effect |
|---------|--------|--------|
| `hello` | `source`, `name` | announce/label a stream |
| `points` | `source`, `points` (`[[x,y,z],...]`), `frame`, `color` (optional `[r,g,b]` 0..1) | append a batch |
| `clear` | `source` | drop that source's cloud |
| `pose` | `source`, `position` `[x,y,z]`, `yaw` (deg) | record robot pose |
Points are batched per message to keep overhead low. If a source supplies no
`color`, the viewer colours its points by height; otherwise the given colour is
used as a solid.
### Producing from any language
It is just JSON lines — test it with `nc`:
```sh
printf '%s\n' \
'{"type":"hello","source":"demo","name":"nc test"}' \
'{"type":"points","source":"demo","points":[[0,0,0],[1,0,0.5],[0,1,1]]}' \
| nc 127.0.0.1 9870
```
Python producers can reuse RangeView's `cloud_publisher.CloudPublisher` (a
non-blocking client with auto-reconnect and full-cloud replay), or just write
`json.dumps(msg) + "\n"` to a socket.
## Layout
- `cloudview/protocol.py` - NDJSON encode/decode + address parsing.
- `cloudview/store.py` - thread-safe per-source cloud store.
- `cloudview/server.py` - TCP/Unix socket server; one reader thread per producer.
- `cloudview/viewer.py` - Open3D render loop (height/per-source colouring).
- `cloudview/app.py` - CLI entry point.
Networking and rendering are decoupled: the server fills the store from any
producer; the viewer polls the store and rebuilds geometry only when it changes.
The server/protocol layers have no Open3D dependency, so they are testable
headless.

3
cloudview/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""cloudview: a generic networked point-cloud viewer (NDJSON over a socket)."""
__version__ = "0.1.0"

6
cloudview/__main__.py Normal file
View File

@@ -0,0 +1,6 @@
"""Allow `python -m cloudview`."""
from .app import main
if __name__ == "__main__":
main()

47
cloudview/app.py Normal file
View File

@@ -0,0 +1,47 @@
"""Entry point: start the socket server, then run the Open3D viewer.
python -m cloudview # listen on tcp://127.0.0.1:9870
python -m cloudview -l tcp://0.0.0.0:9870 # accept from other machines
python -m cloudview -l unix:///tmp/cloud.sock # local Unix socket
python -m cloudview -l tcp://0.0.0.0:9870 -l unix:///tmp/cloud.sock # both
"""
from __future__ import annotations
import argparse
import logging
from .protocol import DEFAULT_TCP
from .server import CloudServer
from .store import CloudStore
from .viewer import run_viewer
def main() -> None:
ap = argparse.ArgumentParser(
description="Generic networked point-cloud viewer (NDJSON over a "
"stream socket).")
ap.add_argument("-l", "--listen", action="append", metavar="ADDR",
help="address to listen on (tcp://host:port or "
"unix:///path); repeatable. Default tcp://127.0.0.1:9870")
ap.add_argument("--max-points", type=int, default=500_000,
help="cap per source before oldest points are dropped")
ap.add_argument("-v", "--verbose", action="store_true")
args = ap.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname).1s %(name)s: %(message)s",
datefmt="%H:%M:%S")
store = CloudStore(max_points_per_source=args.max_points)
server = CloudServer(store, args.listen or [DEFAULT_TCP])
server.start()
try:
run_viewer(store)
finally:
server.stop()
if __name__ == "__main__":
main()

76
cloudview/protocol.py Normal file
View File

@@ -0,0 +1,76 @@
"""NDJSON wire protocol for the generic point-cloud stream.
One JSON object per line over a stream socket (TCP or Unix domain). Newline
delimited so it is language-agnostic, debuggable with ``nc``, and trivial to
emit from any system. Message types (``source`` keys a distinct cloud, so one
viewer can show several robots at once):
{"type":"hello","source":"<id>","name":"<label>"}
{"type":"points","source":"<id>","frame":"world",
"points":[[x,y,z], ...], "color":[r,g,b]} # color optional, 0..1
{"type":"clear","source":"<id>"}
{"type":"pose","source":"<id>","position":[x,y,z],"yaw":<deg>}
Coordinates are floats in whatever frame the producer chooses (RangeView uses a
world frame with the origin at the estimator reset, +x forward, +y left, +z up).
Points are batched per message to keep per-line overhead low.
"""
from __future__ import annotations
import json
DEFAULT_TCP_PORT = 9870
DEFAULT_TCP = f"tcp://127.0.0.1:{DEFAULT_TCP_PORT}"
def encode(msg: dict) -> bytes:
"""Serialise one message to a single NDJSON line (bytes)."""
return (json.dumps(msg, separators=(",", ":")) + "\n").encode("utf-8")
class LineDecoder:
"""Accumulate stream bytes and yield parsed JSON objects per newline."""
def __init__(self, max_line: int = 64 * 1024 * 1024) -> None:
self._buf = bytearray()
self._max = max_line
def feed(self, data: bytes) -> list[dict]:
"""Add received bytes; return any newly-complete messages (in order).
Blank lines are skipped; a malformed line raises ValueError so the caller
can drop the connection.
"""
self._buf.extend(data)
if len(self._buf) > self._max:
raise ValueError("NDJSON line exceeded maximum length")
out: list[dict] = []
while True:
nl = self._buf.find(b"\n")
if nl < 0:
break
line = bytes(self._buf[:nl]).strip()
del self._buf[:nl + 1]
if line:
out.append(json.loads(line.decode("utf-8")))
return out
def parse_address(addr: str) -> tuple[str, str, int | None]:
"""Parse an address into ('tcp', host, port) or ('unix', path, None).
Accepts ``tcp://host:port``, ``unix:///abs/path`` (or ``unix:/path``), and a
bare ``host:port`` (treated as TCP).
"""
if addr.startswith("tcp://"):
host, _, port = addr[len("tcp://"):].partition(":")
return ("tcp", host or "127.0.0.1", int(port or DEFAULT_TCP_PORT))
if addr.startswith("unix://"):
return ("unix", addr[len("unix://"):], None)
if addr.startswith("unix:"):
return ("unix", addr[len("unix:"):], None)
if ":" in addr:
host, _, port = addr.partition(":")
return ("tcp", host or "127.0.0.1", int(port))
raise ValueError(f"unrecognised address: {addr!r}")

113
cloudview/server.py Normal file
View File

@@ -0,0 +1,113 @@
"""Stream-socket server that feeds a CloudStore from NDJSON producers.
Listens on one or more addresses (TCP and/or Unix), accepts any number of
producers concurrently, and applies their messages to a shared store. All
networking is on background threads; the render loop reads the store separately.
"""
from __future__ import annotations
import logging
import os
import socket
import threading
from .protocol import LineDecoder, parse_address
from .store import CloudStore
logger = logging.getLogger(__name__)
class CloudServer:
def __init__(self, store: CloudStore, addresses: list[str]) -> None:
self._store = store
self._addresses = addresses
self._listeners: list[socket.socket] = []
self._unix_paths: list[str] = []
self._stop = threading.Event()
# Actual bound endpoints (resolves ephemeral ":0" ports for callers/tests).
self.bound: list[tuple[str, str, int | None]] = []
def start(self) -> None:
for addr in self._addresses:
kind, a, b = parse_address(addr)
if kind == "tcp":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((a, b))
host, port = sock.getsockname()[:2]
self.bound.append(("tcp", host, port))
else:
# Replace a stale socket file from a prior run.
if os.path.exists(a):
os.unlink(a)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(a)
self._unix_paths.append(a)
self.bound.append(("unix", a, None))
sock.listen(8)
sock.settimeout(0.5)
self._listeners.append(sock)
threading.Thread(target=self._accept_loop, args=(sock, addr),
name=f"accept[{addr}]", daemon=True).start()
logger.info("listening on %s", addr)
def _accept_loop(self, listener: socket.socket, addr: str) -> None:
while not self._stop.is_set():
try:
conn, _ = listener.accept()
except socket.timeout:
continue
except OSError:
break
threading.Thread(target=self._serve_client, args=(conn, addr),
name=f"client[{addr}]", daemon=True).start()
def _serve_client(self, conn: socket.socket, addr: str) -> None:
logger.info("producer connected on %s", addr)
conn.settimeout(1.0)
decoder = LineDecoder()
try:
while not self._stop.is_set():
try:
data = conn.recv(65536)
except socket.timeout:
continue
if not data:
break
for msg in decoder.feed(data):
self._apply(msg)
except Exception:
logger.exception("error serving producer on %s", addr)
finally:
conn.close()
logger.info("producer disconnected (%s)", addr)
def _apply(self, msg: dict) -> None:
kind = msg.get("type")
source = msg.get("source", "default")
if kind == "points":
self._store.add_points(source, msg.get("points", []),
msg.get("color"))
elif kind == "hello":
self._store.hello(source, msg.get("name"))
elif kind == "clear":
self._store.clear(source)
elif kind == "pose":
self._store.set_pose(source, msg.get("position", [0, 0, 0]),
msg.get("yaw"))
else:
logger.debug("ignoring unknown message type %r", kind)
def stop(self) -> None:
self._stop.set()
for sock in self._listeners:
try:
sock.close()
except OSError:
pass
for path in self._unix_paths:
try:
os.unlink(path)
except OSError:
pass

76
cloudview/store.py Normal file
View File

@@ -0,0 +1,76 @@
"""Thread-safe store of point clouds, keyed by source.
Network reader threads write into it; the render loop reads consistent
snapshots. A monotonic version counter lets the renderer cheaply detect change.
"""
from __future__ import annotations
import threading
import numpy as np
class CloudStore:
"""Holds one growing point cloud per source id, plus optional pose/colour."""
def __init__(self, max_points_per_source: int = 500_000) -> None:
self._lock = threading.Lock()
self._sources: dict[str, dict] = {}
self._version = 0
self._max = max_points_per_source
def hello(self, source: str, name: str | None = None) -> None:
with self._lock:
s = self._sources.setdefault(source, self._blank())
if name:
s["name"] = name
self._version += 1
def add_points(self, source: str, pts, color=None) -> None:
arr = np.asarray(pts, dtype=np.float32).reshape(-1, 3)
if arr.shape[0] == 0:
return
with self._lock:
s = self._sources.setdefault(source, self._blank())
s["points"] = np.vstack([s["points"], arr])[-self._max:]
if color is not None:
s["color"] = tuple(float(c) for c in color)
self._version += 1
def clear(self, source: str) -> None:
with self._lock:
if source in self._sources:
name = self._sources[source].get("name")
self._sources[source] = self._blank(name)
self._version += 1
def set_pose(self, source: str, position, yaw=None) -> None:
with self._lock:
s = self._sources.setdefault(source, self._blank())
s["pose"] = (tuple(float(v) for v in position),
None if yaw is None else float(yaw))
self._version += 1
def version(self) -> int:
with self._lock:
return self._version
def snapshot(self) -> tuple[int, dict[str, dict]]:
"""Return (version, {source: {points, color, name, pose}}) with copied
point arrays so the renderer can read them without holding the lock."""
with self._lock:
return self._version, {
src: {
"points": d["points"].copy(),
"color": d["color"],
"name": d["name"],
"pose": d["pose"],
}
for src, d in self._sources.items()
}
@staticmethod
def _blank(name: str | None = None) -> dict:
return {"points": np.empty((0, 3), np.float32), "color": None,
"name": name, "pose": None}

116
cloudview/viewer.py Normal file
View File

@@ -0,0 +1,116 @@
"""Open3D render loop for the live point cloud.
Runs on the main thread (Open3D owns a GUI/GL context) and polls the CloudStore
for changes, rebuilding the merged geometry only when the store's version moves.
Open3D is imported lazily so the server/protocol layers stay usable without it.
Keys: C toggle colour mode (by height <-> per-source)
R reset/refit the camera to the cloud
B cycle background shade
"""
from __future__ import annotations
import logging
import time
import numpy as np
from .store import CloudStore
logger = logging.getLogger(__name__)
# Distinct solid colours for per-source mode.
_PALETTE = [
(0.30, 0.80, 1.00), (1.00, 0.70, 0.30), (0.50, 1.00, 0.50),
(1.00, 0.50, 0.80), (0.85, 0.85, 0.35), (0.60, 0.60, 1.00),
]
# 5-stop blue -> cyan -> green -> yellow -> red ramp for height colouring.
_RAMP_POS = np.linspace(0.0, 1.0, 5)
_RAMP_RGB = np.array([[0.00, 0.10, 0.65], [0.00, 0.80, 0.90],
[0.15, 0.85, 0.20], [0.95, 0.90, 0.10],
[0.90, 0.20, 0.10]])
def _height_colors(z: np.ndarray) -> np.ndarray:
if z.shape[0] == 0:
return np.zeros((0, 3))
lo, hi = float(z.min()), float(z.max())
t = (z - lo) / (hi - lo) if hi > lo else np.zeros_like(z)
return np.stack([np.interp(t, _RAMP_POS, _RAMP_RGB[:, c]) for c in range(3)],
axis=1)
def _merge(sources: dict, mode: str):
"""Build merged (points, colors) arrays for the current colour mode."""
pts, cols = [], []
for i, (src, d) in enumerate(sorted(sources.items())):
p = d["points"]
if p.shape[0] == 0:
continue
pts.append(p)
if mode == "source" or d["color"] is not None:
base = d["color"] or _PALETTE[i % len(_PALETTE)]
cols.append(np.tile(base, (p.shape[0], 1)))
else:
cols.append(_height_colors(p[:, 2]))
if not pts:
return np.empty((0, 3)), np.empty((0, 3))
return np.vstack(pts), np.vstack(cols)
def run_viewer(store: CloudStore, title: str = "CloudView") -> None:
import open3d as o3d # lazy: only the viewer needs it
vis = o3d.visualization.VisualizerWithKeyCallback()
vis.create_window(title, width=1280, height=800)
opt = vis.get_render_option()
opt.point_size = 2.0
backgrounds = [np.array([0.05, 0.05, 0.07]), np.array([0.0, 0.0, 0.0]),
np.array([0.18, 0.18, 0.20])]
opt.background_color = backgrounds[0]
pcd = o3d.geometry.PointCloud()
vis.add_geometry(pcd)
vis.add_geometry(o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.5))
state = {"mode": "height", "last": -1, "fitted": False, "bg": 0}
def toggle_mode(_v):
state["mode"] = "source" if state["mode"] == "height" else "height"
state["last"] = -1 # force rebuild
return False
def refit(_v):
vis.reset_view_point(True)
return False
def cycle_bg(_v):
state["bg"] = (state["bg"] + 1) % len(backgrounds)
opt.background_color = backgrounds[state["bg"]]
return False
vis.register_key_callback(ord("C"), toggle_mode)
vis.register_key_callback(ord("R"), refit)
vis.register_key_callback(ord("B"), cycle_bg)
logger.info("viewer ready - C: colour mode, R: refit camera, B: background")
try:
while True:
version = store.version()
if version != state["last"]:
state["last"] = version
_, sources = store.snapshot()
points, colors = _merge(sources, state["mode"])
pcd.points = o3d.utility.Vector3dVector(points.astype(np.float64))
pcd.colors = o3d.utility.Vector3dVector(colors.astype(np.float64))
vis.update_geometry(pcd)
if not state["fitted"] and points.shape[0] > 0:
vis.reset_view_point(True)
state["fitted"] = True
if not vis.poll_events():
break
vis.update_renderer()
time.sleep(0.03)
finally:
vis.destroy_window()

7
main.py Normal file
View File

@@ -0,0 +1,7 @@
#!/usr/bin/env python3
"""Entry point for the CloudView point-cloud viewer."""
from cloudview.app import main
if __name__ == "__main__":
main()

21
pyproject.toml Normal file
View File

@@ -0,0 +1,21 @@
[project]
name = "cloudview"
version = "0.1.0"
description = "Generic networked point-cloud viewer (NDJSON over a stream socket)"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
# 3D point-cloud rendering with an interactive camera.
"open3d>=0.18",
"numpy>=1.23",
]
[project.scripts]
cloudview = "cloudview.app:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["cloudview"]

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
# 3D point-cloud rendering with an interactive orbit/zoom camera.
open3d>=0.18
numpy>=1.23

2813
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff