sensenet/pubsub/subscriber.go

118 lines
2.1 KiB
Go
Raw Permalink Normal View History

2022-02-27 06:57:13 +00:00
package pubsub
import (
2023-05-14 01:32:29 +00:00
"context"
2022-02-27 06:57:13 +00:00
"fmt"
2022-02-27 10:10:21 +00:00
"log"
2022-02-27 06:57:13 +00:00
"sort"
telemetrypb "git.wntrmute.dev/kyle/sensenet/proto"
"git.wntrmute.dev/kyle/sensenet/topic"
2023-05-14 01:32:29 +00:00
"github.com/go-zeromq/zmq4"
2022-02-27 06:57:13 +00:00
"google.golang.org/protobuf/proto"
)
2022-02-27 10:10:21 +00:00
func readTopic(b []byte) string {
size := int(b[1])
return string(b[2 : size+2])
}
func protoTopic(topic string) string {
return fmt.Sprintf("\x0a%c%s", len(topic), topic)
}
2022-02-27 06:57:13 +00:00
type Subscriber struct {
addr string
publisher string
2023-05-14 01:32:29 +00:00
sock zmq4.Socket
2022-02-27 06:57:13 +00:00
topics map[string]bool
}
func NewSubscriber(addr, publisher string, topics ...string) (*Subscriber, error) {
sub := &Subscriber{
2022-02-27 10:10:21 +00:00
addr: addr,
publisher: publisher,
2023-05-14 01:32:29 +00:00
sock: zmq4.NewSub(context.Background()),
2022-02-27 10:10:21 +00:00
topics: map[string]bool{},
2022-02-27 06:57:13 +00:00
}
err := sub.connect()
if err != nil {
return nil, err
}
sub.Conflate(1)
2022-02-27 10:10:21 +00:00
for _, topicName := range topics {
sub.Subscribe(topicName)
2022-02-27 06:57:13 +00:00
}
return sub, nil
}
func (sub *Subscriber) Conflate(n int) {
2023-05-14 01:32:29 +00:00
sub.sock.SetOption("CONFLATE", n)
2022-02-27 06:57:13 +00:00
}
func (sub *Subscriber) connect() error {
2022-02-27 10:10:21 +00:00
log.Printf("subscriber dialing %s", sub.addr)
2023-05-14 01:49:14 +00:00
return sub.sock.Dial(sub.addr)
2022-02-27 06:57:13 +00:00
}
func (sub *Subscriber) Subscribe(topic string) {
if sub.topics[topic] {
return
}
sub.topics[topic] = true
2023-05-14 01:32:29 +00:00
sub.sock.SetOption(zmq4.OptionSubscribe, protoTopic(topic))
2022-02-27 06:57:13 +00:00
}
func (sub *Subscriber) Topics() []string {
var topics = make([]string, 0, len(sub.topics))
for k := range sub.topics {
topics = append(topics, k)
}
sort.Strings(topics)
return topics
}
func (sub *Subscriber) receive() ([]byte, error) {
2023-05-14 01:32:29 +00:00
var data []byte
msg, err := sub.sock.Recv()
if err != nil {
return nil, err
}
for _, frame := range msg.Frames {
data = append(data, frame...)
2022-02-27 06:57:13 +00:00
}
return data, nil
}
func (sub *Subscriber) Receive() (*topic.Packet, error) {
data, err := sub.receive()
if err != nil {
return nil, err
}
pbPacket := &telemetrypb.Packet{}
err = proto.Unmarshal(data, pbPacket)
if err != nil {
return nil, err
}
2022-02-27 10:10:21 +00:00
t := readTopic(data)
2022-02-27 06:57:13 +00:00
packet := &topic.Packet{
2022-02-27 10:10:21 +00:00
Topic: t,
2022-02-27 06:57:13 +00:00
Publisher: sub.publisher,
Received: int64(pbPacket.Timestamp),
Payload: pbPacket.Payload,
}
return packet, nil
}