Publish packets.

This commit is contained in:
2022-02-27 02:10:21 -08:00
parent a3966fc28b
commit 579c17bb6a
11 changed files with 624 additions and 36 deletions

View File

@@ -1,7 +1,9 @@
package receiver
import (
"context"
"log"
"strings"
"git.wntrmute.dev/kyle/sensenet/config"
"git.wntrmute.dev/kyle/sensenet/pubsub"
@@ -18,6 +20,23 @@ type Receiver struct {
errs chan error
}
func normalizeAddress(addr string) string {
if !strings.HasPrefix(addr, "tcp://") {
addr = "tcp://" + addr
}
// this should split into either
// protocol, //host, port or
// host, port
// n captures how many parts are expected
hap := strings.SplitN(addr, ":", 3)
if len(hap) != 3 {
addr += ":4000"
}
return addr
}
func receive(packets chan *topic.Packet, errs chan error, subscriber *pubsub.Subscriber) {
packet, err := subscriber.Receive()
if err != nil {
@@ -28,7 +47,8 @@ func receive(packets chan *topic.Packet, errs chan error, subscriber *pubsub.Sub
packets <- packet
}
func (rcvr *Receiver) Subscribe(addr string, publisher *config.Publisher) error {
func (rcvr *Receiver) Subscribe(publisher *config.Publisher) error {
addr := normalizeAddress(publisher.Addr)
subscriber, err := pubsub.NewSubscriber(addr, publisher.Name, publisher.Topics...)
if err != nil {
return err
@@ -43,6 +63,19 @@ func (rcvr *Receiver) Subscribe(addr string, publisher *config.Publisher) error
return nil
}
func (rcvr *Receiver) Store(ctx context.Context, db topic.Database) {
for {
select {
case packet := <-rcvr.packets:
log.Printf("publishing %dB packet [%s]", len(packet.Payload), packet.Topic)
err := topic.Publish(ctx, db, packet)
if err != nil {
rcvr.errs <- err
}
}
}
}
func (rcvr *Receiver) LogErrors() {
for {
select {