package pubsub import ( "context" "fmt" "log" "sort" telemetrypb "git.wntrmute.dev/kyle/sensenet/proto" "git.wntrmute.dev/kyle/sensenet/topic" "github.com/go-zeromq/zmq4" "google.golang.org/protobuf/proto" ) func readTopic(b []byte) string { size := int(b[1]) return string(b[2 : size+2]) } func protoTopic(topic string) string { return fmt.Sprintf("\x0a%c%s", len(topic), topic) } type Subscriber struct { addr string publisher string sock zmq4.Socket topics map[string]bool } func NewSubscriber(addr, publisher string, topics ...string) (*Subscriber, error) { sub := &Subscriber{ addr: addr, publisher: publisher, sock: zmq4.NewSub(context.Background()), topics: map[string]bool{}, } err := sub.connect() if err != nil { return nil, err } sub.Conflate(1) for _, topicName := range topics { sub.Subscribe(topicName) } return sub, nil } func (sub *Subscriber) Conflate(n int) { sub.sock.SetOption("CONFLATE", n) } func (sub *Subscriber) connect() error { log.Printf("subscriber dialing %s", sub.addr) return sub.sock.Listen(sub.addr) } func (sub *Subscriber) Subscribe(topic string) { if sub.topics[topic] { return } sub.topics[topic] = true sub.sock.SetOption(zmq4.OptionSubscribe, protoTopic(topic)) } func (sub *Subscriber) Topics() []string { var topics = make([]string, 0, len(sub.topics)) for k := range sub.topics { topics = append(topics, k) } sort.Strings(topics) return topics } func (sub *Subscriber) receive() ([]byte, error) { var data []byte msg, err := sub.sock.Recv() if err != nil { return nil, err } for _, frame := range msg.Frames { data = append(data, frame...) } return data, nil } func (sub *Subscriber) Receive() (*topic.Packet, error) { data, err := sub.receive() if err != nil { return nil, err } pbPacket := &telemetrypb.Packet{} err = proto.Unmarshal(data, pbPacket) if err != nil { return nil, err } t := readTopic(data) packet := &topic.Packet{ Topic: t, Publisher: sub.publisher, Received: int64(pbPacket.Timestamp), Payload: pbPacket.Payload, } return packet, nil }