3 Commits

Author SHA1 Message Date
fa4d022bc1 Add boot sequencing to agent
The agent reads [[boot.sequence]] stages from its config and starts
services in dependency order before accepting gRPC connections. Each
stage waits for its services to pass health checks before proceeding:

- tcp: TCP connect to the container's mapped port
- grpc: standard gRPC health check

Foundation stage (stage 0): blocks and retries indefinitely if health
fails — all downstream services depend on it.
Non-foundation stages: log warning and proceed on failure.

Uses the recover logic to start containers from the registry, then
health-checks to verify readiness.

Config example:
  [[boot.sequence]]
  name = "foundation"
  services = ["mcias", "mcns"]
  timeout = "120s"
  health = "tcp"

Architecture v2 Phase 4 feature.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 11:53:11 -07:00
9d543998dc Add mcp-agent recover command
Recreates containers from the agent's SQLite registry when podman's
database is lost (UID change, podman reset, reboot). For each service
with desired_state="running" that doesn't have a running container:

- Removes any stale container with the same name
- Recreates the container from the stored spec (image, ports, volumes,
  cmd, network, user, restart policy)
- Allocates route ports and injects PORT env vars
- Re-registers mc-proxy routes
- Provisions TLS certs for L7 routes

Does NOT pull images — assumes local cache.

Root cause action item from the 2026-04-03 UID incident.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 11:50:31 -07:00
f9f6f339f4 Add multi-address fallback for node connectivity
NodeConfig and MasterNodeConfig gain an optional addresses[] field
for fallback addresses tried in order after the primary address.
Provides resilience when Tailscale DNS is down or a node is only
reachable via LAN.

- dialAgentMulti: tries each address with a 3s health check, returns
  first success
- forEachNode: uses multi-address dialing
- AgentPool.AddNodeMulti: master tries all addresses when connecting
- AllAddresses(): deduplicates primary + fallback addresses

Config example:
  [[nodes]]
  name = "rift"
  address = "rift.scylla-hammerhead.ts.net:9444"
  addresses = ["100.95.252.120:9444", "192.168.88.181:9444"]

Existing configs without addresses[] work unchanged.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 09:45:50 -07:00
13 changed files with 553 additions and 22 deletions

View File

@@ -43,6 +43,7 @@ func main() {
}) })
root.AddCommand(snapshotCmd()) root.AddCommand(snapshotCmd())
root.AddCommand(recoverCmd())
if err := root.Execute(); err != nil { if err := root.Execute(); err != nil {
log.Fatal(err) log.Fatal(err)

68
cmd/mcp-agent/recover.go Normal file
View File

@@ -0,0 +1,68 @@
package main
import (
"context"
"fmt"
"log/slog"
"os"
"git.wntrmute.dev/mc/mcp/internal/agent"
"git.wntrmute.dev/mc/mcp/internal/config"
"git.wntrmute.dev/mc/mcp/internal/registry"
"git.wntrmute.dev/mc/mcp/internal/runtime"
"github.com/spf13/cobra"
)
func recoverCmd() *cobra.Command {
return &cobra.Command{
Use: "recover",
Short: "Recreate containers from the agent registry",
Long: `Recover recreates containers from the agent's SQLite registry for all
services whose desired state is "running" but which don't have a running
container in podman.
This is the recovery path after a podman database loss (e.g., after a
UID change, podman reset, or reboot that cleared container state).
Images must be cached locally — recover does not pull from MCR.`,
RunE: func(cmd *cobra.Command, args []string) error {
cfg, err := config.LoadAgentConfig(cfgPath)
if err != nil {
return fmt.Errorf("load config: %w", err)
}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
db, err := registry.Open(cfg.Database.Path)
if err != nil {
return fmt.Errorf("open registry: %w", err)
}
defer func() { _ = db.Close() }()
proxy, err := agent.NewProxyRouter(cfg.MCProxy.Socket, cfg.MCProxy.CertDir, logger)
if err != nil {
logger.Warn("mc-proxy not available, routes will not be registered", "err", err)
}
certs, err := agent.NewCertProvisioner(cfg.Metacrypt, cfg.MCProxy.CertDir, logger)
if err != nil {
logger.Warn("cert provisioner not available", "err", err)
}
a := &agent.Agent{
Config: cfg,
DB: db,
Runtime: &runtime.Podman{},
Logger: logger,
PortAlloc: agent.NewPortAllocator(),
Proxy: proxy,
Certs: certs,
Version: version,
}
return a.Recover(context.Background())
},
}
}

View File

@@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"os" "os"
"strings" "strings"
"time"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1" mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/config" "git.wntrmute.dev/mc/mcp/internal/config"
@@ -52,6 +53,38 @@ func dialAgent(address string, cfg *config.CLIConfig) (mcpv1.McpAgentServiceClie
return mcpv1.NewMcpAgentServiceClient(conn), conn, nil return mcpv1.NewMcpAgentServiceClient(conn), conn, nil
} }
// dialAgentMulti tries each address in order and returns the first successful
// connection. Provides resilience when Tailscale DNS is down or a node is
// reachable via LAN but not Tailnet.
func dialAgentMulti(addresses []string, cfg *config.CLIConfig) (mcpv1.McpAgentServiceClient, *grpc.ClientConn, error) {
if len(addresses) == 0 {
return nil, nil, fmt.Errorf("no addresses to dial")
}
if len(addresses) == 1 {
return dialAgent(addresses[0], cfg)
}
var lastErr error
for _, addr := range addresses {
client, conn, err := dialAgent(addr, cfg)
if err != nil {
lastErr = fmt.Errorf("%s: %w", addr, err)
continue
}
// Quick health check to verify the connection actually works.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
_, err = client.NodeStatus(ctx, &mcpv1.NodeStatusRequest{})
cancel()
if err != nil {
_ = conn.Close()
lastErr = fmt.Errorf("%s: %w", addr, err)
continue
}
return client, conn, nil
}
return nil, nil, fmt.Errorf("all addresses failed, last error: %w", lastErr)
}
// dialMaster connects to the master at the given address and returns a gRPC // dialMaster connects to the master at the given address and returns a gRPC
// client for the McpMasterService. // client for the McpMasterService.
func dialMaster(address string, cfg *config.CLIConfig) (mcpv1.McpMasterServiceClient, *grpc.ClientConn, error) { func dialMaster(address string, cfg *config.CLIConfig) (mcpv1.McpMasterServiceClient, *grpc.ClientConn, error) {

View File

@@ -9,7 +9,7 @@ import (
) )
// findNodeAddress looks up a node by name in the CLI config and returns // findNodeAddress looks up a node by name in the CLI config and returns
// its address. // its primary address.
func findNodeAddress(cfg *config.CLIConfig, nodeName string) (string, error) { func findNodeAddress(cfg *config.CLIConfig, nodeName string) (string, error) {
for _, n := range cfg.Nodes { for _, n := range cfg.Nodes {
if n.Name == nodeName { if n.Name == nodeName {
@@ -19,6 +19,16 @@ func findNodeAddress(cfg *config.CLIConfig, nodeName string) (string, error) {
return "", fmt.Errorf("node %q not found in config", nodeName) return "", fmt.Errorf("node %q not found in config", nodeName)
} }
// findNode looks up a node by name in the CLI config.
func findNode(cfg *config.CLIConfig, nodeName string) (*config.NodeConfig, error) {
for i := range cfg.Nodes {
if cfg.Nodes[i].Name == nodeName {
return &cfg.Nodes[i], nil
}
}
return nil, fmt.Errorf("node %q not found in config", nodeName)
}
// printComponentResults prints the result of each component operation. // printComponentResults prints the result of each component operation.
func printComponentResults(results []*mcpv1.ComponentResult) { func printComponentResults(results []*mcpv1.ComponentResult) {
for _, r := range results { for _, r := range results {

View File

@@ -27,7 +27,7 @@ func forEachNode(fn func(node config.NodeConfig, client mcpv1.McpAgentServiceCli
} }
for _, node := range cfg.Nodes { for _, node := range cfg.Nodes {
client, conn, err := dialAgent(node.Address, cfg) client, conn, err := dialAgentMulti(node.AllAddresses(), cfg)
if err != nil { if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "warning: %s: %v\n", node.Name, err) _, _ = fmt.Fprintf(os.Stderr, "warning: %s: %v\n", node.Name, err)
continue continue
@@ -85,7 +85,7 @@ func psCmd() *cobra.Command {
Short: "Live check: query runtime on all agents", Short: "Live check: query runtime on all agents",
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
w := newTable() w := newTable()
_, _ = fmt.Fprintln(w, "SERVICE\tCOMPONENT\tNODE\tSTATE\tVERSION\tUPTIME") _, _ = fmt.Fprintln(w, "SERVICE\tCOMPONENT\tNODE\tSTATE\tVERSION\tUPTIME\t")
now := time.Now() now := time.Now()
if err := forEachNode(func(node config.NodeConfig, client mcpv1.McpAgentServiceClient) error { if err := forEachNode(func(node config.NodeConfig, client mcpv1.McpAgentServiceClient) error {
@@ -96,19 +96,25 @@ func psCmd() *cobra.Command {
} }
for _, svc := range resp.GetServices() { for _, svc := range resp.GetServices() {
comment := svc.GetComment()
for _, comp := range svc.GetComponents() { for _, comp := range svc.GetComponents() {
uptime := "-" uptime := "-"
if comp.GetStarted() != nil { if comp.GetStarted() != nil {
d := now.Sub(comp.GetStarted().AsTime()) d := now.Sub(comp.GetStarted().AsTime())
uptime = formatDuration(d) uptime = formatDuration(d)
} }
_, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n", col7 := ""
if comment != "" {
col7 = "# " + comment
}
_, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
svc.GetName(), svc.GetName(),
comp.GetName(), comp.GetName(),
node.Name, node.Name,
comp.GetObservedState(), comp.GetObservedState(),
comp.GetVersion(), comp.GetVersion(),
uptime, uptime,
col7,
) )
} }
} }

View File

@@ -119,6 +119,18 @@ func Run(cfg *config.AgentConfig, version string) error {
"runtime", cfg.Agent.ContainerRuntime, "runtime", cfg.Agent.ContainerRuntime,
) )
// Run boot sequence before starting the gRPC server.
// On the master node, this starts foundation services (MCIAS, MCNS)
// before core services, ensuring dependencies are met.
if len(cfg.Boot.Sequence) > 0 {
bootCtx, bootCancel := context.WithCancel(context.Background())
defer bootCancel()
if err := a.RunBootSequence(bootCtx); err != nil {
logger.Error("boot sequence failed", "err", err)
// Continue starting the gRPC server — partial boot is better than no agent.
}
}
mon.Start() mon.Start()
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)

202
internal/agent/boot.go Normal file
View File

@@ -0,0 +1,202 @@
package agent
import (
"context"
"fmt"
"net"
"time"
"git.wntrmute.dev/mc/mcp/internal/config"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
// RunBootSequence executes the boot stages defined in the agent config.
// Each stage's services must be healthy before the next stage starts.
// This is used on the master node to start foundation services (MCIAS,
// MCNS) before core services (Metacrypt, MCR) before the master itself.
//
// If no boot sequence is configured, this is a no-op.
func (a *Agent) RunBootSequence(ctx context.Context) error {
stages := a.Config.Boot.Sequence
if len(stages) == 0 {
return nil
}
a.Logger.Info("boot sequence starting", "stages", len(stages))
for i, stage := range stages {
a.Logger.Info("boot stage starting",
"stage", stage.Name,
"services", stage.Services,
"timeout", stage.Timeout.Duration,
"health", stage.Health,
)
// Use the recover logic to start any services in this stage
// that aren't already running.
if err := a.Recover(ctx); err != nil {
a.Logger.Warn("boot stage recover failed", "stage", stage.Name, "err", err)
}
// Wait for all services in this stage to be healthy.
timeout := stage.Timeout.Duration
if timeout == 0 {
timeout = 60 * time.Second
}
if err := a.waitForHealthy(ctx, stage, timeout, i == 0); err != nil {
if i == 0 {
// Foundation stage: block and retry indefinitely.
a.Logger.Error("foundation stage failed — retrying indefinitely",
"stage", stage.Name, "err", err)
for {
time.Sleep(10 * time.Second)
if retryErr := a.waitForHealthy(ctx, stage, timeout, true); retryErr == nil {
break
}
}
} else {
// Non-foundation: log and proceed.
a.Logger.Warn("boot stage not fully healthy, proceeding",
"stage", stage.Name, "err", err)
}
}
a.Logger.Info("boot stage complete", "stage", stage.Name)
}
a.Logger.Info("boot sequence complete")
return nil
}
// waitForHealthy waits until all services in the stage pass their health check.
func (a *Agent) waitForHealthy(ctx context.Context, stage config.BootStage, timeout time.Duration, isFoundation bool) error {
deadline := time.Now().Add(timeout)
for _, svc := range stage.Services {
for {
if time.Now().After(deadline) {
return fmt.Errorf("timeout waiting for %s", svc)
}
healthy, err := a.checkServiceHealth(ctx, svc, stage.Health)
if err == nil && healthy {
a.Logger.Info("service healthy", "service", svc, "check", stage.Health)
break
}
if ctx.Err() != nil {
return ctx.Err()
}
time.Sleep(2 * time.Second)
}
}
return nil
}
// checkServiceHealth probes a service using the specified health check method.
func (a *Agent) checkServiceHealth(ctx context.Context, serviceName, method string) (bool, error) {
// Find the service's port from the registry.
port, err := a.findServicePort(serviceName)
if err != nil {
return false, err
}
switch method {
case "tcp", "":
return a.checkTCP(ctx, port)
case "grpc":
return a.checkGRPC(ctx, port)
default:
// Unknown method, fall back to TCP.
return a.checkTCP(ctx, port)
}
}
// findServicePort finds the first mapped port for a service from the registry
// or from the running container.
func (a *Agent) findServicePort(serviceName string) (int, error) {
// Check the running containers for a mapped port.
containers, err := a.Runtime.List(context.Background())
if err != nil {
return 0, fmt.Errorf("list containers: %w", err)
}
for _, c := range containers {
// Container name might be "service" or "service-component"
if c.Name == serviceName || len(c.Name) > len(serviceName) && c.Name[:len(serviceName)+1] == serviceName+"-" {
// Parse the first port mapping to get the host port.
for _, p := range c.Ports {
// Port format: "127.0.0.1:28443->8443/tcp" or "8443/tcp"
port := parseHostPort(p)
if port > 0 {
return port, nil
}
}
}
}
return 0, fmt.Errorf("no port found for service %s", serviceName)
}
// parseHostPort extracts the host port from a podman port mapping string.
func parseHostPort(mapping string) int {
// Format: "127.0.0.1:28443->8443/tcp" or "0.0.0.0:53->53/tcp"
for i := len(mapping) - 1; i >= 0; i-- {
if mapping[i] == ':' {
// Found the host:port separator
rest := mapping[i+1:]
// Find the -> separator
for j := 0; j < len(rest); j++ {
if rest[j] == '-' {
portStr := rest[:j]
var port int
for _, ch := range portStr {
if ch >= '0' && ch <= '9' {
port = port*10 + int(ch-'0')
}
}
if port > 0 {
return port
}
}
}
}
}
return 0
}
// checkTCP attempts a TCP connection to localhost:port.
func (a *Agent) checkTCP(ctx context.Context, port int) (bool, error) {
addr := fmt.Sprintf("127.0.0.1:%d", port)
conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
if err != nil {
return false, err
}
_ = conn.Close()
return true, nil
}
// checkGRPC calls the standard gRPC health check on localhost:port.
func (a *Agent) checkGRPC(ctx context.Context, port int) (bool, error) {
addr := fmt.Sprintf("127.0.0.1:%d", port)
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return false, err
}
defer func() { _ = conn.Close() }()
client := healthpb.NewHealthClient(conn)
resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{})
if err != nil {
return false, err
}
return resp.GetStatus() == healthpb.HealthCheckResponse_SERVING, nil
}

139
internal/agent/recover.go Normal file
View File

@@ -0,0 +1,139 @@
package agent
import (
"context"
"fmt"
"git.wntrmute.dev/mc/mcp/internal/registry"
"git.wntrmute.dev/mc/mcp/internal/runtime"
)
// Recover recreates containers from the agent's registry for all services
// whose desired state is "running" but which don't have a running container
// in podman. This is the recovery path after a podman database loss (e.g.,
// after a UID change or podman reset).
//
// Recover does NOT pull images — it assumes the images are cached locally.
// If an image is missing, that component is skipped with a warning.
func (a *Agent) Recover(ctx context.Context) error {
services, err := registry.ListServices(a.DB)
if err != nil {
return fmt.Errorf("list services: %w", err)
}
// Get the list of currently running containers from podman.
running, err := a.Runtime.List(ctx)
if err != nil {
a.Logger.Warn("cannot list containers, assuming none running", "err", err)
running = nil
}
runningSet := make(map[string]bool)
for _, c := range running {
runningSet[c.Name] = true
}
var recovered, skipped, already int
for _, svc := range services {
if !svc.Active {
continue
}
comps, err := registry.ListComponents(a.DB, svc.Name)
if err != nil {
a.Logger.Warn("list components", "service", svc.Name, "err", err)
continue
}
for _, comp := range comps {
if comp.DesiredState != "running" {
continue
}
containerName := svc.Name + "-" + comp.Name
if comp.Name == svc.Name {
containerName = svc.Name
}
// Skip if container is already running.
if runningSet[containerName] {
already++
continue
}
a.Logger.Info("recovering container",
"service", svc.Name,
"component", comp.Name,
"image", comp.Image,
)
// Remove any stale container with the same name.
_ = a.Runtime.Remove(ctx, containerName)
// Build the container spec from the registry.
spec := runtime.ContainerSpec{
Name: containerName,
Image: comp.Image,
Network: comp.Network,
User: comp.UserSpec,
Restart: comp.Restart,
Volumes: comp.Volumes,
Cmd: comp.Cmd,
}
// Allocate ports from routes if the component has routes.
if len(comp.Routes) > 0 && a.PortAlloc != nil {
ports, env, allocErr := a.allocateRoutePorts(svc.Name, comp.Name, comp.Routes)
if allocErr != nil {
a.Logger.Warn("allocate route ports", "container", containerName, "err", allocErr)
spec.Ports = comp.Ports
} else {
spec.Ports = append(comp.Ports, ports...)
spec.Env = append(spec.Env, env...)
}
} else {
spec.Ports = comp.Ports
}
if err := a.Runtime.Run(ctx, spec); err != nil {
a.Logger.Error("recover container failed",
"container", containerName,
"err", err,
)
skipped++
continue
}
// Re-register mc-proxy routes.
if a.Proxy != nil && len(comp.Routes) > 0 {
hostPorts, hpErr := registry.GetRouteHostPorts(a.DB, svc.Name, comp.Name)
if hpErr == nil {
if proxyErr := a.Proxy.RegisterRoutes(ctx, svc.Name, comp.Routes, hostPorts); proxyErr != nil {
a.Logger.Warn("re-register routes", "service", svc.Name, "err", proxyErr)
}
}
}
// Provision TLS certs if needed.
if a.Certs != nil && hasL7Routes(comp.Routes) {
hostnames := l7Hostnames(svc.Name, comp.Routes)
if certErr := a.Certs.EnsureCert(ctx, svc.Name, hostnames); certErr != nil {
a.Logger.Warn("cert provisioning", "service", svc.Name, "err", certErr)
}
}
recovered++
a.Logger.Info("container recovered", "container", containerName)
}
}
a.Logger.Info("recovery complete",
"recovered", recovered,
"skipped", skipped,
"already_running", already,
)
return nil
}
// hasL7Routes and l7Hostnames are defined in deploy.go.

View File

@@ -19,6 +19,23 @@ type AgentConfig struct {
MCNS MCNSConfig `toml:"mcns"` MCNS MCNSConfig `toml:"mcns"`
Monitor MonitorConfig `toml:"monitor"` Monitor MonitorConfig `toml:"monitor"`
Log LogConfig `toml:"log"` Log LogConfig `toml:"log"`
Boot BootConfig `toml:"boot"`
}
// BootConfig holds the boot sequence for the master node.
// Each stage's services must be healthy before the next stage starts.
// Worker and edge nodes don't use this — they wait for the master.
type BootConfig struct {
Sequence []BootStage `toml:"sequence"`
}
// BootStage defines a group of services that must be started and healthy
// before the next stage begins.
type BootStage struct {
Name string `toml:"name"`
Services []string `toml:"services"`
Timeout Duration `toml:"timeout"`
Health string `toml:"health"` // "tcp", "grpc", or "http"
} }
// MetacryptConfig holds the Metacrypt CA integration settings for // MetacryptConfig holds the Metacrypt CA integration settings for

View File

@@ -50,9 +50,28 @@ type AuthConfig struct {
} }
// NodeConfig defines a managed node that the CLI connects to. // NodeConfig defines a managed node that the CLI connects to.
// Address is the primary address. Addresses is an optional list of
// fallback addresses tried in order if the primary fails. This
// provides resilience when Tailscale DNS is down or a node is
// reachable via LAN but not Tailnet.
type NodeConfig struct { type NodeConfig struct {
Name string `toml:"name"` Name string `toml:"name"`
Address string `toml:"address"` Address string `toml:"address"`
Addresses []string `toml:"addresses,omitempty"`
}
// AllAddresses returns the node's primary address followed by any
// fallback addresses, deduplicated.
func (n NodeConfig) AllAddresses() []string {
seen := make(map[string]bool)
var addrs []string
for _, a := range append([]string{n.Address}, n.Addresses...) {
if a != "" && !seen[a] {
seen[a] = true
addrs = append(addrs, a)
}
}
return addrs
} }
// LoadCLIConfig reads and validates a CLI configuration file. // LoadCLIConfig reads and validates a CLI configuration file.

View File

@@ -62,9 +62,24 @@ type TimeoutsConfig struct {
// MasterNodeConfig is a bootstrap node entry in the master config. // MasterNodeConfig is a bootstrap node entry in the master config.
type MasterNodeConfig struct { type MasterNodeConfig struct {
Name string `toml:"name"` Name string `toml:"name"`
Address string `toml:"address"` Address string `toml:"address"`
Role string `toml:"role"` // "worker", "edge", or "master" Addresses []string `toml:"addresses,omitempty"`
Role string `toml:"role"` // "worker", "edge", or "master"
}
// AllAddresses returns the node's primary address followed by any
// fallback addresses, deduplicated.
func (n MasterNodeConfig) AllAddresses() []string {
seen := make(map[string]bool)
var addrs []string
for _, a := range append([]string{n.Address}, n.Addresses...) {
if a != "" && !seen[a] {
seen[a] = true
addrs = append(addrs, a)
}
}
return addrs
} }
// LoadMasterConfig reads and validates a master configuration file. // LoadMasterConfig reads and validates a master configuration file.

View File

@@ -140,21 +140,30 @@ func NewAgentPool(caCertPath, token string) *AgentPool {
// AddNode dials an agent and adds it to the pool. // AddNode dials an agent and adds it to the pool.
func (p *AgentPool) AddNode(name, address string) error { func (p *AgentPool) AddNode(name, address string) error {
client, err := DialAgent(address, p.caCert, p.token) return p.AddNodeMulti(name, []string{address})
if err != nil { }
return fmt.Errorf("add node %s: %w", name, err)
}
client.Node = name
p.mu.Lock() // AddNodeMulti tries each address in order and adds the first successful
defer p.mu.Unlock() // connection to the pool.
func (p *AgentPool) AddNodeMulti(name string, addresses []string) error {
var lastErr error
for _, addr := range addresses {
client, err := DialAgent(addr, p.caCert, p.token)
if err != nil {
lastErr = fmt.Errorf("%s: %w", addr, err)
continue
}
client.Node = name
// Close existing connection if re-adding. p.mu.Lock()
if old, ok := p.clients[name]; ok { if old, ok := p.clients[name]; ok {
_ = old.Close() _ = old.Close()
}
p.clients[name] = client
p.mu.Unlock()
return nil
} }
p.clients[name] = client return fmt.Errorf("add node %s: all addresses failed: %w", name, lastErr)
return nil
} }
// Get returns the agent client for a node. // Get returns the agent client for a node.

View File

@@ -63,7 +63,7 @@ func Run(cfg *config.MasterConfig, version string) error {
// Create agent connection pool. // Create agent connection pool.
pool := NewAgentPool(cfg.Master.CACert, token) pool := NewAgentPool(cfg.Master.CACert, token)
for _, n := range cfg.Nodes { for _, n := range cfg.Nodes {
if addErr := pool.AddNode(n.Name, n.Address); addErr != nil { if addErr := pool.AddNodeMulti(n.Name, n.AllAddresses()); addErr != nil {
logger.Warn("failed to connect to agent", "node", n.Name, "err", addErr) logger.Warn("failed to connect to agent", "node", n.Name, "err", addErr)
// Non-fatal: the node may come up later. // Non-fatal: the node may come up later.
} }