90 lines
2.2 KiB
Go
90 lines
2.2 KiB
Go
|
package server
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"net"
|
||
|
"time"
|
||
|
|
||
|
"git.wntrmute.dev/kyle/goutils/config"
|
||
|
"git.wntrmute.dev/kyle/goutils/log"
|
||
|
"git.wntrmute.dev/kyle/overpush/core"
|
||
|
pb "git.wntrmute.dev/kyle/overpush/proto"
|
||
|
"github.com/google/uuid"
|
||
|
"google.golang.org/grpc"
|
||
|
)
|
||
|
|
||
|
type Service struct {
|
||
|
Default *core.Account
|
||
|
Accounts map[string]*core.Account
|
||
|
address string
|
||
|
Listener net.TCPListener
|
||
|
|
||
|
// omg thank you for mentioning this in the docs ever you fucking asshats
|
||
|
pb.UnimplementedOverpushServer
|
||
|
}
|
||
|
|
||
|
func (svc *Service) QueueMessage(ctx context.Context, msg *pb.Message) (*pb.Response, error) {
|
||
|
var acct *core.Account
|
||
|
var ok bool
|
||
|
|
||
|
log.Debugf("message %d queued", msg.Timestamp)
|
||
|
|
||
|
stream := msg.GetStreamName()
|
||
|
if stream == "" {
|
||
|
acct = svc.Default
|
||
|
} else if acct, ok = svc.Accounts[stream]; !ok {
|
||
|
return &pb.Response{
|
||
|
Timestamp: time.Now().Unix(),
|
||
|
Id: uuid.NewString(),
|
||
|
Status: pb.Status_StatusFailed,
|
||
|
Errors: []string{fmt.Sprintf("server: invalid stream name %s", stream)},
|
||
|
}, errors.New("service: invalid stream name " + stream)
|
||
|
}
|
||
|
|
||
|
qmsg := core.NewMessage(msg)
|
||
|
if qmsg == nil {
|
||
|
log.Err("server: invalid message received, dropping")
|
||
|
return &pb.Response{
|
||
|
Timestamp: time.Now().Unix(),
|
||
|
Id: uuid.NewString(),
|
||
|
Status: pb.Status_StatusFailed,
|
||
|
Errors: []string{"service: invalid message"},
|
||
|
}, errors.New("server: invalid message received, dropping")
|
||
|
}
|
||
|
|
||
|
acct.Queue() <- qmsg
|
||
|
return &pb.Response{
|
||
|
Timestamp: time.Now().Unix(),
|
||
|
Id: uuid.NewString(),
|
||
|
Status: pb.Status_StatusQueued,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
func New() (*Service, error) {
|
||
|
srv := &Service{
|
||
|
address: config.GetDefault("address", "127.0.0.1:5000"),
|
||
|
}
|
||
|
|
||
|
defaultToken := config.Get("primary_token")
|
||
|
if defaultToken == "" {
|
||
|
return nil, errors.New("server: no primary token defined")
|
||
|
}
|
||
|
srv.Default = core.NewAccount(defaultToken)
|
||
|
return srv, nil
|
||
|
}
|
||
|
|
||
|
func Run(srv *Service) error {
|
||
|
var opts []grpc.ServerOption
|
||
|
|
||
|
lsn, err := net.Listen("tcp4", srv.address)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("server: failed to set up listener: %w", err)
|
||
|
}
|
||
|
|
||
|
grpcServer := grpc.NewServer(opts...)
|
||
|
pb.RegisterOverpushServer(grpcServer, srv)
|
||
|
return grpcServer.Serve(lsn)
|
||
|
}
|