Add route declarations and automatic port allocation to MCP agent
Service definitions can now declare routes per component instead of manual port mappings: [[components.routes]] name = "rest" port = 8443 mode = "l4" The agent allocates free host ports at deploy time and injects $PORT/$PORT_<NAME> env vars into containers. Backward compatible: components with old-style ports= work unchanged. Changes: - Proto: RouteSpec message, routes + env fields on ComponentSpec - Servicedef: RouteDef parsing and validation from TOML - Registry: component_routes table with host_port tracking - Runtime: Env field on ContainerSpec, -e flag in BuildRunArgs - Agent: PortAllocator (random 10000-60000, availability check), deploy wiring for route→port mapping and env injection Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -26,11 +26,12 @@ import (
|
||||
type Agent struct {
|
||||
mcpv1.UnimplementedMcpAgentServiceServer
|
||||
|
||||
Config *config.AgentConfig
|
||||
DB *sql.DB
|
||||
Runtime runtime.Runtime
|
||||
Monitor *monitor.Monitor
|
||||
Logger *slog.Logger
|
||||
Config *config.AgentConfig
|
||||
DB *sql.DB
|
||||
Runtime runtime.Runtime
|
||||
Monitor *monitor.Monitor
|
||||
Logger *slog.Logger
|
||||
PortAlloc *PortAllocator
|
||||
}
|
||||
|
||||
// Run starts the agent: opens the database, sets up the gRPC server with
|
||||
@@ -51,11 +52,12 @@ func Run(cfg *config.AgentConfig) error {
|
||||
mon := monitor.New(db, rt, cfg.Monitor, cfg.Agent.NodeName, logger)
|
||||
|
||||
a := &Agent{
|
||||
Config: cfg,
|
||||
DB: db,
|
||||
Runtime: rt,
|
||||
Monitor: mon,
|
||||
Logger: logger,
|
||||
Config: cfg,
|
||||
DB: db,
|
||||
Runtime: rt,
|
||||
Monitor: mon,
|
||||
Logger: logger,
|
||||
PortAlloc: NewPortAllocator(),
|
||||
}
|
||||
|
||||
tlsCert, err := tls.LoadX509KeyPair(cfg.Server.TLSCert, cfg.Server.TLSKey)
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
|
||||
"git.wntrmute.dev/kyle/mcp/internal/registry"
|
||||
@@ -58,6 +59,25 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
|
||||
|
||||
a.Logger.Info("deploying component", "service", serviceName, "component", compName, "desired", desiredState)
|
||||
|
||||
// Convert proto routes to registry routes.
|
||||
var regRoutes []registry.Route
|
||||
for _, r := range cs.GetRoutes() {
|
||||
mode := r.GetMode()
|
||||
if mode == "" {
|
||||
mode = "l4"
|
||||
}
|
||||
name := r.GetName()
|
||||
if name == "" {
|
||||
name = "default"
|
||||
}
|
||||
regRoutes = append(regRoutes, registry.Route{
|
||||
Name: name,
|
||||
Port: int(r.GetPort()),
|
||||
Mode: mode,
|
||||
Hostname: r.GetHostname(),
|
||||
})
|
||||
}
|
||||
|
||||
regComp := ®istry.Component{
|
||||
Name: compName,
|
||||
Service: serviceName,
|
||||
@@ -70,6 +90,7 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
|
||||
Ports: cs.GetPorts(),
|
||||
Volumes: cs.GetVolumes(),
|
||||
Cmd: cs.GetCmd(),
|
||||
Routes: regRoutes,
|
||||
}
|
||||
|
||||
if err := ensureComponent(a.DB, regComp); err != nil {
|
||||
@@ -89,16 +110,34 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
|
||||
_ = a.Runtime.Stop(ctx, containerName) // may not exist yet
|
||||
_ = a.Runtime.Remove(ctx, containerName) // may not exist yet
|
||||
|
||||
// Build the container spec. If the component has routes, use route-based
|
||||
// port allocation and env injection. Otherwise, fall back to legacy ports.
|
||||
runSpec := runtime.ContainerSpec{
|
||||
Name: containerName,
|
||||
Image: cs.GetImage(),
|
||||
Network: cs.GetNetwork(),
|
||||
User: cs.GetUser(),
|
||||
Restart: cs.GetRestart(),
|
||||
Ports: cs.GetPorts(),
|
||||
Volumes: cs.GetVolumes(),
|
||||
Cmd: cs.GetCmd(),
|
||||
Env: cs.GetEnv(),
|
||||
}
|
||||
|
||||
if len(regRoutes) > 0 && a.PortAlloc != nil {
|
||||
ports, env, err := a.allocateRoutePorts(serviceName, compName, regRoutes)
|
||||
if err != nil {
|
||||
return &mcpv1.ComponentResult{
|
||||
Name: compName,
|
||||
Error: fmt.Sprintf("allocate route ports: %v", err),
|
||||
}
|
||||
}
|
||||
runSpec.Ports = ports
|
||||
runSpec.Env = append(runSpec.Env, env...)
|
||||
} else {
|
||||
// Legacy: use ports directly from the spec.
|
||||
runSpec.Ports = cs.GetPorts()
|
||||
}
|
||||
|
||||
if err := a.Runtime.Run(ctx, runSpec); err != nil {
|
||||
_ = registry.UpdateComponentState(a.DB, serviceName, compName, "", "removed")
|
||||
return &mcpv1.ComponentResult{
|
||||
@@ -117,6 +156,36 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
|
||||
}
|
||||
}
|
||||
|
||||
// allocateRoutePorts allocates host ports for each route, stores them in
|
||||
// the registry, and returns the port mappings and env vars for the container.
|
||||
func (a *Agent) allocateRoutePorts(service, component string, routes []registry.Route) ([]string, []string, error) {
|
||||
var ports []string
|
||||
var env []string
|
||||
|
||||
for _, r := range routes {
|
||||
hostPort, err := a.PortAlloc.Allocate()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("allocate port for route %q: %w", r.Name, err)
|
||||
}
|
||||
|
||||
if err := registry.UpdateRouteHostPort(a.DB, service, component, r.Name, hostPort); err != nil {
|
||||
a.PortAlloc.Release(hostPort)
|
||||
return nil, nil, fmt.Errorf("store host port for route %q: %w", r.Name, err)
|
||||
}
|
||||
|
||||
ports = append(ports, fmt.Sprintf("127.0.0.1:%d:%d", hostPort, r.Port))
|
||||
|
||||
if len(routes) == 1 {
|
||||
env = append(env, fmt.Sprintf("PORT=%d", hostPort))
|
||||
} else {
|
||||
envName := "PORT_" + strings.ToUpper(r.Name)
|
||||
env = append(env, fmt.Sprintf("%s=%d", envName, hostPort))
|
||||
}
|
||||
}
|
||||
|
||||
return ports, env, nil
|
||||
}
|
||||
|
||||
// ensureService creates the service if it does not exist, or updates its
|
||||
// active flag if it does.
|
||||
func ensureService(db *sql.DB, name string, active bool) error {
|
||||
|
||||
69
internal/agent/portalloc.go
Normal file
69
internal/agent/portalloc.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
"net"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
portRangeMin = 10000
|
||||
portRangeMax = 60000
|
||||
maxRetries = 10
|
||||
)
|
||||
|
||||
// PortAllocator manages host port allocation for route-based deployments.
|
||||
// It tracks allocated ports within the agent session to avoid double-allocation.
|
||||
type PortAllocator struct {
|
||||
mu sync.Mutex
|
||||
allocated map[int]bool
|
||||
}
|
||||
|
||||
// NewPortAllocator creates a new PortAllocator.
|
||||
func NewPortAllocator() *PortAllocator {
|
||||
return &PortAllocator{
|
||||
allocated: make(map[int]bool),
|
||||
}
|
||||
}
|
||||
|
||||
// Allocate picks a free port in range [10000, 60000).
|
||||
// It tries random ports, checks availability with net.Listen, and retries up to 10 times.
|
||||
func (pa *PortAllocator) Allocate() (int, error) {
|
||||
pa.mu.Lock()
|
||||
defer pa.mu.Unlock()
|
||||
|
||||
for i := range maxRetries {
|
||||
port := portRangeMin + rand.IntN(portRangeMax-portRangeMin)
|
||||
if pa.allocated[port] {
|
||||
continue
|
||||
}
|
||||
|
||||
if !isPortFree(port) {
|
||||
continue
|
||||
}
|
||||
|
||||
pa.allocated[port] = true
|
||||
return port, nil
|
||||
_ = i
|
||||
}
|
||||
|
||||
return 0, fmt.Errorf("failed to allocate port after %d attempts", maxRetries)
|
||||
}
|
||||
|
||||
// Release marks a port as available again.
|
||||
func (pa *PortAllocator) Release(port int) {
|
||||
pa.mu.Lock()
|
||||
defer pa.mu.Unlock()
|
||||
delete(pa.allocated, port)
|
||||
}
|
||||
|
||||
// isPortFree checks if a TCP port is available by attempting to listen on it.
|
||||
func isPortFree(port int) bool {
|
||||
ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
_ = ln.Close()
|
||||
return true
|
||||
}
|
||||
65
internal/agent/portalloc_test.go
Normal file
65
internal/agent/portalloc_test.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestPortAllocator_Allocate(t *testing.T) {
|
||||
pa := NewPortAllocator()
|
||||
|
||||
port, err := pa.Allocate()
|
||||
if err != nil {
|
||||
t.Fatalf("allocate: %v", err)
|
||||
}
|
||||
if port < portRangeMin || port >= portRangeMax {
|
||||
t.Fatalf("port %d out of range [%d, %d)", port, portRangeMin, portRangeMax)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPortAllocator_NoDuplicates(t *testing.T) {
|
||||
pa := NewPortAllocator()
|
||||
|
||||
ports := make(map[int]bool)
|
||||
for range 20 {
|
||||
port, err := pa.Allocate()
|
||||
if err != nil {
|
||||
t.Fatalf("allocate: %v", err)
|
||||
}
|
||||
if ports[port] {
|
||||
t.Fatalf("duplicate port allocated: %d", port)
|
||||
}
|
||||
ports[port] = true
|
||||
}
|
||||
}
|
||||
|
||||
func TestPortAllocator_Release(t *testing.T) {
|
||||
pa := NewPortAllocator()
|
||||
|
||||
port, err := pa.Allocate()
|
||||
if err != nil {
|
||||
t.Fatalf("allocate: %v", err)
|
||||
}
|
||||
|
||||
pa.Release(port)
|
||||
|
||||
// After release, the port should no longer be tracked as allocated.
|
||||
pa.mu.Lock()
|
||||
if pa.allocated[port] {
|
||||
t.Fatal("port should not be tracked after release")
|
||||
}
|
||||
pa.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestPortAllocator_PortIsFree(t *testing.T) {
|
||||
pa := NewPortAllocator()
|
||||
|
||||
port, err := pa.Allocate()
|
||||
if err != nil {
|
||||
t.Fatalf("allocate: %v", err)
|
||||
}
|
||||
|
||||
// The port should be free (we only track it, we don't hold the listener).
|
||||
if !isPortFree(port) {
|
||||
t.Fatalf("allocated port %d should be free on the system", port)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user