Files
mcp/internal/agent/agent.go
Kyle Isom 98b166fa7b agent: recover down components on startup when no boot sequence
A unikernel VM has no runtime restart policy, so if it exits — including
when an agent restart's cgroup kill takes it down — nothing restarts it,
and it sits in drift. Recover() already handles this (and unikernels, via
runtimeFor), but only ran inside RunBootSequence, which is gated on a
[boot] sequence that worker nodes don't define. Now the agent also runs
Recover once in the background on startup when there is no boot sequence,
so desired=running components (VMs especially) come back after an agent
or host restart without delaying registration.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-11 12:58:50 -07:00

222 lines
6.2 KiB
Go

package agent
import (
"context"
"crypto/tls"
"database/sql"
"fmt"
"log/slog"
"net"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/auth"
"git.wntrmute.dev/mc/mcp/internal/config"
"git.wntrmute.dev/mc/mcp/internal/monitor"
"git.wntrmute.dev/mc/mcp/internal/registry"
"git.wntrmute.dev/mc/mcp/internal/runtime"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// Agent is the MCP node agent. It manages containers, stores the registry,
// monitors for drift, and serves the gRPC API.
type Agent struct {
mcpv1.UnimplementedMcpAgentServiceServer
Config *config.AgentConfig
DB *sql.DB
Runtime runtime.Runtime // container runtime (podman)
Unikernel runtime.Runtime // unikernel runtime (qemu/nanos); nil if unavailable
Monitor *monitor.Monitor
Logger *slog.Logger
PortAlloc *PortAllocator
Proxy *ProxyRouter
Certs *CertProvisioner
DNS *DNSRegistrar
Version string
}
// Run starts the agent: opens the database, sets up the gRPC server with
// TLS and auth, and blocks until SIGINT/SIGTERM.
func Run(cfg *config.AgentConfig, version string) error {
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: parseLogLevel(cfg.Log.Level),
}))
db, err := registry.Open(cfg.Database.Path)
if err != nil {
return fmt.Errorf("open registry: %w", err)
}
defer func() { _ = db.Close() }()
rt := &runtime.Podman{}
// The unikernel runtime is enabled only on nodes with KVM. Services with
// runtime = "unikernel" are placed by the master on KVM-capable nodes.
var uk runtime.Runtime
if unikernelSupported() {
qemu := &runtime.QEMU{
ImageDir: filepath.Join(homeDir(cfg), "images"),
StateDir: filepath.Join(homeDir(cfg), "vm"),
HomeDir: homeDir(cfg),
}
// If the isolated host-only bridge exists, switch unikernels to
// bridge networking (Phase 2: mandatory mediation). Otherwise they
// use QEMU user-mode port forwards (Phase 1).
if _, err := os.Stat("/sys/class/net/" + unikernelBridge); err == nil {
qemu.Bridge = unikernelBridge
qemu.Gateway = unikernelGateway
qemu.SubnetPrefix = unikernelSubnetPrefix
logger.Info("unikernel runtime enabled (KVM + isolated bridge)", "bridge", unikernelBridge)
} else {
logger.Info("unikernel runtime enabled (KVM, user-mode networking)")
}
uk = qemu
}
mon := monitor.New(db, mergedLister{primary: rt, extra: uk, logger: logger}, cfg.Monitor, cfg.Agent.NodeName, logger)
proxy, err := NewProxyRouter(cfg.MCProxy.Socket, cfg.MCProxy.CertDir, logger)
if err != nil {
return fmt.Errorf("connect to mc-proxy: %w", err)
}
certs, err := NewCertProvisioner(cfg.Metacrypt, cfg.MCProxy.CertDir, logger)
if err != nil {
return fmt.Errorf("create cert provisioner: %w", err)
}
dns, err := NewDNSRegistrar(cfg.MCNS, logger)
if err != nil {
return fmt.Errorf("create DNS registrar: %w", err)
}
a := &Agent{
Config: cfg,
DB: db,
Runtime: rt,
Unikernel: uk,
Monitor: mon,
Logger: logger,
PortAlloc: NewPortAllocator(),
Proxy: proxy,
Certs: certs,
DNS: dns,
Version: version,
}
tlsCert, err := tls.LoadX509KeyPair(cfg.Server.TLSCert, cfg.Server.TLSKey)
if err != nil {
return fmt.Errorf("load TLS cert: %w", err)
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{tlsCert},
MinVersion: tls.VersionTLS13,
}
validator, err := auth.NewMCIASValidator(cfg.MCIAS.ServerURL, cfg.MCIAS.CACert)
if err != nil {
return fmt.Errorf("create MCIAS validator: %w", err)
}
server := grpc.NewServer(
grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.ChainUnaryInterceptor(
auth.AuthInterceptor(validator),
),
grpc.ChainStreamInterceptor(
auth.StreamAuthInterceptor(validator),
),
)
mcpv1.RegisterMcpAgentServiceServer(server, a)
lis, err := net.Listen("tcp", cfg.Server.GRPCAddr)
if err != nil {
return fmt.Errorf("listen %q: %w", cfg.Server.GRPCAddr, err)
}
logger.Info("agent starting",
"addr", cfg.Server.GRPCAddr,
"node", cfg.Agent.NodeName,
"runtime", cfg.Agent.ContainerRuntime,
)
// Run boot sequence before starting the gRPC server.
// On the master node, this starts foundation services (MCIAS, MCNS)
// before core services, ensuring dependencies are met.
if len(cfg.Boot.Sequence) > 0 {
bootCtx, bootCancel := context.WithCancel(context.Background())
defer bootCancel()
if err := a.RunBootSequence(bootCtx); err != nil {
logger.Error("boot sequence failed", "err", err)
// Continue starting the gRPC server — partial boot is better than no agent.
}
} else {
// No ordered boot sequence: still reconcile once, in the background, so
// desired=running components are brought back up after an agent or host
// restart without delaying registration. This matters most for
// unikernel VMs: unlike podman containers (which have a restart policy),
// a VM that exits — including when an agent restart's cgroup kill takes
// it down — has nothing to restart it.
go func() {
recCtx, recCancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer recCancel()
if err := a.Recover(recCtx); err != nil {
logger.Error("startup recover failed", "err", err)
}
}()
}
// Start heartbeat client (registers with master and sends heartbeats).
hbClient, hbErr := NewHeartbeatClient(*cfg, logger)
if hbErr != nil {
logger.Warn("heartbeat client failed to start", "err", hbErr)
} else if hbClient != nil {
hbClient.Start()
logger.Info("heartbeat client started", "master", cfg.Master.Address)
}
mon.Start()
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
errCh := make(chan error, 1)
go func() {
errCh <- server.Serve(lis)
}()
select {
case <-ctx.Done():
logger.Info("shutting down")
if hbClient != nil {
hbClient.Stop()
}
mon.Stop()
server.GracefulStop()
_ = proxy.Close()
return nil
case err := <-errCh:
mon.Stop()
return fmt.Errorf("serve: %w", err)
}
}
func parseLogLevel(level string) slog.Level {
switch level {
case "debug":
return slog.LevelDebug
case "warn":
return slog.LevelWarn
case "error":
return slog.LevelError
default:
return slog.LevelInfo
}
}