diff --git a/internal/master/deploy.go b/internal/master/deploy.go new file mode 100644 index 0000000..da6751c --- /dev/null +++ b/internal/master/deploy.go @@ -0,0 +1,211 @@ +package master + +import ( + "context" + "fmt" + "net" + "strings" + + mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1" + "git.wntrmute.dev/mc/mcp/internal/masterdb" +) + +// Deploy handles the MasterDeployRequest: places the service, forwards to +// the agent, registers DNS, and coordinates edge routing. +func (m *Master) Deploy(ctx context.Context, req *mcpv1.MasterDeployRequest) (*mcpv1.MasterDeployResponse, error) { + spec := req.GetService() + if spec == nil || spec.GetName() == "" { + return nil, fmt.Errorf("service spec with name is required") + } + + serviceName := spec.GetName() + tier := spec.GetTier() + if tier == "" { + tier = "worker" + } + + m.Logger.Info("Deploy", "service", serviceName, "tier", tier, "node_override", spec.GetNode()) + + resp := &mcpv1.MasterDeployResponse{} + + // Step 1: Place service. + nodeName := spec.GetNode() + if nodeName == "" { + var err error + switch tier { + case "core": + nodeName, err = FindMasterNode(m.DB) + default: + nodeName, err = PickNode(m.DB) + } + if err != nil { + resp.Error = fmt.Sprintf("placement failed: %v", err) + return resp, nil + } + } + resp.Node = nodeName + + node, err := masterdb.GetNode(m.DB, nodeName) + if err != nil || node == nil { + resp.Error = fmt.Sprintf("node %q not found", nodeName) + return resp, nil + } + + // Parse the node's Tailnet IP from its address (host:port). + nodeHost, _, err := net.SplitHostPort(node.Address) + if err != nil { + resp.Error = fmt.Sprintf("invalid node address %q: %v", node.Address, err) + return resp, nil + } + + // Step 2: Forward deploy to the agent. + client, err := m.Pool.Get(nodeName) + if err != nil { + resp.Error = fmt.Sprintf("agent connection: %v", err) + return resp, nil + } + + deployCtx, deployCancel := context.WithTimeout(ctx, m.Config.Timeouts.Deploy.Duration) + defer deployCancel() + + deployResp, err := client.Deploy(deployCtx, &mcpv1.DeployRequest{ + Service: spec, + }) + if err != nil { + resp.DeployResult = &mcpv1.StepResult{Step: "deploy", Error: err.Error()} + resp.Error = fmt.Sprintf("agent deploy failed: %v", err) + return resp, nil + } + resp.DeployResult = &mcpv1.StepResult{Step: "deploy", Success: true} + + // Check agent-side results for failures. + for _, cr := range deployResp.GetResults() { + if !cr.GetSuccess() { + resp.DeployResult.Success = false + resp.DeployResult.Error = fmt.Sprintf("component %s: %s", cr.GetName(), cr.GetError()) + resp.Error = resp.DeployResult.Error + return resp, nil + } + } + + // Step 3: Register DNS — Tailnet IP from the node address. + if m.DNS != nil { + if err := m.DNS.EnsureRecord(ctx, serviceName, nodeHost); err != nil { + m.Logger.Warn("DNS registration failed", "service", serviceName, "err", err) + resp.DnsResult = &mcpv1.StepResult{Step: "dns", Error: err.Error()} + } else { + resp.DnsResult = &mcpv1.StepResult{Step: "dns", Success: true} + } + } + + // Record placement. + if err := masterdb.CreatePlacement(m.DB, serviceName, nodeName, tier); err != nil { + m.Logger.Error("record placement", "service", serviceName, "err", err) + } + + // Steps 4-9: Detect public routes and coordinate edge routing. + edgeResult := m.setupEdgeRoutes(ctx, spec, serviceName, nodeHost) + if edgeResult != nil { + resp.EdgeRouteResult = edgeResult + } + + // Compute overall success. + resp.Success = true + if resp.DeployResult != nil && !resp.DeployResult.Success { + resp.Success = false + } + if resp.EdgeRouteResult != nil && !resp.EdgeRouteResult.Success { + resp.Success = false + } + + m.Logger.Info("deploy complete", "service", serviceName, "node", nodeName, "success", resp.Success) + return resp, nil +} + +// setupEdgeRoutes detects public routes and coordinates edge routing. +func (m *Master) setupEdgeRoutes(ctx context.Context, spec *mcpv1.ServiceSpec, serviceName, nodeHost string) *mcpv1.StepResult { + var publicRoutes []*mcpv1.RouteSpec + for _, comp := range spec.GetComponents() { + for _, route := range comp.GetRoutes() { + if route.GetPublic() && route.GetHostname() != "" { + publicRoutes = append(publicRoutes, route) + } + } + } + + if len(publicRoutes) == 0 { + return nil + } + + // Find the edge node. + edgeNodeName, err := FindEdgeNode(m.DB) + if err != nil { + return &mcpv1.StepResult{Step: "edge_route", Error: fmt.Sprintf("no edge node: %v", err)} + } + + edgeClient, err := m.Pool.Get(edgeNodeName) + if err != nil { + return &mcpv1.StepResult{Step: "edge_route", Error: fmt.Sprintf("edge agent connection: %v", err)} + } + + var lastErr string + for _, route := range publicRoutes { + hostname := route.GetHostname() + + // Validate hostname against allowed domains. + if !m.isAllowedDomain(hostname) { + lastErr = fmt.Sprintf("hostname %q not under an allowed domain", hostname) + m.Logger.Warn("edge route rejected", "hostname", hostname, "reason", lastErr) + continue + } + + // Construct the backend hostname: .svc.mcp. + // For simplicity, use the service name as the component name. + zone := "metacircular.net" + if m.DNS != nil && m.DNS.Zone() != "" { + zone = m.DNS.Zone() + } + backendHostname := serviceName + "." + zone + + edgeCtx, edgeCancel := context.WithTimeout(ctx, m.Config.Timeouts.EdgeRoute.Duration) + _, setupErr := edgeClient.SetupEdgeRoute(edgeCtx, &mcpv1.SetupEdgeRouteRequest{ + Hostname: hostname, + BackendHostname: backendHostname, + BackendPort: route.GetPort(), + BackendTls: true, + }) + edgeCancel() + + if setupErr != nil { + lastErr = fmt.Sprintf("setup edge route %s: %v", hostname, setupErr) + m.Logger.Warn("edge route setup failed", "hostname", hostname, "err", setupErr) + continue + } + + // Record edge route in master DB. + if dbErr := masterdb.CreateEdgeRoute(m.DB, hostname, serviceName, edgeNodeName, backendHostname, int(route.GetPort())); dbErr != nil { + m.Logger.Warn("record edge route", "hostname", hostname, "err", dbErr) + } + + m.Logger.Info("edge route established", "hostname", hostname, "edge_node", edgeNodeName) + } + + if lastErr != "" { + return &mcpv1.StepResult{Step: "edge_route", Error: lastErr} + } + return &mcpv1.StepResult{Step: "edge_route", Success: true} +} + +// isAllowedDomain checks if hostname falls under one of the configured +// allowed domains using proper domain label matching. +func (m *Master) isAllowedDomain(hostname string) bool { + if len(m.Config.Edge.AllowedDomains) == 0 { + return true // no restrictions configured + } + for _, domain := range m.Config.Edge.AllowedDomains { + if hostname == domain || strings.HasSuffix(hostname, "."+domain) { + return true + } + } + return false +} diff --git a/internal/master/dns.go b/internal/master/dns.go new file mode 100644 index 0000000..0f91e85 --- /dev/null +++ b/internal/master/dns.go @@ -0,0 +1,252 @@ +package master + +import ( + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "strings" + "time" + + "git.wntrmute.dev/mc/mcp/internal/auth" + "git.wntrmute.dev/mc/mcp/internal/config" +) + +// DNSClient creates and removes A records in MCNS. Unlike the agent's +// DNSRegistrar, the master registers records for different node IPs +// (the nodeAddr is a per-call parameter, not a fixed config value). +type DNSClient struct { + serverURL string + token string + zone string + httpClient *http.Client + logger *slog.Logger +} + +type dnsRecord struct { + ID int `json:"ID"` + Name string `json:"Name"` + Type string `json:"Type"` + Value string `json:"Value"` + TTL int `json:"TTL"` +} + +// NewDNSClient creates a DNS client. Returns (nil, nil) if serverURL is empty. +func NewDNSClient(cfg config.MCNSConfig, logger *slog.Logger) (*DNSClient, error) { + if cfg.ServerURL == "" { + logger.Info("mcns not configured, DNS registration disabled") + return nil, nil + } + + token, err := auth.LoadToken(cfg.TokenPath) + if err != nil { + return nil, fmt.Errorf("load mcns token: %w", err) + } + + httpClient, err := newHTTPClient(cfg.CACert) + if err != nil { + return nil, fmt.Errorf("create mcns HTTP client: %w", err) + } + + logger.Info("master DNS client enabled", "server", cfg.ServerURL, "zone", cfg.Zone) + return &DNSClient{ + serverURL: strings.TrimRight(cfg.ServerURL, "/"), + token: token, + zone: cfg.Zone, + httpClient: httpClient, + logger: logger, + }, nil +} + +// Zone returns the configured DNS zone. +func (d *DNSClient) Zone() string { + if d == nil { + return "" + } + return d.zone +} + +// EnsureRecord ensures an A record exists for serviceName pointing to nodeAddr. +func (d *DNSClient) EnsureRecord(ctx context.Context, serviceName, nodeAddr string) error { + if d == nil { + return nil + } + + existing, err := d.listRecords(ctx, serviceName) + if err != nil { + return fmt.Errorf("list DNS records: %w", err) + } + + for _, r := range existing { + if r.Value == nodeAddr { + d.logger.Debug("DNS record exists", "service", serviceName, "value", r.Value) + return nil + } + } + + if len(existing) > 0 { + d.logger.Info("updating DNS record", "service", serviceName, + "old_value", existing[0].Value, "new_value", nodeAddr) + return d.updateRecord(ctx, existing[0].ID, serviceName, nodeAddr) + } + + d.logger.Info("creating DNS record", "service", serviceName, + "record", serviceName+"."+d.zone, "value", nodeAddr) + return d.createRecord(ctx, serviceName, nodeAddr) +} + +// RemoveRecord removes A records for serviceName. +func (d *DNSClient) RemoveRecord(ctx context.Context, serviceName string) error { + if d == nil { + return nil + } + + existing, err := d.listRecords(ctx, serviceName) + if err != nil { + return fmt.Errorf("list DNS records: %w", err) + } + + for _, r := range existing { + d.logger.Info("removing DNS record", "service", serviceName, "id", r.ID) + if err := d.deleteRecord(ctx, r.ID); err != nil { + return err + } + } + return nil +} + +func (d *DNSClient) listRecords(ctx context.Context, serviceName string) ([]dnsRecord, error) { + url := fmt.Sprintf("%s/v1/zones/%s/records?name=%s&type=A", d.serverURL, d.zone, serviceName) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("create list request: %w", err) + } + req.Header.Set("Authorization", "Bearer "+d.token) + + resp, err := d.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("list records: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read list response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("list records: mcns returned %d: %s", resp.StatusCode, string(body)) + } + + var envelope struct { + Records []dnsRecord `json:"records"` + } + if err := json.Unmarshal(body, &envelope); err != nil { + return nil, fmt.Errorf("parse list response: %w", err) + } + return envelope.Records, nil +} + +func (d *DNSClient) createRecord(ctx context.Context, serviceName, nodeAddr string) error { + reqBody, _ := json.Marshal(map[string]interface{}{ + "name": serviceName, "type": "A", "value": nodeAddr, "ttl": 300, + }) + + url := fmt.Sprintf("%s/v1/zones/%s/records", d.serverURL, d.zone) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(reqBody)) + if err != nil { + return fmt.Errorf("create record request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+d.token) + + resp, err := d.httpClient.Do(req) + if err != nil { + return fmt.Errorf("create record: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("create record: mcns returned %d: %s", resp.StatusCode, string(respBody)) + } + return nil +} + +func (d *DNSClient) updateRecord(ctx context.Context, recordID int, serviceName, nodeAddr string) error { + reqBody, _ := json.Marshal(map[string]interface{}{ + "name": serviceName, "type": "A", "value": nodeAddr, "ttl": 300, + }) + + url := fmt.Sprintf("%s/v1/zones/%s/records/%d", d.serverURL, d.zone, recordID) + req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(reqBody)) + if err != nil { + return fmt.Errorf("create update request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+d.token) + + resp, err := d.httpClient.Do(req) + if err != nil { + return fmt.Errorf("update record: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("update record: mcns returned %d: %s", resp.StatusCode, string(respBody)) + } + return nil +} + +func (d *DNSClient) deleteRecord(ctx context.Context, recordID int) error { + url := fmt.Sprintf("%s/v1/zones/%s/records/%d", d.serverURL, d.zone, recordID) + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) + if err != nil { + return fmt.Errorf("create delete request: %w", err) + } + req.Header.Set("Authorization", "Bearer "+d.token) + + resp, err := d.httpClient.Do(req) + if err != nil { + return fmt.Errorf("delete record: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("delete record: mcns returned %d: %s", resp.StatusCode, string(respBody)) + } + return nil +} + +func newHTTPClient(caCertPath string) (*http.Client, error) { + tlsConfig := &tls.Config{ + MinVersion: tls.VersionTLS13, + } + + if caCertPath != "" { + caCert, err := os.ReadFile(caCertPath) //nolint:gosec // path from trusted config + if err != nil { + return nil, fmt.Errorf("read CA cert %q: %w", caCertPath, err) + } + pool := x509.NewCertPool() + if !pool.AppendCertsFromPEM(caCert) { + return nil, fmt.Errorf("parse CA cert %q: no valid certificates found", caCertPath) + } + tlsConfig.RootCAs = pool + } + + return &http.Client{ + Timeout: 30 * time.Second, + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + }, nil +} diff --git a/internal/master/master.go b/internal/master/master.go new file mode 100644 index 0000000..e25dafd --- /dev/null +++ b/internal/master/master.go @@ -0,0 +1,159 @@ +package master + +import ( + "context" + "crypto/tls" + "database/sql" + "fmt" + "log/slog" + "net" + "os" + "os/signal" + "syscall" + + mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1" + "git.wntrmute.dev/mc/mcp/internal/auth" + "git.wntrmute.dev/mc/mcp/internal/config" + "git.wntrmute.dev/mc/mcp/internal/masterdb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +// Master is the MCP cluster master. It coordinates multi-node deployments, +// manages edge routes, and stores cluster state. +type Master struct { + mcpv1.UnimplementedMcpMasterServiceServer + + Config *config.MasterConfig + DB *sql.DB + Pool *AgentPool + DNS *DNSClient + Logger *slog.Logger + Version string +} + +// Run starts the master: opens the database, bootstraps nodes, sets up the +// gRPC server with TLS and auth, and blocks until SIGINT/SIGTERM. +func Run(cfg *config.MasterConfig, version string) error { + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + Level: parseLogLevel(cfg.Log.Level), + })) + + // Open master database. + db, err := masterdb.Open(cfg.Database.Path) + if err != nil { + return fmt.Errorf("open master database: %w", err) + } + defer func() { _ = db.Close() }() + + // Bootstrap nodes from config. + for _, n := range cfg.Nodes { + if err := masterdb.UpsertNode(db, n.Name, n.Address, n.Role, "amd64"); err != nil { + return fmt.Errorf("bootstrap node %s: %w", n.Name, err) + } + logger.Info("bootstrapped node", "name", n.Name, "address", n.Address, "role", n.Role) + } + + // Load service token for dialing agents. + token, err := LoadServiceToken(cfg.Master.ServiceTokenPath) + if err != nil { + return fmt.Errorf("load service token: %w", err) + } + + // Create agent connection pool. + pool := NewAgentPool(cfg.Master.CACert, token) + for _, n := range cfg.Nodes { + if addErr := pool.AddNode(n.Name, n.Address); addErr != nil { + logger.Warn("failed to connect to agent", "node", n.Name, "err", addErr) + // Non-fatal: the node may come up later. + } + } + + // Create DNS client. + dns, err := NewDNSClient(cfg.MCNS, logger) + if err != nil { + return fmt.Errorf("create DNS client: %w", err) + } + + m := &Master{ + Config: cfg, + DB: db, + Pool: pool, + DNS: dns, + Logger: logger, + Version: version, + } + + // TLS. + tlsCert, err := tls.LoadX509KeyPair(cfg.Server.TLSCert, cfg.Server.TLSKey) + if err != nil { + return fmt.Errorf("load TLS cert: %w", err) + } + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + MinVersion: tls.VersionTLS13, + } + + // Auth interceptor (same as agent — validates MCIAS tokens). + validator, err := auth.NewMCIASValidator(cfg.MCIAS.ServerURL, cfg.MCIAS.CACert) + if err != nil { + return fmt.Errorf("create MCIAS validator: %w", err) + } + + // gRPC server. + server := grpc.NewServer( + grpc.Creds(credentials.NewTLS(tlsConfig)), + grpc.ChainUnaryInterceptor( + auth.AuthInterceptor(validator), + ), + grpc.ChainStreamInterceptor( + auth.StreamAuthInterceptor(validator), + ), + ) + mcpv1.RegisterMcpMasterServiceServer(server, m) + + // Listen. + lis, err := net.Listen("tcp", cfg.Server.GRPCAddr) + if err != nil { + return fmt.Errorf("listen %q: %w", cfg.Server.GRPCAddr, err) + } + + logger.Info("master starting", + "addr", cfg.Server.GRPCAddr, + "version", version, + "nodes", len(cfg.Nodes), + ) + + // Signal handling. + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + errCh := make(chan error, 1) + go func() { + errCh <- server.Serve(lis) + }() + + select { + case <-ctx.Done(): + logger.Info("shutting down") + server.GracefulStop() + pool.Close() + return nil + case err := <-errCh: + pool.Close() + return fmt.Errorf("serve: %w", err) + } +} + +func parseLogLevel(level string) slog.Level { + switch level { + case "debug": + return slog.LevelDebug + case "warn": + return slog.LevelWarn + case "error": + return slog.LevelError + default: + return slog.LevelInfo + } +} diff --git a/internal/master/placement.go b/internal/master/placement.go new file mode 100644 index 0000000..27395cf --- /dev/null +++ b/internal/master/placement.go @@ -0,0 +1,64 @@ +package master + +import ( + "database/sql" + "fmt" + "sort" + + "git.wntrmute.dev/mc/mcp/internal/masterdb" +) + +// PickNode selects the best worker node for a new service deployment. +// Algorithm: fewest placed services, ties broken alphabetically. +func PickNode(db *sql.DB) (string, error) { + workers, err := masterdb.ListWorkerNodes(db) + if err != nil { + return "", fmt.Errorf("list workers: %w", err) + } + if len(workers) == 0 { + return "", fmt.Errorf("no worker nodes available") + } + + counts, err := masterdb.CountPlacementsPerNode(db) + if err != nil { + return "", fmt.Errorf("count placements: %w", err) + } + + // Sort: fewest placements first, then alphabetically. + sort.Slice(workers, func(i, j int) bool { + ci := counts[workers[i].Name] + cj := counts[workers[j].Name] + if ci != cj { + return ci < cj + } + return workers[i].Name < workers[j].Name + }) + + return workers[0].Name, nil +} + +// FindMasterNode returns the name of the node with role "master". +func FindMasterNode(db *sql.DB) (string, error) { + nodes, err := masterdb.ListNodes(db) + if err != nil { + return "", fmt.Errorf("list nodes: %w", err) + } + for _, n := range nodes { + if n.Role == "master" { + return n.Name, nil + } + } + return "", fmt.Errorf("no master node found") +} + +// FindEdgeNode returns the name of the first edge node. +func FindEdgeNode(db *sql.DB) (string, error) { + edges, err := masterdb.ListEdgeNodes(db) + if err != nil { + return "", fmt.Errorf("list edge nodes: %w", err) + } + if len(edges) == 0 { + return "", fmt.Errorf("no edge nodes available") + } + return edges[0].Name, nil +} diff --git a/internal/master/status.go b/internal/master/status.go new file mode 100644 index 0000000..b6b8905 --- /dev/null +++ b/internal/master/status.go @@ -0,0 +1,130 @@ +package master + +import ( + "context" + "fmt" + + mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1" + "git.wntrmute.dev/mc/mcp/internal/masterdb" +) + +// Status returns the status of services across the fleet. +func (m *Master) Status(ctx context.Context, req *mcpv1.MasterStatusRequest) (*mcpv1.MasterStatusResponse, error) { + m.Logger.Debug("Status", "service", req.GetServiceName()) + + resp := &mcpv1.MasterStatusResponse{} + + // If a specific service is requested, look up its placement. + if name := req.GetServiceName(); name != "" { + placement, err := masterdb.GetPlacement(m.DB, name) + if err != nil { + return nil, fmt.Errorf("lookup placement: %w", err) + } + if placement == nil { + return resp, nil // empty — service not found + } + + ss := m.getServiceStatus(ctx, placement) + resp.Services = append(resp.Services, ss) + return resp, nil + } + + // All services. + placements, err := masterdb.ListPlacements(m.DB) + if err != nil { + return nil, fmt.Errorf("list placements: %w", err) + } + + for _, p := range placements { + ss := m.getServiceStatus(ctx, p) + resp.Services = append(resp.Services, ss) + } + + return resp, nil +} + +func (m *Master) getServiceStatus(ctx context.Context, p *masterdb.Placement) *mcpv1.ServiceStatus { + ss := &mcpv1.ServiceStatus{ + Name: p.ServiceName, + Node: p.Node, + Tier: p.Tier, + Status: "unknown", + } + + // Query the agent for live status. + client, err := m.Pool.Get(p.Node) + if err != nil { + ss.Status = "unreachable" + return ss + } + + statusCtx, cancel := context.WithTimeout(ctx, m.Config.Timeouts.HealthCheck.Duration) + defer cancel() + + agentResp, err := client.GetServiceStatus(statusCtx, &mcpv1.GetServiceStatusRequest{ + Name: p.ServiceName, + }) + if err != nil { + ss.Status = "unreachable" + return ss + } + + // Map agent status to master status. + for _, info := range agentResp.GetServices() { + if info.GetName() == p.ServiceName { + if info.GetActive() { + ss.Status = "running" + } else { + ss.Status = "stopped" + } + break + } + } + + // Attach edge route info. + edgeRoutes, err := masterdb.ListEdgeRoutesForService(m.DB, p.ServiceName) + if err == nil { + for _, er := range edgeRoutes { + ss.EdgeRoutes = append(ss.EdgeRoutes, &mcpv1.EdgeRouteStatus{ + Hostname: er.Hostname, + EdgeNode: er.EdgeNode, + }) + } + } + + return ss +} + +// ListNodes returns all nodes in the registry with placement counts. +func (m *Master) ListNodes(_ context.Context, _ *mcpv1.ListNodesRequest) (*mcpv1.ListNodesResponse, error) { + m.Logger.Debug("ListNodes") + + nodes, err := masterdb.ListNodes(m.DB) + if err != nil { + return nil, fmt.Errorf("list nodes: %w", err) + } + + counts, err := masterdb.CountPlacementsPerNode(m.DB) + if err != nil { + return nil, fmt.Errorf("count placements: %w", err) + } + + resp := &mcpv1.ListNodesResponse{} + for _, n := range nodes { + ni := &mcpv1.NodeInfo{ + Name: n.Name, + Role: n.Role, + Address: n.Address, + Arch: n.Arch, + Status: n.Status, + Containers: int32(n.Containers), //nolint:gosec // small number + Services: int32(counts[n.Name]), //nolint:gosec // small number + } + if n.LastHeartbeat != nil { + ni.LastHeartbeat = n.LastHeartbeat.Format("2006-01-02T15:04:05Z") + } + resp.Nodes = append(resp.Nodes, ni) + } + + return resp, nil +} diff --git a/internal/master/undeploy.go b/internal/master/undeploy.go new file mode 100644 index 0000000..577d7b9 --- /dev/null +++ b/internal/master/undeploy.go @@ -0,0 +1,94 @@ +package master + +import ( + "context" + "fmt" + + mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1" + "git.wntrmute.dev/mc/mcp/internal/masterdb" +) + +// Undeploy handles MasterUndeployRequest: removes edge routes, DNS, then +// forwards the undeploy to the worker agent. +func (m *Master) Undeploy(ctx context.Context, req *mcpv1.MasterUndeployRequest) (*mcpv1.MasterUndeployResponse, error) { + serviceName := req.GetServiceName() + if serviceName == "" { + return nil, fmt.Errorf("service_name is required") + } + + m.Logger.Info("Undeploy", "service", serviceName) + + // Look up placement. + placement, err := masterdb.GetPlacement(m.DB, serviceName) + if err != nil { + return &mcpv1.MasterUndeployResponse{Error: fmt.Sprintf("lookup placement: %v", err)}, nil + } + if placement == nil { + return &mcpv1.MasterUndeployResponse{Error: fmt.Sprintf("service %q not found in placements", serviceName)}, nil + } + + // Step 1: Undeploy on worker first (stops the backend). + client, err := m.Pool.Get(placement.Node) + if err != nil { + return &mcpv1.MasterUndeployResponse{Error: fmt.Sprintf("agent connection: %v", err)}, nil + } + + undeployCtx, undeployCancel := context.WithTimeout(ctx, m.Config.Timeouts.Undeploy.Duration) + defer undeployCancel() + + _, undeployErr := client.UndeployService(undeployCtx, &mcpv1.UndeployServiceRequest{ + Name: serviceName, + }) + if undeployErr != nil { + m.Logger.Warn("agent undeploy failed", "service", serviceName, "node", placement.Node, "err", undeployErr) + // Continue — still clean up edge routes and records. + } + + // Step 2: Remove edge routes. + edgeRoutes, err := masterdb.ListEdgeRoutesForService(m.DB, serviceName) + if err != nil { + m.Logger.Warn("list edge routes for undeploy", "service", serviceName, "err", err) + } + for _, er := range edgeRoutes { + edgeClient, getErr := m.Pool.Get(er.EdgeNode) + if getErr != nil { + m.Logger.Warn("edge agent connection", "edge_node", er.EdgeNode, "err", getErr) + continue + } + + edgeCtx, edgeCancel := context.WithTimeout(ctx, m.Config.Timeouts.EdgeRoute.Duration) + _, removeErr := edgeClient.RemoveEdgeRoute(edgeCtx, &mcpv1.RemoveEdgeRouteRequest{ + Hostname: er.Hostname, + }) + edgeCancel() + + if removeErr != nil { + m.Logger.Warn("remove edge route", "hostname", er.Hostname, "err", removeErr) + } else { + m.Logger.Info("edge route removed", "hostname", er.Hostname, "edge_node", er.EdgeNode) + } + } + + // Step 3: Remove DNS. + if m.DNS != nil { + if dnsErr := m.DNS.RemoveRecord(ctx, serviceName); dnsErr != nil { + m.Logger.Warn("DNS removal failed", "service", serviceName, "err", dnsErr) + } + } + + // Step 4: Clean up records. + _ = masterdb.DeleteEdgeRoutesForService(m.DB, serviceName) + _ = masterdb.DeletePlacement(m.DB, serviceName) + + success := undeployErr == nil + var errMsg string + if !success { + errMsg = fmt.Sprintf("agent undeploy: %v", undeployErr) + } + + m.Logger.Info("undeploy complete", "service", serviceName, "success", success) + return &mcpv1.MasterUndeployResponse{ + Success: success, + Error: errMsg, + }, nil +}