Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| fa4d022bc1 | |||
| 9d543998dc |
@@ -43,6 +43,7 @@ func main() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
root.AddCommand(snapshotCmd())
|
root.AddCommand(snapshotCmd())
|
||||||
|
root.AddCommand(recoverCmd())
|
||||||
|
|
||||||
if err := root.Execute(); err != nil {
|
if err := root.Execute(); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
|||||||
68
cmd/mcp-agent/recover.go
Normal file
68
cmd/mcp-agent/recover.go
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/agent"
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/config"
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/registry"
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/runtime"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
func recoverCmd() *cobra.Command {
|
||||||
|
return &cobra.Command{
|
||||||
|
Use: "recover",
|
||||||
|
Short: "Recreate containers from the agent registry",
|
||||||
|
Long: `Recover recreates containers from the agent's SQLite registry for all
|
||||||
|
services whose desired state is "running" but which don't have a running
|
||||||
|
container in podman.
|
||||||
|
|
||||||
|
This is the recovery path after a podman database loss (e.g., after a
|
||||||
|
UID change, podman reset, or reboot that cleared container state).
|
||||||
|
|
||||||
|
Images must be cached locally — recover does not pull from MCR.`,
|
||||||
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
|
cfg, err := config.LoadAgentConfig(cfgPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("load config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
|
||||||
|
Level: slog.LevelInfo,
|
||||||
|
}))
|
||||||
|
|
||||||
|
db, err := registry.Open(cfg.Database.Path)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("open registry: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = db.Close() }()
|
||||||
|
|
||||||
|
proxy, err := agent.NewProxyRouter(cfg.MCProxy.Socket, cfg.MCProxy.CertDir, logger)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warn("mc-proxy not available, routes will not be registered", "err", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
certs, err := agent.NewCertProvisioner(cfg.Metacrypt, cfg.MCProxy.CertDir, logger)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warn("cert provisioner not available", "err", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a := &agent.Agent{
|
||||||
|
Config: cfg,
|
||||||
|
DB: db,
|
||||||
|
Runtime: &runtime.Podman{},
|
||||||
|
Logger: logger,
|
||||||
|
PortAlloc: agent.NewPortAllocator(),
|
||||||
|
Proxy: proxy,
|
||||||
|
Certs: certs,
|
||||||
|
Version: version,
|
||||||
|
}
|
||||||
|
|
||||||
|
return a.Recover(context.Background())
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -119,6 +119,18 @@ func Run(cfg *config.AgentConfig, version string) error {
|
|||||||
"runtime", cfg.Agent.ContainerRuntime,
|
"runtime", cfg.Agent.ContainerRuntime,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Run boot sequence before starting the gRPC server.
|
||||||
|
// On the master node, this starts foundation services (MCIAS, MCNS)
|
||||||
|
// before core services, ensuring dependencies are met.
|
||||||
|
if len(cfg.Boot.Sequence) > 0 {
|
||||||
|
bootCtx, bootCancel := context.WithCancel(context.Background())
|
||||||
|
defer bootCancel()
|
||||||
|
if err := a.RunBootSequence(bootCtx); err != nil {
|
||||||
|
logger.Error("boot sequence failed", "err", err)
|
||||||
|
// Continue starting the gRPC server — partial boot is better than no agent.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
mon.Start()
|
mon.Start()
|
||||||
|
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
|||||||
202
internal/agent/boot.go
Normal file
202
internal/agent/boot.go
Normal file
@@ -0,0 +1,202 @@
|
|||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/config"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
healthpb "google.golang.org/grpc/health/grpc_health_v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RunBootSequence executes the boot stages defined in the agent config.
|
||||||
|
// Each stage's services must be healthy before the next stage starts.
|
||||||
|
// This is used on the master node to start foundation services (MCIAS,
|
||||||
|
// MCNS) before core services (Metacrypt, MCR) before the master itself.
|
||||||
|
//
|
||||||
|
// If no boot sequence is configured, this is a no-op.
|
||||||
|
func (a *Agent) RunBootSequence(ctx context.Context) error {
|
||||||
|
stages := a.Config.Boot.Sequence
|
||||||
|
if len(stages) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Logger.Info("boot sequence starting", "stages", len(stages))
|
||||||
|
|
||||||
|
for i, stage := range stages {
|
||||||
|
a.Logger.Info("boot stage starting",
|
||||||
|
"stage", stage.Name,
|
||||||
|
"services", stage.Services,
|
||||||
|
"timeout", stage.Timeout.Duration,
|
||||||
|
"health", stage.Health,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Use the recover logic to start any services in this stage
|
||||||
|
// that aren't already running.
|
||||||
|
if err := a.Recover(ctx); err != nil {
|
||||||
|
a.Logger.Warn("boot stage recover failed", "stage", stage.Name, "err", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all services in this stage to be healthy.
|
||||||
|
timeout := stage.Timeout.Duration
|
||||||
|
if timeout == 0 {
|
||||||
|
timeout = 60 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := a.waitForHealthy(ctx, stage, timeout, i == 0); err != nil {
|
||||||
|
if i == 0 {
|
||||||
|
// Foundation stage: block and retry indefinitely.
|
||||||
|
a.Logger.Error("foundation stage failed — retrying indefinitely",
|
||||||
|
"stage", stage.Name, "err", err)
|
||||||
|
for {
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
if retryErr := a.waitForHealthy(ctx, stage, timeout, true); retryErr == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Non-foundation: log and proceed.
|
||||||
|
a.Logger.Warn("boot stage not fully healthy, proceeding",
|
||||||
|
"stage", stage.Name, "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Logger.Info("boot stage complete", "stage", stage.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Logger.Info("boot sequence complete")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitForHealthy waits until all services in the stage pass their health check.
|
||||||
|
func (a *Agent) waitForHealthy(ctx context.Context, stage config.BootStage, timeout time.Duration, isFoundation bool) error {
|
||||||
|
deadline := time.Now().Add(timeout)
|
||||||
|
|
||||||
|
for _, svc := range stage.Services {
|
||||||
|
for {
|
||||||
|
if time.Now().After(deadline) {
|
||||||
|
return fmt.Errorf("timeout waiting for %s", svc)
|
||||||
|
}
|
||||||
|
|
||||||
|
healthy, err := a.checkServiceHealth(ctx, svc, stage.Health)
|
||||||
|
if err == nil && healthy {
|
||||||
|
a.Logger.Info("service healthy", "service", svc, "check", stage.Health)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkServiceHealth probes a service using the specified health check method.
|
||||||
|
func (a *Agent) checkServiceHealth(ctx context.Context, serviceName, method string) (bool, error) {
|
||||||
|
// Find the service's port from the registry.
|
||||||
|
port, err := a.findServicePort(serviceName)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch method {
|
||||||
|
case "tcp", "":
|
||||||
|
return a.checkTCP(ctx, port)
|
||||||
|
case "grpc":
|
||||||
|
return a.checkGRPC(ctx, port)
|
||||||
|
default:
|
||||||
|
// Unknown method, fall back to TCP.
|
||||||
|
return a.checkTCP(ctx, port)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// findServicePort finds the first mapped port for a service from the registry
|
||||||
|
// or from the running container.
|
||||||
|
func (a *Agent) findServicePort(serviceName string) (int, error) {
|
||||||
|
// Check the running containers for a mapped port.
|
||||||
|
containers, err := a.Runtime.List(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("list containers: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range containers {
|
||||||
|
// Container name might be "service" or "service-component"
|
||||||
|
if c.Name == serviceName || len(c.Name) > len(serviceName) && c.Name[:len(serviceName)+1] == serviceName+"-" {
|
||||||
|
// Parse the first port mapping to get the host port.
|
||||||
|
for _, p := range c.Ports {
|
||||||
|
// Port format: "127.0.0.1:28443->8443/tcp" or "8443/tcp"
|
||||||
|
port := parseHostPort(p)
|
||||||
|
if port > 0 {
|
||||||
|
return port, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0, fmt.Errorf("no port found for service %s", serviceName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseHostPort extracts the host port from a podman port mapping string.
|
||||||
|
func parseHostPort(mapping string) int {
|
||||||
|
// Format: "127.0.0.1:28443->8443/tcp" or "0.0.0.0:53->53/tcp"
|
||||||
|
for i := len(mapping) - 1; i >= 0; i-- {
|
||||||
|
if mapping[i] == ':' {
|
||||||
|
// Found the host:port separator
|
||||||
|
rest := mapping[i+1:]
|
||||||
|
// Find the -> separator
|
||||||
|
for j := 0; j < len(rest); j++ {
|
||||||
|
if rest[j] == '-' {
|
||||||
|
portStr := rest[:j]
|
||||||
|
var port int
|
||||||
|
for _, ch := range portStr {
|
||||||
|
if ch >= '0' && ch <= '9' {
|
||||||
|
port = port*10 + int(ch-'0')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if port > 0 {
|
||||||
|
return port
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkTCP attempts a TCP connection to localhost:port.
|
||||||
|
func (a *Agent) checkTCP(ctx context.Context, port int) (bool, error) {
|
||||||
|
addr := fmt.Sprintf("127.0.0.1:%d", port)
|
||||||
|
conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
_ = conn.Close()
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkGRPC calls the standard gRPC health check on localhost:port.
|
||||||
|
func (a *Agent) checkGRPC(ctx context.Context, port int) (bool, error) {
|
||||||
|
addr := fmt.Sprintf("127.0.0.1:%d", port)
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
defer func() { _ = conn.Close() }()
|
||||||
|
|
||||||
|
client := healthpb.NewHealthClient(conn)
|
||||||
|
resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.GetStatus() == healthpb.HealthCheckResponse_SERVING, nil
|
||||||
|
}
|
||||||
139
internal/agent/recover.go
Normal file
139
internal/agent/recover.go
Normal file
@@ -0,0 +1,139 @@
|
|||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/registry"
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/runtime"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Recover recreates containers from the agent's registry for all services
|
||||||
|
// whose desired state is "running" but which don't have a running container
|
||||||
|
// in podman. This is the recovery path after a podman database loss (e.g.,
|
||||||
|
// after a UID change or podman reset).
|
||||||
|
//
|
||||||
|
// Recover does NOT pull images — it assumes the images are cached locally.
|
||||||
|
// If an image is missing, that component is skipped with a warning.
|
||||||
|
func (a *Agent) Recover(ctx context.Context) error {
|
||||||
|
services, err := registry.ListServices(a.DB)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("list services: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the list of currently running containers from podman.
|
||||||
|
running, err := a.Runtime.List(ctx)
|
||||||
|
if err != nil {
|
||||||
|
a.Logger.Warn("cannot list containers, assuming none running", "err", err)
|
||||||
|
running = nil
|
||||||
|
}
|
||||||
|
runningSet := make(map[string]bool)
|
||||||
|
for _, c := range running {
|
||||||
|
runningSet[c.Name] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
var recovered, skipped, already int
|
||||||
|
|
||||||
|
for _, svc := range services {
|
||||||
|
if !svc.Active {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
comps, err := registry.ListComponents(a.DB, svc.Name)
|
||||||
|
if err != nil {
|
||||||
|
a.Logger.Warn("list components", "service", svc.Name, "err", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, comp := range comps {
|
||||||
|
if comp.DesiredState != "running" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
containerName := svc.Name + "-" + comp.Name
|
||||||
|
if comp.Name == svc.Name {
|
||||||
|
containerName = svc.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip if container is already running.
|
||||||
|
if runningSet[containerName] {
|
||||||
|
already++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Logger.Info("recovering container",
|
||||||
|
"service", svc.Name,
|
||||||
|
"component", comp.Name,
|
||||||
|
"image", comp.Image,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Remove any stale container with the same name.
|
||||||
|
_ = a.Runtime.Remove(ctx, containerName)
|
||||||
|
|
||||||
|
// Build the container spec from the registry.
|
||||||
|
spec := runtime.ContainerSpec{
|
||||||
|
Name: containerName,
|
||||||
|
Image: comp.Image,
|
||||||
|
Network: comp.Network,
|
||||||
|
User: comp.UserSpec,
|
||||||
|
Restart: comp.Restart,
|
||||||
|
Volumes: comp.Volumes,
|
||||||
|
Cmd: comp.Cmd,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allocate ports from routes if the component has routes.
|
||||||
|
if len(comp.Routes) > 0 && a.PortAlloc != nil {
|
||||||
|
ports, env, allocErr := a.allocateRoutePorts(svc.Name, comp.Name, comp.Routes)
|
||||||
|
if allocErr != nil {
|
||||||
|
a.Logger.Warn("allocate route ports", "container", containerName, "err", allocErr)
|
||||||
|
spec.Ports = comp.Ports
|
||||||
|
} else {
|
||||||
|
spec.Ports = append(comp.Ports, ports...)
|
||||||
|
spec.Env = append(spec.Env, env...)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
spec.Ports = comp.Ports
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := a.Runtime.Run(ctx, spec); err != nil {
|
||||||
|
a.Logger.Error("recover container failed",
|
||||||
|
"container", containerName,
|
||||||
|
"err", err,
|
||||||
|
)
|
||||||
|
skipped++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-register mc-proxy routes.
|
||||||
|
if a.Proxy != nil && len(comp.Routes) > 0 {
|
||||||
|
hostPorts, hpErr := registry.GetRouteHostPorts(a.DB, svc.Name, comp.Name)
|
||||||
|
if hpErr == nil {
|
||||||
|
if proxyErr := a.Proxy.RegisterRoutes(ctx, svc.Name, comp.Routes, hostPorts); proxyErr != nil {
|
||||||
|
a.Logger.Warn("re-register routes", "service", svc.Name, "err", proxyErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Provision TLS certs if needed.
|
||||||
|
if a.Certs != nil && hasL7Routes(comp.Routes) {
|
||||||
|
hostnames := l7Hostnames(svc.Name, comp.Routes)
|
||||||
|
if certErr := a.Certs.EnsureCert(ctx, svc.Name, hostnames); certErr != nil {
|
||||||
|
a.Logger.Warn("cert provisioning", "service", svc.Name, "err", certErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
recovered++
|
||||||
|
a.Logger.Info("container recovered", "container", containerName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Logger.Info("recovery complete",
|
||||||
|
"recovered", recovered,
|
||||||
|
"skipped", skipped,
|
||||||
|
"already_running", already,
|
||||||
|
)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// hasL7Routes and l7Hostnames are defined in deploy.go.
|
||||||
@@ -19,6 +19,23 @@ type AgentConfig struct {
|
|||||||
MCNS MCNSConfig `toml:"mcns"`
|
MCNS MCNSConfig `toml:"mcns"`
|
||||||
Monitor MonitorConfig `toml:"monitor"`
|
Monitor MonitorConfig `toml:"monitor"`
|
||||||
Log LogConfig `toml:"log"`
|
Log LogConfig `toml:"log"`
|
||||||
|
Boot BootConfig `toml:"boot"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// BootConfig holds the boot sequence for the master node.
|
||||||
|
// Each stage's services must be healthy before the next stage starts.
|
||||||
|
// Worker and edge nodes don't use this — they wait for the master.
|
||||||
|
type BootConfig struct {
|
||||||
|
Sequence []BootStage `toml:"sequence"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// BootStage defines a group of services that must be started and healthy
|
||||||
|
// before the next stage begins.
|
||||||
|
type BootStage struct {
|
||||||
|
Name string `toml:"name"`
|
||||||
|
Services []string `toml:"services"`
|
||||||
|
Timeout Duration `toml:"timeout"`
|
||||||
|
Health string `toml:"health"` // "tcp", "grpc", or "http"
|
||||||
}
|
}
|
||||||
|
|
||||||
// MetacryptConfig holds the Metacrypt CA integration settings for
|
// MetacryptConfig holds the Metacrypt CA integration settings for
|
||||||
|
|||||||
Reference in New Issue
Block a user