package server import ( "context" "errors" "fmt" "net" "time" "git.wntrmute.dev/kyle/goutils/config" "git.wntrmute.dev/kyle/goutils/log" "git.wntrmute.dev/kyle/overpush/core" pb "git.wntrmute.dev/kyle/overpush/proto" "github.com/google/uuid" "google.golang.org/grpc" ) type Service struct { Default *core.Account Accounts map[string]*core.Account address string Listener net.TCPListener // omg thank you for mentioning this in the docs ever you fucking asshats pb.UnimplementedOverpushServer } func (svc *Service) QueueMessage(ctx context.Context, msg *pb.Message) (*pb.Response, error) { var acct *core.Account var ok bool log.Debugf("message %d queued", msg.Timestamp) stream := msg.GetStreamName() if stream == "" { acct = svc.Default } else if acct, ok = svc.Accounts[stream]; !ok { log.Errf("invalid stream name '%s' in message", stream) return &pb.Response{ Timestamp: time.Now().Unix(), Id: uuid.NewString(), Status: pb.Status_StatusFailed, Errors: []string{fmt.Sprintf("server: invalid stream name %s", stream)}, }, errors.New("service: invalid stream name " + stream) } qmsg := core.NewMessage(msg) if qmsg == nil { log.Err("server: invalid message received, dropping") return &pb.Response{ Timestamp: time.Now().Unix(), Id: uuid.NewString(), Status: pb.Status_StatusFailed, Errors: []string{"service: invalid message"}, }, errors.New("server: invalid message received, dropping") } acct.Queue() <- qmsg return &pb.Response{ Timestamp: time.Now().Unix(), Id: uuid.NewString(), Status: pb.Status_StatusQueued, }, nil } func New() (*Service, error) { srv := &Service{ address: config.GetDefault("address", "127.0.0.1:5000"), } defaultToken := config.Get("primary_token") if defaultToken == "" { return nil, errors.New("server: no primary token defined") } srv.Default = core.NewAccount(defaultToken) return srv, nil } func Run(srv *Service) error { var opts []grpc.ServerOption lsn, err := net.Listen("tcp4", srv.address) if err != nil { return fmt.Errorf("server: failed to set up listener: %w", err) } grpcServer := grpc.NewServer(opts...) pb.RegisterOverpushServer(grpcServer, srv) return grpcServer.Serve(lsn) }