package pubsub import ( "fmt" "sort" telemetrypb "git.wntrmute.dev/kyle/sensenet/proto" "git.wntrmute.dev/kyle/sensenet/topic" "google.golang.org/protobuf/proto" "gopkg.in/zeromq/goczmq.v4" ) type Subscriber struct { addr string publisher string sock *goczmq.Sock topics map[string]bool } func NewSubscriber(addr, publisher string, topics ...string) (*Subscriber, error) { sub := &Subscriber{ addr: addr, sock: goczmq.NewSock(goczmq.Pub), } err := sub.connect() if err != nil { return nil, err } sub.Conflate(1) for _, topic := range topics { sub.Subscribe(topic) } return sub, nil } func (sub *Subscriber) Conflate(n int) { sub.sock.SetConflate(n) } func (sub *Subscriber) connect() error { return sub.sock.Connect(sub.addr) } func (sub *Subscriber) Subscribe(topic string) { if sub.topics[topic] { return } sub.topics[topic] = true topic = fmt.Sprintf("\x0a%d%s", len(topic), topic) sub.sock.SetSubscribe(topic) } func (sub *Subscriber) Topics() []string { var topics = make([]string, 0, len(sub.topics)) for k := range sub.topics { topics = append(topics, k) } sort.Strings(topics) return topics } func (sub *Subscriber) receive() ([]byte, error) { var ( data []byte err error packet []byte more = 1 ) for more > 0 { packet, more, err = sub.sock.RecvFrame() if err != nil { return nil, err } data = append(data, packet...) } return data, nil } func (sub *Subscriber) Receive() (*topic.Packet, error) { data, err := sub.receive() if err != nil { return nil, err } pbPacket := &telemetrypb.Packet{} err = proto.Unmarshal(data, pbPacket) if err != nil { return nil, err } packet := &topic.Packet{ Publisher: sub.publisher, Received: int64(pbPacket.Timestamp), Payload: pbPacket.Payload, } return packet, nil }