Implements the hypervisor design's Phase 1: a second runtime.Runtime backend (QEMU) that runs each service component as a Nanos unikernel VM instead of a podman container, selected per-component via a new runtime = "unikernel" service-def field. - internal/runtime/qemu.go: QEMURuntime. Pull extracts the ELF from the OCI image; Run does `ops build` + boots qemu-system-x86_64 with KVM, user-mode net port-forwards, QMP control socket and serial console log; Stop/Remove/Inspect/List/Logs map onto VM lifecycle + state dir. - proto/registry/servicedef: add runtime, memory_mb, vcpus fields (registry migration 5). - agent: holds both runtimes; runtimeFor() selects per component; listAllContainers() merges containers + VMs so drift/status see both. Unikernel runtime auto-enables on nodes with /dev/kvm + ops. Validated end-to-end on straylight: a test service deploys via `mcp deploy --direct`, boots as a Nanos unikernel, serves HTTP through the agent port-forward, and reports running via `mcp status`/`mcp logs`. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
394 lines
12 KiB
Go
394 lines
12 KiB
Go
package registry
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"time"
|
|
)
|
|
|
|
// Route represents a route entry for a component in the registry.
|
|
type Route struct {
|
|
Name string
|
|
Port int
|
|
Mode string
|
|
Hostname string
|
|
HostPort int // agent-assigned host port (0 = not yet allocated)
|
|
}
|
|
|
|
// Component represents a component in the registry.
|
|
type Component struct {
|
|
Name string
|
|
Service string
|
|
Image string
|
|
Network string
|
|
UserSpec string
|
|
Restart string
|
|
DesiredState string
|
|
ObservedState string
|
|
Version string
|
|
Ports []string
|
|
Volumes []string
|
|
Cmd []string
|
|
Routes []Route
|
|
Runtime string // "container" (default) or "unikernel"
|
|
MemoryMB int // unikernel guest memory in MB
|
|
VCPUs int // unikernel guest vCPUs
|
|
CreatedAt time.Time
|
|
UpdatedAt time.Time
|
|
}
|
|
|
|
// defaultRuntime normalizes an empty runtime to "container" so the
|
|
// components.runtime column is never empty.
|
|
func defaultRuntime(r string) string {
|
|
if r == "" {
|
|
return "container"
|
|
}
|
|
return r
|
|
}
|
|
|
|
// CreateComponent creates a new component in the registry.
|
|
func CreateComponent(db *sql.DB, c *Component) error {
|
|
tx, err := db.Begin()
|
|
if err != nil {
|
|
return fmt.Errorf("begin tx: %w", err)
|
|
}
|
|
defer tx.Rollback() //nolint:errcheck
|
|
|
|
_, err = tx.Exec(`
|
|
INSERT INTO components (name, service, image, network, user_spec, restart, desired_state, observed_state, version, runtime, memory_mb, vcpus)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
c.Name, c.Service, c.Image, c.Network, c.UserSpec, c.Restart,
|
|
c.DesiredState, c.ObservedState, c.Version, defaultRuntime(c.Runtime), c.MemoryMB, c.VCPUs,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("insert component %q/%q: %w", c.Service, c.Name, err)
|
|
}
|
|
|
|
if err := setPorts(tx, c.Service, c.Name, c.Ports); err != nil {
|
|
return err
|
|
}
|
|
if err := setVolumes(tx, c.Service, c.Name, c.Volumes); err != nil {
|
|
return err
|
|
}
|
|
if err := setCmd(tx, c.Service, c.Name, c.Cmd); err != nil {
|
|
return err
|
|
}
|
|
if err := setRoutes(tx, c.Service, c.Name, c.Routes); err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
// GetComponent retrieves a component by service and name.
|
|
func GetComponent(db *sql.DB, service, name string) (*Component, error) {
|
|
c := &Component{}
|
|
var createdAt, updatedAt string
|
|
err := db.QueryRow(`
|
|
SELECT name, service, image, network, user_spec, restart,
|
|
desired_state, observed_state, version, runtime, memory_mb, vcpus, created_at, updated_at
|
|
FROM components WHERE service = ? AND name = ?`,
|
|
service, name,
|
|
).Scan(&c.Name, &c.Service, &c.Image, &c.Network, &c.UserSpec, &c.Restart,
|
|
&c.DesiredState, &c.ObservedState, &c.Version, &c.Runtime, &c.MemoryMB, &c.VCPUs, &createdAt, &updatedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get component %q/%q: %w", service, name, err)
|
|
}
|
|
c.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt)
|
|
c.UpdatedAt, _ = time.Parse("2006-01-02 15:04:05", updatedAt)
|
|
|
|
c.Ports, err = getPorts(db, service, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.Volumes, err = getVolumes(db, service, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.Cmd, err = getCmd(db, service, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.Routes, err = getRoutes(db, service, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return c, nil
|
|
}
|
|
|
|
// ListComponents returns all components for a service.
|
|
func ListComponents(db *sql.DB, service string) ([]Component, error) {
|
|
rows, err := db.Query(`
|
|
SELECT name, service, image, network, user_spec, restart,
|
|
desired_state, observed_state, version, runtime, memory_mb, vcpus, created_at, updated_at
|
|
FROM components WHERE service = ? ORDER BY name`,
|
|
service,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list components for %q: %w", service, err)
|
|
}
|
|
defer func() { _ = rows.Close() }()
|
|
|
|
var components []Component
|
|
for rows.Next() {
|
|
var c Component
|
|
var createdAt, updatedAt string
|
|
if err := rows.Scan(&c.Name, &c.Service, &c.Image, &c.Network, &c.UserSpec, &c.Restart,
|
|
&c.DesiredState, &c.ObservedState, &c.Version, &c.Runtime, &c.MemoryMB, &c.VCPUs, &createdAt, &updatedAt); err != nil {
|
|
return nil, fmt.Errorf("scan component: %w", err)
|
|
}
|
|
c.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt)
|
|
c.UpdatedAt, _ = time.Parse("2006-01-02 15:04:05", updatedAt)
|
|
|
|
c.Ports, _ = getPorts(db, c.Service, c.Name)
|
|
c.Volumes, _ = getVolumes(db, c.Service, c.Name)
|
|
c.Cmd, _ = getCmd(db, c.Service, c.Name)
|
|
c.Routes, _ = getRoutes(db, c.Service, c.Name)
|
|
|
|
components = append(components, c)
|
|
}
|
|
return components, rows.Err()
|
|
}
|
|
|
|
// UpdateComponentState updates desired and/or observed state.
|
|
func UpdateComponentState(db *sql.DB, service, name, desiredState, observedState string) error {
|
|
res, err := db.Exec(`
|
|
UPDATE components
|
|
SET desired_state = CASE WHEN ? = '' THEN desired_state ELSE ? END,
|
|
observed_state = CASE WHEN ? = '' THEN observed_state ELSE ? END,
|
|
updated_at = datetime('now')
|
|
WHERE service = ? AND name = ?`,
|
|
desiredState, desiredState, observedState, observedState, service, name,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("update state %q/%q: %w", service, name, err)
|
|
}
|
|
n, _ := res.RowsAffected()
|
|
if n == 0 {
|
|
return fmt.Errorf("update state %q/%q: %w", service, name, sql.ErrNoRows)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateComponentSpec updates the full spec for an existing component.
|
|
func UpdateComponentSpec(db *sql.DB, c *Component) error {
|
|
tx, err := db.Begin()
|
|
if err != nil {
|
|
return fmt.Errorf("begin tx: %w", err)
|
|
}
|
|
defer tx.Rollback() //nolint:errcheck
|
|
|
|
_, err = tx.Exec(`
|
|
UPDATE components
|
|
SET image = ?, network = ?, user_spec = ?, restart = ?, version = ?,
|
|
runtime = ?, memory_mb = ?, vcpus = ?, updated_at = datetime('now')
|
|
WHERE service = ? AND name = ?`,
|
|
c.Image, c.Network, c.UserSpec, c.Restart, c.Version,
|
|
defaultRuntime(c.Runtime), c.MemoryMB, c.VCPUs, c.Service, c.Name,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("update component %q/%q: %w", c.Service, c.Name, err)
|
|
}
|
|
|
|
if err := setPorts(tx, c.Service, c.Name, c.Ports); err != nil {
|
|
return err
|
|
}
|
|
if err := setVolumes(tx, c.Service, c.Name, c.Volumes); err != nil {
|
|
return err
|
|
}
|
|
if err := setCmd(tx, c.Service, c.Name, c.Cmd); err != nil {
|
|
return err
|
|
}
|
|
if err := setRoutes(tx, c.Service, c.Name, c.Routes); err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
// DeleteComponent removes a component.
|
|
func DeleteComponent(db *sql.DB, service, name string) error {
|
|
res, err := db.Exec("DELETE FROM components WHERE service = ? AND name = ?", service, name)
|
|
if err != nil {
|
|
return fmt.Errorf("delete component %q/%q: %w", service, name, err)
|
|
}
|
|
n, _ := res.RowsAffected()
|
|
if n == 0 {
|
|
return fmt.Errorf("delete component %q/%q: %w", service, name, sql.ErrNoRows)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// helper: set port mappings (delete + re-insert)
|
|
func setPorts(tx *sql.Tx, service, component string, ports []string) error {
|
|
if _, err := tx.Exec("DELETE FROM component_ports WHERE service = ? AND component = ?", service, component); err != nil {
|
|
return fmt.Errorf("clear ports %q/%q: %w", service, component, err)
|
|
}
|
|
for _, p := range ports {
|
|
if _, err := tx.Exec("INSERT INTO component_ports (service, component, mapping) VALUES (?, ?, ?)", service, component, p); err != nil {
|
|
return fmt.Errorf("insert port %q/%q %q: %w", service, component, p, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func getPorts(db *sql.DB, service, component string) ([]string, error) {
|
|
rows, err := db.Query("SELECT mapping FROM component_ports WHERE service = ? AND component = ? ORDER BY mapping", service, component)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get ports %q/%q: %w", service, component, err)
|
|
}
|
|
defer func() { _ = rows.Close() }()
|
|
var ports []string
|
|
for rows.Next() {
|
|
var p string
|
|
if err := rows.Scan(&p); err != nil {
|
|
return nil, err
|
|
}
|
|
ports = append(ports, p)
|
|
}
|
|
return ports, rows.Err()
|
|
}
|
|
|
|
// helper: set volume mappings
|
|
func setVolumes(tx *sql.Tx, service, component string, volumes []string) error {
|
|
if _, err := tx.Exec("DELETE FROM component_volumes WHERE service = ? AND component = ?", service, component); err != nil {
|
|
return fmt.Errorf("clear volumes %q/%q: %w", service, component, err)
|
|
}
|
|
for _, v := range volumes {
|
|
if _, err := tx.Exec("INSERT INTO component_volumes (service, component, mapping) VALUES (?, ?, ?)", service, component, v); err != nil {
|
|
return fmt.Errorf("insert volume %q/%q %q: %w", service, component, v, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func getVolumes(db *sql.DB, service, component string) ([]string, error) {
|
|
rows, err := db.Query("SELECT mapping FROM component_volumes WHERE service = ? AND component = ? ORDER BY mapping", service, component)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get volumes %q/%q: %w", service, component, err)
|
|
}
|
|
defer func() { _ = rows.Close() }()
|
|
var volumes []string
|
|
for rows.Next() {
|
|
var v string
|
|
if err := rows.Scan(&v); err != nil {
|
|
return nil, err
|
|
}
|
|
volumes = append(volumes, v)
|
|
}
|
|
return volumes, rows.Err()
|
|
}
|
|
|
|
// helper: set command args
|
|
func setCmd(tx *sql.Tx, service, component string, cmd []string) error {
|
|
if _, err := tx.Exec("DELETE FROM component_cmd WHERE service = ? AND component = ?", service, component); err != nil {
|
|
return fmt.Errorf("clear cmd %q/%q: %w", service, component, err)
|
|
}
|
|
for i, arg := range cmd {
|
|
if _, err := tx.Exec("INSERT INTO component_cmd (service, component, position, arg) VALUES (?, ?, ?, ?)", service, component, i, arg); err != nil {
|
|
return fmt.Errorf("insert cmd %q/%q [%d]: %w", service, component, i, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func getCmd(db *sql.DB, service, component string) ([]string, error) {
|
|
rows, err := db.Query("SELECT arg FROM component_cmd WHERE service = ? AND component = ? ORDER BY position", service, component)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get cmd %q/%q: %w", service, component, err)
|
|
}
|
|
defer func() { _ = rows.Close() }()
|
|
var cmd []string
|
|
for rows.Next() {
|
|
var arg string
|
|
if err := rows.Scan(&arg); err != nil {
|
|
return nil, err
|
|
}
|
|
cmd = append(cmd, arg)
|
|
}
|
|
return cmd, rows.Err()
|
|
}
|
|
|
|
// helper: set route definitions (delete + re-insert)
|
|
func setRoutes(tx *sql.Tx, service, component string, routes []Route) error {
|
|
if _, err := tx.Exec("DELETE FROM component_routes WHERE service = ? AND component = ?", service, component); err != nil {
|
|
return fmt.Errorf("clear routes %q/%q: %w", service, component, err)
|
|
}
|
|
for _, r := range routes {
|
|
mode := r.Mode
|
|
if mode == "" {
|
|
mode = "l4"
|
|
}
|
|
name := r.Name
|
|
if name == "" {
|
|
name = "default"
|
|
}
|
|
if _, err := tx.Exec(
|
|
"INSERT INTO component_routes (service, component, name, port, mode, hostname, host_port) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
service, component, name, r.Port, mode, r.Hostname, r.HostPort,
|
|
); err != nil {
|
|
return fmt.Errorf("insert route %q/%q %q: %w", service, component, name, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func getRoutes(db *sql.DB, service, component string) ([]Route, error) {
|
|
rows, err := db.Query(
|
|
"SELECT name, port, mode, hostname, host_port FROM component_routes WHERE service = ? AND component = ? ORDER BY name",
|
|
service, component,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get routes %q/%q: %w", service, component, err)
|
|
}
|
|
defer func() { _ = rows.Close() }()
|
|
var routes []Route
|
|
for rows.Next() {
|
|
var r Route
|
|
if err := rows.Scan(&r.Name, &r.Port, &r.Mode, &r.Hostname, &r.HostPort); err != nil {
|
|
return nil, err
|
|
}
|
|
routes = append(routes, r)
|
|
}
|
|
return routes, rows.Err()
|
|
}
|
|
|
|
// UpdateRouteHostPort updates the agent-assigned host port for a specific route.
|
|
func UpdateRouteHostPort(db *sql.DB, service, component, routeName string, hostPort int) error {
|
|
res, err := db.Exec(
|
|
"UPDATE component_routes SET host_port = ? WHERE service = ? AND component = ? AND name = ?",
|
|
hostPort, service, component, routeName,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("update route host_port %q/%q/%q: %w", service, component, routeName, err)
|
|
}
|
|
n, _ := res.RowsAffected()
|
|
if n == 0 {
|
|
return fmt.Errorf("update route host_port %q/%q/%q: %w", service, component, routeName, sql.ErrNoRows)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetRouteHostPorts returns a map of route name to assigned host port for a component.
|
|
func GetRouteHostPorts(db *sql.DB, service, component string) (map[string]int, error) {
|
|
rows, err := db.Query(
|
|
"SELECT name, host_port FROM component_routes WHERE service = ? AND component = ?",
|
|
service, component,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get route host ports %q/%q: %w", service, component, err)
|
|
}
|
|
defer func() { _ = rows.Close() }()
|
|
result := make(map[string]int)
|
|
for rows.Next() {
|
|
var name string
|
|
var port int
|
|
if err := rows.Scan(&name, &port); err != nil {
|
|
return nil, err
|
|
}
|
|
result[name] = port
|
|
}
|
|
return result, rows.Err()
|
|
}
|