From 6fd81cacf2464100773cda0e465c1dc1953c652a Mon Sep 17 00:00:00 2001 From: Kyle Isom Date: Thu, 2 Apr 2026 15:39:46 -0700 Subject: [PATCH] Add master core: deploy, undeploy, status, placement, DNS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Master struct with Run() lifecycle following the agent pattern exactly: open DB → bootstrap nodes → create agent pool → DNS client → TLS → auth interceptor → gRPC server → signal handler. RPC handlers: - Deploy: place service (tier-aware), forward to agent, register DNS with Tailnet IP, detect public routes, validate against allowed domains, coordinate edge routing via SetupEdgeRoute, record placement and edge routes in master DB, return structured per-step results. - Undeploy: undeploy on worker first, then remove edge routes, DNS, and DB records. Best-effort cleanup on failure. - Status: query agents for service status, aggregate with placements and edge route info from master DB. - ListNodes: return all nodes with placement counts. Placement algorithm: fewest services, ties broken alphabetically. DNS client: extracted from agent's DNSRegistrar with explicit nodeAddr parameter (master registers for different nodes). Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/master/deploy.go | 211 +++++++++++++++++++++++++++++ internal/master/dns.go | 252 +++++++++++++++++++++++++++++++++++ internal/master/master.go | 159 ++++++++++++++++++++++ internal/master/placement.go | 64 +++++++++ internal/master/status.go | 130 ++++++++++++++++++ internal/master/undeploy.go | 94 +++++++++++++ 6 files changed, 910 insertions(+) create mode 100644 internal/master/deploy.go create mode 100644 internal/master/dns.go create mode 100644 internal/master/master.go create mode 100644 internal/master/placement.go create mode 100644 internal/master/status.go create mode 100644 internal/master/undeploy.go diff --git a/internal/master/deploy.go b/internal/master/deploy.go new file mode 100644 index 0000000..da6751c --- /dev/null +++ b/internal/master/deploy.go @@ -0,0 +1,211 @@ +package master + +import ( + "context" + "fmt" + "net" + "strings" + + mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1" + "git.wntrmute.dev/mc/mcp/internal/masterdb" +) + +// Deploy handles the MasterDeployRequest: places the service, forwards to +// the agent, registers DNS, and coordinates edge routing. +func (m *Master) Deploy(ctx context.Context, req *mcpv1.MasterDeployRequest) (*mcpv1.MasterDeployResponse, error) { + spec := req.GetService() + if spec == nil || spec.GetName() == "" { + return nil, fmt.Errorf("service spec with name is required") + } + + serviceName := spec.GetName() + tier := spec.GetTier() + if tier == "" { + tier = "worker" + } + + m.Logger.Info("Deploy", "service", serviceName, "tier", tier, "node_override", spec.GetNode()) + + resp := &mcpv1.MasterDeployResponse{} + + // Step 1: Place service. + nodeName := spec.GetNode() + if nodeName == "" { + var err error + switch tier { + case "core": + nodeName, err = FindMasterNode(m.DB) + default: + nodeName, err = PickNode(m.DB) + } + if err != nil { + resp.Error = fmt.Sprintf("placement failed: %v", err) + return resp, nil + } + } + resp.Node = nodeName + + node, err := masterdb.GetNode(m.DB, nodeName) + if err != nil || node == nil { + resp.Error = fmt.Sprintf("node %q not found", nodeName) + return resp, nil + } + + // Parse the node's Tailnet IP from its address (host:port). + nodeHost, _, err := net.SplitHostPort(node.Address) + if err != nil { + resp.Error = fmt.Sprintf("invalid node address %q: %v", node.Address, err) + return resp, nil + } + + // Step 2: Forward deploy to the agent. + client, err := m.Pool.Get(nodeName) + if err != nil { + resp.Error = fmt.Sprintf("agent connection: %v", err) + return resp, nil + } + + deployCtx, deployCancel := context.WithTimeout(ctx, m.Config.Timeouts.Deploy.Duration) + defer deployCancel() + + deployResp, err := client.Deploy(deployCtx, &mcpv1.DeployRequest{ + Service: spec, + }) + if err != nil { + resp.DeployResult = &mcpv1.StepResult{Step: "deploy", Error: err.Error()} + resp.Error = fmt.Sprintf("agent deploy failed: %v", err) + return resp, nil + } + resp.DeployResult = &mcpv1.StepResult{Step: "deploy", Success: true} + + // Check agent-side results for failures. + for _, cr := range deployResp.GetResults() { + if !cr.GetSuccess() { + resp.DeployResult.Success = false + resp.DeployResult.Error = fmt.Sprintf("component %s: %s", cr.GetName(), cr.GetError()) + resp.Error = resp.DeployResult.Error + return resp, nil + } + } + + // Step 3: Register DNS — Tailnet IP from the node address. + if m.DNS != nil { + if err := m.DNS.EnsureRecord(ctx, serviceName, nodeHost); err != nil { + m.Logger.Warn("DNS registration failed", "service", serviceName, "err", err) + resp.DnsResult = &mcpv1.StepResult{Step: "dns", Error: err.Error()} + } else { + resp.DnsResult = &mcpv1.StepResult{Step: "dns", Success: true} + } + } + + // Record placement. + if err := masterdb.CreatePlacement(m.DB, serviceName, nodeName, tier); err != nil { + m.Logger.Error("record placement", "service", serviceName, "err", err) + } + + // Steps 4-9: Detect public routes and coordinate edge routing. + edgeResult := m.setupEdgeRoutes(ctx, spec, serviceName, nodeHost) + if edgeResult != nil { + resp.EdgeRouteResult = edgeResult + } + + // Compute overall success. + resp.Success = true + if resp.DeployResult != nil && !resp.DeployResult.Success { + resp.Success = false + } + if resp.EdgeRouteResult != nil && !resp.EdgeRouteResult.Success { + resp.Success = false + } + + m.Logger.Info("deploy complete", "service", serviceName, "node", nodeName, "success", resp.Success) + return resp, nil +} + +// setupEdgeRoutes detects public routes and coordinates edge routing. +func (m *Master) setupEdgeRoutes(ctx context.Context, spec *mcpv1.ServiceSpec, serviceName, nodeHost string) *mcpv1.StepResult { + var publicRoutes []*mcpv1.RouteSpec + for _, comp := range spec.GetComponents() { + for _, route := range comp.GetRoutes() { + if route.GetPublic() && route.GetHostname() != "" { + publicRoutes = append(publicRoutes, route) + } + } + } + + if len(publicRoutes) == 0 { + return nil + } + + // Find the edge node. + edgeNodeName, err := FindEdgeNode(m.DB) + if err != nil { + return &mcpv1.StepResult{Step: "edge_route", Error: fmt.Sprintf("no edge node: %v", err)} + } + + edgeClient, err := m.Pool.Get(edgeNodeName) + if err != nil { + return &mcpv1.StepResult{Step: "edge_route", Error: fmt.Sprintf("edge agent connection: %v", err)} + } + + var lastErr string + for _, route := range publicRoutes { + hostname := route.GetHostname() + + // Validate hostname against allowed domains. + if !m.isAllowedDomain(hostname) { + lastErr = fmt.Sprintf("hostname %q not under an allowed domain", hostname) + m.Logger.Warn("edge route rejected", "hostname", hostname, "reason", lastErr) + continue + } + + // Construct the backend hostname: .svc.mcp. + // For simplicity, use the service name as the component name. + zone := "metacircular.net" + if m.DNS != nil && m.DNS.Zone() != "" { + zone = m.DNS.Zone() + } + backendHostname := serviceName + "." + zone + + edgeCtx, edgeCancel := context.WithTimeout(ctx, m.Config.Timeouts.EdgeRoute.Duration) + _, setupErr := edgeClient.SetupEdgeRoute(edgeCtx, &mcpv1.SetupEdgeRouteRequest{ + Hostname: hostname, + BackendHostname: backendHostname, + BackendPort: route.GetPort(), + BackendTls: true, + }) + edgeCancel() + + if setupErr != nil { + lastErr = fmt.Sprintf("setup edge route %s: %v", hostname, setupErr) + m.Logger.Warn("edge route setup failed", "hostname", hostname, "err", setupErr) + continue + } + + // Record edge route in master DB. + if dbErr := masterdb.CreateEdgeRoute(m.DB, hostname, serviceName, edgeNodeName, backendHostname, int(route.GetPort())); dbErr != nil { + m.Logger.Warn("record edge route", "hostname", hostname, "err", dbErr) + } + + m.Logger.Info("edge route established", "hostname", hostname, "edge_node", edgeNodeName) + } + + if lastErr != "" { + return &mcpv1.StepResult{Step: "edge_route", Error: lastErr} + } + return &mcpv1.StepResult{Step: "edge_route", Success: true} +} + +// isAllowedDomain checks if hostname falls under one of the configured +// allowed domains using proper domain label matching. +func (m *Master) isAllowedDomain(hostname string) bool { + if len(m.Config.Edge.AllowedDomains) == 0 { + return true // no restrictions configured + } + for _, domain := range m.Config.Edge.AllowedDomains { + if hostname == domain || strings.HasSuffix(hostname, "."+domain) { + return true + } + } + return false +} diff --git a/internal/master/dns.go b/internal/master/dns.go new file mode 100644 index 0000000..0f91e85 --- /dev/null +++ b/internal/master/dns.go @@ -0,0 +1,252 @@ +package master + +import ( + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "strings" + "time" + + "git.wntrmute.dev/mc/mcp/internal/auth" + "git.wntrmute.dev/mc/mcp/internal/config" +) + +// DNSClient creates and removes A records in MCNS. Unlike the agent's +// DNSRegistrar, the master registers records for different node IPs +// (the nodeAddr is a per-call parameter, not a fixed config value). +type DNSClient struct { + serverURL string + token string + zone string + httpClient *http.Client + logger *slog.Logger +} + +type dnsRecord struct { + ID int `json:"ID"` + Name string `json:"Name"` + Type string `json:"Type"` + Value string `json:"Value"` + TTL int `json:"TTL"` +} + +// NewDNSClient creates a DNS client. Returns (nil, nil) if serverURL is empty. +func NewDNSClient(cfg config.MCNSConfig, logger *slog.Logger) (*DNSClient, error) { + if cfg.ServerURL == "" { + logger.Info("mcns not configured, DNS registration disabled") + return nil, nil + } + + token, err := auth.LoadToken(cfg.TokenPath) + if err != nil { + return nil, fmt.Errorf("load mcns token: %w", err) + } + + httpClient, err := newHTTPClient(cfg.CACert) + if err != nil { + return nil, fmt.Errorf("create mcns HTTP client: %w", err) + } + + logger.Info("master DNS client enabled", "server", cfg.ServerURL, "zone", cfg.Zone) + return &DNSClient{ + serverURL: strings.TrimRight(cfg.ServerURL, "/"), + token: token, + zone: cfg.Zone, + httpClient: httpClient, + logger: logger, + }, nil +} + +// Zone returns the configured DNS zone. +func (d *DNSClient) Zone() string { + if d == nil { + return "" + } + return d.zone +} + +// EnsureRecord ensures an A record exists for serviceName pointing to nodeAddr. +func (d *DNSClient) EnsureRecord(ctx context.Context, serviceName, nodeAddr string) error { + if d == nil { + return nil + } + + existing, err := d.listRecords(ctx, serviceName) + if err != nil { + return fmt.Errorf("list DNS records: %w", err) + } + + for _, r := range existing { + if r.Value == nodeAddr { + d.logger.Debug("DNS record exists", "service", serviceName, "value", r.Value) + return nil + } + } + + if len(existing) > 0 { + d.logger.Info("updating DNS record", "service", serviceName, + "old_value", existing[0].Value, "new_value", nodeAddr) + return d.updateRecord(ctx, existing[0].ID, serviceName, nodeAddr) + } + + d.logger.Info("creating DNS record", "service", serviceName, + "record", serviceName+"."+d.zone, "value", nodeAddr) + return d.createRecord(ctx, serviceName, nodeAddr) +} + +// RemoveRecord removes A records for serviceName. +func (d *DNSClient) RemoveRecord(ctx context.Context, serviceName string) error { + if d == nil { + return nil + } + + existing, err := d.listRecords(ctx, serviceName) + if err != nil { + return fmt.Errorf("list DNS records: %w", err) + } + + for _, r := range existing { + d.logger.Info("removing DNS record", "service", serviceName, "id", r.ID) + if err := d.deleteRecord(ctx, r.ID); err != nil { + return err + } + } + return nil +} + +func (d *DNSClient) listRecords(ctx context.Context, serviceName string) ([]dnsRecord, error) { + url := fmt.Sprintf("%s/v1/zones/%s/records?name=%s&type=A", d.serverURL, d.zone, serviceName) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("create list request: %w", err) + } + req.Header.Set("Authorization", "Bearer "+d.token) + + resp, err := d.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("list records: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read list response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("list records: mcns returned %d: %s", resp.StatusCode, string(body)) + } + + var envelope struct { + Records []dnsRecord `json:"records"` + } + if err := json.Unmarshal(body, &envelope); err != nil { + return nil, fmt.Errorf("parse list response: %w", err) + } + return envelope.Records, nil +} + +func (d *DNSClient) createRecord(ctx context.Context, serviceName, nodeAddr string) error { + reqBody, _ := json.Marshal(map[string]interface{}{ + "name": serviceName, "type": "A", "value": nodeAddr, "ttl": 300, + }) + + url := fmt.Sprintf("%s/v1/zones/%s/records", d.serverURL, d.zone) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(reqBody)) + if err != nil { + return fmt.Errorf("create record request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+d.token) + + resp, err := d.httpClient.Do(req) + if err != nil { + return fmt.Errorf("create record: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("create record: mcns returned %d: %s", resp.StatusCode, string(respBody)) + } + return nil +} + +func (d *DNSClient) updateRecord(ctx context.Context, recordID int, serviceName, nodeAddr string) error { + reqBody, _ := json.Marshal(map[string]interface{}{ + "name": serviceName, "type": "A", "value": nodeAddr, "ttl": 300, + }) + + url := fmt.Sprintf("%s/v1/zones/%s/records/%d", d.serverURL, d.zone, recordID) + req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(reqBody)) + if err != nil { + return fmt.Errorf("create update request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+d.token) + + resp, err := d.httpClient.Do(req) + if err != nil { + return fmt.Errorf("update record: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("update record: mcns returned %d: %s", resp.StatusCode, string(respBody)) + } + return nil +} + +func (d *DNSClient) deleteRecord(ctx context.Context, recordID int) error { + url := fmt.Sprintf("%s/v1/zones/%s/records/%d", d.serverURL, d.zone, recordID) + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) + if err != nil { + return fmt.Errorf("create delete request: %w", err) + } + req.Header.Set("Authorization", "Bearer "+d.token) + + resp, err := d.httpClient.Do(req) + if err != nil { + return fmt.Errorf("delete record: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("delete record: mcns returned %d: %s", resp.StatusCode, string(respBody)) + } + return nil +} + +func newHTTPClient(caCertPath string) (*http.Client, error) { + tlsConfig := &tls.Config{ + MinVersion: tls.VersionTLS13, + } + + if caCertPath != "" { + caCert, err := os.ReadFile(caCertPath) //nolint:gosec // path from trusted config + if err != nil { + return nil, fmt.Errorf("read CA cert %q: %w", caCertPath, err) + } + pool := x509.NewCertPool() + if !pool.AppendCertsFromPEM(caCert) { + return nil, fmt.Errorf("parse CA cert %q: no valid certificates found", caCertPath) + } + tlsConfig.RootCAs = pool + } + + return &http.Client{ + Timeout: 30 * time.Second, + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + }, nil +} diff --git a/internal/master/master.go b/internal/master/master.go new file mode 100644 index 0000000..e25dafd --- /dev/null +++ b/internal/master/master.go @@ -0,0 +1,159 @@ +package master + +import ( + "context" + "crypto/tls" + "database/sql" + "fmt" + "log/slog" + "net" + "os" + "os/signal" + "syscall" + + mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1" + "git.wntrmute.dev/mc/mcp/internal/auth" + "git.wntrmute.dev/mc/mcp/internal/config" + "git.wntrmute.dev/mc/mcp/internal/masterdb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +// Master is the MCP cluster master. It coordinates multi-node deployments, +// manages edge routes, and stores cluster state. +type Master struct { + mcpv1.UnimplementedMcpMasterServiceServer + + Config *config.MasterConfig + DB *sql.DB + Pool *AgentPool + DNS *DNSClient + Logger *slog.Logger + Version string +} + +// Run starts the master: opens the database, bootstraps nodes, sets up the +// gRPC server with TLS and auth, and blocks until SIGINT/SIGTERM. +func Run(cfg *config.MasterConfig, version string) error { + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + Level: parseLogLevel(cfg.Log.Level), + })) + + // Open master database. + db, err := masterdb.Open(cfg.Database.Path) + if err != nil { + return fmt.Errorf("open master database: %w", err) + } + defer func() { _ = db.Close() }() + + // Bootstrap nodes from config. + for _, n := range cfg.Nodes { + if err := masterdb.UpsertNode(db, n.Name, n.Address, n.Role, "amd64"); err != nil { + return fmt.Errorf("bootstrap node %s: %w", n.Name, err) + } + logger.Info("bootstrapped node", "name", n.Name, "address", n.Address, "role", n.Role) + } + + // Load service token for dialing agents. + token, err := LoadServiceToken(cfg.Master.ServiceTokenPath) + if err != nil { + return fmt.Errorf("load service token: %w", err) + } + + // Create agent connection pool. + pool := NewAgentPool(cfg.Master.CACert, token) + for _, n := range cfg.Nodes { + if addErr := pool.AddNode(n.Name, n.Address); addErr != nil { + logger.Warn("failed to connect to agent", "node", n.Name, "err", addErr) + // Non-fatal: the node may come up later. + } + } + + // Create DNS client. + dns, err := NewDNSClient(cfg.MCNS, logger) + if err != nil { + return fmt.Errorf("create DNS client: %w", err) + } + + m := &Master{ + Config: cfg, + DB: db, + Pool: pool, + DNS: dns, + Logger: logger, + Version: version, + } + + // TLS. + tlsCert, err := tls.LoadX509KeyPair(cfg.Server.TLSCert, cfg.Server.TLSKey) + if err != nil { + return fmt.Errorf("load TLS cert: %w", err) + } + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + MinVersion: tls.VersionTLS13, + } + + // Auth interceptor (same as agent — validates MCIAS tokens). + validator, err := auth.NewMCIASValidator(cfg.MCIAS.ServerURL, cfg.MCIAS.CACert) + if err != nil { + return fmt.Errorf("create MCIAS validator: %w", err) + } + + // gRPC server. + server := grpc.NewServer( + grpc.Creds(credentials.NewTLS(tlsConfig)), + grpc.ChainUnaryInterceptor( + auth.AuthInterceptor(validator), + ), + grpc.ChainStreamInterceptor( + auth.StreamAuthInterceptor(validator), + ), + ) + mcpv1.RegisterMcpMasterServiceServer(server, m) + + // Listen. + lis, err := net.Listen("tcp", cfg.Server.GRPCAddr) + if err != nil { + return fmt.Errorf("listen %q: %w", cfg.Server.GRPCAddr, err) + } + + logger.Info("master starting", + "addr", cfg.Server.GRPCAddr, + "version", version, + "nodes", len(cfg.Nodes), + ) + + // Signal handling. + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + errCh := make(chan error, 1) + go func() { + errCh <- server.Serve(lis) + }() + + select { + case <-ctx.Done(): + logger.Info("shutting down") + server.GracefulStop() + pool.Close() + return nil + case err := <-errCh: + pool.Close() + return fmt.Errorf("serve: %w", err) + } +} + +func parseLogLevel(level string) slog.Level { + switch level { + case "debug": + return slog.LevelDebug + case "warn": + return slog.LevelWarn + case "error": + return slog.LevelError + default: + return slog.LevelInfo + } +} diff --git a/internal/master/placement.go b/internal/master/placement.go new file mode 100644 index 0000000..27395cf --- /dev/null +++ b/internal/master/placement.go @@ -0,0 +1,64 @@ +package master + +import ( + "database/sql" + "fmt" + "sort" + + "git.wntrmute.dev/mc/mcp/internal/masterdb" +) + +// PickNode selects the best worker node for a new service deployment. +// Algorithm: fewest placed services, ties broken alphabetically. +func PickNode(db *sql.DB) (string, error) { + workers, err := masterdb.ListWorkerNodes(db) + if err != nil { + return "", fmt.Errorf("list workers: %w", err) + } + if len(workers) == 0 { + return "", fmt.Errorf("no worker nodes available") + } + + counts, err := masterdb.CountPlacementsPerNode(db) + if err != nil { + return "", fmt.Errorf("count placements: %w", err) + } + + // Sort: fewest placements first, then alphabetically. + sort.Slice(workers, func(i, j int) bool { + ci := counts[workers[i].Name] + cj := counts[workers[j].Name] + if ci != cj { + return ci < cj + } + return workers[i].Name < workers[j].Name + }) + + return workers[0].Name, nil +} + +// FindMasterNode returns the name of the node with role "master". +func FindMasterNode(db *sql.DB) (string, error) { + nodes, err := masterdb.ListNodes(db) + if err != nil { + return "", fmt.Errorf("list nodes: %w", err) + } + for _, n := range nodes { + if n.Role == "master" { + return n.Name, nil + } + } + return "", fmt.Errorf("no master node found") +} + +// FindEdgeNode returns the name of the first edge node. +func FindEdgeNode(db *sql.DB) (string, error) { + edges, err := masterdb.ListEdgeNodes(db) + if err != nil { + return "", fmt.Errorf("list edge nodes: %w", err) + } + if len(edges) == 0 { + return "", fmt.Errorf("no edge nodes available") + } + return edges[0].Name, nil +} diff --git a/internal/master/status.go b/internal/master/status.go new file mode 100644 index 0000000..b6b8905 --- /dev/null +++ b/internal/master/status.go @@ -0,0 +1,130 @@ +package master + +import ( + "context" + "fmt" + + mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1" + "git.wntrmute.dev/mc/mcp/internal/masterdb" +) + +// Status returns the status of services across the fleet. +func (m *Master) Status(ctx context.Context, req *mcpv1.MasterStatusRequest) (*mcpv1.MasterStatusResponse, error) { + m.Logger.Debug("Status", "service", req.GetServiceName()) + + resp := &mcpv1.MasterStatusResponse{} + + // If a specific service is requested, look up its placement. + if name := req.GetServiceName(); name != "" { + placement, err := masterdb.GetPlacement(m.DB, name) + if err != nil { + return nil, fmt.Errorf("lookup placement: %w", err) + } + if placement == nil { + return resp, nil // empty — service not found + } + + ss := m.getServiceStatus(ctx, placement) + resp.Services = append(resp.Services, ss) + return resp, nil + } + + // All services. + placements, err := masterdb.ListPlacements(m.DB) + if err != nil { + return nil, fmt.Errorf("list placements: %w", err) + } + + for _, p := range placements { + ss := m.getServiceStatus(ctx, p) + resp.Services = append(resp.Services, ss) + } + + return resp, nil +} + +func (m *Master) getServiceStatus(ctx context.Context, p *masterdb.Placement) *mcpv1.ServiceStatus { + ss := &mcpv1.ServiceStatus{ + Name: p.ServiceName, + Node: p.Node, + Tier: p.Tier, + Status: "unknown", + } + + // Query the agent for live status. + client, err := m.Pool.Get(p.Node) + if err != nil { + ss.Status = "unreachable" + return ss + } + + statusCtx, cancel := context.WithTimeout(ctx, m.Config.Timeouts.HealthCheck.Duration) + defer cancel() + + agentResp, err := client.GetServiceStatus(statusCtx, &mcpv1.GetServiceStatusRequest{ + Name: p.ServiceName, + }) + if err != nil { + ss.Status = "unreachable" + return ss + } + + // Map agent status to master status. + for _, info := range agentResp.GetServices() { + if info.GetName() == p.ServiceName { + if info.GetActive() { + ss.Status = "running" + } else { + ss.Status = "stopped" + } + break + } + } + + // Attach edge route info. + edgeRoutes, err := masterdb.ListEdgeRoutesForService(m.DB, p.ServiceName) + if err == nil { + for _, er := range edgeRoutes { + ss.EdgeRoutes = append(ss.EdgeRoutes, &mcpv1.EdgeRouteStatus{ + Hostname: er.Hostname, + EdgeNode: er.EdgeNode, + }) + } + } + + return ss +} + +// ListNodes returns all nodes in the registry with placement counts. +func (m *Master) ListNodes(_ context.Context, _ *mcpv1.ListNodesRequest) (*mcpv1.ListNodesResponse, error) { + m.Logger.Debug("ListNodes") + + nodes, err := masterdb.ListNodes(m.DB) + if err != nil { + return nil, fmt.Errorf("list nodes: %w", err) + } + + counts, err := masterdb.CountPlacementsPerNode(m.DB) + if err != nil { + return nil, fmt.Errorf("count placements: %w", err) + } + + resp := &mcpv1.ListNodesResponse{} + for _, n := range nodes { + ni := &mcpv1.NodeInfo{ + Name: n.Name, + Role: n.Role, + Address: n.Address, + Arch: n.Arch, + Status: n.Status, + Containers: int32(n.Containers), //nolint:gosec // small number + Services: int32(counts[n.Name]), //nolint:gosec // small number + } + if n.LastHeartbeat != nil { + ni.LastHeartbeat = n.LastHeartbeat.Format("2006-01-02T15:04:05Z") + } + resp.Nodes = append(resp.Nodes, ni) + } + + return resp, nil +} diff --git a/internal/master/undeploy.go b/internal/master/undeploy.go new file mode 100644 index 0000000..577d7b9 --- /dev/null +++ b/internal/master/undeploy.go @@ -0,0 +1,94 @@ +package master + +import ( + "context" + "fmt" + + mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1" + "git.wntrmute.dev/mc/mcp/internal/masterdb" +) + +// Undeploy handles MasterUndeployRequest: removes edge routes, DNS, then +// forwards the undeploy to the worker agent. +func (m *Master) Undeploy(ctx context.Context, req *mcpv1.MasterUndeployRequest) (*mcpv1.MasterUndeployResponse, error) { + serviceName := req.GetServiceName() + if serviceName == "" { + return nil, fmt.Errorf("service_name is required") + } + + m.Logger.Info("Undeploy", "service", serviceName) + + // Look up placement. + placement, err := masterdb.GetPlacement(m.DB, serviceName) + if err != nil { + return &mcpv1.MasterUndeployResponse{Error: fmt.Sprintf("lookup placement: %v", err)}, nil + } + if placement == nil { + return &mcpv1.MasterUndeployResponse{Error: fmt.Sprintf("service %q not found in placements", serviceName)}, nil + } + + // Step 1: Undeploy on worker first (stops the backend). + client, err := m.Pool.Get(placement.Node) + if err != nil { + return &mcpv1.MasterUndeployResponse{Error: fmt.Sprintf("agent connection: %v", err)}, nil + } + + undeployCtx, undeployCancel := context.WithTimeout(ctx, m.Config.Timeouts.Undeploy.Duration) + defer undeployCancel() + + _, undeployErr := client.UndeployService(undeployCtx, &mcpv1.UndeployServiceRequest{ + Name: serviceName, + }) + if undeployErr != nil { + m.Logger.Warn("agent undeploy failed", "service", serviceName, "node", placement.Node, "err", undeployErr) + // Continue — still clean up edge routes and records. + } + + // Step 2: Remove edge routes. + edgeRoutes, err := masterdb.ListEdgeRoutesForService(m.DB, serviceName) + if err != nil { + m.Logger.Warn("list edge routes for undeploy", "service", serviceName, "err", err) + } + for _, er := range edgeRoutes { + edgeClient, getErr := m.Pool.Get(er.EdgeNode) + if getErr != nil { + m.Logger.Warn("edge agent connection", "edge_node", er.EdgeNode, "err", getErr) + continue + } + + edgeCtx, edgeCancel := context.WithTimeout(ctx, m.Config.Timeouts.EdgeRoute.Duration) + _, removeErr := edgeClient.RemoveEdgeRoute(edgeCtx, &mcpv1.RemoveEdgeRouteRequest{ + Hostname: er.Hostname, + }) + edgeCancel() + + if removeErr != nil { + m.Logger.Warn("remove edge route", "hostname", er.Hostname, "err", removeErr) + } else { + m.Logger.Info("edge route removed", "hostname", er.Hostname, "edge_node", er.EdgeNode) + } + } + + // Step 3: Remove DNS. + if m.DNS != nil { + if dnsErr := m.DNS.RemoveRecord(ctx, serviceName); dnsErr != nil { + m.Logger.Warn("DNS removal failed", "service", serviceName, "err", dnsErr) + } + } + + // Step 4: Clean up records. + _ = masterdb.DeleteEdgeRoutesForService(m.DB, serviceName) + _ = masterdb.DeletePlacement(m.DB, serviceName) + + success := undeployErr == nil + var errMsg string + if !success { + errMsg = fmt.Sprintf("agent undeploy: %v", undeployErr) + } + + m.Logger.Info("undeploy complete", "service", serviceName, "success", success) + return &mcpv1.MasterUndeployResponse{ + Success: success, + Error: errMsg, + }, nil +}