91 lines
2.2 KiB
Go
91 lines
2.2 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"git.wntrmute.dev/kyle/goutils/config"
|
|
"git.wntrmute.dev/kyle/goutils/log"
|
|
"git.wntrmute.dev/kyle/overpush/core"
|
|
pb "git.wntrmute.dev/kyle/overpush/proto"
|
|
"github.com/google/uuid"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type Service struct {
|
|
Default *core.Account
|
|
Accounts map[string]*core.Account
|
|
address string
|
|
Listener net.TCPListener
|
|
|
|
// omg thank you for mentioning this in the docs ever you fucking asshats
|
|
pb.UnimplementedOverpushServer
|
|
}
|
|
|
|
func (svc *Service) QueueMessage(ctx context.Context, msg *pb.Message) (*pb.Response, error) {
|
|
var acct *core.Account
|
|
var ok bool
|
|
|
|
log.Debugf("message %d queued", msg.Timestamp)
|
|
|
|
stream := msg.GetStreamName()
|
|
if stream == "" {
|
|
acct = svc.Default
|
|
} else if acct, ok = svc.Accounts[stream]; !ok {
|
|
log.Errf("invalid stream name '%s' in message", stream)
|
|
return &pb.Response{
|
|
Timestamp: time.Now().Unix(),
|
|
Id: uuid.NewString(),
|
|
Status: pb.Status_StatusFailed,
|
|
Errors: []string{fmt.Sprintf("server: invalid stream name %s", stream)},
|
|
}, errors.New("service: invalid stream name " + stream)
|
|
}
|
|
|
|
qmsg := core.NewMessage(msg)
|
|
if qmsg == nil {
|
|
log.Errln("server: invalid message received, dropping")
|
|
return &pb.Response{
|
|
Timestamp: time.Now().Unix(),
|
|
Id: uuid.NewString(),
|
|
Status: pb.Status_StatusFailed,
|
|
Errors: []string{"service: invalid message"},
|
|
}, errors.New("server: invalid message received, dropping")
|
|
}
|
|
|
|
acct.Queue() <- qmsg
|
|
return &pb.Response{
|
|
Timestamp: time.Now().Unix(),
|
|
Id: uuid.NewString(),
|
|
Status: pb.Status_StatusQueued,
|
|
}, nil
|
|
}
|
|
|
|
func New() (*Service, error) {
|
|
srv := &Service{
|
|
address: config.GetDefault("address", "127.0.0.1:5000"),
|
|
}
|
|
|
|
defaultToken := config.Get("primary_token")
|
|
if defaultToken == "" {
|
|
return nil, errors.New("server: no primary token defined")
|
|
}
|
|
srv.Default = core.NewAccount(defaultToken)
|
|
return srv, nil
|
|
}
|
|
|
|
func Run(srv *Service) error {
|
|
var opts []grpc.ServerOption
|
|
|
|
lsn, err := net.Listen("tcp4", srv.address)
|
|
if err != nil {
|
|
return fmt.Errorf("server: failed to set up listener: %w", err)
|
|
}
|
|
|
|
grpcServer := grpc.NewServer(opts...)
|
|
pb.RegisterOverpushServer(grpcServer, srv)
|
|
return grpcServer.Serve(lsn)
|
|
}
|