Master struct with Run() lifecycle following the agent pattern exactly: open DB → bootstrap nodes → create agent pool → DNS client → TLS → auth interceptor → gRPC server → signal handler. RPC handlers: - Deploy: place service (tier-aware), forward to agent, register DNS with Tailnet IP, detect public routes, validate against allowed domains, coordinate edge routing via SetupEdgeRoute, record placement and edge routes in master DB, return structured per-step results. - Undeploy: undeploy on worker first, then remove edge routes, DNS, and DB records. Best-effort cleanup on failure. - Status: query agents for service status, aggregate with placements and edge route info from master DB. - ListNodes: return all nodes with placement counts. Placement algorithm: fewest services, ties broken alphabetically. DNS client: extracted from agent's DNSRegistrar with explicit nodeAddr parameter (master registers for different nodes). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
160 lines
3.8 KiB
Go
160 lines
3.8 KiB
Go
package master
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"database/sql"
|
|
"fmt"
|
|
"log/slog"
|
|
"net"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
|
|
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
|
"git.wntrmute.dev/mc/mcp/internal/auth"
|
|
"git.wntrmute.dev/mc/mcp/internal/config"
|
|
"git.wntrmute.dev/mc/mcp/internal/masterdb"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
)
|
|
|
|
// Master is the MCP cluster master. It coordinates multi-node deployments,
|
|
// manages edge routes, and stores cluster state.
|
|
type Master struct {
|
|
mcpv1.UnimplementedMcpMasterServiceServer
|
|
|
|
Config *config.MasterConfig
|
|
DB *sql.DB
|
|
Pool *AgentPool
|
|
DNS *DNSClient
|
|
Logger *slog.Logger
|
|
Version string
|
|
}
|
|
|
|
// Run starts the master: opens the database, bootstraps nodes, sets up the
|
|
// gRPC server with TLS and auth, and blocks until SIGINT/SIGTERM.
|
|
func Run(cfg *config.MasterConfig, version string) error {
|
|
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
|
|
Level: parseLogLevel(cfg.Log.Level),
|
|
}))
|
|
|
|
// Open master database.
|
|
db, err := masterdb.Open(cfg.Database.Path)
|
|
if err != nil {
|
|
return fmt.Errorf("open master database: %w", err)
|
|
}
|
|
defer func() { _ = db.Close() }()
|
|
|
|
// Bootstrap nodes from config.
|
|
for _, n := range cfg.Nodes {
|
|
if err := masterdb.UpsertNode(db, n.Name, n.Address, n.Role, "amd64"); err != nil {
|
|
return fmt.Errorf("bootstrap node %s: %w", n.Name, err)
|
|
}
|
|
logger.Info("bootstrapped node", "name", n.Name, "address", n.Address, "role", n.Role)
|
|
}
|
|
|
|
// Load service token for dialing agents.
|
|
token, err := LoadServiceToken(cfg.Master.ServiceTokenPath)
|
|
if err != nil {
|
|
return fmt.Errorf("load service token: %w", err)
|
|
}
|
|
|
|
// Create agent connection pool.
|
|
pool := NewAgentPool(cfg.Master.CACert, token)
|
|
for _, n := range cfg.Nodes {
|
|
if addErr := pool.AddNode(n.Name, n.Address); addErr != nil {
|
|
logger.Warn("failed to connect to agent", "node", n.Name, "err", addErr)
|
|
// Non-fatal: the node may come up later.
|
|
}
|
|
}
|
|
|
|
// Create DNS client.
|
|
dns, err := NewDNSClient(cfg.MCNS, logger)
|
|
if err != nil {
|
|
return fmt.Errorf("create DNS client: %w", err)
|
|
}
|
|
|
|
m := &Master{
|
|
Config: cfg,
|
|
DB: db,
|
|
Pool: pool,
|
|
DNS: dns,
|
|
Logger: logger,
|
|
Version: version,
|
|
}
|
|
|
|
// TLS.
|
|
tlsCert, err := tls.LoadX509KeyPair(cfg.Server.TLSCert, cfg.Server.TLSKey)
|
|
if err != nil {
|
|
return fmt.Errorf("load TLS cert: %w", err)
|
|
}
|
|
tlsConfig := &tls.Config{
|
|
Certificates: []tls.Certificate{tlsCert},
|
|
MinVersion: tls.VersionTLS13,
|
|
}
|
|
|
|
// Auth interceptor (same as agent — validates MCIAS tokens).
|
|
validator, err := auth.NewMCIASValidator(cfg.MCIAS.ServerURL, cfg.MCIAS.CACert)
|
|
if err != nil {
|
|
return fmt.Errorf("create MCIAS validator: %w", err)
|
|
}
|
|
|
|
// gRPC server.
|
|
server := grpc.NewServer(
|
|
grpc.Creds(credentials.NewTLS(tlsConfig)),
|
|
grpc.ChainUnaryInterceptor(
|
|
auth.AuthInterceptor(validator),
|
|
),
|
|
grpc.ChainStreamInterceptor(
|
|
auth.StreamAuthInterceptor(validator),
|
|
),
|
|
)
|
|
mcpv1.RegisterMcpMasterServiceServer(server, m)
|
|
|
|
// Listen.
|
|
lis, err := net.Listen("tcp", cfg.Server.GRPCAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("listen %q: %w", cfg.Server.GRPCAddr, err)
|
|
}
|
|
|
|
logger.Info("master starting",
|
|
"addr", cfg.Server.GRPCAddr,
|
|
"version", version,
|
|
"nodes", len(cfg.Nodes),
|
|
)
|
|
|
|
// Signal handling.
|
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
|
defer stop()
|
|
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
errCh <- server.Serve(lis)
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Info("shutting down")
|
|
server.GracefulStop()
|
|
pool.Close()
|
|
return nil
|
|
case err := <-errCh:
|
|
pool.Close()
|
|
return fmt.Errorf("serve: %w", err)
|
|
}
|
|
}
|
|
|
|
func parseLogLevel(level string) slog.Level {
|
|
switch level {
|
|
case "debug":
|
|
return slog.LevelDebug
|
|
case "warn":
|
|
return slog.LevelWarn
|
|
case "error":
|
|
return slog.LevelError
|
|
default:
|
|
return slog.LevelInfo
|
|
}
|
|
}
|