Add unikernel runtime: run services as Nanos VMs under QEMU/KVM

Implements the hypervisor design's Phase 1: a second runtime.Runtime
backend (QEMU) that runs each service component as a Nanos unikernel VM
instead of a podman container, selected per-component via a new
runtime = "unikernel" service-def field.

- internal/runtime/qemu.go: QEMURuntime. Pull extracts the ELF from the
  OCI image; Run does `ops build` + boots qemu-system-x86_64 with KVM,
  user-mode net port-forwards, QMP control socket and serial console log;
  Stop/Remove/Inspect/List/Logs map onto VM lifecycle + state dir.
- proto/registry/servicedef: add runtime, memory_mb, vcpus fields
  (registry migration 5).
- agent: holds both runtimes; runtimeFor() selects per component;
  listAllContainers() merges containers + VMs so drift/status see both.
  Unikernel runtime auto-enables on nodes with /dev/kvm + ops.

Validated end-to-end on straylight: a test service deploys via
`mcp deploy --direct`, boots as a Nanos unikernel, serves HTTP through
the agent port-forward, and reports running via `mcp status`/`mcp logs`.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Kyle Isom
2026-06-11 00:54:49 -07:00
parent 3b08caaa0a
commit d56f224359
30 changed files with 949 additions and 152 deletions

View File

@@ -19,7 +19,7 @@ func (a *Agent) AdoptContainers(ctx context.Context, req *mcpv1.AdoptContainersR
return nil, fmt.Errorf("service name is required")
}
containers, err := a.Runtime.List(ctx)
containers, err := a.listAllContainers(ctx)
if err != nil {
return nil, fmt.Errorf("list containers: %w", err)
}
@@ -46,7 +46,7 @@ func (a *Agent) AdoptContainers(ctx context.Context, req *mcpv1.AdoptContainersR
}
// Ensure the service exists once, before adopting any containers.
if err := registry.CreateService(a.DB, service, true); err != nil {
if err := registry.CreateService(a.DB, service, true, ""); err != nil {
if _, getErr := registry.GetService(a.DB, service); getErr != nil {
return nil, fmt.Errorf("create service %q: %w", service, err)
}

View File

@@ -9,6 +9,7 @@ import (
"net"
"os"
"os/signal"
"path/filepath"
"syscall"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
@@ -28,7 +29,8 @@ type Agent struct {
Config *config.AgentConfig
DB *sql.DB
Runtime runtime.Runtime
Runtime runtime.Runtime // container runtime (podman)
Unikernel runtime.Runtime // unikernel runtime (qemu/nanos); nil if unavailable
Monitor *monitor.Monitor
Logger *slog.Logger
PortAlloc *PortAllocator
@@ -53,6 +55,18 @@ func Run(cfg *config.AgentConfig, version string) error {
rt := &runtime.Podman{}
// The unikernel runtime is enabled only on nodes with KVM. Services with
// runtime = "unikernel" are placed by the master on KVM-capable nodes.
var uk runtime.Runtime
if unikernelSupported() {
uk = &runtime.QEMU{
ImageDir: filepath.Join(homeDir(cfg), "images"),
StateDir: filepath.Join(homeDir(cfg), "vm"),
HomeDir: homeDir(cfg),
}
logger.Info("unikernel runtime enabled (KVM detected)")
}
mon := monitor.New(db, rt, cfg.Monitor, cfg.Agent.NodeName, logger)
proxy, err := NewProxyRouter(cfg.MCProxy.Socket, cfg.MCProxy.CertDir, logger)
@@ -74,6 +88,7 @@ func Run(cfg *config.AgentConfig, version string) error {
Config: cfg,
DB: db,
Runtime: rt,
Unikernel: uk,
Monitor: mon,
Logger: logger,
PortAlloc: NewPortAllocator(),

View File

@@ -120,7 +120,7 @@ func (a *Agent) checkServiceHealth(ctx context.Context, serviceName, method stri
// or from the running container.
func (a *Agent) findServicePort(serviceName string) (int, error) {
// Check the running containers for a mapped port.
containers, err := a.Runtime.List(context.Background())
containers, err := a.listAllContainers(context.Background())
if err != nil {
return 0, fmt.Errorf("list containers: %w", err)
}

View File

@@ -22,7 +22,7 @@ func (a *Agent) Deploy(ctx context.Context, req *mcpv1.DeployRequest) (*mcpv1.De
serviceName := spec.GetName()
a.Logger.Info("deploying", "service", serviceName)
if err := ensureService(a.DB, serviceName, spec.GetActive()); err != nil {
if err := ensureService(a.DB, serviceName, spec.GetActive(), spec.GetComment()); err != nil {
return nil, fmt.Errorf("deploy: ensure service %q: %w", serviceName, err)
}
@@ -94,8 +94,20 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
Volumes: cs.GetVolumes(),
Cmd: cs.GetCmd(),
Routes: regRoutes,
Runtime: cs.GetRuntime(),
MemoryMB: int(cs.GetMemoryMb()),
VCPUs: int(cs.GetVcpus()),
}
// Select the runtime backend (container vs unikernel) for this component.
if cs.GetRuntime() == "unikernel" && a.Unikernel == nil {
return &mcpv1.ComponentResult{
Name: compName,
Error: "service requests unikernel runtime but this node has no KVM/ops support",
}
}
rt := a.runtimeFor(cs.GetRuntime())
if err := ensureComponent(a.DB, regComp); err != nil {
return &mcpv1.ComponentResult{
Name: compName,
@@ -103,27 +115,29 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
}
}
if err := a.Runtime.Pull(ctx, cs.GetImage()); err != nil {
if err := rt.Pull(ctx, cs.GetImage()); err != nil {
return &mcpv1.ComponentResult{
Name: compName,
Error: fmt.Sprintf("pull image: %v", err),
}
}
_ = a.Runtime.Stop(ctx, containerName) // may not exist yet
_ = a.Runtime.Remove(ctx, containerName) // may not exist yet
_ = rt.Stop(ctx, containerName) // may not exist yet
_ = rt.Remove(ctx, containerName) // may not exist yet
// Build the container spec. If the component has routes, use route-based
// port allocation and env injection. Otherwise, fall back to legacy ports.
runSpec := runtime.ContainerSpec{
Name: containerName,
Image: cs.GetImage(),
Network: cs.GetNetwork(),
User: cs.GetUser(),
Restart: cs.GetRestart(),
Volumes: cs.GetVolumes(),
Cmd: cs.GetCmd(),
Env: cs.GetEnv(),
Name: containerName,
Image: cs.GetImage(),
Network: cs.GetNetwork(),
User: cs.GetUser(),
Restart: cs.GetRestart(),
Volumes: cs.GetVolumes(),
Cmd: cs.GetCmd(),
Env: cs.GetEnv(),
MemoryMB: int(cs.GetMemoryMb()),
VCPUs: int(cs.GetVcpus()),
}
if len(regRoutes) > 0 && a.PortAlloc != nil {
@@ -142,7 +156,7 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
runSpec.Ports = cs.GetPorts()
}
if err := a.Runtime.Run(ctx, runSpec); err != nil {
if err := rt.Run(ctx, runSpec); err != nil {
_ = registry.UpdateComponentState(a.DB, serviceName, compName, "", "removed")
return &mcpv1.ComponentResult{
Name: compName,
@@ -219,16 +233,16 @@ func (a *Agent) allocateRoutePorts(service, component string, routes []registry.
}
// ensureService creates the service if it does not exist, or updates its
// active flag if it does.
func ensureService(db *sql.DB, name string, active bool) error {
// active flag and comment if it does.
func ensureService(db *sql.DB, name string, active bool, comment string) error {
_, err := registry.GetService(db, name)
if errors.Is(err, sql.ErrNoRows) {
return registry.CreateService(db, name, active)
return registry.CreateService(db, name, active, comment)
}
if err != nil {
return err
}
return registry.UpdateServiceActive(db, name, active)
return registry.UpdateServiceActive(db, name, active, comment)
}
// hasL7Routes reports whether any route uses L7 (TLS-terminating) mode.

View File

@@ -34,7 +34,7 @@ func testAgent(t *testing.T) *Agent {
// allocateRoutePorts can store host ports for it.
func seedComponent(t *testing.T, db *sql.DB, service, component string, routes []registry.Route) {
t.Helper()
if err := registry.CreateService(db, service, true); err != nil {
if err := registry.CreateService(db, service, true, ""); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(db, &registry.Component{

View File

@@ -12,8 +12,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
mcproxy "git.wntrmute.dev/mc/mc-proxy/client/mcproxy"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/registry"
)
@@ -182,7 +182,7 @@ func (a *Agent) HealthCheck(_ context.Context, _ *mcpv1.HealthCheckRequest) (*mc
// Count running containers if the runtime is available.
if a.Runtime != nil {
if list, err := a.Runtime.List(context.Background()); err == nil {
if list, err := a.listAllContainers(context.Background()); err == nil {
containers = int32(len(list)) //nolint:gosec // container count is small
} else {
st = "degraded"

View File

@@ -21,8 +21,8 @@ import (
// MasterConfig holds the optional master connection settings for the agent.
// When configured, the agent self-registers and sends periodic heartbeats.
type MasterConfig struct {
Address string `toml:"address"` // master gRPC address
CACert string `toml:"ca_cert"` // CA cert to verify master's TLS
Address string `toml:"address"` // master gRPC address
CACert string `toml:"ca_cert"` // CA cert to verify master's TLS
TokenPath string `toml:"token_path"` // MCIAS service token for auth
}
@@ -38,12 +38,20 @@ type HeartbeatClient struct {
interval time.Duration
stop chan struct{}
wg sync.WaitGroup
logger interface{ Info(string, ...any); Warn(string, ...any); Error(string, ...any) }
logger interface {
Info(string, ...any)
Warn(string, ...any)
Error(string, ...any)
}
}
// NewHeartbeatClient creates a client that registers with the master and
// sends periodic heartbeats. Returns nil if master address is not configured.
func NewHeartbeatClient(cfg config.AgentConfig, logger interface{ Info(string, ...any); Warn(string, ...any); Error(string, ...any) }) (*HeartbeatClient, error) {
func NewHeartbeatClient(cfg config.AgentConfig, logger interface {
Info(string, ...any)
Warn(string, ...any)
Error(string, ...any)
}) (*HeartbeatClient, error) {
// Config takes precedence, env vars as fallback.
masterAddr := cfg.Master.Address
if masterAddr == "" {

View File

@@ -51,7 +51,7 @@ func (a *Agent) StopService(ctx context.Context, req *mcpv1.StopServiceRequest)
}
}
if err := a.Runtime.Stop(ctx, containerName); err != nil {
if err := a.runtimeFor(c.Runtime).Stop(ctx, containerName); err != nil {
a.Logger.Info("stop container (ignored)", "container", containerName, "error", err)
}
@@ -132,12 +132,14 @@ func startComponent(ctx context.Context, a *Agent, service string, c *registry.C
containerName := ContainerNameFor(service, c.Name)
r := &mcpv1.ComponentResult{Name: c.Name, Success: true}
rt := a.runtimeFor(c.Runtime)
// Remove any pre-existing container; ignore errors for non-existent ones.
_ = a.Runtime.Stop(ctx, containerName)
_ = a.Runtime.Remove(ctx, containerName)
_ = rt.Stop(ctx, containerName)
_ = rt.Remove(ctx, containerName)
spec := componentToSpec(service, c)
if err := a.Runtime.Run(ctx, spec); err != nil {
if err := rt.Run(ctx, spec); err != nil {
r.Success = false
r.Error = fmt.Sprintf("run container: %v", err)
return r
@@ -156,11 +158,12 @@ func restartComponent(ctx context.Context, a *Agent, service string, c *registry
containerName := ContainerNameFor(service, c.Name)
r := &mcpv1.ComponentResult{Name: c.Name, Success: true}
_ = a.Runtime.Stop(ctx, containerName)
_ = a.Runtime.Remove(ctx, containerName)
rt := a.runtimeFor(c.Runtime)
_ = rt.Stop(ctx, containerName)
_ = rt.Remove(ctx, containerName)
spec := componentToSpec(service, c)
if err := a.Runtime.Run(ctx, spec); err != nil {
if err := rt.Run(ctx, spec); err != nil {
r.Success = false
r.Error = fmt.Sprintf("run container: %v", err)
_ = registry.UpdateComponentState(a.DB, service, c.Name, "", "stopped")
@@ -177,14 +180,16 @@ func restartComponent(ctx context.Context, a *Agent, service string, c *registry
// componentToSpec builds a runtime.ContainerSpec from a registry Component.
func componentToSpec(service string, c *registry.Component) runtime.ContainerSpec {
return runtime.ContainerSpec{
Name: ContainerNameFor(service, c.Name),
Image: c.Image,
Network: c.Network,
User: c.UserSpec,
Restart: c.Restart,
Ports: c.Ports,
Volumes: c.Volumes,
Cmd: c.Cmd,
Name: ContainerNameFor(service, c.Name),
Image: c.Image,
Network: c.Network,
User: c.UserSpec,
Restart: c.Restart,
Ports: c.Ports,
Volumes: c.Volumes,
Cmd: c.Cmd,
MemoryMB: c.MemoryMB,
VCPUs: c.VCPUs,
}
}

View File

@@ -2,15 +2,22 @@ package agent
import (
"bufio"
"context"
"io"
"os/exec"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/registry"
"git.wntrmute.dev/mc/mcp/internal/runtime"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// logStreamer is implemented by both the Podman (container logs) and QEMU
// (serial console) runtimes.
type logStreamer interface {
Logs(ctx context.Context, name string, tail int, follow, timestamps bool, since string) *exec.Cmd
}
// Logs streams container logs for a service component.
func (a *Agent) Logs(req *mcpv1.LogsRequest, stream mcpv1.McpAgentService_LogsServer) error {
if req.GetService() == "" {
@@ -32,14 +39,20 @@ func (a *Agent) Logs(req *mcpv1.LogsRequest, stream mcpv1.McpAgentService_LogsSe
containerName := ContainerNameFor(req.GetService(), component)
podman, ok := a.Runtime.(*runtime.Podman)
// Select the runtime for this component (container vs unikernel) and
// stream its logs (podman logs / journald, or the VM serial console).
var compRuntime string
if c, err := registry.GetComponent(a.DB, req.GetService(), component); err == nil {
compRuntime = c.Runtime
}
ls, ok := a.runtimeFor(compRuntime).(logStreamer)
if !ok {
return status.Error(codes.Internal, "logs requires podman runtime")
return status.Error(codes.Internal, "selected runtime does not support log streaming")
}
cmd := podman.Logs(stream.Context(), containerName, int(req.GetTail()), req.GetFollow(), req.GetTimestamps(), req.GetSince())
cmd := ls.Logs(stream.Context(), containerName, int(req.GetTail()), req.GetFollow(), req.GetTimestamps(), req.GetSince())
a.Logger.Info("running podman logs", "container", containerName, "args", cmd.Args)
a.Logger.Info("streaming logs", "container", containerName, "runtime", compRuntime, "args", cmd.Args)
// Podman writes container stdout to its stdout and container stderr
// to its stderr. Merge both into a single pipe.
@@ -48,7 +61,7 @@ func (a *Agent) Logs(req *mcpv1.LogsRequest, stream mcpv1.McpAgentService_LogsSe
cmd.Stderr = pw
if err := cmd.Start(); err != nil {
pw.Close()
_ = pw.Close()
return status.Errorf(codes.Internal, "start podman logs: %v", err)
}
@@ -58,7 +71,7 @@ func (a *Agent) Logs(req *mcpv1.LogsRequest, stream mcpv1.McpAgentService_LogsSe
if err != nil {
a.Logger.Warn("podman logs exited", "container", containerName, "error", err)
}
pw.Close()
_ = pw.Close()
}()
scanner := bufio.NewScanner(pr)

View File

@@ -14,7 +14,7 @@ func TestPurgeComponentRemoved(t *testing.T) {
ctx := context.Background()
// Set up a service with a stale component.
if err := registry.CreateService(a.DB, "mcns", true); err != nil {
if err := registry.CreateService(a.DB, "mcns", true, ""); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
@@ -69,7 +69,7 @@ func TestPurgeRefusesRunning(t *testing.T) {
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "mcr", true); err != nil {
if err := registry.CreateService(a.DB, "mcr", true, ""); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
@@ -109,7 +109,7 @@ func TestPurgeRefusesStopped(t *testing.T) {
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "mcr", true); err != nil {
if err := registry.CreateService(a.DB, "mcr", true, ""); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
@@ -140,7 +140,7 @@ func TestPurgeSkipsDefinedComponent(t *testing.T) {
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "mcns", true); err != nil {
if err := registry.CreateService(a.DB, "mcns", true, ""); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
@@ -176,7 +176,7 @@ func TestPurgeDryRun(t *testing.T) {
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "mcns", true); err != nil {
if err := registry.CreateService(a.DB, "mcns", true, ""); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
@@ -217,7 +217,7 @@ func TestPurgeServiceFilter(t *testing.T) {
ctx := context.Background()
// Create two services.
if err := registry.CreateService(a.DB, "mcns", true); err != nil {
if err := registry.CreateService(a.DB, "mcns", true, ""); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
@@ -226,7 +226,7 @@ func TestPurgeServiceFilter(t *testing.T) {
}); err != nil {
t.Fatalf("create component: %v", err)
}
if err := registry.CreateService(a.DB, "mcr", true); err != nil {
if err := registry.CreateService(a.DB, "mcr", true, ""); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
@@ -263,7 +263,7 @@ func TestPurgeServiceDeletedWhenEmpty(t *testing.T) {
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "mcns", true); err != nil {
if err := registry.CreateService(a.DB, "mcns", true, ""); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
@@ -306,7 +306,7 @@ func TestPurgeServiceKeptWhenComponentsRemain(t *testing.T) {
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "mcns", true); err != nil {
if err := registry.CreateService(a.DB, "mcns", true, ""); err != nil {
t.Fatalf("create service: %v", err)
}
// Stale component (will be purged).
@@ -359,7 +359,7 @@ func TestPurgeExitedState(t *testing.T) {
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "test", true); err != nil {
if err := registry.CreateService(a.DB, "test", true, ""); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
@@ -384,7 +384,7 @@ func TestPurgeUnknownState(t *testing.T) {
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "test", true); err != nil {
if err := registry.CreateService(a.DB, "test", true, ""); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{

View File

@@ -22,7 +22,7 @@ func (a *Agent) Recover(ctx context.Context) error {
}
// Get the list of currently running containers from podman.
running, err := a.Runtime.List(ctx)
running, err := a.listAllContainers(ctx)
if err != nil {
a.Logger.Warn("cannot list containers, assuming none running", "err", err)
running = nil
@@ -67,18 +67,22 @@ func (a *Agent) Recover(ctx context.Context) error {
"image", comp.Image,
)
rt := a.runtimeFor(comp.Runtime)
// Remove any stale container with the same name.
_ = a.Runtime.Remove(ctx, containerName)
_ = rt.Remove(ctx, containerName)
// Build the container spec from the registry.
spec := runtime.ContainerSpec{
Name: containerName,
Image: comp.Image,
Network: comp.Network,
User: comp.UserSpec,
Restart: comp.Restart,
Volumes: comp.Volumes,
Cmd: comp.Cmd,
Name: containerName,
Image: comp.Image,
Network: comp.Network,
User: comp.UserSpec,
Restart: comp.Restart,
Volumes: comp.Volumes,
Cmd: comp.Cmd,
MemoryMB: comp.MemoryMB,
VCPUs: comp.VCPUs,
}
// Allocate ports from routes if the component has routes.
@@ -95,7 +99,7 @@ func (a *Agent) Recover(ctx context.Context) error {
spec.Ports = comp.Ports
}
if err := a.Runtime.Run(ctx, spec); err != nil {
if err := rt.Run(ctx, spec); err != nil {
a.Logger.Error("recover container failed",
"container", containerName,
"err", err,

65
internal/agent/runtime.go Normal file
View File

@@ -0,0 +1,65 @@
package agent
import (
"context"
"os"
"os/exec"
"path/filepath"
"git.wntrmute.dev/mc/mcp/internal/config"
"git.wntrmute.dev/mc/mcp/internal/runtime"
)
// unikernelSupported reports whether this node can run Nanos unikernels:
// it needs KVM (/dev/kvm) and the `ops` toolchain on PATH.
func unikernelSupported() bool {
if _, err := os.Stat("/dev/kvm"); err != nil {
return false
}
if _, err := exec.LookPath("ops"); err != nil {
return false
}
return true
}
// homeDir returns the agent's working directory (where images/ and vm/ live),
// derived from the registry database path (e.g. /srv/mcp/mcp.db -> /srv/mcp).
func homeDir(cfg *config.AgentConfig) string {
if cfg != nil && cfg.Database.Path != "" {
return filepath.Dir(cfg.Database.Path)
}
if h := os.Getenv("HOME"); h != "" {
return h
}
return "/srv/mcp"
}
// runtimeFor selects the runtime backend for a component's declared runtime.
// Unknown or empty runtimes fall back to the container runtime. If a service
// requests "unikernel" but this node lacks the unikernel runtime, it falls
// back to the container runtime (the master should not place it here).
func (a *Agent) runtimeFor(rt string) runtime.Runtime {
if rt == "unikernel" && a.Unikernel != nil {
return a.Unikernel
}
return a.Runtime
}
// listAllContainers returns the observed state across every configured
// runtime (containers + unikernel VMs) so reconciliation, status, and drift
// detection see the whole picture.
func (a *Agent) listAllContainers(ctx context.Context) ([]runtime.ContainerInfo, error) {
infos, err := a.Runtime.List(ctx)
if err != nil {
return nil, err
}
if a.Unikernel != nil {
vms, vmErr := a.Unikernel.List(ctx)
if vmErr == nil {
infos = append(infos, vms...)
} else if a.Logger != nil {
a.Logger.Warn("list unikernel VMs failed", "err", vmErr)
}
}
return infos, nil
}

View File

@@ -15,8 +15,9 @@ import (
// ServiceInfo message.
func buildServiceInfo(svc registry.Service, components []registry.Component) *mcpv1.ServiceInfo {
info := &mcpv1.ServiceInfo{
Name: svc.Name,
Active: svc.Active,
Name: svc.Name,
Active: svc.Active,
Comment: svc.Comment,
}
for _, c := range components {
info.Components = append(info.Components, &mcpv1.ComponentInfo{
@@ -56,7 +57,7 @@ func (a *Agent) ListServices(ctx context.Context, req *mcpv1.ListServicesRequest
// registry. It returns a list of ServiceInfo messages with updated observed
// state. This shared logic is used by both LiveCheck and GetServiceStatus.
func (a *Agent) liveCheckServices(ctx context.Context) ([]*mcpv1.ServiceInfo, error) {
containers, err := a.Runtime.List(ctx)
containers, err := a.listAllContainers(ctx)
if err != nil {
return nil, fmt.Errorf("runtime list: %w", err)
}
@@ -84,8 +85,9 @@ func (a *Agent) liveCheckServices(ctx context.Context) ([]*mcpv1.ServiceInfo, er
}
info := &mcpv1.ServiceInfo{
Name: svc.Name,
Active: svc.Active,
Name: svc.Name,
Active: svc.Active,
Comment: svc.Comment,
}
for _, comp := range components {

View File

@@ -23,7 +23,7 @@ func TestListServices(t *testing.T) {
}
// Add a service with components.
if err := registry.CreateService(a.DB, "metacrypt", true); err != nil {
if err := registry.CreateService(a.DB, "metacrypt", true, ""); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
@@ -71,7 +71,7 @@ func TestLiveCheck(t *testing.T) {
ctx := context.Background()
// Set up registry with one service and one component.
if err := registry.CreateService(a.DB, "metacrypt", true); err != nil {
if err := registry.CreateService(a.DB, "metacrypt", true, ""); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
@@ -146,7 +146,7 @@ func TestGetServiceStatus_DriftDetection(t *testing.T) {
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "metacrypt", true); err != nil {
if err := registry.CreateService(a.DB, "metacrypt", true, ""); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
@@ -207,7 +207,7 @@ func TestGetServiceStatus_FilterByName(t *testing.T) {
ctx := context.Background()
for _, svc := range []string{"metacrypt", "mcr"} {
if err := registry.CreateService(a.DB, svc, true); err != nil {
if err := registry.CreateService(a.DB, svc, true, ""); err != nil {
t.Fatalf("create service %q: %v", svc, err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{

View File

@@ -60,15 +60,20 @@ func (a *Agent) syncService(_ context.Context, spec *mcpv1.ServiceSpec) (*mcpv1.
existing, err := registry.GetService(a.DB, spec.GetName())
if err != nil {
// Service does not exist; create it.
if err := registry.CreateService(a.DB, spec.GetName(), spec.GetActive()); err != nil {
if err := registry.CreateService(a.DB, spec.GetName(), spec.GetActive(), spec.GetComment()); err != nil {
return nil, nil, status.Errorf(codes.Internal, "create service %q: %v", spec.GetName(), err)
}
changes = append(changes, "created service")
} else if existing.Active != spec.GetActive() {
if err := registry.UpdateServiceActive(a.DB, spec.GetName(), spec.GetActive()); err != nil {
} else if existing.Active != spec.GetActive() || existing.Comment != spec.GetComment() {
if err := registry.UpdateServiceActive(a.DB, spec.GetName(), spec.GetActive(), spec.GetComment()); err != nil {
return nil, nil, status.Errorf(codes.Internal, "update service %q: %v", spec.GetName(), err)
}
changes = append(changes, fmt.Sprintf("active: %v -> %v", existing.Active, spec.GetActive()))
if existing.Active != spec.GetActive() {
changes = append(changes, fmt.Sprintf("active: %v -> %v", existing.Active, spec.GetActive()))
}
if existing.Comment != spec.GetComment() {
changes = append(changes, fmt.Sprintf("comment: %q", spec.GetComment()))
}
}
// Create or update each component.
@@ -107,7 +112,7 @@ func (a *Agent) syncService(_ context.Context, spec *mcpv1.ServiceSpec) (*mcpv1.
// reconcileUntracked lists all containers from the runtime and adds any that
// are not already tracked in the registry with desired_state "ignore".
func (a *Agent) reconcileUntracked(ctx context.Context, known map[string]bool) error {
containers, err := a.Runtime.List(ctx)
containers, err := a.listAllContainers(ctx)
if err != nil {
return fmt.Errorf("list containers: %w", err)
}
@@ -127,7 +132,7 @@ func (a *Agent) reconcileUntracked(ctx context.Context, known map[string]bool) e
}
if _, err := registry.GetService(a.DB, service); err != nil {
if err := registry.CreateService(a.DB, service, true); err != nil {
if err := registry.CreateService(a.DB, service, true, ""); err != nil {
a.Logger.Info("reconcile: create service failed", "service", service, "error", err)
continue
}

View File

@@ -36,7 +36,7 @@ func (a *Agent) UndeployService(ctx context.Context, req *mcpv1.UndeployServiceR
}
// Mark the service as inactive.
if err := registry.UpdateServiceActive(a.DB, serviceName, false); err != nil {
if err := registry.UpdateServiceActive(a.DB, serviceName, false, ""); err != nil {
a.Logger.Warn("failed to mark service inactive", "service", serviceName, "err", err)
}
@@ -73,10 +73,11 @@ func (a *Agent) undeployComponent(ctx context.Context, serviceName string, c *re
}
// 4. Stop and remove the container.
if err := a.Runtime.Stop(ctx, containerName); err != nil {
rt := a.runtimeFor(c.Runtime)
if err := rt.Stop(ctx, containerName); err != nil {
a.Logger.Info("stop container (ignored)", "container", containerName, "error", err)
}
if err := a.Runtime.Remove(ctx, containerName); err != nil {
if err := rt.Remove(ctx, containerName); err != nil {
a.Logger.Info("remove container (ignored)", "container", containerName, "error", err)
}