Add master core: deploy, undeploy, status, placement, DNS

Master struct with Run() lifecycle following the agent pattern exactly:
open DB → bootstrap nodes → create agent pool → DNS client → TLS →
auth interceptor → gRPC server → signal handler.

RPC handlers:
- Deploy: place service (tier-aware), forward to agent, register DNS
  with Tailnet IP, detect public routes, validate against allowed
  domains, coordinate edge routing via SetupEdgeRoute, record placement
  and edge routes in master DB, return structured per-step results.
- Undeploy: undeploy on worker first, then remove edge routes, DNS,
  and DB records. Best-effort cleanup on failure.
- Status: query agents for service status, aggregate with placements
  and edge route info from master DB.
- ListNodes: return all nodes with placement counts.

Placement algorithm: fewest services, ties broken alphabetically.
DNS client: extracted from agent's DNSRegistrar with explicit nodeAddr
parameter (master registers for different nodes).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-02 15:39:46 -07:00
parent 20735e4b41
commit 6fd81cacf2
6 changed files with 910 additions and 0 deletions

211
internal/master/deploy.go Normal file
View File

@@ -0,0 +1,211 @@
package master
import (
"context"
"fmt"
"net"
"strings"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/masterdb"
)
// Deploy handles the MasterDeployRequest: places the service, forwards to
// the agent, registers DNS, and coordinates edge routing.
func (m *Master) Deploy(ctx context.Context, req *mcpv1.MasterDeployRequest) (*mcpv1.MasterDeployResponse, error) {
spec := req.GetService()
if spec == nil || spec.GetName() == "" {
return nil, fmt.Errorf("service spec with name is required")
}
serviceName := spec.GetName()
tier := spec.GetTier()
if tier == "" {
tier = "worker"
}
m.Logger.Info("Deploy", "service", serviceName, "tier", tier, "node_override", spec.GetNode())
resp := &mcpv1.MasterDeployResponse{}
// Step 1: Place service.
nodeName := spec.GetNode()
if nodeName == "" {
var err error
switch tier {
case "core":
nodeName, err = FindMasterNode(m.DB)
default:
nodeName, err = PickNode(m.DB)
}
if err != nil {
resp.Error = fmt.Sprintf("placement failed: %v", err)
return resp, nil
}
}
resp.Node = nodeName
node, err := masterdb.GetNode(m.DB, nodeName)
if err != nil || node == nil {
resp.Error = fmt.Sprintf("node %q not found", nodeName)
return resp, nil
}
// Parse the node's Tailnet IP from its address (host:port).
nodeHost, _, err := net.SplitHostPort(node.Address)
if err != nil {
resp.Error = fmt.Sprintf("invalid node address %q: %v", node.Address, err)
return resp, nil
}
// Step 2: Forward deploy to the agent.
client, err := m.Pool.Get(nodeName)
if err != nil {
resp.Error = fmt.Sprintf("agent connection: %v", err)
return resp, nil
}
deployCtx, deployCancel := context.WithTimeout(ctx, m.Config.Timeouts.Deploy.Duration)
defer deployCancel()
deployResp, err := client.Deploy(deployCtx, &mcpv1.DeployRequest{
Service: spec,
})
if err != nil {
resp.DeployResult = &mcpv1.StepResult{Step: "deploy", Error: err.Error()}
resp.Error = fmt.Sprintf("agent deploy failed: %v", err)
return resp, nil
}
resp.DeployResult = &mcpv1.StepResult{Step: "deploy", Success: true}
// Check agent-side results for failures.
for _, cr := range deployResp.GetResults() {
if !cr.GetSuccess() {
resp.DeployResult.Success = false
resp.DeployResult.Error = fmt.Sprintf("component %s: %s", cr.GetName(), cr.GetError())
resp.Error = resp.DeployResult.Error
return resp, nil
}
}
// Step 3: Register DNS — Tailnet IP from the node address.
if m.DNS != nil {
if err := m.DNS.EnsureRecord(ctx, serviceName, nodeHost); err != nil {
m.Logger.Warn("DNS registration failed", "service", serviceName, "err", err)
resp.DnsResult = &mcpv1.StepResult{Step: "dns", Error: err.Error()}
} else {
resp.DnsResult = &mcpv1.StepResult{Step: "dns", Success: true}
}
}
// Record placement.
if err := masterdb.CreatePlacement(m.DB, serviceName, nodeName, tier); err != nil {
m.Logger.Error("record placement", "service", serviceName, "err", err)
}
// Steps 4-9: Detect public routes and coordinate edge routing.
edgeResult := m.setupEdgeRoutes(ctx, spec, serviceName, nodeHost)
if edgeResult != nil {
resp.EdgeRouteResult = edgeResult
}
// Compute overall success.
resp.Success = true
if resp.DeployResult != nil && !resp.DeployResult.Success {
resp.Success = false
}
if resp.EdgeRouteResult != nil && !resp.EdgeRouteResult.Success {
resp.Success = false
}
m.Logger.Info("deploy complete", "service", serviceName, "node", nodeName, "success", resp.Success)
return resp, nil
}
// setupEdgeRoutes detects public routes and coordinates edge routing.
func (m *Master) setupEdgeRoutes(ctx context.Context, spec *mcpv1.ServiceSpec, serviceName, nodeHost string) *mcpv1.StepResult {
var publicRoutes []*mcpv1.RouteSpec
for _, comp := range spec.GetComponents() {
for _, route := range comp.GetRoutes() {
if route.GetPublic() && route.GetHostname() != "" {
publicRoutes = append(publicRoutes, route)
}
}
}
if len(publicRoutes) == 0 {
return nil
}
// Find the edge node.
edgeNodeName, err := FindEdgeNode(m.DB)
if err != nil {
return &mcpv1.StepResult{Step: "edge_route", Error: fmt.Sprintf("no edge node: %v", err)}
}
edgeClient, err := m.Pool.Get(edgeNodeName)
if err != nil {
return &mcpv1.StepResult{Step: "edge_route", Error: fmt.Sprintf("edge agent connection: %v", err)}
}
var lastErr string
for _, route := range publicRoutes {
hostname := route.GetHostname()
// Validate hostname against allowed domains.
if !m.isAllowedDomain(hostname) {
lastErr = fmt.Sprintf("hostname %q not under an allowed domain", hostname)
m.Logger.Warn("edge route rejected", "hostname", hostname, "reason", lastErr)
continue
}
// Construct the backend hostname: <component>.svc.mcp.<zone>
// For simplicity, use the service name as the component name.
zone := "metacircular.net"
if m.DNS != nil && m.DNS.Zone() != "" {
zone = m.DNS.Zone()
}
backendHostname := serviceName + "." + zone
edgeCtx, edgeCancel := context.WithTimeout(ctx, m.Config.Timeouts.EdgeRoute.Duration)
_, setupErr := edgeClient.SetupEdgeRoute(edgeCtx, &mcpv1.SetupEdgeRouteRequest{
Hostname: hostname,
BackendHostname: backendHostname,
BackendPort: route.GetPort(),
BackendTls: true,
})
edgeCancel()
if setupErr != nil {
lastErr = fmt.Sprintf("setup edge route %s: %v", hostname, setupErr)
m.Logger.Warn("edge route setup failed", "hostname", hostname, "err", setupErr)
continue
}
// Record edge route in master DB.
if dbErr := masterdb.CreateEdgeRoute(m.DB, hostname, serviceName, edgeNodeName, backendHostname, int(route.GetPort())); dbErr != nil {
m.Logger.Warn("record edge route", "hostname", hostname, "err", dbErr)
}
m.Logger.Info("edge route established", "hostname", hostname, "edge_node", edgeNodeName)
}
if lastErr != "" {
return &mcpv1.StepResult{Step: "edge_route", Error: lastErr}
}
return &mcpv1.StepResult{Step: "edge_route", Success: true}
}
// isAllowedDomain checks if hostname falls under one of the configured
// allowed domains using proper domain label matching.
func (m *Master) isAllowedDomain(hostname string) bool {
if len(m.Config.Edge.AllowedDomains) == 0 {
return true // no restrictions configured
}
for _, domain := range m.Config.Edge.AllowedDomains {
if hostname == domain || strings.HasSuffix(hostname, "."+domain) {
return true
}
}
return false
}

252
internal/master/dns.go Normal file
View File

@@ -0,0 +1,252 @@
package master
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"strings"
"time"
"git.wntrmute.dev/mc/mcp/internal/auth"
"git.wntrmute.dev/mc/mcp/internal/config"
)
// DNSClient creates and removes A records in MCNS. Unlike the agent's
// DNSRegistrar, the master registers records for different node IPs
// (the nodeAddr is a per-call parameter, not a fixed config value).
type DNSClient struct {
serverURL string
token string
zone string
httpClient *http.Client
logger *slog.Logger
}
type dnsRecord struct {
ID int `json:"ID"`
Name string `json:"Name"`
Type string `json:"Type"`
Value string `json:"Value"`
TTL int `json:"TTL"`
}
// NewDNSClient creates a DNS client. Returns (nil, nil) if serverURL is empty.
func NewDNSClient(cfg config.MCNSConfig, logger *slog.Logger) (*DNSClient, error) {
if cfg.ServerURL == "" {
logger.Info("mcns not configured, DNS registration disabled")
return nil, nil
}
token, err := auth.LoadToken(cfg.TokenPath)
if err != nil {
return nil, fmt.Errorf("load mcns token: %w", err)
}
httpClient, err := newHTTPClient(cfg.CACert)
if err != nil {
return nil, fmt.Errorf("create mcns HTTP client: %w", err)
}
logger.Info("master DNS client enabled", "server", cfg.ServerURL, "zone", cfg.Zone)
return &DNSClient{
serverURL: strings.TrimRight(cfg.ServerURL, "/"),
token: token,
zone: cfg.Zone,
httpClient: httpClient,
logger: logger,
}, nil
}
// Zone returns the configured DNS zone.
func (d *DNSClient) Zone() string {
if d == nil {
return ""
}
return d.zone
}
// EnsureRecord ensures an A record exists for serviceName pointing to nodeAddr.
func (d *DNSClient) EnsureRecord(ctx context.Context, serviceName, nodeAddr string) error {
if d == nil {
return nil
}
existing, err := d.listRecords(ctx, serviceName)
if err != nil {
return fmt.Errorf("list DNS records: %w", err)
}
for _, r := range existing {
if r.Value == nodeAddr {
d.logger.Debug("DNS record exists", "service", serviceName, "value", r.Value)
return nil
}
}
if len(existing) > 0 {
d.logger.Info("updating DNS record", "service", serviceName,
"old_value", existing[0].Value, "new_value", nodeAddr)
return d.updateRecord(ctx, existing[0].ID, serviceName, nodeAddr)
}
d.logger.Info("creating DNS record", "service", serviceName,
"record", serviceName+"."+d.zone, "value", nodeAddr)
return d.createRecord(ctx, serviceName, nodeAddr)
}
// RemoveRecord removes A records for serviceName.
func (d *DNSClient) RemoveRecord(ctx context.Context, serviceName string) error {
if d == nil {
return nil
}
existing, err := d.listRecords(ctx, serviceName)
if err != nil {
return fmt.Errorf("list DNS records: %w", err)
}
for _, r := range existing {
d.logger.Info("removing DNS record", "service", serviceName, "id", r.ID)
if err := d.deleteRecord(ctx, r.ID); err != nil {
return err
}
}
return nil
}
func (d *DNSClient) listRecords(ctx context.Context, serviceName string) ([]dnsRecord, error) {
url := fmt.Sprintf("%s/v1/zones/%s/records?name=%s&type=A", d.serverURL, d.zone, serviceName)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("create list request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+d.token)
resp, err := d.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("list records: %w", err)
}
defer func() { _ = resp.Body.Close() }()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read list response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("list records: mcns returned %d: %s", resp.StatusCode, string(body))
}
var envelope struct {
Records []dnsRecord `json:"records"`
}
if err := json.Unmarshal(body, &envelope); err != nil {
return nil, fmt.Errorf("parse list response: %w", err)
}
return envelope.Records, nil
}
func (d *DNSClient) createRecord(ctx context.Context, serviceName, nodeAddr string) error {
reqBody, _ := json.Marshal(map[string]interface{}{
"name": serviceName, "type": "A", "value": nodeAddr, "ttl": 300,
})
url := fmt.Sprintf("%s/v1/zones/%s/records", d.serverURL, d.zone)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(reqBody))
if err != nil {
return fmt.Errorf("create record request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+d.token)
resp, err := d.httpClient.Do(req)
if err != nil {
return fmt.Errorf("create record: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("create record: mcns returned %d: %s", resp.StatusCode, string(respBody))
}
return nil
}
func (d *DNSClient) updateRecord(ctx context.Context, recordID int, serviceName, nodeAddr string) error {
reqBody, _ := json.Marshal(map[string]interface{}{
"name": serviceName, "type": "A", "value": nodeAddr, "ttl": 300,
})
url := fmt.Sprintf("%s/v1/zones/%s/records/%d", d.serverURL, d.zone, recordID)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(reqBody))
if err != nil {
return fmt.Errorf("create update request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+d.token)
resp, err := d.httpClient.Do(req)
if err != nil {
return fmt.Errorf("update record: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("update record: mcns returned %d: %s", resp.StatusCode, string(respBody))
}
return nil
}
func (d *DNSClient) deleteRecord(ctx context.Context, recordID int) error {
url := fmt.Sprintf("%s/v1/zones/%s/records/%d", d.serverURL, d.zone, recordID)
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil)
if err != nil {
return fmt.Errorf("create delete request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+d.token)
resp, err := d.httpClient.Do(req)
if err != nil {
return fmt.Errorf("delete record: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("delete record: mcns returned %d: %s", resp.StatusCode, string(respBody))
}
return nil
}
func newHTTPClient(caCertPath string) (*http.Client, error) {
tlsConfig := &tls.Config{
MinVersion: tls.VersionTLS13,
}
if caCertPath != "" {
caCert, err := os.ReadFile(caCertPath) //nolint:gosec // path from trusted config
if err != nil {
return nil, fmt.Errorf("read CA cert %q: %w", caCertPath, err)
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("parse CA cert %q: no valid certificates found", caCertPath)
}
tlsConfig.RootCAs = pool
}
return &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}, nil
}

159
internal/master/master.go Normal file
View File

@@ -0,0 +1,159 @@
package master
import (
"context"
"crypto/tls"
"database/sql"
"fmt"
"log/slog"
"net"
"os"
"os/signal"
"syscall"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/auth"
"git.wntrmute.dev/mc/mcp/internal/config"
"git.wntrmute.dev/mc/mcp/internal/masterdb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// Master is the MCP cluster master. It coordinates multi-node deployments,
// manages edge routes, and stores cluster state.
type Master struct {
mcpv1.UnimplementedMcpMasterServiceServer
Config *config.MasterConfig
DB *sql.DB
Pool *AgentPool
DNS *DNSClient
Logger *slog.Logger
Version string
}
// Run starts the master: opens the database, bootstraps nodes, sets up the
// gRPC server with TLS and auth, and blocks until SIGINT/SIGTERM.
func Run(cfg *config.MasterConfig, version string) error {
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: parseLogLevel(cfg.Log.Level),
}))
// Open master database.
db, err := masterdb.Open(cfg.Database.Path)
if err != nil {
return fmt.Errorf("open master database: %w", err)
}
defer func() { _ = db.Close() }()
// Bootstrap nodes from config.
for _, n := range cfg.Nodes {
if err := masterdb.UpsertNode(db, n.Name, n.Address, n.Role, "amd64"); err != nil {
return fmt.Errorf("bootstrap node %s: %w", n.Name, err)
}
logger.Info("bootstrapped node", "name", n.Name, "address", n.Address, "role", n.Role)
}
// Load service token for dialing agents.
token, err := LoadServiceToken(cfg.Master.ServiceTokenPath)
if err != nil {
return fmt.Errorf("load service token: %w", err)
}
// Create agent connection pool.
pool := NewAgentPool(cfg.Master.CACert, token)
for _, n := range cfg.Nodes {
if addErr := pool.AddNode(n.Name, n.Address); addErr != nil {
logger.Warn("failed to connect to agent", "node", n.Name, "err", addErr)
// Non-fatal: the node may come up later.
}
}
// Create DNS client.
dns, err := NewDNSClient(cfg.MCNS, logger)
if err != nil {
return fmt.Errorf("create DNS client: %w", err)
}
m := &Master{
Config: cfg,
DB: db,
Pool: pool,
DNS: dns,
Logger: logger,
Version: version,
}
// TLS.
tlsCert, err := tls.LoadX509KeyPair(cfg.Server.TLSCert, cfg.Server.TLSKey)
if err != nil {
return fmt.Errorf("load TLS cert: %w", err)
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{tlsCert},
MinVersion: tls.VersionTLS13,
}
// Auth interceptor (same as agent — validates MCIAS tokens).
validator, err := auth.NewMCIASValidator(cfg.MCIAS.ServerURL, cfg.MCIAS.CACert)
if err != nil {
return fmt.Errorf("create MCIAS validator: %w", err)
}
// gRPC server.
server := grpc.NewServer(
grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.ChainUnaryInterceptor(
auth.AuthInterceptor(validator),
),
grpc.ChainStreamInterceptor(
auth.StreamAuthInterceptor(validator),
),
)
mcpv1.RegisterMcpMasterServiceServer(server, m)
// Listen.
lis, err := net.Listen("tcp", cfg.Server.GRPCAddr)
if err != nil {
return fmt.Errorf("listen %q: %w", cfg.Server.GRPCAddr, err)
}
logger.Info("master starting",
"addr", cfg.Server.GRPCAddr,
"version", version,
"nodes", len(cfg.Nodes),
)
// Signal handling.
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
errCh := make(chan error, 1)
go func() {
errCh <- server.Serve(lis)
}()
select {
case <-ctx.Done():
logger.Info("shutting down")
server.GracefulStop()
pool.Close()
return nil
case err := <-errCh:
pool.Close()
return fmt.Errorf("serve: %w", err)
}
}
func parseLogLevel(level string) slog.Level {
switch level {
case "debug":
return slog.LevelDebug
case "warn":
return slog.LevelWarn
case "error":
return slog.LevelError
default:
return slog.LevelInfo
}
}

View File

@@ -0,0 +1,64 @@
package master
import (
"database/sql"
"fmt"
"sort"
"git.wntrmute.dev/mc/mcp/internal/masterdb"
)
// PickNode selects the best worker node for a new service deployment.
// Algorithm: fewest placed services, ties broken alphabetically.
func PickNode(db *sql.DB) (string, error) {
workers, err := masterdb.ListWorkerNodes(db)
if err != nil {
return "", fmt.Errorf("list workers: %w", err)
}
if len(workers) == 0 {
return "", fmt.Errorf("no worker nodes available")
}
counts, err := masterdb.CountPlacementsPerNode(db)
if err != nil {
return "", fmt.Errorf("count placements: %w", err)
}
// Sort: fewest placements first, then alphabetically.
sort.Slice(workers, func(i, j int) bool {
ci := counts[workers[i].Name]
cj := counts[workers[j].Name]
if ci != cj {
return ci < cj
}
return workers[i].Name < workers[j].Name
})
return workers[0].Name, nil
}
// FindMasterNode returns the name of the node with role "master".
func FindMasterNode(db *sql.DB) (string, error) {
nodes, err := masterdb.ListNodes(db)
if err != nil {
return "", fmt.Errorf("list nodes: %w", err)
}
for _, n := range nodes {
if n.Role == "master" {
return n.Name, nil
}
}
return "", fmt.Errorf("no master node found")
}
// FindEdgeNode returns the name of the first edge node.
func FindEdgeNode(db *sql.DB) (string, error) {
edges, err := masterdb.ListEdgeNodes(db)
if err != nil {
return "", fmt.Errorf("list edge nodes: %w", err)
}
if len(edges) == 0 {
return "", fmt.Errorf("no edge nodes available")
}
return edges[0].Name, nil
}

130
internal/master/status.go Normal file
View File

@@ -0,0 +1,130 @@
package master
import (
"context"
"fmt"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/masterdb"
)
// Status returns the status of services across the fleet.
func (m *Master) Status(ctx context.Context, req *mcpv1.MasterStatusRequest) (*mcpv1.MasterStatusResponse, error) {
m.Logger.Debug("Status", "service", req.GetServiceName())
resp := &mcpv1.MasterStatusResponse{}
// If a specific service is requested, look up its placement.
if name := req.GetServiceName(); name != "" {
placement, err := masterdb.GetPlacement(m.DB, name)
if err != nil {
return nil, fmt.Errorf("lookup placement: %w", err)
}
if placement == nil {
return resp, nil // empty — service not found
}
ss := m.getServiceStatus(ctx, placement)
resp.Services = append(resp.Services, ss)
return resp, nil
}
// All services.
placements, err := masterdb.ListPlacements(m.DB)
if err != nil {
return nil, fmt.Errorf("list placements: %w", err)
}
for _, p := range placements {
ss := m.getServiceStatus(ctx, p)
resp.Services = append(resp.Services, ss)
}
return resp, nil
}
func (m *Master) getServiceStatus(ctx context.Context, p *masterdb.Placement) *mcpv1.ServiceStatus {
ss := &mcpv1.ServiceStatus{
Name: p.ServiceName,
Node: p.Node,
Tier: p.Tier,
Status: "unknown",
}
// Query the agent for live status.
client, err := m.Pool.Get(p.Node)
if err != nil {
ss.Status = "unreachable"
return ss
}
statusCtx, cancel := context.WithTimeout(ctx, m.Config.Timeouts.HealthCheck.Duration)
defer cancel()
agentResp, err := client.GetServiceStatus(statusCtx, &mcpv1.GetServiceStatusRequest{
Name: p.ServiceName,
})
if err != nil {
ss.Status = "unreachable"
return ss
}
// Map agent status to master status.
for _, info := range agentResp.GetServices() {
if info.GetName() == p.ServiceName {
if info.GetActive() {
ss.Status = "running"
} else {
ss.Status = "stopped"
}
break
}
}
// Attach edge route info.
edgeRoutes, err := masterdb.ListEdgeRoutesForService(m.DB, p.ServiceName)
if err == nil {
for _, er := range edgeRoutes {
ss.EdgeRoutes = append(ss.EdgeRoutes, &mcpv1.EdgeRouteStatus{
Hostname: er.Hostname,
EdgeNode: er.EdgeNode,
})
}
}
return ss
}
// ListNodes returns all nodes in the registry with placement counts.
func (m *Master) ListNodes(_ context.Context, _ *mcpv1.ListNodesRequest) (*mcpv1.ListNodesResponse, error) {
m.Logger.Debug("ListNodes")
nodes, err := masterdb.ListNodes(m.DB)
if err != nil {
return nil, fmt.Errorf("list nodes: %w", err)
}
counts, err := masterdb.CountPlacementsPerNode(m.DB)
if err != nil {
return nil, fmt.Errorf("count placements: %w", err)
}
resp := &mcpv1.ListNodesResponse{}
for _, n := range nodes {
ni := &mcpv1.NodeInfo{
Name: n.Name,
Role: n.Role,
Address: n.Address,
Arch: n.Arch,
Status: n.Status,
Containers: int32(n.Containers), //nolint:gosec // small number
Services: int32(counts[n.Name]), //nolint:gosec // small number
}
if n.LastHeartbeat != nil {
ni.LastHeartbeat = n.LastHeartbeat.Format("2006-01-02T15:04:05Z")
}
resp.Nodes = append(resp.Nodes, ni)
}
return resp, nil
}

View File

@@ -0,0 +1,94 @@
package master
import (
"context"
"fmt"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/masterdb"
)
// Undeploy handles MasterUndeployRequest: removes edge routes, DNS, then
// forwards the undeploy to the worker agent.
func (m *Master) Undeploy(ctx context.Context, req *mcpv1.MasterUndeployRequest) (*mcpv1.MasterUndeployResponse, error) {
serviceName := req.GetServiceName()
if serviceName == "" {
return nil, fmt.Errorf("service_name is required")
}
m.Logger.Info("Undeploy", "service", serviceName)
// Look up placement.
placement, err := masterdb.GetPlacement(m.DB, serviceName)
if err != nil {
return &mcpv1.MasterUndeployResponse{Error: fmt.Sprintf("lookup placement: %v", err)}, nil
}
if placement == nil {
return &mcpv1.MasterUndeployResponse{Error: fmt.Sprintf("service %q not found in placements", serviceName)}, nil
}
// Step 1: Undeploy on worker first (stops the backend).
client, err := m.Pool.Get(placement.Node)
if err != nil {
return &mcpv1.MasterUndeployResponse{Error: fmt.Sprintf("agent connection: %v", err)}, nil
}
undeployCtx, undeployCancel := context.WithTimeout(ctx, m.Config.Timeouts.Undeploy.Duration)
defer undeployCancel()
_, undeployErr := client.UndeployService(undeployCtx, &mcpv1.UndeployServiceRequest{
Name: serviceName,
})
if undeployErr != nil {
m.Logger.Warn("agent undeploy failed", "service", serviceName, "node", placement.Node, "err", undeployErr)
// Continue — still clean up edge routes and records.
}
// Step 2: Remove edge routes.
edgeRoutes, err := masterdb.ListEdgeRoutesForService(m.DB, serviceName)
if err != nil {
m.Logger.Warn("list edge routes for undeploy", "service", serviceName, "err", err)
}
for _, er := range edgeRoutes {
edgeClient, getErr := m.Pool.Get(er.EdgeNode)
if getErr != nil {
m.Logger.Warn("edge agent connection", "edge_node", er.EdgeNode, "err", getErr)
continue
}
edgeCtx, edgeCancel := context.WithTimeout(ctx, m.Config.Timeouts.EdgeRoute.Duration)
_, removeErr := edgeClient.RemoveEdgeRoute(edgeCtx, &mcpv1.RemoveEdgeRouteRequest{
Hostname: er.Hostname,
})
edgeCancel()
if removeErr != nil {
m.Logger.Warn("remove edge route", "hostname", er.Hostname, "err", removeErr)
} else {
m.Logger.Info("edge route removed", "hostname", er.Hostname, "edge_node", er.EdgeNode)
}
}
// Step 3: Remove DNS.
if m.DNS != nil {
if dnsErr := m.DNS.RemoveRecord(ctx, serviceName); dnsErr != nil {
m.Logger.Warn("DNS removal failed", "service", serviceName, "err", dnsErr)
}
}
// Step 4: Clean up records.
_ = masterdb.DeleteEdgeRoutesForService(m.DB, serviceName)
_ = masterdb.DeletePlacement(m.DB, serviceName)
success := undeployErr == nil
var errMsg string
if !success {
errMsg = fmt.Sprintf("agent undeploy: %v", undeployErr)
}
m.Logger.Info("undeploy complete", "service", serviceName, "success", success)
return &mcpv1.MasterUndeployResponse{
Success: success,
Error: errMsg,
}, nil
}