diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 0cd7431..581e272 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -119,6 +119,18 @@ func Run(cfg *config.AgentConfig, version string) error { "runtime", cfg.Agent.ContainerRuntime, ) + // Run boot sequence before starting the gRPC server. + // On the master node, this starts foundation services (MCIAS, MCNS) + // before core services, ensuring dependencies are met. + if len(cfg.Boot.Sequence) > 0 { + bootCtx, bootCancel := context.WithCancel(context.Background()) + defer bootCancel() + if err := a.RunBootSequence(bootCtx); err != nil { + logger.Error("boot sequence failed", "err", err) + // Continue starting the gRPC server — partial boot is better than no agent. + } + } + mon.Start() ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) diff --git a/internal/agent/boot.go b/internal/agent/boot.go new file mode 100644 index 0000000..2f3c9a2 --- /dev/null +++ b/internal/agent/boot.go @@ -0,0 +1,202 @@ +package agent + +import ( + "context" + "fmt" + "net" + "time" + + "git.wntrmute.dev/mc/mcp/internal/config" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +// RunBootSequence executes the boot stages defined in the agent config. +// Each stage's services must be healthy before the next stage starts. +// This is used on the master node to start foundation services (MCIAS, +// MCNS) before core services (Metacrypt, MCR) before the master itself. +// +// If no boot sequence is configured, this is a no-op. +func (a *Agent) RunBootSequence(ctx context.Context) error { + stages := a.Config.Boot.Sequence + if len(stages) == 0 { + return nil + } + + a.Logger.Info("boot sequence starting", "stages", len(stages)) + + for i, stage := range stages { + a.Logger.Info("boot stage starting", + "stage", stage.Name, + "services", stage.Services, + "timeout", stage.Timeout.Duration, + "health", stage.Health, + ) + + // Use the recover logic to start any services in this stage + // that aren't already running. + if err := a.Recover(ctx); err != nil { + a.Logger.Warn("boot stage recover failed", "stage", stage.Name, "err", err) + } + + // Wait for all services in this stage to be healthy. + timeout := stage.Timeout.Duration + if timeout == 0 { + timeout = 60 * time.Second + } + + if err := a.waitForHealthy(ctx, stage, timeout, i == 0); err != nil { + if i == 0 { + // Foundation stage: block and retry indefinitely. + a.Logger.Error("foundation stage failed — retrying indefinitely", + "stage", stage.Name, "err", err) + for { + time.Sleep(10 * time.Second) + if retryErr := a.waitForHealthy(ctx, stage, timeout, true); retryErr == nil { + break + } + } + } else { + // Non-foundation: log and proceed. + a.Logger.Warn("boot stage not fully healthy, proceeding", + "stage", stage.Name, "err", err) + } + } + + a.Logger.Info("boot stage complete", "stage", stage.Name) + } + + a.Logger.Info("boot sequence complete") + return nil +} + +// waitForHealthy waits until all services in the stage pass their health check. +func (a *Agent) waitForHealthy(ctx context.Context, stage config.BootStage, timeout time.Duration, isFoundation bool) error { + deadline := time.Now().Add(timeout) + + for _, svc := range stage.Services { + for { + if time.Now().After(deadline) { + return fmt.Errorf("timeout waiting for %s", svc) + } + + healthy, err := a.checkServiceHealth(ctx, svc, stage.Health) + if err == nil && healthy { + a.Logger.Info("service healthy", "service", svc, "check", stage.Health) + break + } + + if ctx.Err() != nil { + return ctx.Err() + } + + time.Sleep(2 * time.Second) + } + } + return nil +} + +// checkServiceHealth probes a service using the specified health check method. +func (a *Agent) checkServiceHealth(ctx context.Context, serviceName, method string) (bool, error) { + // Find the service's port from the registry. + port, err := a.findServicePort(serviceName) + if err != nil { + return false, err + } + + switch method { + case "tcp", "": + return a.checkTCP(ctx, port) + case "grpc": + return a.checkGRPC(ctx, port) + default: + // Unknown method, fall back to TCP. + return a.checkTCP(ctx, port) + } +} + +// findServicePort finds the first mapped port for a service from the registry +// or from the running container. +func (a *Agent) findServicePort(serviceName string) (int, error) { + // Check the running containers for a mapped port. + containers, err := a.Runtime.List(context.Background()) + if err != nil { + return 0, fmt.Errorf("list containers: %w", err) + } + + for _, c := range containers { + // Container name might be "service" or "service-component" + if c.Name == serviceName || len(c.Name) > len(serviceName) && c.Name[:len(serviceName)+1] == serviceName+"-" { + // Parse the first port mapping to get the host port. + for _, p := range c.Ports { + // Port format: "127.0.0.1:28443->8443/tcp" or "8443/tcp" + port := parseHostPort(p) + if port > 0 { + return port, nil + } + } + } + } + + return 0, fmt.Errorf("no port found for service %s", serviceName) +} + +// parseHostPort extracts the host port from a podman port mapping string. +func parseHostPort(mapping string) int { + // Format: "127.0.0.1:28443->8443/tcp" or "0.0.0.0:53->53/tcp" + for i := len(mapping) - 1; i >= 0; i-- { + if mapping[i] == ':' { + // Found the host:port separator + rest := mapping[i+1:] + // Find the -> separator + for j := 0; j < len(rest); j++ { + if rest[j] == '-' { + portStr := rest[:j] + var port int + for _, ch := range portStr { + if ch >= '0' && ch <= '9' { + port = port*10 + int(ch-'0') + } + } + if port > 0 { + return port + } + } + } + } + } + return 0 +} + +// checkTCP attempts a TCP connection to localhost:port. +func (a *Agent) checkTCP(ctx context.Context, port int) (bool, error) { + addr := fmt.Sprintf("127.0.0.1:%d", port) + conn, err := net.DialTimeout("tcp", addr, 2*time.Second) + if err != nil { + return false, err + } + _ = conn.Close() + return true, nil +} + +// checkGRPC calls the standard gRPC health check on localhost:port. +func (a *Agent) checkGRPC(ctx context.Context, port int) (bool, error) { + addr := fmt.Sprintf("127.0.0.1:%d", port) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return false, err + } + defer func() { _ = conn.Close() }() + + client := healthpb.NewHealthClient(conn) + resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{}) + if err != nil { + return false, err + } + + return resp.GetStatus() == healthpb.HealthCheckResponse_SERVING, nil +} diff --git a/internal/config/agent.go b/internal/config/agent.go index 6fac1ce..ff90f9d 100644 --- a/internal/config/agent.go +++ b/internal/config/agent.go @@ -19,6 +19,23 @@ type AgentConfig struct { MCNS MCNSConfig `toml:"mcns"` Monitor MonitorConfig `toml:"monitor"` Log LogConfig `toml:"log"` + Boot BootConfig `toml:"boot"` +} + +// BootConfig holds the boot sequence for the master node. +// Each stage's services must be healthy before the next stage starts. +// Worker and edge nodes don't use this — they wait for the master. +type BootConfig struct { + Sequence []BootStage `toml:"sequence"` +} + +// BootStage defines a group of services that must be started and healthy +// before the next stage begins. +type BootStage struct { + Name string `toml:"name"` + Services []string `toml:"services"` + Timeout Duration `toml:"timeout"` + Health string `toml:"health"` // "tcp", "grpc", or "http" } // MetacryptConfig holds the Metacrypt CA integration settings for