diff --git a/cmd/mcp/node.go b/cmd/mcp/node.go index 25113d3..bea14d7 100644 --- a/cmd/mcp/node.go +++ b/cmd/mcp/node.go @@ -53,15 +53,15 @@ func runNodeList(_ *cobra.Command, _ []string) error { w := tabwriter.NewWriter(os.Stdout, 0, 4, 2, ' ', 0) _, _ = fmt.Fprintln(w, "NAME\tADDRESS\tVERSION") for _, n := range cfg.Nodes { - ver := queryAgentVersion(cfg, n.Address) + ver := queryAgentVersion(cfg, n.AllAddresses()) _, _ = fmt.Fprintf(w, "%s\t%s\t%s\n", n.Name, n.Address, ver) } return w.Flush() } -// queryAgentVersion dials the agent and returns its version, or an error indicator. -func queryAgentVersion(cfg *config.CLIConfig, address string) string { - client, conn, err := dialAgent(address, cfg) +// queryAgentVersion dials the agent (trying all addresses) and returns its version. +func queryAgentVersion(cfg *config.CLIConfig, addresses []string) string { + client, conn, err := dialAgentMulti(addresses, cfg) if err != nil { return "error" } diff --git a/gen/mcp/v1/mcp.pb.go b/gen/mcp/v1/mcp.pb.go index 0b01f1d..d0cd2a6 100644 --- a/gen/mcp/v1/mcp.pb.go +++ b/gen/mcp/v1/mcp.pb.go @@ -110,6 +110,9 @@ type ComponentSpec struct { Cmd []string `protobuf:"bytes,8,rep,name=cmd,proto3" json:"cmd,omitempty"` Routes []*RouteSpec `protobuf:"bytes,9,rep,name=routes,proto3" json:"routes,omitempty"` Env []string `protobuf:"bytes,10,rep,name=env,proto3" json:"env,omitempty"` + Runtime string `protobuf:"bytes,11,opt,name=runtime,proto3" json:"runtime,omitempty"` // "container" (default) or "unikernel" + MemoryMb int32 `protobuf:"varint,12,opt,name=memory_mb,json=memoryMb,proto3" json:"memory_mb,omitempty"` // unikernel guest memory in MB (default 256) + Vcpus int32 `protobuf:"varint,13,opt,name=vcpus,proto3" json:"vcpus,omitempty"` // unikernel guest vCPUs (default 1) unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -214,6 +217,27 @@ func (x *ComponentSpec) GetEnv() []string { return nil } +func (x *ComponentSpec) GetRuntime() string { + if x != nil { + return x.Runtime + } + return "" +} + +func (x *ComponentSpec) GetMemoryMb() int32 { + if x != nil { + return x.MemoryMb + } + return 0 +} + +func (x *ComponentSpec) GetVcpus() int32 { + if x != nil { + return x.Vcpus + } + return 0 +} + type SnapshotConfig struct { state protoimpl.MessageState `protogen:"open.v1"` Method string `protobuf:"bytes,1,opt,name=method,proto3" json:"method,omitempty"` // "grpc", "cli", "exec: ", "full", or "" (default) @@ -3586,7 +3610,7 @@ const file_proto_mcp_v1_mcp_proto_rawDesc = "" + "\x04port\x18\x02 \x01(\x05R\x04port\x12\x12\n" + "\x04mode\x18\x03 \x01(\tR\x04mode\x12\x1a\n" + "\bhostname\x18\x04 \x01(\tR\bhostname\x12\x16\n" + - "\x06public\x18\x05 \x01(\bR\x06public\"\x80\x02\n" + + "\x06public\x18\x05 \x01(\bR\x06public\"\xcd\x02\n" + "\rComponentSpec\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n" + "\x05image\x18\x02 \x01(\tR\x05image\x12\x18\n" + @@ -3598,7 +3622,10 @@ const file_proto_mcp_v1_mcp_proto_rawDesc = "" + "\x03cmd\x18\b \x03(\tR\x03cmd\x12)\n" + "\x06routes\x18\t \x03(\v2\x11.mcp.v1.RouteSpecR\x06routes\x12\x10\n" + "\x03env\x18\n" + - " \x03(\tR\x03env\"D\n" + + " \x03(\tR\x03env\x12\x18\n" + + "\aruntime\x18\v \x01(\tR\aruntime\x12\x1b\n" + + "\tmemory_mb\x18\f \x01(\x05R\bmemoryMb\x12\x14\n" + + "\x05vcpus\x18\r \x01(\x05R\x05vcpus\"D\n" + "\x0eSnapshotConfig\x12\x16\n" + "\x06method\x18\x01 \x01(\tR\x06method\x12\x1a\n" + "\bexcludes\x18\x02 \x03(\tR\bexcludes\"\xe6\x01\n" + diff --git a/internal/agent/adopt.go b/internal/agent/adopt.go index efac5cd..17631cb 100644 --- a/internal/agent/adopt.go +++ b/internal/agent/adopt.go @@ -19,7 +19,7 @@ func (a *Agent) AdoptContainers(ctx context.Context, req *mcpv1.AdoptContainersR return nil, fmt.Errorf("service name is required") } - containers, err := a.Runtime.List(ctx) + containers, err := a.listAllContainers(ctx) if err != nil { return nil, fmt.Errorf("list containers: %w", err) } @@ -46,7 +46,7 @@ func (a *Agent) AdoptContainers(ctx context.Context, req *mcpv1.AdoptContainersR } // Ensure the service exists once, before adopting any containers. - if err := registry.CreateService(a.DB, service, true); err != nil { + if err := registry.CreateService(a.DB, service, true, ""); err != nil { if _, getErr := registry.GetService(a.DB, service); getErr != nil { return nil, fmt.Errorf("create service %q: %w", service, err) } diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 374e822..b13e70a 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -9,6 +9,7 @@ import ( "net" "os" "os/signal" + "path/filepath" "syscall" mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1" @@ -28,7 +29,8 @@ type Agent struct { Config *config.AgentConfig DB *sql.DB - Runtime runtime.Runtime + Runtime runtime.Runtime // container runtime (podman) + Unikernel runtime.Runtime // unikernel runtime (qemu/nanos); nil if unavailable Monitor *monitor.Monitor Logger *slog.Logger PortAlloc *PortAllocator @@ -53,6 +55,18 @@ func Run(cfg *config.AgentConfig, version string) error { rt := &runtime.Podman{} + // The unikernel runtime is enabled only on nodes with KVM. Services with + // runtime = "unikernel" are placed by the master on KVM-capable nodes. + var uk runtime.Runtime + if unikernelSupported() { + uk = &runtime.QEMU{ + ImageDir: filepath.Join(homeDir(cfg), "images"), + StateDir: filepath.Join(homeDir(cfg), "vm"), + HomeDir: homeDir(cfg), + } + logger.Info("unikernel runtime enabled (KVM detected)") + } + mon := monitor.New(db, rt, cfg.Monitor, cfg.Agent.NodeName, logger) proxy, err := NewProxyRouter(cfg.MCProxy.Socket, cfg.MCProxy.CertDir, logger) @@ -74,6 +88,7 @@ func Run(cfg *config.AgentConfig, version string) error { Config: cfg, DB: db, Runtime: rt, + Unikernel: uk, Monitor: mon, Logger: logger, PortAlloc: NewPortAllocator(), diff --git a/internal/agent/boot.go b/internal/agent/boot.go index 2f3c9a2..189a573 100644 --- a/internal/agent/boot.go +++ b/internal/agent/boot.go @@ -120,7 +120,7 @@ func (a *Agent) checkServiceHealth(ctx context.Context, serviceName, method stri // or from the running container. func (a *Agent) findServicePort(serviceName string) (int, error) { // Check the running containers for a mapped port. - containers, err := a.Runtime.List(context.Background()) + containers, err := a.listAllContainers(context.Background()) if err != nil { return 0, fmt.Errorf("list containers: %w", err) } diff --git a/internal/agent/deploy.go b/internal/agent/deploy.go index a2faafe..9cca5bc 100644 --- a/internal/agent/deploy.go +++ b/internal/agent/deploy.go @@ -22,7 +22,7 @@ func (a *Agent) Deploy(ctx context.Context, req *mcpv1.DeployRequest) (*mcpv1.De serviceName := spec.GetName() a.Logger.Info("deploying", "service", serviceName) - if err := ensureService(a.DB, serviceName, spec.GetActive()); err != nil { + if err := ensureService(a.DB, serviceName, spec.GetActive(), spec.GetComment()); err != nil { return nil, fmt.Errorf("deploy: ensure service %q: %w", serviceName, err) } @@ -94,8 +94,20 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp Volumes: cs.GetVolumes(), Cmd: cs.GetCmd(), Routes: regRoutes, + Runtime: cs.GetRuntime(), + MemoryMB: int(cs.GetMemoryMb()), + VCPUs: int(cs.GetVcpus()), } + // Select the runtime backend (container vs unikernel) for this component. + if cs.GetRuntime() == "unikernel" && a.Unikernel == nil { + return &mcpv1.ComponentResult{ + Name: compName, + Error: "service requests unikernel runtime but this node has no KVM/ops support", + } + } + rt := a.runtimeFor(cs.GetRuntime()) + if err := ensureComponent(a.DB, regComp); err != nil { return &mcpv1.ComponentResult{ Name: compName, @@ -103,27 +115,29 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp } } - if err := a.Runtime.Pull(ctx, cs.GetImage()); err != nil { + if err := rt.Pull(ctx, cs.GetImage()); err != nil { return &mcpv1.ComponentResult{ Name: compName, Error: fmt.Sprintf("pull image: %v", err), } } - _ = a.Runtime.Stop(ctx, containerName) // may not exist yet - _ = a.Runtime.Remove(ctx, containerName) // may not exist yet + _ = rt.Stop(ctx, containerName) // may not exist yet + _ = rt.Remove(ctx, containerName) // may not exist yet // Build the container spec. If the component has routes, use route-based // port allocation and env injection. Otherwise, fall back to legacy ports. runSpec := runtime.ContainerSpec{ - Name: containerName, - Image: cs.GetImage(), - Network: cs.GetNetwork(), - User: cs.GetUser(), - Restart: cs.GetRestart(), - Volumes: cs.GetVolumes(), - Cmd: cs.GetCmd(), - Env: cs.GetEnv(), + Name: containerName, + Image: cs.GetImage(), + Network: cs.GetNetwork(), + User: cs.GetUser(), + Restart: cs.GetRestart(), + Volumes: cs.GetVolumes(), + Cmd: cs.GetCmd(), + Env: cs.GetEnv(), + MemoryMB: int(cs.GetMemoryMb()), + VCPUs: int(cs.GetVcpus()), } if len(regRoutes) > 0 && a.PortAlloc != nil { @@ -142,7 +156,7 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp runSpec.Ports = cs.GetPorts() } - if err := a.Runtime.Run(ctx, runSpec); err != nil { + if err := rt.Run(ctx, runSpec); err != nil { _ = registry.UpdateComponentState(a.DB, serviceName, compName, "", "removed") return &mcpv1.ComponentResult{ Name: compName, @@ -219,16 +233,16 @@ func (a *Agent) allocateRoutePorts(service, component string, routes []registry. } // ensureService creates the service if it does not exist, or updates its -// active flag if it does. -func ensureService(db *sql.DB, name string, active bool) error { +// active flag and comment if it does. +func ensureService(db *sql.DB, name string, active bool, comment string) error { _, err := registry.GetService(db, name) if errors.Is(err, sql.ErrNoRows) { - return registry.CreateService(db, name, active) + return registry.CreateService(db, name, active, comment) } if err != nil { return err } - return registry.UpdateServiceActive(db, name, active) + return registry.UpdateServiceActive(db, name, active, comment) } // hasL7Routes reports whether any route uses L7 (TLS-terminating) mode. diff --git a/internal/agent/deploy_test.go b/internal/agent/deploy_test.go index 946a208..86fea42 100644 --- a/internal/agent/deploy_test.go +++ b/internal/agent/deploy_test.go @@ -34,7 +34,7 @@ func testAgent(t *testing.T) *Agent { // allocateRoutePorts can store host ports for it. func seedComponent(t *testing.T, db *sql.DB, service, component string, routes []registry.Route) { t.Helper() - if err := registry.CreateService(db, service, true); err != nil { + if err := registry.CreateService(db, service, true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(db, ®istry.Component{ diff --git a/internal/agent/edge_rpc.go b/internal/agent/edge_rpc.go index bea595c..118bd1e 100644 --- a/internal/agent/edge_rpc.go +++ b/internal/agent/edge_rpc.go @@ -12,8 +12,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1" mcproxy "git.wntrmute.dev/mc/mc-proxy/client/mcproxy" + mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1" "git.wntrmute.dev/mc/mcp/internal/registry" ) @@ -182,7 +182,7 @@ func (a *Agent) HealthCheck(_ context.Context, _ *mcpv1.HealthCheckRequest) (*mc // Count running containers if the runtime is available. if a.Runtime != nil { - if list, err := a.Runtime.List(context.Background()); err == nil { + if list, err := a.listAllContainers(context.Background()); err == nil { containers = int32(len(list)) //nolint:gosec // container count is small } else { st = "degraded" diff --git a/internal/agent/heartbeat.go b/internal/agent/heartbeat.go index da9f94d..2655453 100644 --- a/internal/agent/heartbeat.go +++ b/internal/agent/heartbeat.go @@ -21,8 +21,8 @@ import ( // MasterConfig holds the optional master connection settings for the agent. // When configured, the agent self-registers and sends periodic heartbeats. type MasterConfig struct { - Address string `toml:"address"` // master gRPC address - CACert string `toml:"ca_cert"` // CA cert to verify master's TLS + Address string `toml:"address"` // master gRPC address + CACert string `toml:"ca_cert"` // CA cert to verify master's TLS TokenPath string `toml:"token_path"` // MCIAS service token for auth } @@ -38,12 +38,20 @@ type HeartbeatClient struct { interval time.Duration stop chan struct{} wg sync.WaitGroup - logger interface{ Info(string, ...any); Warn(string, ...any); Error(string, ...any) } + logger interface { + Info(string, ...any) + Warn(string, ...any) + Error(string, ...any) + } } // NewHeartbeatClient creates a client that registers with the master and // sends periodic heartbeats. Returns nil if master address is not configured. -func NewHeartbeatClient(cfg config.AgentConfig, logger interface{ Info(string, ...any); Warn(string, ...any); Error(string, ...any) }) (*HeartbeatClient, error) { +func NewHeartbeatClient(cfg config.AgentConfig, logger interface { + Info(string, ...any) + Warn(string, ...any) + Error(string, ...any) +}) (*HeartbeatClient, error) { // Config takes precedence, env vars as fallback. masterAddr := cfg.Master.Address if masterAddr == "" { diff --git a/internal/agent/lifecycle.go b/internal/agent/lifecycle.go index 9facc94..d0c05bc 100644 --- a/internal/agent/lifecycle.go +++ b/internal/agent/lifecycle.go @@ -51,7 +51,7 @@ func (a *Agent) StopService(ctx context.Context, req *mcpv1.StopServiceRequest) } } - if err := a.Runtime.Stop(ctx, containerName); err != nil { + if err := a.runtimeFor(c.Runtime).Stop(ctx, containerName); err != nil { a.Logger.Info("stop container (ignored)", "container", containerName, "error", err) } @@ -132,12 +132,14 @@ func startComponent(ctx context.Context, a *Agent, service string, c *registry.C containerName := ContainerNameFor(service, c.Name) r := &mcpv1.ComponentResult{Name: c.Name, Success: true} + rt := a.runtimeFor(c.Runtime) + // Remove any pre-existing container; ignore errors for non-existent ones. - _ = a.Runtime.Stop(ctx, containerName) - _ = a.Runtime.Remove(ctx, containerName) + _ = rt.Stop(ctx, containerName) + _ = rt.Remove(ctx, containerName) spec := componentToSpec(service, c) - if err := a.Runtime.Run(ctx, spec); err != nil { + if err := rt.Run(ctx, spec); err != nil { r.Success = false r.Error = fmt.Sprintf("run container: %v", err) return r @@ -156,11 +158,12 @@ func restartComponent(ctx context.Context, a *Agent, service string, c *registry containerName := ContainerNameFor(service, c.Name) r := &mcpv1.ComponentResult{Name: c.Name, Success: true} - _ = a.Runtime.Stop(ctx, containerName) - _ = a.Runtime.Remove(ctx, containerName) + rt := a.runtimeFor(c.Runtime) + _ = rt.Stop(ctx, containerName) + _ = rt.Remove(ctx, containerName) spec := componentToSpec(service, c) - if err := a.Runtime.Run(ctx, spec); err != nil { + if err := rt.Run(ctx, spec); err != nil { r.Success = false r.Error = fmt.Sprintf("run container: %v", err) _ = registry.UpdateComponentState(a.DB, service, c.Name, "", "stopped") @@ -177,14 +180,16 @@ func restartComponent(ctx context.Context, a *Agent, service string, c *registry // componentToSpec builds a runtime.ContainerSpec from a registry Component. func componentToSpec(service string, c *registry.Component) runtime.ContainerSpec { return runtime.ContainerSpec{ - Name: ContainerNameFor(service, c.Name), - Image: c.Image, - Network: c.Network, - User: c.UserSpec, - Restart: c.Restart, - Ports: c.Ports, - Volumes: c.Volumes, - Cmd: c.Cmd, + Name: ContainerNameFor(service, c.Name), + Image: c.Image, + Network: c.Network, + User: c.UserSpec, + Restart: c.Restart, + Ports: c.Ports, + Volumes: c.Volumes, + Cmd: c.Cmd, + MemoryMB: c.MemoryMB, + VCPUs: c.VCPUs, } } diff --git a/internal/agent/logs.go b/internal/agent/logs.go index 714a3e6..e511eb8 100644 --- a/internal/agent/logs.go +++ b/internal/agent/logs.go @@ -2,15 +2,22 @@ package agent import ( "bufio" + "context" "io" + "os/exec" mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1" "git.wntrmute.dev/mc/mcp/internal/registry" - "git.wntrmute.dev/mc/mcp/internal/runtime" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) +// logStreamer is implemented by both the Podman (container logs) and QEMU +// (serial console) runtimes. +type logStreamer interface { + Logs(ctx context.Context, name string, tail int, follow, timestamps bool, since string) *exec.Cmd +} + // Logs streams container logs for a service component. func (a *Agent) Logs(req *mcpv1.LogsRequest, stream mcpv1.McpAgentService_LogsServer) error { if req.GetService() == "" { @@ -32,14 +39,20 @@ func (a *Agent) Logs(req *mcpv1.LogsRequest, stream mcpv1.McpAgentService_LogsSe containerName := ContainerNameFor(req.GetService(), component) - podman, ok := a.Runtime.(*runtime.Podman) + // Select the runtime for this component (container vs unikernel) and + // stream its logs (podman logs / journald, or the VM serial console). + var compRuntime string + if c, err := registry.GetComponent(a.DB, req.GetService(), component); err == nil { + compRuntime = c.Runtime + } + ls, ok := a.runtimeFor(compRuntime).(logStreamer) if !ok { - return status.Error(codes.Internal, "logs requires podman runtime") + return status.Error(codes.Internal, "selected runtime does not support log streaming") } - cmd := podman.Logs(stream.Context(), containerName, int(req.GetTail()), req.GetFollow(), req.GetTimestamps(), req.GetSince()) + cmd := ls.Logs(stream.Context(), containerName, int(req.GetTail()), req.GetFollow(), req.GetTimestamps(), req.GetSince()) - a.Logger.Info("running podman logs", "container", containerName, "args", cmd.Args) + a.Logger.Info("streaming logs", "container", containerName, "runtime", compRuntime, "args", cmd.Args) // Podman writes container stdout to its stdout and container stderr // to its stderr. Merge both into a single pipe. @@ -48,7 +61,7 @@ func (a *Agent) Logs(req *mcpv1.LogsRequest, stream mcpv1.McpAgentService_LogsSe cmd.Stderr = pw if err := cmd.Start(); err != nil { - pw.Close() + _ = pw.Close() return status.Errorf(codes.Internal, "start podman logs: %v", err) } @@ -58,7 +71,7 @@ func (a *Agent) Logs(req *mcpv1.LogsRequest, stream mcpv1.McpAgentService_LogsSe if err != nil { a.Logger.Warn("podman logs exited", "container", containerName, "error", err) } - pw.Close() + _ = pw.Close() }() scanner := bufio.NewScanner(pr) diff --git a/internal/agent/purge_test.go b/internal/agent/purge_test.go index 3a25089..7470c16 100644 --- a/internal/agent/purge_test.go +++ b/internal/agent/purge_test.go @@ -14,7 +14,7 @@ func TestPurgeComponentRemoved(t *testing.T) { ctx := context.Background() // Set up a service with a stale component. - if err := registry.CreateService(a.DB, "mcns", true); err != nil { + if err := registry.CreateService(a.DB, "mcns", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(a.DB, ®istry.Component{ @@ -69,7 +69,7 @@ func TestPurgeRefusesRunning(t *testing.T) { a := newTestAgent(t, rt) ctx := context.Background() - if err := registry.CreateService(a.DB, "mcr", true); err != nil { + if err := registry.CreateService(a.DB, "mcr", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(a.DB, ®istry.Component{ @@ -109,7 +109,7 @@ func TestPurgeRefusesStopped(t *testing.T) { a := newTestAgent(t, rt) ctx := context.Background() - if err := registry.CreateService(a.DB, "mcr", true); err != nil { + if err := registry.CreateService(a.DB, "mcr", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(a.DB, ®istry.Component{ @@ -140,7 +140,7 @@ func TestPurgeSkipsDefinedComponent(t *testing.T) { a := newTestAgent(t, rt) ctx := context.Background() - if err := registry.CreateService(a.DB, "mcns", true); err != nil { + if err := registry.CreateService(a.DB, "mcns", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(a.DB, ®istry.Component{ @@ -176,7 +176,7 @@ func TestPurgeDryRun(t *testing.T) { a := newTestAgent(t, rt) ctx := context.Background() - if err := registry.CreateService(a.DB, "mcns", true); err != nil { + if err := registry.CreateService(a.DB, "mcns", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(a.DB, ®istry.Component{ @@ -217,7 +217,7 @@ func TestPurgeServiceFilter(t *testing.T) { ctx := context.Background() // Create two services. - if err := registry.CreateService(a.DB, "mcns", true); err != nil { + if err := registry.CreateService(a.DB, "mcns", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(a.DB, ®istry.Component{ @@ -226,7 +226,7 @@ func TestPurgeServiceFilter(t *testing.T) { }); err != nil { t.Fatalf("create component: %v", err) } - if err := registry.CreateService(a.DB, "mcr", true); err != nil { + if err := registry.CreateService(a.DB, "mcr", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(a.DB, ®istry.Component{ @@ -263,7 +263,7 @@ func TestPurgeServiceDeletedWhenEmpty(t *testing.T) { a := newTestAgent(t, rt) ctx := context.Background() - if err := registry.CreateService(a.DB, "mcns", true); err != nil { + if err := registry.CreateService(a.DB, "mcns", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(a.DB, ®istry.Component{ @@ -306,7 +306,7 @@ func TestPurgeServiceKeptWhenComponentsRemain(t *testing.T) { a := newTestAgent(t, rt) ctx := context.Background() - if err := registry.CreateService(a.DB, "mcns", true); err != nil { + if err := registry.CreateService(a.DB, "mcns", true, ""); err != nil { t.Fatalf("create service: %v", err) } // Stale component (will be purged). @@ -359,7 +359,7 @@ func TestPurgeExitedState(t *testing.T) { a := newTestAgent(t, rt) ctx := context.Background() - if err := registry.CreateService(a.DB, "test", true); err != nil { + if err := registry.CreateService(a.DB, "test", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(a.DB, ®istry.Component{ @@ -384,7 +384,7 @@ func TestPurgeUnknownState(t *testing.T) { a := newTestAgent(t, rt) ctx := context.Background() - if err := registry.CreateService(a.DB, "test", true); err != nil { + if err := registry.CreateService(a.DB, "test", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(a.DB, ®istry.Component{ diff --git a/internal/agent/recover.go b/internal/agent/recover.go index a0aaa1e..f7d08d8 100644 --- a/internal/agent/recover.go +++ b/internal/agent/recover.go @@ -22,7 +22,7 @@ func (a *Agent) Recover(ctx context.Context) error { } // Get the list of currently running containers from podman. - running, err := a.Runtime.List(ctx) + running, err := a.listAllContainers(ctx) if err != nil { a.Logger.Warn("cannot list containers, assuming none running", "err", err) running = nil @@ -67,18 +67,22 @@ func (a *Agent) Recover(ctx context.Context) error { "image", comp.Image, ) + rt := a.runtimeFor(comp.Runtime) + // Remove any stale container with the same name. - _ = a.Runtime.Remove(ctx, containerName) + _ = rt.Remove(ctx, containerName) // Build the container spec from the registry. spec := runtime.ContainerSpec{ - Name: containerName, - Image: comp.Image, - Network: comp.Network, - User: comp.UserSpec, - Restart: comp.Restart, - Volumes: comp.Volumes, - Cmd: comp.Cmd, + Name: containerName, + Image: comp.Image, + Network: comp.Network, + User: comp.UserSpec, + Restart: comp.Restart, + Volumes: comp.Volumes, + Cmd: comp.Cmd, + MemoryMB: comp.MemoryMB, + VCPUs: comp.VCPUs, } // Allocate ports from routes if the component has routes. @@ -95,7 +99,7 @@ func (a *Agent) Recover(ctx context.Context) error { spec.Ports = comp.Ports } - if err := a.Runtime.Run(ctx, spec); err != nil { + if err := rt.Run(ctx, spec); err != nil { a.Logger.Error("recover container failed", "container", containerName, "err", err, diff --git a/internal/agent/runtime.go b/internal/agent/runtime.go new file mode 100644 index 0000000..00996b0 --- /dev/null +++ b/internal/agent/runtime.go @@ -0,0 +1,65 @@ +package agent + +import ( + "context" + "os" + "os/exec" + "path/filepath" + + "git.wntrmute.dev/mc/mcp/internal/config" + "git.wntrmute.dev/mc/mcp/internal/runtime" +) + +// unikernelSupported reports whether this node can run Nanos unikernels: +// it needs KVM (/dev/kvm) and the `ops` toolchain on PATH. +func unikernelSupported() bool { + if _, err := os.Stat("/dev/kvm"); err != nil { + return false + } + if _, err := exec.LookPath("ops"); err != nil { + return false + } + return true +} + +// homeDir returns the agent's working directory (where images/ and vm/ live), +// derived from the registry database path (e.g. /srv/mcp/mcp.db -> /srv/mcp). +func homeDir(cfg *config.AgentConfig) string { + if cfg != nil && cfg.Database.Path != "" { + return filepath.Dir(cfg.Database.Path) + } + if h := os.Getenv("HOME"); h != "" { + return h + } + return "/srv/mcp" +} + +// runtimeFor selects the runtime backend for a component's declared runtime. +// Unknown or empty runtimes fall back to the container runtime. If a service +// requests "unikernel" but this node lacks the unikernel runtime, it falls +// back to the container runtime (the master should not place it here). +func (a *Agent) runtimeFor(rt string) runtime.Runtime { + if rt == "unikernel" && a.Unikernel != nil { + return a.Unikernel + } + return a.Runtime +} + +// listAllContainers returns the observed state across every configured +// runtime (containers + unikernel VMs) so reconciliation, status, and drift +// detection see the whole picture. +func (a *Agent) listAllContainers(ctx context.Context) ([]runtime.ContainerInfo, error) { + infos, err := a.Runtime.List(ctx) + if err != nil { + return nil, err + } + if a.Unikernel != nil { + vms, vmErr := a.Unikernel.List(ctx) + if vmErr == nil { + infos = append(infos, vms...) + } else if a.Logger != nil { + a.Logger.Warn("list unikernel VMs failed", "err", vmErr) + } + } + return infos, nil +} diff --git a/internal/agent/status.go b/internal/agent/status.go index 0a93b8c..cd1938c 100644 --- a/internal/agent/status.go +++ b/internal/agent/status.go @@ -15,8 +15,9 @@ import ( // ServiceInfo message. func buildServiceInfo(svc registry.Service, components []registry.Component) *mcpv1.ServiceInfo { info := &mcpv1.ServiceInfo{ - Name: svc.Name, - Active: svc.Active, + Name: svc.Name, + Active: svc.Active, + Comment: svc.Comment, } for _, c := range components { info.Components = append(info.Components, &mcpv1.ComponentInfo{ @@ -56,7 +57,7 @@ func (a *Agent) ListServices(ctx context.Context, req *mcpv1.ListServicesRequest // registry. It returns a list of ServiceInfo messages with updated observed // state. This shared logic is used by both LiveCheck and GetServiceStatus. func (a *Agent) liveCheckServices(ctx context.Context) ([]*mcpv1.ServiceInfo, error) { - containers, err := a.Runtime.List(ctx) + containers, err := a.listAllContainers(ctx) if err != nil { return nil, fmt.Errorf("runtime list: %w", err) } @@ -84,8 +85,9 @@ func (a *Agent) liveCheckServices(ctx context.Context) ([]*mcpv1.ServiceInfo, er } info := &mcpv1.ServiceInfo{ - Name: svc.Name, - Active: svc.Active, + Name: svc.Name, + Active: svc.Active, + Comment: svc.Comment, } for _, comp := range components { diff --git a/internal/agent/status_test.go b/internal/agent/status_test.go index 17f2733..c3fac9a 100644 --- a/internal/agent/status_test.go +++ b/internal/agent/status_test.go @@ -23,7 +23,7 @@ func TestListServices(t *testing.T) { } // Add a service with components. - if err := registry.CreateService(a.DB, "metacrypt", true); err != nil { + if err := registry.CreateService(a.DB, "metacrypt", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(a.DB, ®istry.Component{ @@ -71,7 +71,7 @@ func TestLiveCheck(t *testing.T) { ctx := context.Background() // Set up registry with one service and one component. - if err := registry.CreateService(a.DB, "metacrypt", true); err != nil { + if err := registry.CreateService(a.DB, "metacrypt", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(a.DB, ®istry.Component{ @@ -146,7 +146,7 @@ func TestGetServiceStatus_DriftDetection(t *testing.T) { a := newTestAgent(t, rt) ctx := context.Background() - if err := registry.CreateService(a.DB, "metacrypt", true); err != nil { + if err := registry.CreateService(a.DB, "metacrypt", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(a.DB, ®istry.Component{ @@ -207,7 +207,7 @@ func TestGetServiceStatus_FilterByName(t *testing.T) { ctx := context.Background() for _, svc := range []string{"metacrypt", "mcr"} { - if err := registry.CreateService(a.DB, svc, true); err != nil { + if err := registry.CreateService(a.DB, svc, true, ""); err != nil { t.Fatalf("create service %q: %v", svc, err) } if err := registry.CreateComponent(a.DB, ®istry.Component{ diff --git a/internal/agent/sync.go b/internal/agent/sync.go index 443bd63..2b09a4c 100644 --- a/internal/agent/sync.go +++ b/internal/agent/sync.go @@ -60,15 +60,20 @@ func (a *Agent) syncService(_ context.Context, spec *mcpv1.ServiceSpec) (*mcpv1. existing, err := registry.GetService(a.DB, spec.GetName()) if err != nil { // Service does not exist; create it. - if err := registry.CreateService(a.DB, spec.GetName(), spec.GetActive()); err != nil { + if err := registry.CreateService(a.DB, spec.GetName(), spec.GetActive(), spec.GetComment()); err != nil { return nil, nil, status.Errorf(codes.Internal, "create service %q: %v", spec.GetName(), err) } changes = append(changes, "created service") - } else if existing.Active != spec.GetActive() { - if err := registry.UpdateServiceActive(a.DB, spec.GetName(), spec.GetActive()); err != nil { + } else if existing.Active != spec.GetActive() || existing.Comment != spec.GetComment() { + if err := registry.UpdateServiceActive(a.DB, spec.GetName(), spec.GetActive(), spec.GetComment()); err != nil { return nil, nil, status.Errorf(codes.Internal, "update service %q: %v", spec.GetName(), err) } - changes = append(changes, fmt.Sprintf("active: %v -> %v", existing.Active, spec.GetActive())) + if existing.Active != spec.GetActive() { + changes = append(changes, fmt.Sprintf("active: %v -> %v", existing.Active, spec.GetActive())) + } + if existing.Comment != spec.GetComment() { + changes = append(changes, fmt.Sprintf("comment: %q", spec.GetComment())) + } } // Create or update each component. @@ -107,7 +112,7 @@ func (a *Agent) syncService(_ context.Context, spec *mcpv1.ServiceSpec) (*mcpv1. // reconcileUntracked lists all containers from the runtime and adds any that // are not already tracked in the registry with desired_state "ignore". func (a *Agent) reconcileUntracked(ctx context.Context, known map[string]bool) error { - containers, err := a.Runtime.List(ctx) + containers, err := a.listAllContainers(ctx) if err != nil { return fmt.Errorf("list containers: %w", err) } @@ -127,7 +132,7 @@ func (a *Agent) reconcileUntracked(ctx context.Context, known map[string]bool) e } if _, err := registry.GetService(a.DB, service); err != nil { - if err := registry.CreateService(a.DB, service, true); err != nil { + if err := registry.CreateService(a.DB, service, true, ""); err != nil { a.Logger.Info("reconcile: create service failed", "service", service, "error", err) continue } diff --git a/internal/agent/undeploy.go b/internal/agent/undeploy.go index 8f14834..991dc06 100644 --- a/internal/agent/undeploy.go +++ b/internal/agent/undeploy.go @@ -36,7 +36,7 @@ func (a *Agent) UndeployService(ctx context.Context, req *mcpv1.UndeployServiceR } // Mark the service as inactive. - if err := registry.UpdateServiceActive(a.DB, serviceName, false); err != nil { + if err := registry.UpdateServiceActive(a.DB, serviceName, false, ""); err != nil { a.Logger.Warn("failed to mark service inactive", "service", serviceName, "err", err) } @@ -73,10 +73,11 @@ func (a *Agent) undeployComponent(ctx context.Context, serviceName string, c *re } // 4. Stop and remove the container. - if err := a.Runtime.Stop(ctx, containerName); err != nil { + rt := a.runtimeFor(c.Runtime) + if err := rt.Stop(ctx, containerName); err != nil { a.Logger.Info("stop container (ignored)", "container", containerName, "error", err) } - if err := a.Runtime.Remove(ctx, containerName); err != nil { + if err := rt.Remove(ctx, containerName); err != nil { a.Logger.Info("remove container (ignored)", "container", containerName, "error", err) } diff --git a/internal/config/agent.go b/internal/config/agent.go index 4b4809d..92e431b 100644 --- a/internal/config/agent.go +++ b/internal/config/agent.go @@ -10,17 +10,17 @@ import ( // AgentConfig is the configuration for the mcp-agent daemon. type AgentConfig struct { - Server ServerConfig `toml:"server"` - Database DatabaseConfig `toml:"database"` - MCIAS MCIASConfig `toml:"mcias"` - Agent AgentSettings `toml:"agent"` - MCProxy MCProxyConfig `toml:"mcproxy"` - Metacrypt MetacryptConfig `toml:"metacrypt"` - MCNS MCNSConfig `toml:"mcns"` - Monitor MonitorConfig `toml:"monitor"` - Log LogConfig `toml:"log"` - Boot BootConfig `toml:"boot"` - Master AgentMasterConfig `toml:"master"` + Server ServerConfig `toml:"server"` + Database DatabaseConfig `toml:"database"` + MCIAS MCIASConfig `toml:"mcias"` + Agent AgentSettings `toml:"agent"` + MCProxy MCProxyConfig `toml:"mcproxy"` + Metacrypt MetacryptConfig `toml:"metacrypt"` + MCNS MCNSConfig `toml:"mcns"` + Monitor MonitorConfig `toml:"monitor"` + Log LogConfig `toml:"log"` + Boot BootConfig `toml:"boot"` + Master AgentMasterConfig `toml:"master"` } // AgentMasterConfig holds the optional master connection settings. diff --git a/internal/master/status.go b/internal/master/status.go index b6b8905..481e975 100644 --- a/internal/master/status.go +++ b/internal/master/status.go @@ -117,7 +117,7 @@ func (m *Master) ListNodes(_ context.Context, _ *mcpv1.ListNodesRequest) (*mcpv1 Address: n.Address, Arch: n.Arch, Status: n.Status, - Containers: int32(n.Containers), //nolint:gosec // small number + Containers: int32(n.Containers), //nolint:gosec // small number Services: int32(counts[n.Name]), //nolint:gosec // small number } if n.LastHeartbeat != nil { diff --git a/internal/monitor/monitor_test.go b/internal/monitor/monitor_test.go index ac336b0..310220c 100644 --- a/internal/monitor/monitor_test.go +++ b/internal/monitor/monitor_test.go @@ -68,7 +68,7 @@ func TestAlerterDriftDetection(t *testing.T) { al := NewAlerter(cfg, "test-node", db, logger) // Set up a service and component so CountEvents works. - if err := registry.CreateService(db, "metacrypt", true); err != nil { + if err := registry.CreateService(db, "metacrypt", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(db, ®istry.Component{ @@ -112,7 +112,7 @@ func TestAlerterCooldownSuppression(t *testing.T) { al := NewAlerter(cfg, "test-node", db, logger) - if err := registry.CreateService(db, "metacrypt", true); err != nil { + if err := registry.CreateService(db, "metacrypt", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(db, ®istry.Component{ @@ -148,7 +148,7 @@ func TestAlerterFlapDetection(t *testing.T) { al := NewAlerter(cfg, "test-node", db, logger) - if err := registry.CreateService(db, "metacrypt", true); err != nil { + if err := registry.CreateService(db, "metacrypt", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(db, ®istry.Component{ @@ -179,7 +179,7 @@ func TestMonitorTickStateChange(t *testing.T) { logger := testLogger() cfg := testMonitorConfig() - if err := registry.CreateService(db, "metacrypt", true); err != nil { + if err := registry.CreateService(db, "metacrypt", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(db, ®istry.Component{ @@ -251,7 +251,7 @@ func TestMonitorNoChangeNoEvent(t *testing.T) { logger := testLogger() cfg := testMonitorConfig() - if err := registry.CreateService(db, "metacrypt", true); err != nil { + if err := registry.CreateService(db, "metacrypt", true, ""); err != nil { t.Fatalf("create service: %v", err) } if err := registry.CreateComponent(db, ®istry.Component{ diff --git a/internal/registry/components.go b/internal/registry/components.go index b5ba364..f4b8a71 100644 --- a/internal/registry/components.go +++ b/internal/registry/components.go @@ -30,10 +30,22 @@ type Component struct { Volumes []string Cmd []string Routes []Route + Runtime string // "container" (default) or "unikernel" + MemoryMB int // unikernel guest memory in MB + VCPUs int // unikernel guest vCPUs CreatedAt time.Time UpdatedAt time.Time } +// defaultRuntime normalizes an empty runtime to "container" so the +// components.runtime column is never empty. +func defaultRuntime(r string) string { + if r == "" { + return "container" + } + return r +} + // CreateComponent creates a new component in the registry. func CreateComponent(db *sql.DB, c *Component) error { tx, err := db.Begin() @@ -43,10 +55,10 @@ func CreateComponent(db *sql.DB, c *Component) error { defer tx.Rollback() //nolint:errcheck _, err = tx.Exec(` - INSERT INTO components (name, service, image, network, user_spec, restart, desired_state, observed_state, version) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + INSERT INTO components (name, service, image, network, user_spec, restart, desired_state, observed_state, version, runtime, memory_mb, vcpus) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, c.Name, c.Service, c.Image, c.Network, c.UserSpec, c.Restart, - c.DesiredState, c.ObservedState, c.Version, + c.DesiredState, c.ObservedState, c.Version, defaultRuntime(c.Runtime), c.MemoryMB, c.VCPUs, ) if err != nil { return fmt.Errorf("insert component %q/%q: %w", c.Service, c.Name, err) @@ -74,11 +86,11 @@ func GetComponent(db *sql.DB, service, name string) (*Component, error) { var createdAt, updatedAt string err := db.QueryRow(` SELECT name, service, image, network, user_spec, restart, - desired_state, observed_state, version, created_at, updated_at + desired_state, observed_state, version, runtime, memory_mb, vcpus, created_at, updated_at FROM components WHERE service = ? AND name = ?`, service, name, ).Scan(&c.Name, &c.Service, &c.Image, &c.Network, &c.UserSpec, &c.Restart, - &c.DesiredState, &c.ObservedState, &c.Version, &createdAt, &updatedAt) + &c.DesiredState, &c.ObservedState, &c.Version, &c.Runtime, &c.MemoryMB, &c.VCPUs, &createdAt, &updatedAt) if err != nil { return nil, fmt.Errorf("get component %q/%q: %w", service, name, err) } @@ -109,7 +121,7 @@ func GetComponent(db *sql.DB, service, name string) (*Component, error) { func ListComponents(db *sql.DB, service string) ([]Component, error) { rows, err := db.Query(` SELECT name, service, image, network, user_spec, restart, - desired_state, observed_state, version, created_at, updated_at + desired_state, observed_state, version, runtime, memory_mb, vcpus, created_at, updated_at FROM components WHERE service = ? ORDER BY name`, service, ) @@ -123,7 +135,7 @@ func ListComponents(db *sql.DB, service string) ([]Component, error) { var c Component var createdAt, updatedAt string if err := rows.Scan(&c.Name, &c.Service, &c.Image, &c.Network, &c.UserSpec, &c.Restart, - &c.DesiredState, &c.ObservedState, &c.Version, &createdAt, &updatedAt); err != nil { + &c.DesiredState, &c.ObservedState, &c.Version, &c.Runtime, &c.MemoryMB, &c.VCPUs, &createdAt, &updatedAt); err != nil { return nil, fmt.Errorf("scan component: %w", err) } c.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt) @@ -169,9 +181,11 @@ func UpdateComponentSpec(db *sql.DB, c *Component) error { _, err = tx.Exec(` UPDATE components - SET image = ?, network = ?, user_spec = ?, restart = ?, version = ?, updated_at = datetime('now') + SET image = ?, network = ?, user_spec = ?, restart = ?, version = ?, + runtime = ?, memory_mb = ?, vcpus = ?, updated_at = datetime('now') WHERE service = ? AND name = ?`, - c.Image, c.Network, c.UserSpec, c.Restart, c.Version, c.Service, c.Name, + c.Image, c.Network, c.UserSpec, c.Restart, c.Version, + defaultRuntime(c.Runtime), c.MemoryMB, c.VCPUs, c.Service, c.Name, ) if err != nil { return fmt.Errorf("update component %q/%q: %w", c.Service, c.Name, err) diff --git a/internal/registry/db.go b/internal/registry/db.go index db3d367..7c911eb 100644 --- a/internal/registry/db.go +++ b/internal/registry/db.go @@ -156,4 +156,9 @@ var migrations = []string{ created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) );`, + + // Migration 5: unikernel runtime support (per-component runtime + VM resources) + `ALTER TABLE components ADD COLUMN runtime TEXT NOT NULL DEFAULT 'container'; + ALTER TABLE components ADD COLUMN memory_mb INTEGER NOT NULL DEFAULT 0; + ALTER TABLE components ADD COLUMN vcpus INTEGER NOT NULL DEFAULT 0;`, } diff --git a/internal/registry/registry_test.go b/internal/registry/registry_test.go index 3d9b32d..bf9135b 100644 --- a/internal/registry/registry_test.go +++ b/internal/registry/registry_test.go @@ -37,7 +37,7 @@ func TestServiceCRUD(t *testing.T) { db := openTestDB(t) // Create - if err := CreateService(db, "metacrypt", true); err != nil { + if err := CreateService(db, "metacrypt", true, ""); err != nil { t.Fatalf("create: %v", err) } @@ -60,7 +60,7 @@ func TestServiceCRUD(t *testing.T) { } // Update active - if err := UpdateServiceActive(db, "metacrypt", false); err != nil { + if err := UpdateServiceActive(db, "metacrypt", false, ""); err != nil { t.Fatalf("update: %v", err) } s, _ = GetService(db, "metacrypt") @@ -80,17 +80,17 @@ func TestServiceCRUD(t *testing.T) { func TestServiceDuplicateName(t *testing.T) { db := openTestDB(t) - if err := CreateService(db, "metacrypt", true); err != nil { + if err := CreateService(db, "metacrypt", true, ""); err != nil { t.Fatalf("first create: %v", err) } - if err := CreateService(db, "metacrypt", true); err == nil { + if err := CreateService(db, "metacrypt", true, ""); err == nil { t.Fatal("expected error on duplicate name") } } func TestComponentCRUD(t *testing.T) { db := openTestDB(t) - if err := CreateService(db, "metacrypt", true); err != nil { + if err := CreateService(db, "metacrypt", true, ""); err != nil { t.Fatalf("create service: %v", err) } @@ -198,7 +198,7 @@ func TestComponentCRUD(t *testing.T) { func TestComponentCompositePK(t *testing.T) { db := openTestDB(t) - if err := CreateService(db, "metacrypt", true); err != nil { + if err := CreateService(db, "metacrypt", true, ""); err != nil { t.Fatalf("create service: %v", err) } @@ -213,7 +213,7 @@ func TestComponentCompositePK(t *testing.T) { func TestCascadeDelete(t *testing.T) { db := openTestDB(t) - if err := CreateService(db, "metacrypt", true); err != nil { + if err := CreateService(db, "metacrypt", true, ""); err != nil { t.Fatalf("create service: %v", err) } @@ -239,7 +239,7 @@ func TestCascadeDelete(t *testing.T) { func TestComponentRoutes(t *testing.T) { db := openTestDB(t) - if err := CreateService(db, "svc", true); err != nil { + if err := CreateService(db, "svc", true, ""); err != nil { t.Fatalf("create service: %v", err) } @@ -298,7 +298,7 @@ func TestComponentRoutes(t *testing.T) { func TestRouteHostPort(t *testing.T) { db := openTestDB(t) - if err := CreateService(db, "svc", true); err != nil { + if err := CreateService(db, "svc", true, ""); err != nil { t.Fatalf("create service: %v", err) } @@ -363,7 +363,7 @@ func TestRouteHostPort(t *testing.T) { func TestRouteCascadeDelete(t *testing.T) { db := openTestDB(t) - if err := CreateService(db, "svc", true); err != nil { + if err := CreateService(db, "svc", true, ""); err != nil { t.Fatalf("create service: %v", err) } diff --git a/internal/registry/services.go b/internal/registry/services.go index f1d5b01..4fb5fec 100644 --- a/internal/registry/services.go +++ b/internal/registry/services.go @@ -10,15 +10,16 @@ import ( type Service struct { Name string Active bool + Comment string CreatedAt time.Time UpdatedAt time.Time } // CreateService creates a new service in the registry. -func CreateService(db *sql.DB, name string, active bool) error { +func CreateService(db *sql.DB, name string, active bool, comment string) error { _, err := db.Exec( - "INSERT INTO services (name, active) VALUES (?, ?)", - name, active, + "INSERT INTO services (name, active, comment) VALUES (?, ?, ?)", + name, active, comment, ) if err != nil { return fmt.Errorf("create service %q: %w", name, err) @@ -31,9 +32,9 @@ func GetService(db *sql.DB, name string) (*Service, error) { s := &Service{} var createdAt, updatedAt string err := db.QueryRow( - "SELECT name, active, created_at, updated_at FROM services WHERE name = ?", + "SELECT name, active, comment, created_at, updated_at FROM services WHERE name = ?", name, - ).Scan(&s.Name, &s.Active, &createdAt, &updatedAt) + ).Scan(&s.Name, &s.Active, &s.Comment, &createdAt, &updatedAt) if err != nil { return nil, fmt.Errorf("get service %q: %w", name, err) } @@ -44,7 +45,7 @@ func GetService(db *sql.DB, name string) (*Service, error) { // ListServices returns all services. func ListServices(db *sql.DB) ([]Service, error) { - rows, err := db.Query("SELECT name, active, created_at, updated_at FROM services ORDER BY name") + rows, err := db.Query("SELECT name, active, comment, created_at, updated_at FROM services ORDER BY name") if err != nil { return nil, fmt.Errorf("list services: %w", err) } @@ -54,7 +55,7 @@ func ListServices(db *sql.DB) ([]Service, error) { for rows.Next() { var s Service var createdAt, updatedAt string - if err := rows.Scan(&s.Name, &s.Active, &createdAt, &updatedAt); err != nil { + if err := rows.Scan(&s.Name, &s.Active, &s.Comment, &createdAt, &updatedAt); err != nil { return nil, fmt.Errorf("scan service: %w", err) } s.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt) @@ -64,11 +65,15 @@ func ListServices(db *sql.DB) ([]Service, error) { return services, rows.Err() } -// UpdateServiceActive updates a service's active flag. -func UpdateServiceActive(db *sql.DB, name string, active bool) error { - res, err := db.Exec( - "UPDATE services SET active = ?, updated_at = datetime('now') WHERE name = ?", - active, name, +// UpdateServiceActive updates a service's active flag and comment. If comment +// is empty, the existing comment is preserved. +func UpdateServiceActive(db *sql.DB, name string, active bool, comment string) error { + res, err := db.Exec(` + UPDATE services SET active = ?, + comment = CASE WHEN ? = '' THEN comment ELSE ? END, + updated_at = datetime('now') + WHERE name = ?`, + active, comment, comment, name, ) if err != nil { return fmt.Errorf("update service %q: %w", name, err) diff --git a/internal/runtime/qemu.go b/internal/runtime/qemu.go new file mode 100644 index 0000000..861a596 --- /dev/null +++ b/internal/runtime/qemu.go @@ -0,0 +1,558 @@ +package runtime + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "net" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "syscall" + "time" +) + +// dialUnix connects to a unix-domain socket with a timeout. +func dialUnix(path string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", path, timeout) +} + +// QEMU implements the Runtime interface by running services as Nanos +// unikernel virtual machines under QEMU/KVM instead of containers. +// +// Each service component becomes a single-process VM with its own kernel. +// The lifecycle maps onto the same Runtime interface as Podman so the agent +// can treat unikernels and containers uniformly: +// +// Pull -> pull the OCI image, extract the ELF binary, cache it +// Run -> `ops build` the binary into a Nanos image, boot it under QEMU +// Stop -> graceful QMP powerdown, then SIGTERM/SIGKILL the QEMU process +// Remove -> stop and delete the VM state directory +// Inspect -> read persisted metadata + check process liveness +// List -> enumerate VM state directories +// Logs -> stream the serial console log file +// +// Phase 1 uses QEMU user-mode networking with host port forwards, which is +// functionally equivalent to rootless podman's localhost port mappings: +// mc-proxy routes to 127.0.0.1: exactly as it does for containers. +// Isolated bridge networking is a later phase. +type QEMU struct { + // ImageDir holds built Nanos images and extracted binaries. + // Default: /srv/mcp/images + ImageDir string + // StateDir holds per-VM runtime state (pidfile, QMP socket, console log, + // metadata). Default: /srv/mcp/vm + StateDir string + // OpsPath is the path to the `ops` Nanos toolchain binary. Default: "ops". + OpsPath string + // QemuPath is the path to qemu-system-x86_64. Default: "qemu-system-x86_64". + QemuPath string + // Memory is the default guest memory in MB when a spec does not set one. + Memory int + // HomeDir is set as $HOME for `ops` so it uses a stable ~/.ops directory. + HomeDir string +} + +func (q *QEMU) imageDir() string { + if q.ImageDir != "" { + return q.ImageDir + } + return "/srv/mcp/images" +} + +func (q *QEMU) stateDir() string { + if q.StateDir != "" { + return q.StateDir + } + return "/srv/mcp/vm" +} + +func (q *QEMU) opsPath() string { + if q.OpsPath != "" { + return q.OpsPath + } + return "ops" +} + +func (q *QEMU) qemuPath() string { + if q.QemuPath != "" { + return q.QemuPath + } + return "qemu-system-x86_64" +} + +func (q *QEMU) memory() int { + if q.Memory > 0 { + return q.Memory + } + return 256 +} + +// opsEnv returns the environment for invoking `ops`, pinning $HOME so its +// cache and image directory are stable across invocations. +func (q *QEMU) opsEnv() []string { + env := os.Environ() + if q.HomeDir != "" { + env = append(env, "HOME="+q.HomeDir) + } + return env +} + +// sanitizeImage turns an image reference into a filesystem-safe stem. +// +// "mcr.example:8443/mcdoc:v0.1.0" -> "mcr.example_8443_mcdoc_v0.1.0" +func sanitizeImage(image string) string { + r := strings.NewReplacer("/", "_", ":", "_") + return r.Replace(image) +} + +// binaryName derives the in-image ELF binary name from an image reference by +// taking the repository basename. "host:8443/mcdoc:v0.1.0" -> "mcdoc". +func binaryName(image string) string { + name := image + if i := strings.LastIndex(name, "/"); i >= 0 { + name = name[i+1:] + } + if i := strings.Index(name, ":"); i >= 0 { + name = name[:i] + } + return name +} + +func (q *QEMU) binPath(image string) string { + return filepath.Join(q.imageDir(), sanitizeImage(image)+".bin") +} + +func (q *QEMU) imgPath(name string) string { + return filepath.Join(q.imageDir(), name+".img") +} + +func (q *QEMU) vmDir(name string) string { + return filepath.Join(q.stateDir(), name) +} + +// vmMeta is the persisted per-VM metadata written at Run time so that +// Inspect/List can report accurate information after an agent restart. +type vmMeta struct { + Name string `json:"name"` + Image string `json:"image"` + User string `json:"user"` + Restart string `json:"restart"` + Ports []string `json:"ports"` + Volumes []string `json:"volumes"` + Cmd []string `json:"cmd"` + MemoryMB int `json:"memory_mb"` + VCPUs int `json:"vcpus"` + ImageHash string `json:"image_hash"` + Started time.Time `json:"started"` +} + +// Pull pulls the OCI image and extracts its ELF binary into the image cache. +// The binary is the input to `ops build`; the Nanos image itself is built at +// Run time so the service's command arguments can be baked in. +func (q *QEMU) Pull(ctx context.Context, image string) error { + if err := os.MkdirAll(q.imageDir(), 0o750); err != nil { + return fmt.Errorf("create image dir: %w", err) + } + + // Pull the OCI image via podman (reuses the agent's registry auth). + if out, err := exec.CommandContext(ctx, "podman", "pull", image).CombinedOutput(); err != nil { //nolint:gosec // args built programmatically + return fmt.Errorf("podman pull %q: %w: %s", image, err, out) + } + + // Create (do not start) a container to copy the binary out of. + tmp := "ukextract-" + sanitizeImage(image) + _ = exec.CommandContext(ctx, "podman", "rm", "-f", tmp).Run() //nolint:gosec + if out, err := exec.CommandContext(ctx, "podman", "create", "--name", tmp, image).CombinedOutput(); err != nil { //nolint:gosec + return fmt.Errorf("podman create %q: %w: %s", image, err, out) + } + defer func() { _ = exec.Command("podman", "rm", "-f", tmp).Run() }() //nolint:gosec + + bin := binaryName(image) + src := tmp + ":/usr/local/bin/" + bin + dst := q.binPath(image) + if out, err := exec.CommandContext(ctx, "podman", "cp", src, dst).CombinedOutput(); err != nil { //nolint:gosec + return fmt.Errorf("extract binary %q from %q: %w: %s", bin, image, err, out) + } + if err := os.Chmod(dst, 0o755); err != nil { //nolint:gosec // unikernel ELF must be executable + return fmt.Errorf("chmod extracted binary: %w", err) + } + return nil +} + +// opsConfig is the subset of the `ops` build configuration we generate. +type opsConfig struct { + Args []string `json:"Args,omitempty"` + Env map[string]string `json:"Env,omitempty"` + RunConfig opsRunConfig `json:"RunConfig"` +} + +type opsRunConfig struct { + Ports []string `json:"Ports,omitempty"` + Memory string `json:"Memory,omitempty"` + CPUs int `json:"CPUs,omitempty"` + Klibs []string `json:"Klibs,omitempty"` + Mounts any `json:"Mounts,omitempty"` + NoTrace []string `json:"NoTrace,omitempty"` + GDBPort int `json:"GDBPort,omitempty"` + Nanos string `json:"Nanos,omitempty"` + Hostname string `json:"Hostname,omitempty"` +} + +// guestPorts extracts the guest (container) port from each spec port mapping. +// Accepts "host:container", "ip:host:container", or a bare "port". +func guestPorts(ports []string) []string { + var gp []string + for _, p := range ports { + parts := strings.Split(p, ":") + gp = append(gp, parts[len(parts)-1]) + } + return gp +} + +// hostForward builds the QEMU hostfwd value for a spec port mapping. +// "ip:host:container" -> "tcp:ip:host-:container" +// "host:container" -> "tcp:127.0.0.1:host-:container" +func hostForward(p string) string { + parts := strings.Split(p, ":") + switch len(parts) { + case 3: + return fmt.Sprintf("tcp:%s:%s-:%s", parts[0], parts[1], parts[2]) + case 2: + return fmt.Sprintf("tcp:127.0.0.1:%s-:%s", parts[0], parts[1]) + default: + return fmt.Sprintf("tcp:127.0.0.1:%s-:%s", parts[0], parts[0]) + } +} + +// Run builds the Nanos image (if needed) and boots it under QEMU/KVM. +func (q *QEMU) Run(ctx context.Context, spec ContainerSpec) error { + if err := os.MkdirAll(q.vmDir(spec.Name), 0o750); err != nil { + return fmt.Errorf("create vm state dir: %w", err) + } + + mem := spec.MemoryMB + if mem <= 0 { + mem = q.memory() + } + cpus := spec.VCPUs + if cpus <= 0 { + cpus = 1 + } + + // Build the Nanos image from the extracted binary, baking in command args. + cfg := opsConfig{ + Args: spec.Cmd, + RunConfig: opsRunConfig{ + Ports: guestPorts(spec.Ports), + Memory: strconv.Itoa(mem) + "m", + CPUs: cpus, + }, + } + if len(spec.Env) > 0 { + cfg.Env = map[string]string{} + for _, e := range spec.Env { + if i := strings.Index(e, "="); i >= 0 { + cfg.Env[e[:i]] = e[i+1:] + } + } + } + cfgPath := filepath.Join(q.vmDir(spec.Name), "ops.json") + cfgBytes, err := json.MarshalIndent(cfg, "", " ") + if err != nil { + return fmt.Errorf("marshal ops config: %w", err) + } + if err := os.WriteFile(cfgPath, cfgBytes, 0o640); err != nil { //nolint:gosec // mcp-group-readable config + return fmt.Errorf("write ops config: %w", err) + } + + img := q.imgPath(spec.Name) + bin := q.binPath(spec.Image) + if _, err := os.Stat(bin); err != nil { + return fmt.Errorf("binary not found for %q (Pull first): %w", spec.Image, err) + } + build := exec.CommandContext(ctx, q.opsPath(), "build", bin, "-c", cfgPath, "-i", spec.Name) //nolint:gosec + build.Env = q.opsEnv() + if out, err := build.CombinedOutput(); err != nil { + return fmt.Errorf("ops build %q: %w: %s", spec.Name, err, out) + } + // ops writes to ~/.ops/images/.img; move it into our image dir. + opsImg := filepath.Join(q.opsImagesDir(), spec.Name+".img") + if _, err := os.Stat(opsImg); err == nil { + if err := os.Rename(opsImg, img); err != nil { + // Cross-device fallback: copy. + if cpErr := copyFile(opsImg, img); cpErr != nil { + return fmt.Errorf("relocate built image: %w", err) + } + _ = os.Remove(opsImg) + } + } + + hash, _ := fileSHA256(img) + + // Assemble QEMU invocation: KVM-accelerated, headless, serial console to a + // file, QMP control socket, virtio disk + NIC with user-mode port forwards. + netdev := "user,id=n0" + for _, p := range spec.Ports { + netdev += ",hostfwd=" + hostForward(p) + } + args := []string{ + "-enable-kvm", + "-m", strconv.Itoa(mem), + "-smp", strconv.Itoa(cpus), + "-display", "none", + "-no-reboot", + "-daemonize", + "-pidfile", filepath.Join(q.vmDir(spec.Name), "qemu.pid"), + "-serial", "file:" + filepath.Join(q.vmDir(spec.Name), "console.log"), + "-qmp", "unix:" + filepath.Join(q.vmDir(spec.Name), "qmp.sock") + ",server,nowait", + "-drive", "file=" + img + ",format=raw,if=virtio", + "-device", "virtio-net-pci,netdev=n0", + "-netdev", netdev, + } + // 9p passthrough for host /srv/ volumes (best-effort; Nanos must + // support the 9p client for the guest to mount it). + for i, v := range spec.Volumes { + parts := strings.SplitN(v, ":", 2) + host := parts[0] + tag := fmt.Sprintf("srv%d", i) + args = append(args, + "-virtfs", fmt.Sprintf("local,path=%s,mount_tag=%s,security_model=none,id=%s", host, tag, tag), + ) + } + + cmd := exec.CommandContext(ctx, q.qemuPath(), args...) //nolint:gosec + if out, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("qemu launch %q: %w: %s", spec.Name, err, out) + } + + meta := vmMeta{ + Name: spec.Name, + Image: spec.Image, + User: spec.User, + Restart: spec.Restart, + Ports: spec.Ports, + Volumes: spec.Volumes, + Cmd: spec.Cmd, + MemoryMB: mem, + VCPUs: cpus, + ImageHash: hash, + Started: time.Now().UTC(), + } + return q.writeMeta(spec.Name, meta) +} + +func (q *QEMU) opsImagesDir() string { + home := q.HomeDir + if home == "" { + home, _ = os.UserHomeDir() + } + return filepath.Join(home, ".ops", "images") +} + +func (q *QEMU) writeMeta(name string, m vmMeta) error { + b, err := json.MarshalIndent(m, "", " ") + if err != nil { + return fmt.Errorf("marshal vm meta: %w", err) + } + return os.WriteFile(filepath.Join(q.vmDir(name), "meta.json"), b, 0o640) //nolint:gosec // mcp-group-readable metadata +} + +func (q *QEMU) readMeta(name string) (vmMeta, error) { + var m vmMeta + b, err := os.ReadFile(filepath.Join(q.vmDir(name), "meta.json")) + if err != nil { + return m, err + } + return m, json.Unmarshal(b, &m) +} + +// pidOf returns the running QEMU pid for a VM, or 0 if not running. +func (q *QEMU) pidOf(name string) int { + b, err := os.ReadFile(filepath.Join(q.vmDir(name), "qemu.pid")) + if err != nil { + return 0 + } + pid, err := strconv.Atoi(strings.TrimSpace(string(b))) + if err != nil || pid <= 0 { + return 0 + } + if err := syscall.Kill(pid, 0); err != nil { + return 0 + } + return pid +} + +// Stop gracefully powers down the VM, falling back to SIGTERM/SIGKILL. +func (q *QEMU) Stop(ctx context.Context, name string) error { + pid := q.pidOf(name) + if pid == 0 { + return nil + } + // Try a graceful QMP system_powerdown. + _ = q.qmpCommand(name, "system_powerdown") + deadline := time.Now().Add(10 * time.Second) + for time.Now().Before(deadline) { + if q.pidOf(name) == 0 { + return nil + } + time.Sleep(300 * time.Millisecond) + } + // Escalate. + _ = syscall.Kill(pid, syscall.SIGTERM) + time.Sleep(2 * time.Second) + if q.pidOf(name) != 0 { + _ = syscall.Kill(pid, syscall.SIGKILL) + } + return nil +} + +// qmpCommand sends a single QMP command over the VM's control socket. +func (q *QEMU) qmpCommand(name, command string) error { + sock := filepath.Join(q.vmDir(name), "qmp.sock") + conn, err := dialUnix(sock, 3*time.Second) + if err != nil { + return err + } + defer func() { _ = conn.Close() }() + // QMP handshake: read greeting, send qmp_capabilities, then the command. + dec := json.NewDecoder(conn) + var greeting map[string]any + _ = dec.Decode(&greeting) + _, _ = conn.Write([]byte(`{"execute":"qmp_capabilities"}`)) + var ack map[string]any + _ = dec.Decode(&ack) + _, err = conn.Write([]byte(`{"execute":"` + command + `"}`)) + return err +} + +// Remove stops the VM and deletes its state directory. +func (q *QEMU) Remove(ctx context.Context, name string) error { + _ = q.Stop(ctx, name) + if pid := q.pidOf(name); pid != 0 { + _ = syscall.Kill(pid, syscall.SIGKILL) + } + return os.RemoveAll(q.vmDir(name)) +} + +// Inspect reports the observed state of a VM. +func (q *QEMU) Inspect(ctx context.Context, name string) (ContainerInfo, error) { + m, err := q.readMeta(name) + if err != nil { + return ContainerInfo{}, fmt.Errorf("qemu inspect %q: %w", name, err) + } + state := "stopped" + if q.pidOf(name) != 0 { + state = "running" + } + return q.infoFromMeta(m, state), nil +} + +func (q *QEMU) infoFromMeta(m vmMeta, state string) ContainerInfo { + return ContainerInfo{ + Name: m.Name, + Image: m.Image, + State: state, + Network: "user", + User: m.User, + Restart: m.Restart, + Ports: m.Ports, + Volumes: m.Volumes, + Cmd: m.Cmd, + Version: ExtractVersion(m.Image), + Started: m.Started, + } +} + +// List enumerates all VMs known from the state directory. +func (q *QEMU) List(ctx context.Context) ([]ContainerInfo, error) { + entries, err := os.ReadDir(q.stateDir()) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, fmt.Errorf("read vm state dir: %w", err) + } + var infos []ContainerInfo + for _, e := range entries { + if !e.IsDir() { + continue + } + m, err := q.readMeta(e.Name()) + if err != nil { + continue + } + state := "stopped" + if q.pidOf(e.Name()) != 0 { + state = "running" + } + infos = append(infos, q.infoFromMeta(m, state)) + } + return infos, nil +} + +// Logs streams the VM's serial console log. +func (q *QEMU) Logs(ctx context.Context, name string, tail int, follow, timestamps bool, since string) *exec.Cmd { + console := filepath.Join(q.vmDir(name), "console.log") + args := []string{} + if follow { + args = append(args, "-f") + } + if tail > 0 { + args = append(args, "-n", strconv.Itoa(tail)) + } else { + args = append(args, "-n", "+1") + } + args = append(args, console) + return exec.CommandContext(ctx, "tail", args...) //nolint:gosec +} + +// Build builds a Nanos image from a context directory's binary. Used by the +// `mcp build --unikernel` path. Not the primary deploy path. +func (q *QEMU) Build(ctx context.Context, image, contextDir, dockerfile string) error { + return fmt.Errorf("qemu build: not implemented; build OCI image then Pull") +} + +// Push is not implemented for the QEMU runtime. +func (q *QEMU) Push(ctx context.Context, image string) error { + return fmt.Errorf("qemu push: not implemented") +} + +// ImageExists reports whether the extracted binary for the image is cached. +func (q *QEMU) ImageExists(ctx context.Context, image string) (bool, error) { + if _, err := os.Stat(q.binPath(image)); err == nil { + return true, nil + } + return false, nil +} + +// Login delegates registry auth to podman (shared credential store). +func (q *QEMU) Login(ctx context.Context, registry, username, token string) error { + cmd := exec.CommandContext(ctx, "podman", "login", "--username", username, "--password-stdin", registry) //nolint:gosec + cmd.Stdin = strings.NewReader(token) + if out, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("podman login %q: %w: %s", registry, err, out) + } + return nil +} + +func fileSHA256(path string) (string, error) { + b, err := os.ReadFile(path) //nolint:gosec // hashing a known image path + if err != nil { + return "", err + } + sum := sha256.Sum256(b) + return hex.EncodeToString(sum[:]), nil +} + +func copyFile(src, dst string) error { + in, err := os.ReadFile(src) //nolint:gosec // relocating a built image + if err != nil { + return err + } + return os.WriteFile(dst, in, 0o640) //nolint:gosec // relocating a built image +} diff --git a/internal/runtime/qemu_test.go b/internal/runtime/qemu_test.go new file mode 100644 index 0000000..99c862d --- /dev/null +++ b/internal/runtime/qemu_test.go @@ -0,0 +1,43 @@ +package runtime + +import "testing" + +func TestSanitizeImage(t *testing.T) { + got := sanitizeImage("mcr.example:8443/mcdoc:v0.1.0") + if got != "mcr.example_8443_mcdoc_v0.1.0" { + t.Errorf("sanitizeImage = %q", got) + } +} + +func TestBinaryName(t *testing.T) { + cases := map[string]string{ + "host:8443/mcdoc:v0.1.0": "mcdoc", + "mcdoc:v1": "mcdoc", + "reg/uktest": "uktest", + } + for in, want := range cases { + if got := binaryName(in); got != want { + t.Errorf("binaryName(%q) = %q, want %q", in, got, want) + } + } +} + +func TestHostForward(t *testing.T) { + cases := map[string]string{ + "100.88.197.9:18080:8080": "tcp:100.88.197.9:18080-:8080", + "18080:8080": "tcp:127.0.0.1:18080-:8080", + "8080": "tcp:127.0.0.1:8080-:8080", + } + for in, want := range cases { + if got := hostForward(in); got != want { + t.Errorf("hostForward(%q) = %q, want %q", in, got, want) + } + } +} + +func TestGuestPorts(t *testing.T) { + got := guestPorts([]string{"100.88.197.9:18080:8080", "9090"}) + if len(got) != 2 || got[0] != "8080" || got[1] != "9090" { + t.Errorf("guestPorts = %v", got) + } +} diff --git a/internal/runtime/runtime.go b/internal/runtime/runtime.go index 146567b..2dad21f 100644 --- a/internal/runtime/runtime.go +++ b/internal/runtime/runtime.go @@ -17,6 +17,10 @@ type ContainerSpec struct { Volumes []string // "host:container" volume mounts Cmd []string // command and arguments Env []string // environment variables (KEY=VALUE) + + // Unikernel-only fields (ignored by the container runtime). + MemoryMB int // guest memory in MB (default 256) + VCPUs int // guest vCPUs (default 1) } // ContainerInfo describes the observed state of a running or stopped container. diff --git a/internal/servicedef/servicedef.go b/internal/servicedef/servicedef.go index fae858d..3ef5316 100644 --- a/internal/servicedef/servicedef.go +++ b/internal/servicedef/servicedef.go @@ -53,6 +53,9 @@ type ComponentDef struct { Cmd []string `toml:"cmd,omitempty"` Routes []RouteDef `toml:"routes,omitempty"` Env []string `toml:"env,omitempty"` + Runtime string `toml:"runtime,omitempty"` // "container" (default) or "unikernel" + Memory int `toml:"memory,omitempty"` // unikernel guest memory in MB + VCPUs int `toml:"vcpus,omitempty"` // unikernel guest vCPUs } // Load reads and parses a TOML service definition file. If the active field @@ -204,15 +207,18 @@ func ToProto(def *ServiceDef) *mcpv1.ServiceSpec { for _, c := range def.Components { cs := &mcpv1.ComponentSpec{ - Name: c.Name, - Image: c.Image, - Network: c.Network, - User: c.User, - Restart: c.Restart, - Ports: c.Ports, - Volumes: c.Volumes, - Cmd: c.Cmd, - Env: c.Env, + Name: c.Name, + Image: c.Image, + Network: c.Network, + User: c.User, + Restart: c.Restart, + Ports: c.Ports, + Volumes: c.Volumes, + Cmd: c.Cmd, + Env: c.Env, + Runtime: c.Runtime, + MemoryMb: int32(c.Memory), //nolint:gosec // small config value + Vcpus: int32(c.VCPUs), //nolint:gosec // small config value } for _, r := range c.Routes { cs.Routes = append(cs.Routes, &mcpv1.RouteSpec{ diff --git a/proto/mcp/v1/mcp.proto b/proto/mcp/v1/mcp.proto index 748b360..78bc81a 100644 --- a/proto/mcp/v1/mcp.proto +++ b/proto/mcp/v1/mcp.proto @@ -75,6 +75,9 @@ message ComponentSpec { repeated string cmd = 8; repeated RouteSpec routes = 9; repeated string env = 10; + string runtime = 11; // "container" (default) or "unikernel" + int32 memory_mb = 12; // unikernel guest memory in MB (default 256) + int32 vcpus = 13; // unikernel guest vCPUs (default 1) } message SnapshotConfig {