package receiver import ( "context" "log" "strings" "git.wntrmute.dev/kyle/sensenet/config" "git.wntrmute.dev/kyle/sensenet/pubsub" "git.wntrmute.dev/kyle/sensenet/topic" ) const ( packetBuffer = 64 errBuffer = 64 ) type Receiver struct { packets chan *topic.Packet errs chan error } func normalizeAddress(addr string) string { if !strings.HasPrefix(addr, "tcp://") { addr = "tcp://" + addr } // this should split into either // protocol, //host, port or // host, port // n captures how many parts are expected hap := strings.SplitN(addr, ":", 3) if len(hap) != 3 { addr += ":4000" } return addr } func receive(packets chan *topic.Packet, errs chan error, subscriber *pubsub.Subscriber) { packet, err := subscriber.Receive() if err != nil { errs <- err return } packets <- packet } func (rcvr *Receiver) Subscribe(publisher *config.Publisher) error { addr := normalizeAddress(publisher.Addr) subscriber, err := pubsub.NewSubscriber(addr, publisher.Name, publisher.Topics...) if err != nil { return err } go func() { for { receive(rcvr.packets, rcvr.errs, subscriber) } }() return nil } func (rcvr *Receiver) Store(ctx context.Context, db topic.Database) { for { select { case packet := <-rcvr.packets: log.Printf("publishing %dB packet [%s]", len(packet.Payload), packet.Topic) err := topic.Publish(ctx, db, packet) if err != nil { rcvr.errs <- err } } } } func (rcvr *Receiver) LogErrors() { for { select { case err := <-rcvr.errs: log.Println(err) } } } func New() *Receiver { return &Receiver{ packets: make(chan *topic.Packet, packetBuffer), errs: make(chan error, errBuffer), } }