package pubsub import ( "context" "time" telemetrypb "git.wntrmute.dev/kyle/sensenet/proto" "github.com/go-zeromq/zmq4" "google.golang.org/protobuf/proto" ) type Publisher struct { addr string sock zmq4.Socket } func NewPublisher(addr string) (*Publisher, error) { pub := &Publisher{ addr: addr, sock: zmq4.NewPub(context.Background()), } pub.Conflate(1) err := pub.connect() if err != nil { return nil, err } return pub, nil } func (pub *Publisher) connect() error { return pub.sock.Listen(pub.addr) } func (pub *Publisher) Conflate(n int) { pub.sock.SetOption("CONFLATE", n) } func (pub *Publisher) Transmit(topic string, data []byte) error { packet, err := genPacket(topic, data) if err != nil { return err } msg := zmq4.Msg{ Frames: [][]byte{packet}, } return pub.sock.Send(msg) } func genPacket(topic string, payload []byte) ([]byte, error) { packet := &telemetrypb.Packet{ Topic: topic, Timestamp: uint64(time.Now().Unix()), Payload: payload, } return proto.Marshal(packet) }