Files
mcp/internal/agent/deploy.go
Kyle Isom 7383b370f0 Fix mcp ps showing registry version instead of runtime, error on unknown component
mcp ps now uses the actual container image and version from the runtime
instead of the registry, which could be stale after a failed deploy.

Deploy now returns an error when the component filter matches nothing
instead of silently succeeding with zero results.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 19:13:02 -07:00

277 lines
8.2 KiB
Go

package agent
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/registry"
"git.wntrmute.dev/mc/mcp/internal/runtime"
)
// Deploy deploys a service (or a single component of it) to this node.
func (a *Agent) Deploy(ctx context.Context, req *mcpv1.DeployRequest) (*mcpv1.DeployResponse, error) {
spec := req.GetService()
if spec == nil {
return nil, fmt.Errorf("deploy: missing service spec")
}
serviceName := spec.GetName()
a.Logger.Info("deploying", "service", serviceName)
if err := ensureService(a.DB, serviceName, spec.GetActive()); err != nil {
return nil, fmt.Errorf("deploy: ensure service %q: %w", serviceName, err)
}
components := spec.GetComponents()
if target := req.GetComponent(); target != "" {
var filtered []*mcpv1.ComponentSpec
for _, cs := range components {
if cs.GetName() == target {
filtered = append(filtered, cs)
}
}
if len(filtered) == 0 {
return nil, fmt.Errorf("component %q not found in service %q", target, serviceName)
}
components = filtered
}
var results []*mcpv1.ComponentResult
active := spec.GetActive()
for _, cs := range components {
result := a.deployComponent(ctx, serviceName, cs, active)
results = append(results, result)
}
return &mcpv1.DeployResponse{Results: results}, nil
}
// deployComponent handles the full deploy lifecycle for a single component.
func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcpv1.ComponentSpec, active bool) *mcpv1.ComponentResult {
compName := cs.GetName()
containerName := ContainerNameFor(serviceName, compName)
desiredState := "running"
if !active {
desiredState = "stopped"
}
a.Logger.Info("deploying component", "service", serviceName, "component", compName, "desired", desiredState)
// Convert proto routes to registry routes.
var regRoutes []registry.Route
for _, r := range cs.GetRoutes() {
mode := r.GetMode()
if mode == "" {
mode = "l4"
}
name := r.GetName()
if name == "" {
name = "default"
}
regRoutes = append(regRoutes, registry.Route{
Name: name,
Port: int(r.GetPort()),
Mode: mode,
Hostname: r.GetHostname(),
})
}
regComp := &registry.Component{
Name: compName,
Service: serviceName,
Image: cs.GetImage(),
Network: cs.GetNetwork(),
UserSpec: cs.GetUser(),
Restart: cs.GetRestart(),
DesiredState: desiredState,
Version: runtime.ExtractVersion(cs.GetImage()),
Ports: cs.GetPorts(),
Volumes: cs.GetVolumes(),
Cmd: cs.GetCmd(),
Routes: regRoutes,
}
if err := ensureComponent(a.DB, regComp); err != nil {
return &mcpv1.ComponentResult{
Name: compName,
Error: fmt.Sprintf("ensure component: %v", err),
}
}
if err := a.Runtime.Pull(ctx, cs.GetImage()); err != nil {
return &mcpv1.ComponentResult{
Name: compName,
Error: fmt.Sprintf("pull image: %v", err),
}
}
_ = a.Runtime.Stop(ctx, containerName) // may not exist yet
_ = a.Runtime.Remove(ctx, containerName) // may not exist yet
// Build the container spec. If the component has routes, use route-based
// port allocation and env injection. Otherwise, fall back to legacy ports.
runSpec := runtime.ContainerSpec{
Name: containerName,
Image: cs.GetImage(),
Network: cs.GetNetwork(),
User: cs.GetUser(),
Restart: cs.GetRestart(),
Volumes: cs.GetVolumes(),
Cmd: cs.GetCmd(),
Env: cs.GetEnv(),
}
if len(regRoutes) > 0 && a.PortAlloc != nil {
ports, env, err := a.allocateRoutePorts(serviceName, compName, regRoutes)
if err != nil {
return &mcpv1.ComponentResult{
Name: compName,
Error: fmt.Sprintf("allocate route ports: %v", err),
}
}
runSpec.Ports = ports
runSpec.Env = append(runSpec.Env, env...)
} else {
// Legacy: use ports directly from the spec.
runSpec.Ports = cs.GetPorts()
}
if err := a.Runtime.Run(ctx, runSpec); err != nil {
_ = registry.UpdateComponentState(a.DB, serviceName, compName, "", "removed")
return &mcpv1.ComponentResult{
Name: compName,
Error: fmt.Sprintf("run container: %v", err),
}
}
// Provision TLS certs for L7 routes before registering with mc-proxy.
if a.Certs != nil && hasL7Routes(regRoutes) {
hostnames := l7Hostnames(serviceName, regRoutes)
if err := a.Certs.EnsureCert(ctx, serviceName, hostnames); err != nil {
a.Logger.Warn("failed to provision TLS cert", "service", serviceName, "err", err)
}
}
// Register routes with mc-proxy after the container is running.
if len(regRoutes) > 0 && a.Proxy != nil {
hostPorts, err := registry.GetRouteHostPorts(a.DB, serviceName, compName)
if err != nil {
a.Logger.Warn("failed to get host ports for route registration", "service", serviceName, "component", compName, "err", err)
} else if err := a.Proxy.RegisterRoutes(ctx, serviceName, regRoutes, hostPorts); err != nil {
a.Logger.Warn("failed to register routes with mc-proxy", "service", serviceName, "component", compName, "err", err)
}
}
// Register DNS record for the service.
if a.DNS != nil && len(regRoutes) > 0 {
if err := a.DNS.EnsureRecord(ctx, serviceName); err != nil {
a.Logger.Warn("failed to register DNS record", "service", serviceName, "err", err)
}
}
if err := registry.UpdateComponentState(a.DB, serviceName, compName, "running", "running"); err != nil {
a.Logger.Warn("failed to update component state", "service", serviceName, "component", compName, "err", err)
}
return &mcpv1.ComponentResult{
Name: compName,
Success: true,
}
}
// allocateRoutePorts allocates host ports for each route, stores them in
// the registry, and returns the port mappings and env vars for the container.
func (a *Agent) allocateRoutePorts(service, component string, routes []registry.Route) ([]string, []string, error) {
var ports []string
var env []string
for _, r := range routes {
hostPort, err := a.PortAlloc.Allocate()
if err != nil {
return nil, nil, fmt.Errorf("allocate port for route %q: %w", r.Name, err)
}
if err := registry.UpdateRouteHostPort(a.DB, service, component, r.Name, hostPort); err != nil {
a.PortAlloc.Release(hostPort)
return nil, nil, fmt.Errorf("store host port for route %q: %w", r.Name, err)
}
// The container port must match hostPort (which is also set as $PORT),
// so the app's listen address matches the podman port mapping.
// r.Port is the mc-proxy listener port, NOT the container port.
ports = append(ports, fmt.Sprintf("127.0.0.1:%d:%d", hostPort, hostPort))
if len(routes) == 1 {
env = append(env, fmt.Sprintf("PORT=%d", hostPort))
} else {
envName := "PORT_" + strings.ToUpper(r.Name)
env = append(env, fmt.Sprintf("%s=%d", envName, hostPort))
}
}
return ports, env, nil
}
// ensureService creates the service if it does not exist, or updates its
// active flag if it does.
func ensureService(db *sql.DB, name string, active bool) error {
_, err := registry.GetService(db, name)
if errors.Is(err, sql.ErrNoRows) {
return registry.CreateService(db, name, active)
}
if err != nil {
return err
}
return registry.UpdateServiceActive(db, name, active)
}
// hasL7Routes reports whether any route uses L7 (TLS-terminating) mode.
func hasL7Routes(routes []registry.Route) bool {
for _, r := range routes {
if r.Mode == "l7" {
return true
}
}
return false
}
// l7Hostnames returns the unique hostnames from L7 routes, applying
// the default hostname convention when a route has no explicit hostname.
func l7Hostnames(serviceName string, routes []registry.Route) []string {
seen := make(map[string]bool)
var hostnames []string
for _, r := range routes {
if r.Mode != "l7" {
continue
}
h := r.Hostname
if h == "" {
h = serviceName + ".svc.mcp.metacircular.net"
}
if !seen[h] {
seen[h] = true
hostnames = append(hostnames, h)
}
}
return hostnames
}
// ensureComponent creates the component if it does not exist, or updates its
// spec if it does.
func ensureComponent(db *sql.DB, c *registry.Component) error {
_, err := registry.GetComponent(db, c.Service, c.Name)
if errors.Is(err, sql.ErrNoRows) {
c.ObservedState = "unknown"
return registry.CreateComponent(db, c)
}
if err != nil {
return err
}
return registry.UpdateComponentSpec(db, c)
}