overpush/server/server.go

91 lines
2.2 KiB
Go

package server
import (
"context"
"errors"
"fmt"
"net"
"time"
"git.wntrmute.dev/kyle/goutils/config"
"git.wntrmute.dev/kyle/goutils/log"
"git.wntrmute.dev/kyle/overpush/core"
pb "git.wntrmute.dev/kyle/overpush/proto"
"github.com/google/uuid"
"google.golang.org/grpc"
)
type Service struct {
Default *core.Account
Accounts map[string]*core.Account
address string
Listener net.TCPListener
// omg thank you for mentioning this in the docs ever you fucking asshats
pb.UnimplementedOverpushServer
}
func (svc *Service) QueueMessage(ctx context.Context, msg *pb.Message) (*pb.Response, error) {
var acct *core.Account
var ok bool
log.Debugf("message %d queued", msg.Timestamp)
stream := msg.GetStreamName()
if stream == "" {
acct = svc.Default
} else if acct, ok = svc.Accounts[stream]; !ok {
log.Errf("invalid stream name '%s' in message", stream)
return &pb.Response{
Timestamp: time.Now().Unix(),
Id: uuid.NewString(),
Status: pb.Status_StatusFailed,
Errors: []string{fmt.Sprintf("server: invalid stream name %s", stream)},
}, errors.New("service: invalid stream name " + stream)
}
qmsg := core.NewMessage(msg)
if qmsg == nil {
log.Errln("server: invalid message received, dropping")
return &pb.Response{
Timestamp: time.Now().Unix(),
Id: uuid.NewString(),
Status: pb.Status_StatusFailed,
Errors: []string{"service: invalid message"},
}, errors.New("server: invalid message received, dropping")
}
acct.Queue() <- qmsg
return &pb.Response{
Timestamp: time.Now().Unix(),
Id: uuid.NewString(),
Status: pb.Status_StatusQueued,
}, nil
}
func New() (*Service, error) {
srv := &Service{
address: config.GetDefault("address", "127.0.0.1:5000"),
}
defaultToken := config.Get("primary_token")
if defaultToken == "" {
return nil, errors.New("server: no primary token defined")
}
srv.Default = core.NewAccount(defaultToken)
return srv, nil
}
func Run(srv *Service) error {
var opts []grpc.ServerOption
lsn, err := net.Listen("tcp4", srv.address)
if err != nil {
return fmt.Errorf("server: failed to set up listener: %w", err)
}
grpcServer := grpc.NewServer(opts...)
pb.RegisterOverpushServer(grpcServer, srv)
return grpcServer.Serve(lsn)
}