56 lines
1.6 KiB
Python
56 lines
1.6 KiB
Python
import time
|
|
import zmq
|
|
import proto.telemetry_pb2 as proto
|
|
|
|
|
|
class Subscriber:
|
|
def __init__(self, addr, topics=None, conflate=1):
|
|
self.ctx = zmq.Context().instance()
|
|
self.socket = self.ctx.socket(zmq.SUB)
|
|
self.socket.setsockopt(zmq.CONFLATE, conflate)
|
|
self._topics = set()
|
|
self.socket.connect(addr)
|
|
|
|
if topics is not None:
|
|
for topic in topics:
|
|
self.subscribe(topic)
|
|
|
|
def subscribe(self, topic):
|
|
if len(topic) == 0:
|
|
self.socket.subscribe('')
|
|
self._topics.add('')
|
|
return
|
|
|
|
if not topic in self._topics:
|
|
self._topics.add(topic)
|
|
topic_bytes = bytearray([10, len(topic)])
|
|
topic_bytes.extend(topic)
|
|
self.socket.subscribe(bytes(topic_bytes))
|
|
|
|
def topics(self):
|
|
return list(self._topics)
|
|
|
|
def receive(self):
|
|
return self.socket.recv()
|
|
|
|
def close(self):
|
|
return self.socket.close()
|
|
|
|
|
|
class Publisher:
|
|
def __init__(self, addr, conflate=1):
|
|
self.ctx = zmq.Context().instance()
|
|
self.socket = self.ctx.socket(zmq.PUB)
|
|
self.socket.setsockopt(zmq.CONFLATE, conflate)
|
|
self.socket.bind(addr)
|
|
|
|
def publish(self, message, topic=b''):
|
|
message = proto.Packet(topic=topic, created=int(time.time()), payload=message)
|
|
return self.socket.send(message.SerializeToString())
|
|
|
|
def publish_json(self, message, topic=b''):
|
|
return self.publish(json.dumps(message).encode('UTF-8'), topic)
|
|
|
|
def close(self):
|
|
return self.socket.close()
|