Files
mcp/internal/agent/sync.go
Kyle Isom d56f224359 Add unikernel runtime: run services as Nanos VMs under QEMU/KVM
Implements the hypervisor design's Phase 1: a second runtime.Runtime
backend (QEMU) that runs each service component as a Nanos unikernel VM
instead of a podman container, selected per-component via a new
runtime = "unikernel" service-def field.

- internal/runtime/qemu.go: QEMURuntime. Pull extracts the ELF from the
  OCI image; Run does `ops build` + boots qemu-system-x86_64 with KVM,
  user-mode net port-forwards, QMP control socket and serial console log;
  Stop/Remove/Inspect/List/Logs map onto VM lifecycle + state dir.
- proto/registry/servicedef: add runtime, memory_mb, vcpus fields
  (registry migration 5).
- agent: holds both runtimes; runtimeFor() selects per component;
  listAllContainers() merges containers + VMs so drift/status see both.
  Unikernel runtime auto-enables on nodes with /dev/kvm + ops.

Validated end-to-end on straylight: a test service deploys via
`mcp deploy --direct`, boots as a Nanos unikernel, serves HTTP through
the agent port-forward, and reports running via `mcp status`/`mcp logs`.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-11 00:54:49 -07:00

208 lines
6.7 KiB
Go

package agent
import (
"context"
"fmt"
"strings"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/registry"
"git.wntrmute.dev/mc/mcp/internal/runtime"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// SyncDesiredState reconciles the agent's registry with the declared service
// specs. It creates or updates services and components, then discovers
// untracked containers and adds them with desired_state "ignore".
func (a *Agent) SyncDesiredState(ctx context.Context, req *mcpv1.SyncDesiredStateRequest) (*mcpv1.SyncDesiredStateResponse, error) {
a.Logger.Info("SyncDesiredState", "services", len(req.GetServices()))
known := make(map[string]bool)
var results []*mcpv1.ServiceSyncResult
for _, spec := range req.GetServices() {
if spec.GetName() == "" {
return nil, status.Error(codes.InvalidArgument, "service name is required")
}
r, names, err := a.syncService(ctx, spec)
if err != nil {
return nil, err
}
for _, n := range names {
known[n] = true
}
results = append(results, r)
}
// Reconciliation: find containers not in the registry and add them.
if err := a.reconcileUntracked(ctx, known); err != nil {
a.Logger.Info("reconcile untracked containers failed", "error", err)
}
return &mcpv1.SyncDesiredStateResponse{Results: results}, nil
}
// syncService creates or updates a single service and its components.
// It returns the sync result and a list of container names that belong to
// this service.
func (a *Agent) syncService(_ context.Context, spec *mcpv1.ServiceSpec) (*mcpv1.ServiceSyncResult, []string, error) {
result := &mcpv1.ServiceSyncResult{Name: spec.GetName()}
var changes []string
var containerNames []string
desiredState := "running"
if !spec.GetActive() {
desiredState = "stopped"
}
// Create or update the service record.
existing, err := registry.GetService(a.DB, spec.GetName())
if err != nil {
// Service does not exist; create it.
if err := registry.CreateService(a.DB, spec.GetName(), spec.GetActive(), spec.GetComment()); err != nil {
return nil, nil, status.Errorf(codes.Internal, "create service %q: %v", spec.GetName(), err)
}
changes = append(changes, "created service")
} else if existing.Active != spec.GetActive() || existing.Comment != spec.GetComment() {
if err := registry.UpdateServiceActive(a.DB, spec.GetName(), spec.GetActive(), spec.GetComment()); err != nil {
return nil, nil, status.Errorf(codes.Internal, "update service %q: %v", spec.GetName(), err)
}
if existing.Active != spec.GetActive() {
changes = append(changes, fmt.Sprintf("active: %v -> %v", existing.Active, spec.GetActive()))
}
if existing.Comment != spec.GetComment() {
changes = append(changes, fmt.Sprintf("comment: %q", spec.GetComment()))
}
}
// Create or update each component.
for _, cs := range spec.GetComponents() {
containerName := spec.GetName() + "-" + cs.GetName()
containerNames = append(containerNames, containerName)
comp := protoToComponent(spec.GetName(), cs, desiredState)
if componentExists(a.DB, spec.GetName(), cs.GetName()) {
if err := registry.UpdateComponentSpec(a.DB, comp); err != nil {
return nil, nil, status.Errorf(codes.Internal, "update component %q/%q: %v", spec.GetName(), cs.GetName(), err)
}
if err := registry.UpdateComponentState(a.DB, spec.GetName(), cs.GetName(), desiredState, ""); err != nil {
return nil, nil, status.Errorf(codes.Internal, "update component state %q/%q: %v", spec.GetName(), cs.GetName(), err)
}
changes = append(changes, fmt.Sprintf("updated %s", cs.GetName()))
} else {
if err := registry.CreateComponent(a.DB, comp); err != nil {
return nil, nil, status.Errorf(codes.Internal, "create component %q/%q: %v", spec.GetName(), cs.GetName(), err)
}
changes = append(changes, fmt.Sprintf("created %s", cs.GetName()))
}
}
result.Changed = len(changes) > 0
result.Summary = strings.Join(changes, "; ")
if !result.Changed {
result.Summary = "no changes"
}
a.Logger.Info("sync service", "service", spec.GetName(), "changed", result.Changed, "summary", result.Summary)
return result, containerNames, nil
}
// reconcileUntracked lists all containers from the runtime and adds any that
// are not already tracked in the registry with desired_state "ignore".
func (a *Agent) reconcileUntracked(ctx context.Context, known map[string]bool) error {
containers, err := a.listAllContainers(ctx)
if err != nil {
return fmt.Errorf("list containers: %w", err)
}
for _, c := range containers {
if known[c.Name] {
continue
}
service, component, ok := parseContainerName(c.Name)
if !ok {
continue
}
if componentExists(a.DB, service, component) {
continue
}
if _, err := registry.GetService(a.DB, service); err != nil {
if err := registry.CreateService(a.DB, service, true, ""); err != nil {
a.Logger.Info("reconcile: create service failed", "service", service, "error", err)
continue
}
}
comp := &registry.Component{
Name: component,
Service: service,
Image: c.Image,
Network: c.Network,
UserSpec: c.User,
Restart: c.Restart,
DesiredState: "ignore",
ObservedState: c.State,
Version: runtime.ExtractVersion(c.Image),
}
if err := registry.CreateComponent(a.DB, comp); err != nil {
a.Logger.Info("reconcile: create component failed", "container", c.Name, "error", err)
continue
}
a.Logger.Info("reconcile: adopted untracked container", "container", c.Name, "desired_state", "ignore")
}
return nil
}
// protoToComponent converts a proto ComponentSpec to a registry Component.
func protoToComponent(service string, cs *mcpv1.ComponentSpec, desiredState string) *registry.Component {
var routes []registry.Route
for _, r := range cs.GetRoutes() {
mode := r.GetMode()
if mode == "" {
mode = "l4"
}
name := r.GetName()
if name == "" {
name = "default"
}
routes = append(routes, registry.Route{
Name: name,
Port: int(r.GetPort()),
Mode: mode,
Hostname: r.GetHostname(),
})
}
return &registry.Component{
Name: cs.GetName(),
Service: service,
Image: cs.GetImage(),
Network: cs.GetNetwork(),
UserSpec: cs.GetUser(),
Restart: cs.GetRestart(),
Ports: cs.GetPorts(),
Volumes: cs.GetVolumes(),
Cmd: cs.GetCmd(),
Routes: routes,
DesiredState: desiredState,
Version: runtime.ExtractVersion(cs.GetImage()),
}
}
// parseContainerName splits "service-component" into its parts. Returns false
// if the name does not contain a hyphen.
func parseContainerName(name string) (service, component string, ok bool) {
i := strings.IndexByte(name, '-')
if i < 0 {
return "", "", false
}
return name[:i], name[i+1:], true
}