Add boot sequencing to agent
The agent reads [[boot.sequence]] stages from its config and starts services in dependency order before accepting gRPC connections. Each stage waits for its services to pass health checks before proceeding: - tcp: TCP connect to the container's mapped port - grpc: standard gRPC health check Foundation stage (stage 0): blocks and retries indefinitely if health fails — all downstream services depend on it. Non-foundation stages: log warning and proceed on failure. Uses the recover logic to start containers from the registry, then health-checks to verify readiness. Config example: [[boot.sequence]] name = "foundation" services = ["mcias", "mcns"] timeout = "120s" health = "tcp" Architecture v2 Phase 4 feature. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -119,6 +119,18 @@ func Run(cfg *config.AgentConfig, version string) error {
|
||||
"runtime", cfg.Agent.ContainerRuntime,
|
||||
)
|
||||
|
||||
// Run boot sequence before starting the gRPC server.
|
||||
// On the master node, this starts foundation services (MCIAS, MCNS)
|
||||
// before core services, ensuring dependencies are met.
|
||||
if len(cfg.Boot.Sequence) > 0 {
|
||||
bootCtx, bootCancel := context.WithCancel(context.Background())
|
||||
defer bootCancel()
|
||||
if err := a.RunBootSequence(bootCtx); err != nil {
|
||||
logger.Error("boot sequence failed", "err", err)
|
||||
// Continue starting the gRPC server — partial boot is better than no agent.
|
||||
}
|
||||
}
|
||||
|
||||
mon.Start()
|
||||
|
||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
202
internal/agent/boot.go
Normal file
202
internal/agent/boot.go
Normal file
@@ -0,0 +1,202 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"git.wntrmute.dev/mc/mcp/internal/config"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
healthpb "google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
|
||||
// RunBootSequence executes the boot stages defined in the agent config.
|
||||
// Each stage's services must be healthy before the next stage starts.
|
||||
// This is used on the master node to start foundation services (MCIAS,
|
||||
// MCNS) before core services (Metacrypt, MCR) before the master itself.
|
||||
//
|
||||
// If no boot sequence is configured, this is a no-op.
|
||||
func (a *Agent) RunBootSequence(ctx context.Context) error {
|
||||
stages := a.Config.Boot.Sequence
|
||||
if len(stages) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
a.Logger.Info("boot sequence starting", "stages", len(stages))
|
||||
|
||||
for i, stage := range stages {
|
||||
a.Logger.Info("boot stage starting",
|
||||
"stage", stage.Name,
|
||||
"services", stage.Services,
|
||||
"timeout", stage.Timeout.Duration,
|
||||
"health", stage.Health,
|
||||
)
|
||||
|
||||
// Use the recover logic to start any services in this stage
|
||||
// that aren't already running.
|
||||
if err := a.Recover(ctx); err != nil {
|
||||
a.Logger.Warn("boot stage recover failed", "stage", stage.Name, "err", err)
|
||||
}
|
||||
|
||||
// Wait for all services in this stage to be healthy.
|
||||
timeout := stage.Timeout.Duration
|
||||
if timeout == 0 {
|
||||
timeout = 60 * time.Second
|
||||
}
|
||||
|
||||
if err := a.waitForHealthy(ctx, stage, timeout, i == 0); err != nil {
|
||||
if i == 0 {
|
||||
// Foundation stage: block and retry indefinitely.
|
||||
a.Logger.Error("foundation stage failed — retrying indefinitely",
|
||||
"stage", stage.Name, "err", err)
|
||||
for {
|
||||
time.Sleep(10 * time.Second)
|
||||
if retryErr := a.waitForHealthy(ctx, stage, timeout, true); retryErr == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Non-foundation: log and proceed.
|
||||
a.Logger.Warn("boot stage not fully healthy, proceeding",
|
||||
"stage", stage.Name, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
a.Logger.Info("boot stage complete", "stage", stage.Name)
|
||||
}
|
||||
|
||||
a.Logger.Info("boot sequence complete")
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForHealthy waits until all services in the stage pass their health check.
|
||||
func (a *Agent) waitForHealthy(ctx context.Context, stage config.BootStage, timeout time.Duration, isFoundation bool) error {
|
||||
deadline := time.Now().Add(timeout)
|
||||
|
||||
for _, svc := range stage.Services {
|
||||
for {
|
||||
if time.Now().After(deadline) {
|
||||
return fmt.Errorf("timeout waiting for %s", svc)
|
||||
}
|
||||
|
||||
healthy, err := a.checkServiceHealth(ctx, svc, stage.Health)
|
||||
if err == nil && healthy {
|
||||
a.Logger.Info("service healthy", "service", svc, "check", stage.Health)
|
||||
break
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkServiceHealth probes a service using the specified health check method.
|
||||
func (a *Agent) checkServiceHealth(ctx context.Context, serviceName, method string) (bool, error) {
|
||||
// Find the service's port from the registry.
|
||||
port, err := a.findServicePort(serviceName)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
switch method {
|
||||
case "tcp", "":
|
||||
return a.checkTCP(ctx, port)
|
||||
case "grpc":
|
||||
return a.checkGRPC(ctx, port)
|
||||
default:
|
||||
// Unknown method, fall back to TCP.
|
||||
return a.checkTCP(ctx, port)
|
||||
}
|
||||
}
|
||||
|
||||
// findServicePort finds the first mapped port for a service from the registry
|
||||
// or from the running container.
|
||||
func (a *Agent) findServicePort(serviceName string) (int, error) {
|
||||
// Check the running containers for a mapped port.
|
||||
containers, err := a.Runtime.List(context.Background())
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("list containers: %w", err)
|
||||
}
|
||||
|
||||
for _, c := range containers {
|
||||
// Container name might be "service" or "service-component"
|
||||
if c.Name == serviceName || len(c.Name) > len(serviceName) && c.Name[:len(serviceName)+1] == serviceName+"-" {
|
||||
// Parse the first port mapping to get the host port.
|
||||
for _, p := range c.Ports {
|
||||
// Port format: "127.0.0.1:28443->8443/tcp" or "8443/tcp"
|
||||
port := parseHostPort(p)
|
||||
if port > 0 {
|
||||
return port, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0, fmt.Errorf("no port found for service %s", serviceName)
|
||||
}
|
||||
|
||||
// parseHostPort extracts the host port from a podman port mapping string.
|
||||
func parseHostPort(mapping string) int {
|
||||
// Format: "127.0.0.1:28443->8443/tcp" or "0.0.0.0:53->53/tcp"
|
||||
for i := len(mapping) - 1; i >= 0; i-- {
|
||||
if mapping[i] == ':' {
|
||||
// Found the host:port separator
|
||||
rest := mapping[i+1:]
|
||||
// Find the -> separator
|
||||
for j := 0; j < len(rest); j++ {
|
||||
if rest[j] == '-' {
|
||||
portStr := rest[:j]
|
||||
var port int
|
||||
for _, ch := range portStr {
|
||||
if ch >= '0' && ch <= '9' {
|
||||
port = port*10 + int(ch-'0')
|
||||
}
|
||||
}
|
||||
if port > 0 {
|
||||
return port
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// checkTCP attempts a TCP connection to localhost:port.
|
||||
func (a *Agent) checkTCP(ctx context.Context, port int) (bool, error) {
|
||||
addr := fmt.Sprintf("127.0.0.1:%d", port)
|
||||
conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
_ = conn.Close()
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// checkGRPC calls the standard gRPC health check on localhost:port.
|
||||
func (a *Agent) checkGRPC(ctx context.Context, port int) (bool, error) {
|
||||
addr := fmt.Sprintf("127.0.0.1:%d", port)
|
||||
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
client := healthpb.NewHealthClient(conn)
|
||||
resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return resp.GetStatus() == healthpb.HealthCheckResponse_SERVING, nil
|
||||
}
|
||||
@@ -19,6 +19,23 @@ type AgentConfig struct {
|
||||
MCNS MCNSConfig `toml:"mcns"`
|
||||
Monitor MonitorConfig `toml:"monitor"`
|
||||
Log LogConfig `toml:"log"`
|
||||
Boot BootConfig `toml:"boot"`
|
||||
}
|
||||
|
||||
// BootConfig holds the boot sequence for the master node.
|
||||
// Each stage's services must be healthy before the next stage starts.
|
||||
// Worker and edge nodes don't use this — they wait for the master.
|
||||
type BootConfig struct {
|
||||
Sequence []BootStage `toml:"sequence"`
|
||||
}
|
||||
|
||||
// BootStage defines a group of services that must be started and healthy
|
||||
// before the next stage begins.
|
||||
type BootStage struct {
|
||||
Name string `toml:"name"`
|
||||
Services []string `toml:"services"`
|
||||
Timeout Duration `toml:"timeout"`
|
||||
Health string `toml:"health"` // "tcp", "grpc", or "http"
|
||||
}
|
||||
|
||||
// MetacryptConfig holds the Metacrypt CA integration settings for
|
||||
|
||||
Reference in New Issue
Block a user