sensenet/topic/topic.go

89 lines
1.8 KiB
Go

package topic
import (
"context"
"fmt"
"log"
"sync"
"git.wntrmute.dev/kyle/sensenet/config"
"github.com/jackc/pgconn"
)
type Database interface {
Exec(ctx context.Context, query string, args ...interface{}) (pgconn.CommandTag, error)
}
type Topic struct {
CreateTable func(ctx context.Context, db Database) error
Store func(ctx context.Context, db Database, packet *Packet) error
}
var registry = struct {
topics map[string]*Topic
lock *sync.Mutex
}{
topics: map[string]*Topic{},
lock: new(sync.Mutex),
}
func Register(topic string, handler *Topic) {
registry.lock.Lock()
defer registry.lock.Unlock()
if _, ok := registry.topics[topic]; ok {
panic(fmt.Sprintf("attempt to register pht '%s' that has already been registered", topic))
}
registry.topics[topic] = handler
log.Println("registered handler for topic", topic)
}
type Packet struct {
Topic string
Publisher string
Received int64 // unix timestamp
Payload []byte
}
// Setup ensures all the required tables are created.
func Setup(ctx context.Context, db Database) error {
registry.lock.Lock()
defer registry.lock.Unlock()
log.Println("creating tables")
for _, topic := range registry.topics {
err := topic.CreateTable(ctx, db)
if err != nil {
return err
}
}
return nil
}
func Publish(ctx context.Context, db Database, packet *Packet) error {
topic, ok := registry.topics[packet.Topic]
if !ok {
return fmt.Errorf("topic: no handler for topic '%s' [%x]", packet.Topic, packet.Payload)
}
return topic.Store(ctx, db, packet)
}
func ValidateTopics(publishers map[string]*config.Publisher) bool {
valid := true
for _, pub := range publishers {
for _, topic := range pub.Topics {
if _, ok := registry.topics[topic]; !ok {
log.Printf("missing handler for topic %s", topic)
valid = valid && ok
}
}
}
return valid
}