sensenet/pubsub/publisher.go

62 lines
1.0 KiB
Go
Raw Permalink Normal View History

2022-02-27 06:57:13 +00:00
package pubsub
import (
2023-05-14 01:32:29 +00:00
"context"
2022-02-27 06:57:13 +00:00
"time"
telemetrypb "git.wntrmute.dev/kyle/sensenet/proto"
2023-05-14 01:32:29 +00:00
"github.com/go-zeromq/zmq4"
2022-02-27 06:57:13 +00:00
"google.golang.org/protobuf/proto"
)
type Publisher struct {
addr string
2023-05-14 01:32:29 +00:00
sock zmq4.Socket
2022-02-27 06:57:13 +00:00
}
func NewPublisher(addr string) (*Publisher, error) {
pub := &Publisher{
addr: addr,
2023-05-14 01:32:29 +00:00
sock: zmq4.NewPub(context.Background()),
2022-02-27 06:57:13 +00:00
}
pub.Conflate(1)
err := pub.connect()
if err != nil {
return nil, err
}
return pub, nil
}
func (pub *Publisher) connect() error {
2023-05-14 01:49:14 +00:00
return pub.sock.Listen(pub.addr)
2022-02-27 06:57:13 +00:00
}
func (pub *Publisher) Conflate(n int) {
2023-05-14 01:32:29 +00:00
pub.sock.SetOption("CONFLATE", n)
2022-02-27 06:57:13 +00:00
}
func (pub *Publisher) Transmit(topic string, data []byte) error {
packet, err := genPacket(topic, data)
if err != nil {
return err
}
2023-05-14 01:32:29 +00:00
msg := zmq4.Msg{
Frames: [][]byte{packet},
}
return pub.sock.Send(msg)
2022-02-27 06:57:13 +00:00
}
func genPacket(topic string, payload []byte) ([]byte, error) {
packet := &telemetrypb.Packet{
Topic: topic,
Timestamp: uint64(time.Now().Unix()),
Payload: payload,
}
return proto.Marshal(packet)
}