The agent connects to mc-proxy via Unix socket and automatically registers/removes routes during deploy and stop. This eliminates manual mcproxyctl usage or TOML editing. - New ProxyRouter abstraction wraps mc-proxy client library - Deploy: after container starts, registers routes with mc-proxy using host ports from the registry - Stop: removes routes from mc-proxy before stopping container - Config: [mcproxy] section with socket path and cert_dir - Nil-safe: if mc-proxy socket not configured, route registration is silently skipped (backward compatible) - L7 routes use certs from convention path (<cert_dir>/<service>.pem) - L4 routes use TLS passthrough (backend_tls=true) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
225 lines
6.6 KiB
Go
225 lines
6.6 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
|
|
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
|
|
"git.wntrmute.dev/kyle/mcp/internal/registry"
|
|
"git.wntrmute.dev/kyle/mcp/internal/runtime"
|
|
)
|
|
|
|
// Deploy deploys a service (or a single component of it) to this node.
|
|
func (a *Agent) Deploy(ctx context.Context, req *mcpv1.DeployRequest) (*mcpv1.DeployResponse, error) {
|
|
spec := req.GetService()
|
|
if spec == nil {
|
|
return nil, fmt.Errorf("deploy: missing service spec")
|
|
}
|
|
|
|
serviceName := spec.GetName()
|
|
a.Logger.Info("deploying", "service", serviceName)
|
|
|
|
if err := ensureService(a.DB, serviceName, spec.GetActive()); err != nil {
|
|
return nil, fmt.Errorf("deploy: ensure service %q: %w", serviceName, err)
|
|
}
|
|
|
|
components := spec.GetComponents()
|
|
if target := req.GetComponent(); target != "" {
|
|
var filtered []*mcpv1.ComponentSpec
|
|
for _, cs := range components {
|
|
if cs.GetName() == target {
|
|
filtered = append(filtered, cs)
|
|
}
|
|
}
|
|
components = filtered
|
|
}
|
|
|
|
var results []*mcpv1.ComponentResult
|
|
active := spec.GetActive()
|
|
for _, cs := range components {
|
|
result := a.deployComponent(ctx, serviceName, cs, active)
|
|
results = append(results, result)
|
|
}
|
|
|
|
return &mcpv1.DeployResponse{Results: results}, nil
|
|
}
|
|
|
|
// deployComponent handles the full deploy lifecycle for a single component.
|
|
func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcpv1.ComponentSpec, active bool) *mcpv1.ComponentResult {
|
|
compName := cs.GetName()
|
|
containerName := ContainerNameFor(serviceName, compName)
|
|
|
|
desiredState := "running"
|
|
if !active {
|
|
desiredState = "stopped"
|
|
}
|
|
|
|
a.Logger.Info("deploying component", "service", serviceName, "component", compName, "desired", desiredState)
|
|
|
|
// Convert proto routes to registry routes.
|
|
var regRoutes []registry.Route
|
|
for _, r := range cs.GetRoutes() {
|
|
mode := r.GetMode()
|
|
if mode == "" {
|
|
mode = "l4"
|
|
}
|
|
name := r.GetName()
|
|
if name == "" {
|
|
name = "default"
|
|
}
|
|
regRoutes = append(regRoutes, registry.Route{
|
|
Name: name,
|
|
Port: int(r.GetPort()),
|
|
Mode: mode,
|
|
Hostname: r.GetHostname(),
|
|
})
|
|
}
|
|
|
|
regComp := ®istry.Component{
|
|
Name: compName,
|
|
Service: serviceName,
|
|
Image: cs.GetImage(),
|
|
Network: cs.GetNetwork(),
|
|
UserSpec: cs.GetUser(),
|
|
Restart: cs.GetRestart(),
|
|
DesiredState: desiredState,
|
|
Version: runtime.ExtractVersion(cs.GetImage()),
|
|
Ports: cs.GetPorts(),
|
|
Volumes: cs.GetVolumes(),
|
|
Cmd: cs.GetCmd(),
|
|
Routes: regRoutes,
|
|
}
|
|
|
|
if err := ensureComponent(a.DB, regComp); err != nil {
|
|
return &mcpv1.ComponentResult{
|
|
Name: compName,
|
|
Error: fmt.Sprintf("ensure component: %v", err),
|
|
}
|
|
}
|
|
|
|
if err := a.Runtime.Pull(ctx, cs.GetImage()); err != nil {
|
|
return &mcpv1.ComponentResult{
|
|
Name: compName,
|
|
Error: fmt.Sprintf("pull image: %v", err),
|
|
}
|
|
}
|
|
|
|
_ = a.Runtime.Stop(ctx, containerName) // may not exist yet
|
|
_ = a.Runtime.Remove(ctx, containerName) // may not exist yet
|
|
|
|
// Build the container spec. If the component has routes, use route-based
|
|
// port allocation and env injection. Otherwise, fall back to legacy ports.
|
|
runSpec := runtime.ContainerSpec{
|
|
Name: containerName,
|
|
Image: cs.GetImage(),
|
|
Network: cs.GetNetwork(),
|
|
User: cs.GetUser(),
|
|
Restart: cs.GetRestart(),
|
|
Volumes: cs.GetVolumes(),
|
|
Cmd: cs.GetCmd(),
|
|
Env: cs.GetEnv(),
|
|
}
|
|
|
|
if len(regRoutes) > 0 && a.PortAlloc != nil {
|
|
ports, env, err := a.allocateRoutePorts(serviceName, compName, regRoutes)
|
|
if err != nil {
|
|
return &mcpv1.ComponentResult{
|
|
Name: compName,
|
|
Error: fmt.Sprintf("allocate route ports: %v", err),
|
|
}
|
|
}
|
|
runSpec.Ports = ports
|
|
runSpec.Env = append(runSpec.Env, env...)
|
|
} else {
|
|
// Legacy: use ports directly from the spec.
|
|
runSpec.Ports = cs.GetPorts()
|
|
}
|
|
|
|
if err := a.Runtime.Run(ctx, runSpec); err != nil {
|
|
_ = registry.UpdateComponentState(a.DB, serviceName, compName, "", "removed")
|
|
return &mcpv1.ComponentResult{
|
|
Name: compName,
|
|
Error: fmt.Sprintf("run container: %v", err),
|
|
}
|
|
}
|
|
|
|
// Register routes with mc-proxy after the container is running.
|
|
if len(regRoutes) > 0 && a.Proxy != nil {
|
|
hostPorts, err := registry.GetRouteHostPorts(a.DB, serviceName, compName)
|
|
if err != nil {
|
|
a.Logger.Warn("failed to get host ports for route registration", "service", serviceName, "component", compName, "err", err)
|
|
} else if err := a.Proxy.RegisterRoutes(ctx, serviceName, regRoutes, hostPorts); err != nil {
|
|
a.Logger.Warn("failed to register routes with mc-proxy", "service", serviceName, "component", compName, "err", err)
|
|
}
|
|
}
|
|
|
|
if err := registry.UpdateComponentState(a.DB, serviceName, compName, "running", "running"); err != nil {
|
|
a.Logger.Warn("failed to update component state", "service", serviceName, "component", compName, "err", err)
|
|
}
|
|
|
|
return &mcpv1.ComponentResult{
|
|
Name: compName,
|
|
Success: true,
|
|
}
|
|
}
|
|
|
|
// allocateRoutePorts allocates host ports for each route, stores them in
|
|
// the registry, and returns the port mappings and env vars for the container.
|
|
func (a *Agent) allocateRoutePorts(service, component string, routes []registry.Route) ([]string, []string, error) {
|
|
var ports []string
|
|
var env []string
|
|
|
|
for _, r := range routes {
|
|
hostPort, err := a.PortAlloc.Allocate()
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("allocate port for route %q: %w", r.Name, err)
|
|
}
|
|
|
|
if err := registry.UpdateRouteHostPort(a.DB, service, component, r.Name, hostPort); err != nil {
|
|
a.PortAlloc.Release(hostPort)
|
|
return nil, nil, fmt.Errorf("store host port for route %q: %w", r.Name, err)
|
|
}
|
|
|
|
ports = append(ports, fmt.Sprintf("127.0.0.1:%d:%d", hostPort, r.Port))
|
|
|
|
if len(routes) == 1 {
|
|
env = append(env, fmt.Sprintf("PORT=%d", hostPort))
|
|
} else {
|
|
envName := "PORT_" + strings.ToUpper(r.Name)
|
|
env = append(env, fmt.Sprintf("%s=%d", envName, hostPort))
|
|
}
|
|
}
|
|
|
|
return ports, env, nil
|
|
}
|
|
|
|
// ensureService creates the service if it does not exist, or updates its
|
|
// active flag if it does.
|
|
func ensureService(db *sql.DB, name string, active bool) error {
|
|
_, err := registry.GetService(db, name)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return registry.CreateService(db, name, active)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return registry.UpdateServiceActive(db, name, active)
|
|
}
|
|
|
|
// ensureComponent creates the component if it does not exist, or updates its
|
|
// spec if it does.
|
|
func ensureComponent(db *sql.DB, c *registry.Component) error {
|
|
_, err := registry.GetComponent(db, c.Service, c.Name)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
c.ObservedState = "unknown"
|
|
return registry.CreateComponent(db, c)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return registry.UpdateComponentSpec(db, c)
|
|
}
|