2022-02-27 06:57:13 +00:00
|
|
|
package receiver
|
|
|
|
|
|
|
|
import (
|
2022-02-27 10:10:21 +00:00
|
|
|
"context"
|
2022-02-27 06:57:13 +00:00
|
|
|
"log"
|
2022-02-27 10:10:21 +00:00
|
|
|
"strings"
|
2022-02-27 06:57:13 +00:00
|
|
|
|
|
|
|
"git.wntrmute.dev/kyle/sensenet/config"
|
|
|
|
"git.wntrmute.dev/kyle/sensenet/pubsub"
|
|
|
|
"git.wntrmute.dev/kyle/sensenet/topic"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
packetBuffer = 64
|
|
|
|
errBuffer = 64
|
|
|
|
)
|
|
|
|
|
|
|
|
type Receiver struct {
|
|
|
|
packets chan *topic.Packet
|
|
|
|
errs chan error
|
|
|
|
}
|
|
|
|
|
2022-02-27 10:10:21 +00:00
|
|
|
func normalizeAddress(addr string) string {
|
|
|
|
if !strings.HasPrefix(addr, "tcp://") {
|
|
|
|
addr = "tcp://" + addr
|
|
|
|
}
|
|
|
|
|
|
|
|
// this should split into either
|
|
|
|
// protocol, //host, port or
|
|
|
|
// host, port
|
|
|
|
// n captures how many parts are expected
|
|
|
|
hap := strings.SplitN(addr, ":", 3)
|
|
|
|
if len(hap) != 3 {
|
|
|
|
addr += ":4000"
|
|
|
|
}
|
|
|
|
|
|
|
|
return addr
|
|
|
|
}
|
|
|
|
|
2022-02-27 06:57:13 +00:00
|
|
|
func receive(packets chan *topic.Packet, errs chan error, subscriber *pubsub.Subscriber) {
|
|
|
|
packet, err := subscriber.Receive()
|
|
|
|
if err != nil {
|
|
|
|
errs <- err
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
packets <- packet
|
|
|
|
}
|
|
|
|
|
2022-02-27 10:10:21 +00:00
|
|
|
func (rcvr *Receiver) Subscribe(publisher *config.Publisher) error {
|
|
|
|
addr := normalizeAddress(publisher.Addr)
|
2022-02-27 06:57:13 +00:00
|
|
|
subscriber, err := pubsub.NewSubscriber(addr, publisher.Name, publisher.Topics...)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
for {
|
|
|
|
receive(rcvr.packets, rcvr.errs, subscriber)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-02-27 10:10:21 +00:00
|
|
|
func (rcvr *Receiver) Store(ctx context.Context, db topic.Database) {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case packet := <-rcvr.packets:
|
|
|
|
log.Printf("publishing %dB packet [%s]", len(packet.Payload), packet.Topic)
|
|
|
|
err := topic.Publish(ctx, db, packet)
|
|
|
|
if err != nil {
|
|
|
|
rcvr.errs <- err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-27 06:57:13 +00:00
|
|
|
func (rcvr *Receiver) LogErrors() {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case err := <-rcvr.errs:
|
|
|
|
log.Println(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func New() *Receiver {
|
|
|
|
return &Receiver{
|
|
|
|
packets: make(chan *topic.Packet, packetBuffer),
|
|
|
|
errs: make(chan error, errBuffer),
|
|
|
|
}
|
|
|
|
}
|