Add route declarations and automatic port allocation #1
File diff suppressed because it is too large
Load Diff
@@ -31,6 +31,7 @@ type Agent struct {
|
||||
Runtime runtime.Runtime
|
||||
Monitor *monitor.Monitor
|
||||
Logger *slog.Logger
|
||||
PortAlloc *PortAllocator
|
||||
}
|
||||
|
||||
// Run starts the agent: opens the database, sets up the gRPC server with
|
||||
@@ -56,6 +57,7 @@ func Run(cfg *config.AgentConfig) error {
|
||||
Runtime: rt,
|
||||
Monitor: mon,
|
||||
Logger: logger,
|
||||
PortAlloc: NewPortAllocator(),
|
||||
}
|
||||
|
||||
tlsCert, err := tls.LoadX509KeyPair(cfg.Server.TLSCert, cfg.Server.TLSKey)
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
|
||||
"git.wntrmute.dev/kyle/mcp/internal/registry"
|
||||
@@ -58,6 +59,25 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
|
||||
|
||||
a.Logger.Info("deploying component", "service", serviceName, "component", compName, "desired", desiredState)
|
||||
|
||||
// Convert proto routes to registry routes.
|
||||
var regRoutes []registry.Route
|
||||
for _, r := range cs.GetRoutes() {
|
||||
mode := r.GetMode()
|
||||
if mode == "" {
|
||||
mode = "l4"
|
||||
}
|
||||
name := r.GetName()
|
||||
if name == "" {
|
||||
name = "default"
|
||||
}
|
||||
regRoutes = append(regRoutes, registry.Route{
|
||||
Name: name,
|
||||
Port: int(r.GetPort()),
|
||||
Mode: mode,
|
||||
Hostname: r.GetHostname(),
|
||||
})
|
||||
}
|
||||
|
||||
regComp := ®istry.Component{
|
||||
Name: compName,
|
||||
Service: serviceName,
|
||||
@@ -70,6 +90,7 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
|
||||
Ports: cs.GetPorts(),
|
||||
Volumes: cs.GetVolumes(),
|
||||
Cmd: cs.GetCmd(),
|
||||
Routes: regRoutes,
|
||||
}
|
||||
|
||||
if err := ensureComponent(a.DB, regComp); err != nil {
|
||||
@@ -89,16 +110,34 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
|
||||
_ = a.Runtime.Stop(ctx, containerName) // may not exist yet
|
||||
_ = a.Runtime.Remove(ctx, containerName) // may not exist yet
|
||||
|
||||
// Build the container spec. If the component has routes, use route-based
|
||||
// port allocation and env injection. Otherwise, fall back to legacy ports.
|
||||
runSpec := runtime.ContainerSpec{
|
||||
Name: containerName,
|
||||
Image: cs.GetImage(),
|
||||
Network: cs.GetNetwork(),
|
||||
User: cs.GetUser(),
|
||||
Restart: cs.GetRestart(),
|
||||
Ports: cs.GetPorts(),
|
||||
Volumes: cs.GetVolumes(),
|
||||
Cmd: cs.GetCmd(),
|
||||
Env: cs.GetEnv(),
|
||||
}
|
||||
|
||||
if len(regRoutes) > 0 && a.PortAlloc != nil {
|
||||
ports, env, err := a.allocateRoutePorts(serviceName, compName, regRoutes)
|
||||
if err != nil {
|
||||
return &mcpv1.ComponentResult{
|
||||
Name: compName,
|
||||
Error: fmt.Sprintf("allocate route ports: %v", err),
|
||||
}
|
||||
}
|
||||
runSpec.Ports = ports
|
||||
runSpec.Env = append(runSpec.Env, env...)
|
||||
} else {
|
||||
// Legacy: use ports directly from the spec.
|
||||
runSpec.Ports = cs.GetPorts()
|
||||
}
|
||||
|
||||
if err := a.Runtime.Run(ctx, runSpec); err != nil {
|
||||
_ = registry.UpdateComponentState(a.DB, serviceName, compName, "", "removed")
|
||||
return &mcpv1.ComponentResult{
|
||||
@@ -117,6 +156,36 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
|
||||
}
|
||||
}
|
||||
|
||||
// allocateRoutePorts allocates host ports for each route, stores them in
|
||||
// the registry, and returns the port mappings and env vars for the container.
|
||||
func (a *Agent) allocateRoutePorts(service, component string, routes []registry.Route) ([]string, []string, error) {
|
||||
var ports []string
|
||||
var env []string
|
||||
|
||||
for _, r := range routes {
|
||||
hostPort, err := a.PortAlloc.Allocate()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("allocate port for route %q: %w", r.Name, err)
|
||||
}
|
||||
|
||||
if err := registry.UpdateRouteHostPort(a.DB, service, component, r.Name, hostPort); err != nil {
|
||||
a.PortAlloc.Release(hostPort)
|
||||
return nil, nil, fmt.Errorf("store host port for route %q: %w", r.Name, err)
|
||||
}
|
||||
|
||||
ports = append(ports, fmt.Sprintf("127.0.0.1:%d:%d", hostPort, r.Port))
|
||||
|
||||
if len(routes) == 1 {
|
||||
env = append(env, fmt.Sprintf("PORT=%d", hostPort))
|
||||
} else {
|
||||
envName := "PORT_" + strings.ToUpper(r.Name)
|
||||
env = append(env, fmt.Sprintf("%s=%d", envName, hostPort))
|
||||
}
|
||||
}
|
||||
|
||||
return ports, env, nil
|
||||
}
|
||||
|
||||
// ensureService creates the service if it does not exist, or updates its
|
||||
// active flag if it does.
|
||||
func ensureService(db *sql.DB, name string, active bool) error {
|
||||
|
||||
69
internal/agent/portalloc.go
Normal file
69
internal/agent/portalloc.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
"net"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
portRangeMin = 10000
|
||||
portRangeMax = 60000
|
||||
maxRetries = 10
|
||||
)
|
||||
|
||||
// PortAllocator manages host port allocation for route-based deployments.
|
||||
// It tracks allocated ports within the agent session to avoid double-allocation.
|
||||
type PortAllocator struct {
|
||||
mu sync.Mutex
|
||||
allocated map[int]bool
|
||||
}
|
||||
|
||||
// NewPortAllocator creates a new PortAllocator.
|
||||
func NewPortAllocator() *PortAllocator {
|
||||
return &PortAllocator{
|
||||
allocated: make(map[int]bool),
|
||||
}
|
||||
}
|
||||
|
||||
// Allocate picks a free port in range [10000, 60000).
|
||||
// It tries random ports, checks availability with net.Listen, and retries up to 10 times.
|
||||
func (pa *PortAllocator) Allocate() (int, error) {
|
||||
pa.mu.Lock()
|
||||
defer pa.mu.Unlock()
|
||||
|
||||
for i := range maxRetries {
|
||||
port := portRangeMin + rand.IntN(portRangeMax-portRangeMin)
|
||||
if pa.allocated[port] {
|
||||
continue
|
||||
}
|
||||
|
||||
if !isPortFree(port) {
|
||||
continue
|
||||
}
|
||||
|
||||
pa.allocated[port] = true
|
||||
return port, nil
|
||||
_ = i
|
||||
}
|
||||
|
||||
return 0, fmt.Errorf("failed to allocate port after %d attempts", maxRetries)
|
||||
}
|
||||
|
||||
// Release marks a port as available again.
|
||||
func (pa *PortAllocator) Release(port int) {
|
||||
pa.mu.Lock()
|
||||
defer pa.mu.Unlock()
|
||||
delete(pa.allocated, port)
|
||||
}
|
||||
|
||||
// isPortFree checks if a TCP port is available by attempting to listen on it.
|
||||
func isPortFree(port int) bool {
|
||||
ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
_ = ln.Close()
|
||||
return true
|
||||
}
|
||||
65
internal/agent/portalloc_test.go
Normal file
65
internal/agent/portalloc_test.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestPortAllocator_Allocate(t *testing.T) {
|
||||
pa := NewPortAllocator()
|
||||
|
||||
port, err := pa.Allocate()
|
||||
if err != nil {
|
||||
t.Fatalf("allocate: %v", err)
|
||||
}
|
||||
if port < portRangeMin || port >= portRangeMax {
|
||||
t.Fatalf("port %d out of range [%d, %d)", port, portRangeMin, portRangeMax)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPortAllocator_NoDuplicates(t *testing.T) {
|
||||
pa := NewPortAllocator()
|
||||
|
||||
ports := make(map[int]bool)
|
||||
for range 20 {
|
||||
port, err := pa.Allocate()
|
||||
if err != nil {
|
||||
t.Fatalf("allocate: %v", err)
|
||||
}
|
||||
if ports[port] {
|
||||
t.Fatalf("duplicate port allocated: %d", port)
|
||||
}
|
||||
ports[port] = true
|
||||
}
|
||||
}
|
||||
|
||||
func TestPortAllocator_Release(t *testing.T) {
|
||||
pa := NewPortAllocator()
|
||||
|
||||
port, err := pa.Allocate()
|
||||
if err != nil {
|
||||
t.Fatalf("allocate: %v", err)
|
||||
}
|
||||
|
||||
pa.Release(port)
|
||||
|
||||
// After release, the port should no longer be tracked as allocated.
|
||||
pa.mu.Lock()
|
||||
if pa.allocated[port] {
|
||||
t.Fatal("port should not be tracked after release")
|
||||
}
|
||||
pa.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestPortAllocator_PortIsFree(t *testing.T) {
|
||||
pa := NewPortAllocator()
|
||||
|
||||
port, err := pa.Allocate()
|
||||
if err != nil {
|
||||
t.Fatalf("allocate: %v", err)
|
||||
}
|
||||
|
||||
// The port should be free (we only track it, we don't hold the listener).
|
||||
if !isPortFree(port) {
|
||||
t.Fatalf("allocated port %d should be free on the system", port)
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,15 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Route represents a route entry for a component in the registry.
|
||||
type Route struct {
|
||||
Name string
|
||||
Port int
|
||||
Mode string
|
||||
Hostname string
|
||||
HostPort int // agent-assigned host port (0 = not yet allocated)
|
||||
}
|
||||
|
||||
// Component represents a component in the registry.
|
||||
type Component struct {
|
||||
Name string
|
||||
@@ -20,6 +29,7 @@ type Component struct {
|
||||
Ports []string
|
||||
Volumes []string
|
||||
Cmd []string
|
||||
Routes []Route
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
}
|
||||
@@ -51,6 +61,9 @@ func CreateComponent(db *sql.DB, c *Component) error {
|
||||
if err := setCmd(tx, c.Service, c.Name, c.Cmd); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := setRoutes(tx, c.Service, c.Name, c.Routes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
@@ -84,6 +97,10 @@ func GetComponent(db *sql.DB, service, name string) (*Component, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Routes, err = getRoutes(db, service, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
@@ -115,6 +132,7 @@ func ListComponents(db *sql.DB, service string) ([]Component, error) {
|
||||
c.Ports, _ = getPorts(db, c.Service, c.Name)
|
||||
c.Volumes, _ = getVolumes(db, c.Service, c.Name)
|
||||
c.Cmd, _ = getCmd(db, c.Service, c.Name)
|
||||
c.Routes, _ = getRoutes(db, c.Service, c.Name)
|
||||
|
||||
components = append(components, c)
|
||||
}
|
||||
@@ -168,6 +186,9 @@ func UpdateComponentSpec(db *sql.DB, c *Component) error {
|
||||
if err := setCmd(tx, c.Service, c.Name, c.Cmd); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := setRoutes(tx, c.Service, c.Name, c.Routes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
@@ -274,3 +295,85 @@ func getCmd(db *sql.DB, service, component string) ([]string, error) {
|
||||
}
|
||||
return cmd, rows.Err()
|
||||
}
|
||||
|
||||
// helper: set route definitions (delete + re-insert)
|
||||
func setRoutes(tx *sql.Tx, service, component string, routes []Route) error {
|
||||
if _, err := tx.Exec("DELETE FROM component_routes WHERE service = ? AND component = ?", service, component); err != nil {
|
||||
return fmt.Errorf("clear routes %q/%q: %w", service, component, err)
|
||||
}
|
||||
for _, r := range routes {
|
||||
mode := r.Mode
|
||||
if mode == "" {
|
||||
mode = "l4"
|
||||
}
|
||||
name := r.Name
|
||||
if name == "" {
|
||||
name = "default"
|
||||
}
|
||||
if _, err := tx.Exec(
|
||||
"INSERT INTO component_routes (service, component, name, port, mode, hostname, host_port) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
service, component, name, r.Port, mode, r.Hostname, r.HostPort,
|
||||
); err != nil {
|
||||
return fmt.Errorf("insert route %q/%q %q: %w", service, component, name, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getRoutes(db *sql.DB, service, component string) ([]Route, error) {
|
||||
rows, err := db.Query(
|
||||
"SELECT name, port, mode, hostname, host_port FROM component_routes WHERE service = ? AND component = ? ORDER BY name",
|
||||
service, component,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get routes %q/%q: %w", service, component, err)
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
var routes []Route
|
||||
for rows.Next() {
|
||||
var r Route
|
||||
if err := rows.Scan(&r.Name, &r.Port, &r.Mode, &r.Hostname, &r.HostPort); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
routes = append(routes, r)
|
||||
}
|
||||
return routes, rows.Err()
|
||||
}
|
||||
|
||||
// UpdateRouteHostPort updates the agent-assigned host port for a specific route.
|
||||
func UpdateRouteHostPort(db *sql.DB, service, component, routeName string, hostPort int) error {
|
||||
res, err := db.Exec(
|
||||
"UPDATE component_routes SET host_port = ? WHERE service = ? AND component = ? AND name = ?",
|
||||
hostPort, service, component, routeName,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("update route host_port %q/%q/%q: %w", service, component, routeName, err)
|
||||
}
|
||||
n, _ := res.RowsAffected()
|
||||
if n == 0 {
|
||||
return fmt.Errorf("update route host_port %q/%q/%q: %w", service, component, routeName, sql.ErrNoRows)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetRouteHostPorts returns a map of route name to assigned host port for a component.
|
||||
func GetRouteHostPorts(db *sql.DB, service, component string) (map[string]int, error) {
|
||||
rows, err := db.Query(
|
||||
"SELECT name, host_port FROM component_routes WHERE service = ? AND component = ?",
|
||||
service, component,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get route host ports %q/%q: %w", service, component, err)
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
result := make(map[string]int)
|
||||
for rows.Next() {
|
||||
var name string
|
||||
var port int
|
||||
if err := rows.Scan(&name, &port); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result[name] = port
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
@@ -127,4 +127,19 @@ var migrations = []string{
|
||||
CREATE INDEX IF NOT EXISTS idx_events_component_time
|
||||
ON events(service, component, timestamp);
|
||||
`,
|
||||
|
||||
// Migration 2: component routes
|
||||
`
|
||||
CREATE TABLE IF NOT EXISTS component_routes (
|
||||
service TEXT NOT NULL,
|
||||
component TEXT NOT NULL,
|
||||
name TEXT NOT NULL,
|
||||
port INTEGER NOT NULL,
|
||||
mode TEXT NOT NULL DEFAULT 'l4',
|
||||
hostname TEXT NOT NULL DEFAULT '',
|
||||
host_port INTEGER NOT NULL DEFAULT 0,
|
||||
PRIMARY KEY (service, component, name),
|
||||
FOREIGN KEY (service, component) REFERENCES components(service, name) ON DELETE CASCADE
|
||||
);
|
||||
`,
|
||||
}
|
||||
|
||||
@@ -237,6 +237,160 @@ func TestCascadeDelete(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestComponentRoutes(t *testing.T) {
|
||||
db := openTestDB(t)
|
||||
if err := CreateService(db, "svc", true); err != nil {
|
||||
t.Fatalf("create service: %v", err)
|
||||
}
|
||||
|
||||
// Create component with routes
|
||||
c := &Component{
|
||||
Name: "api",
|
||||
Service: "svc",
|
||||
Image: "img:v1",
|
||||
Restart: "unless-stopped",
|
||||
DesiredState: "running",
|
||||
ObservedState: "unknown",
|
||||
Routes: []Route{
|
||||
{Name: "rest", Port: 8443, Mode: "l7", Hostname: "api.example.com"},
|
||||
{Name: "grpc", Port: 9443, Mode: "l4"},
|
||||
},
|
||||
}
|
||||
if err := CreateComponent(db, c); err != nil {
|
||||
t.Fatalf("create component: %v", err)
|
||||
}
|
||||
|
||||
// Get and verify routes
|
||||
got, err := GetComponent(db, "svc", "api")
|
||||
if err != nil {
|
||||
t.Fatalf("get: %v", err)
|
||||
}
|
||||
if len(got.Routes) != 2 {
|
||||
t.Fatalf("routes: got %d, want 2", len(got.Routes))
|
||||
}
|
||||
// Routes are ordered by name: grpc, rest
|
||||
if got.Routes[0].Name != "grpc" || got.Routes[0].Port != 9443 || got.Routes[0].Mode != "l4" {
|
||||
t.Fatalf("route[0]: got %+v", got.Routes[0])
|
||||
}
|
||||
if got.Routes[1].Name != "rest" || got.Routes[1].Port != 8443 || got.Routes[1].Mode != "l7" || got.Routes[1].Hostname != "api.example.com" {
|
||||
t.Fatalf("route[1]: got %+v", got.Routes[1])
|
||||
}
|
||||
|
||||
// Update routes via UpdateComponentSpec
|
||||
c.Routes = []Route{{Name: "http", Port: 8080, Mode: "l7"}}
|
||||
if err := UpdateComponentSpec(db, c); err != nil {
|
||||
t.Fatalf("update spec: %v", err)
|
||||
}
|
||||
got, _ = GetComponent(db, "svc", "api")
|
||||
if len(got.Routes) != 1 || got.Routes[0].Name != "http" {
|
||||
t.Fatalf("updated routes: got %+v", got.Routes)
|
||||
}
|
||||
|
||||
// List components includes routes
|
||||
comps, err := ListComponents(db, "svc")
|
||||
if err != nil {
|
||||
t.Fatalf("list: %v", err)
|
||||
}
|
||||
if len(comps) != 1 || len(comps[0].Routes) != 1 {
|
||||
t.Fatalf("list routes: got %d components, %d routes", len(comps), len(comps[0].Routes))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouteHostPort(t *testing.T) {
|
||||
db := openTestDB(t)
|
||||
if err := CreateService(db, "svc", true); err != nil {
|
||||
t.Fatalf("create service: %v", err)
|
||||
}
|
||||
|
||||
c := &Component{
|
||||
Name: "api",
|
||||
Service: "svc",
|
||||
Image: "img:v1",
|
||||
Restart: "unless-stopped",
|
||||
DesiredState: "running",
|
||||
ObservedState: "unknown",
|
||||
Routes: []Route{
|
||||
{Name: "rest", Port: 8443, Mode: "l7"},
|
||||
{Name: "grpc", Port: 9443, Mode: "l4"},
|
||||
},
|
||||
}
|
||||
if err := CreateComponent(db, c); err != nil {
|
||||
t.Fatalf("create component: %v", err)
|
||||
}
|
||||
|
||||
// Initially host_port is 0
|
||||
ports, err := GetRouteHostPorts(db, "svc", "api")
|
||||
if err != nil {
|
||||
t.Fatalf("get host ports: %v", err)
|
||||
}
|
||||
if ports["rest"] != 0 || ports["grpc"] != 0 {
|
||||
t.Fatalf("initial host ports should be 0: %+v", ports)
|
||||
}
|
||||
|
||||
// Update host ports
|
||||
if err := UpdateRouteHostPort(db, "svc", "api", "rest", 12345); err != nil {
|
||||
t.Fatalf("update rest: %v", err)
|
||||
}
|
||||
if err := UpdateRouteHostPort(db, "svc", "api", "grpc", 12346); err != nil {
|
||||
t.Fatalf("update grpc: %v", err)
|
||||
}
|
||||
|
||||
ports, _ = GetRouteHostPorts(db, "svc", "api")
|
||||
if ports["rest"] != 12345 {
|
||||
t.Fatalf("rest host_port: got %d, want 12345", ports["rest"])
|
||||
}
|
||||
if ports["grpc"] != 12346 {
|
||||
t.Fatalf("grpc host_port: got %d, want 12346", ports["grpc"])
|
||||
}
|
||||
|
||||
// Verify host_port is visible via GetComponent
|
||||
got, _ := GetComponent(db, "svc", "api")
|
||||
for _, r := range got.Routes {
|
||||
if r.Name == "rest" && r.HostPort != 12345 {
|
||||
t.Fatalf("GetComponent rest host_port: got %d", r.HostPort)
|
||||
}
|
||||
if r.Name == "grpc" && r.HostPort != 12346 {
|
||||
t.Fatalf("GetComponent grpc host_port: got %d", r.HostPort)
|
||||
}
|
||||
}
|
||||
|
||||
// Update nonexistent route should fail
|
||||
err = UpdateRouteHostPort(db, "svc", "api", "nonexistent", 99999)
|
||||
if err == nil {
|
||||
t.Fatal("expected error updating nonexistent route")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouteCascadeDelete(t *testing.T) {
|
||||
db := openTestDB(t)
|
||||
if err := CreateService(db, "svc", true); err != nil {
|
||||
t.Fatalf("create service: %v", err)
|
||||
}
|
||||
|
||||
c := &Component{
|
||||
Name: "api", Service: "svc", Image: "img:v1",
|
||||
Restart: "unless-stopped", DesiredState: "running", ObservedState: "unknown",
|
||||
Routes: []Route{{Name: "rest", Port: 8443, Mode: "l4"}},
|
||||
}
|
||||
if err := CreateComponent(db, c); err != nil {
|
||||
t.Fatalf("create component: %v", err)
|
||||
}
|
||||
|
||||
// Delete service cascades to routes
|
||||
if err := DeleteService(db, "svc"); err != nil {
|
||||
t.Fatalf("delete service: %v", err)
|
||||
}
|
||||
|
||||
// Routes table should be empty
|
||||
ports, err := GetRouteHostPorts(db, "svc", "api")
|
||||
if err != nil {
|
||||
t.Fatalf("get routes after cascade: %v", err)
|
||||
}
|
||||
if len(ports) != 0 {
|
||||
t.Fatalf("routes should be empty after cascade, got %d", len(ports))
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvents(t *testing.T) {
|
||||
db := openTestDB(t)
|
||||
|
||||
|
||||
@@ -49,6 +49,9 @@ func (p *Podman) BuildRunArgs(spec ContainerSpec) []string {
|
||||
for _, vol := range spec.Volumes {
|
||||
args = append(args, "-v", vol)
|
||||
}
|
||||
for _, env := range spec.Env {
|
||||
args = append(args, "-e", env)
|
||||
}
|
||||
|
||||
args = append(args, spec.Image)
|
||||
args = append(args, spec.Cmd...)
|
||||
|
||||
@@ -16,6 +16,7 @@ type ContainerSpec struct {
|
||||
Ports []string // "host:container" port mappings
|
||||
Volumes []string // "host:container" volume mounts
|
||||
Cmd []string // command and arguments
|
||||
Env []string // environment variables (KEY=VALUE)
|
||||
}
|
||||
|
||||
// ContainerInfo describes the observed state of a running or stopped container.
|
||||
|
||||
@@ -76,6 +76,38 @@ func TestBuildRunArgs(t *testing.T) {
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("env vars", func(t *testing.T) {
|
||||
spec := ContainerSpec{
|
||||
Name: "test-app",
|
||||
Image: "img:latest",
|
||||
Env: []string{"PORT=12345", "PORT_GRPC=12346"},
|
||||
}
|
||||
requireEqualArgs(t, p.BuildRunArgs(spec), []string{
|
||||
"run", "-d", "--name", "test-app",
|
||||
"-e", "PORT=12345", "-e", "PORT_GRPC=12346",
|
||||
"img:latest",
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("full spec with env", func(t *testing.T) {
|
||||
spec := ContainerSpec{
|
||||
Name: "svc-api",
|
||||
Image: "img:latest",
|
||||
Network: "net",
|
||||
Ports: []string{"127.0.0.1:12345:8443"},
|
||||
Volumes: []string{"/srv:/srv"},
|
||||
Env: []string{"PORT=12345"},
|
||||
}
|
||||
requireEqualArgs(t, p.BuildRunArgs(spec), []string{
|
||||
"run", "-d", "--name", "svc-api",
|
||||
"--network", "net",
|
||||
"-p", "127.0.0.1:12345:8443",
|
||||
"-v", "/srv:/srv",
|
||||
"-e", "PORT=12345",
|
||||
"img:latest",
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("cmd after image", func(t *testing.T) {
|
||||
spec := ContainerSpec{
|
||||
Name: "test-app",
|
||||
|
||||
@@ -21,6 +21,15 @@ type ServiceDef struct {
|
||||
Components []ComponentDef `toml:"components"`
|
||||
}
|
||||
|
||||
// RouteDef describes a route for a component, used for automatic port
|
||||
// allocation and mc-proxy integration.
|
||||
type RouteDef struct {
|
||||
Name string `toml:"name,omitempty"`
|
||||
Port int `toml:"port"`
|
||||
Mode string `toml:"mode,omitempty"`
|
||||
Hostname string `toml:"hostname,omitempty"`
|
||||
}
|
||||
|
||||
// ComponentDef describes a single container component within a service.
|
||||
type ComponentDef struct {
|
||||
Name string `toml:"name"`
|
||||
@@ -31,6 +40,8 @@ type ComponentDef struct {
|
||||
Ports []string `toml:"ports,omitempty"`
|
||||
Volumes []string `toml:"volumes,omitempty"`
|
||||
Cmd []string `toml:"cmd,omitempty"`
|
||||
Routes []RouteDef `toml:"routes,omitempty"`
|
||||
Env []string `toml:"env,omitempty"`
|
||||
}
|
||||
|
||||
// Load reads and parses a TOML service definition file. If the active field
|
||||
@@ -129,11 +140,46 @@ func validate(def *ServiceDef) error {
|
||||
return fmt.Errorf("duplicate component name %q in service %q", c.Name, def.Name)
|
||||
}
|
||||
seen[c.Name] = true
|
||||
|
||||
if err := validateRoutes(c.Name, def.Name, c.Routes); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateRoutes checks that routes within a component are valid.
|
||||
func validateRoutes(compName, svcName string, routes []RouteDef) error {
|
||||
if len(routes) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
routeNames := make(map[string]bool)
|
||||
for i, r := range routes {
|
||||
if r.Port <= 0 {
|
||||
return fmt.Errorf("route port must be > 0 in component %q of service %q", compName, svcName)
|
||||
}
|
||||
if r.Mode != "" && r.Mode != "l4" && r.Mode != "l7" {
|
||||
return fmt.Errorf("route mode must be \"l4\" or \"l7\" in component %q of service %q", compName, svcName)
|
||||
}
|
||||
if len(routes) > 1 && r.Name == "" {
|
||||
return fmt.Errorf("route name is required when component has multiple routes in component %q of service %q", compName, svcName)
|
||||
}
|
||||
|
||||
// Use index-based key for unnamed single routes.
|
||||
key := r.Name
|
||||
if key == "" {
|
||||
key = fmt.Sprintf("_route_%d", i)
|
||||
}
|
||||
if routeNames[key] {
|
||||
return fmt.Errorf("duplicate route name %q in component %q of service %q", r.Name, compName, svcName)
|
||||
}
|
||||
routeNames[key] = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ToProto converts a ServiceDef to a proto ServiceSpec.
|
||||
func ToProto(def *ServiceDef) *mcpv1.ServiceSpec {
|
||||
spec := &mcpv1.ServiceSpec{
|
||||
@@ -142,7 +188,7 @@ func ToProto(def *ServiceDef) *mcpv1.ServiceSpec {
|
||||
}
|
||||
|
||||
for _, c := range def.Components {
|
||||
spec.Components = append(spec.Components, &mcpv1.ComponentSpec{
|
||||
cs := &mcpv1.ComponentSpec{
|
||||
Name: c.Name,
|
||||
Image: c.Image,
|
||||
Network: c.Network,
|
||||
@@ -151,8 +197,18 @@ func ToProto(def *ServiceDef) *mcpv1.ServiceSpec {
|
||||
Ports: c.Ports,
|
||||
Volumes: c.Volumes,
|
||||
Cmd: c.Cmd,
|
||||
Env: c.Env,
|
||||
}
|
||||
for _, r := range c.Routes {
|
||||
cs.Routes = append(cs.Routes, &mcpv1.RouteSpec{
|
||||
Name: r.Name,
|
||||
Port: int32(r.Port),
|
||||
Mode: r.Mode,
|
||||
Hostname: r.Hostname,
|
||||
})
|
||||
}
|
||||
spec.Components = append(spec.Components, cs)
|
||||
}
|
||||
|
||||
return spec
|
||||
}
|
||||
@@ -169,7 +225,7 @@ func FromProto(spec *mcpv1.ServiceSpec, node string) *ServiceDef {
|
||||
}
|
||||
|
||||
for _, c := range spec.GetComponents() {
|
||||
def.Components = append(def.Components, ComponentDef{
|
||||
cd := ComponentDef{
|
||||
Name: c.GetName(),
|
||||
Image: c.GetImage(),
|
||||
Network: c.GetNetwork(),
|
||||
@@ -178,8 +234,18 @@ func FromProto(spec *mcpv1.ServiceSpec, node string) *ServiceDef {
|
||||
Ports: c.GetPorts(),
|
||||
Volumes: c.GetVolumes(),
|
||||
Cmd: c.GetCmd(),
|
||||
Env: c.GetEnv(),
|
||||
}
|
||||
for _, r := range c.GetRoutes() {
|
||||
cd.Routes = append(cd.Routes, RouteDef{
|
||||
Name: r.GetName(),
|
||||
Port: int(r.GetPort()),
|
||||
Mode: r.GetMode(),
|
||||
Hostname: r.GetHostname(),
|
||||
})
|
||||
}
|
||||
def.Components = append(def.Components, cd)
|
||||
}
|
||||
|
||||
return def
|
||||
}
|
||||
|
||||
@@ -261,6 +261,203 @@ image = "img:latest"
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadWriteWithRoutes(t *testing.T) {
|
||||
def := &ServiceDef{
|
||||
Name: "myservice",
|
||||
Node: "rift",
|
||||
Active: boolPtr(true),
|
||||
Components: []ComponentDef{
|
||||
{
|
||||
Name: "api",
|
||||
Image: "img:latest",
|
||||
Network: "docker_default",
|
||||
Routes: []RouteDef{
|
||||
{Name: "rest", Port: 8443, Mode: "l7", Hostname: "api.example.com"},
|
||||
{Name: "grpc", Port: 9443, Mode: "l4"},
|
||||
},
|
||||
Env: []string{"FOO=bar"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
dir := t.TempDir()
|
||||
path := filepath.Join(dir, "myservice.toml")
|
||||
|
||||
if err := Write(path, def); err != nil {
|
||||
t.Fatalf("write: %v", err)
|
||||
}
|
||||
|
||||
got, err := Load(path)
|
||||
if err != nil {
|
||||
t.Fatalf("load: %v", err)
|
||||
}
|
||||
|
||||
if len(got.Components[0].Routes) != 2 {
|
||||
t.Fatalf("routes: got %d, want 2", len(got.Components[0].Routes))
|
||||
}
|
||||
|
||||
r := got.Components[0].Routes[0]
|
||||
if r.Name != "rest" || r.Port != 8443 || r.Mode != "l7" || r.Hostname != "api.example.com" {
|
||||
t.Fatalf("route[0] mismatch: %+v", r)
|
||||
}
|
||||
r2 := got.Components[0].Routes[1]
|
||||
if r2.Name != "grpc" || r2.Port != 9443 || r2.Mode != "l4" {
|
||||
t.Fatalf("route[1] mismatch: %+v", r2)
|
||||
}
|
||||
|
||||
if len(got.Components[0].Env) != 1 || got.Components[0].Env[0] != "FOO=bar" {
|
||||
t.Fatalf("env mismatch: %v", got.Components[0].Env)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouteValidation(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
def *ServiceDef
|
||||
wantErr string
|
||||
}{
|
||||
{
|
||||
name: "route missing port",
|
||||
def: &ServiceDef{
|
||||
Name: "svc", Node: "rift",
|
||||
Components: []ComponentDef{{
|
||||
Name: "api",
|
||||
Image: "img:v1",
|
||||
Routes: []RouteDef{{Name: "rest", Port: 0}},
|
||||
}},
|
||||
},
|
||||
wantErr: "route port must be > 0",
|
||||
},
|
||||
{
|
||||
name: "route invalid mode",
|
||||
def: &ServiceDef{
|
||||
Name: "svc", Node: "rift",
|
||||
Components: []ComponentDef{{
|
||||
Name: "api",
|
||||
Image: "img:v1",
|
||||
Routes: []RouteDef{{Port: 8443, Mode: "tcp"}},
|
||||
}},
|
||||
},
|
||||
wantErr: "route mode must be",
|
||||
},
|
||||
{
|
||||
name: "multi-route missing name",
|
||||
def: &ServiceDef{
|
||||
Name: "svc", Node: "rift",
|
||||
Components: []ComponentDef{{
|
||||
Name: "api",
|
||||
Image: "img:v1",
|
||||
Routes: []RouteDef{
|
||||
{Name: "rest", Port: 8443},
|
||||
{Port: 9443},
|
||||
},
|
||||
}},
|
||||
},
|
||||
wantErr: "route name is required when component has multiple routes",
|
||||
},
|
||||
{
|
||||
name: "duplicate route name",
|
||||
def: &ServiceDef{
|
||||
Name: "svc", Node: "rift",
|
||||
Components: []ComponentDef{{
|
||||
Name: "api",
|
||||
Image: "img:v1",
|
||||
Routes: []RouteDef{
|
||||
{Name: "rest", Port: 8443},
|
||||
{Name: "rest", Port: 9443},
|
||||
},
|
||||
}},
|
||||
},
|
||||
wantErr: "duplicate route name",
|
||||
},
|
||||
{
|
||||
name: "single unnamed route is valid",
|
||||
def: &ServiceDef{
|
||||
Name: "svc", Node: "rift",
|
||||
Components: []ComponentDef{{
|
||||
Name: "api",
|
||||
Image: "img:v1",
|
||||
Routes: []RouteDef{{Port: 8443}},
|
||||
}},
|
||||
},
|
||||
wantErr: "",
|
||||
},
|
||||
{
|
||||
name: "valid l4 mode",
|
||||
def: &ServiceDef{
|
||||
Name: "svc", Node: "rift",
|
||||
Components: []ComponentDef{{
|
||||
Name: "api",
|
||||
Image: "img:v1",
|
||||
Routes: []RouteDef{{Port: 8443, Mode: "l4"}},
|
||||
}},
|
||||
},
|
||||
wantErr: "",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := validate(tt.def)
|
||||
if tt.wantErr == "" {
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err == nil {
|
||||
t.Fatal("expected validation error")
|
||||
}
|
||||
if got := err.Error(); !strings.Contains(got, tt.wantErr) {
|
||||
t.Fatalf("error %q does not contain %q", got, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestProtoConversionWithRoutes(t *testing.T) {
|
||||
def := &ServiceDef{
|
||||
Name: "svc",
|
||||
Node: "rift",
|
||||
Active: boolPtr(true),
|
||||
Components: []ComponentDef{
|
||||
{
|
||||
Name: "api",
|
||||
Image: "img:v1",
|
||||
Routes: []RouteDef{
|
||||
{Name: "rest", Port: 8443, Mode: "l7", Hostname: "api.example.com"},
|
||||
{Name: "grpc", Port: 9443, Mode: "l4"},
|
||||
},
|
||||
Env: []string{"PORT_REST=12345", "PORT_GRPC=12346"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
spec := ToProto(def)
|
||||
if len(spec.Components[0].Routes) != 2 {
|
||||
t.Fatalf("proto routes: got %d, want 2", len(spec.Components[0].Routes))
|
||||
}
|
||||
r := spec.Components[0].Routes[0]
|
||||
if r.GetName() != "rest" || r.GetPort() != 8443 || r.GetMode() != "l7" || r.GetHostname() != "api.example.com" {
|
||||
t.Fatalf("proto route[0] mismatch: %+v", r)
|
||||
}
|
||||
if len(spec.Components[0].Env) != 2 {
|
||||
t.Fatalf("proto env: got %d, want 2", len(spec.Components[0].Env))
|
||||
}
|
||||
|
||||
got := FromProto(spec, "rift")
|
||||
if len(got.Components[0].Routes) != 2 {
|
||||
t.Fatalf("round-trip routes: got %d, want 2", len(got.Components[0].Routes))
|
||||
}
|
||||
gotR := got.Components[0].Routes[0]
|
||||
if gotR.Name != "rest" || gotR.Port != 8443 || gotR.Mode != "l7" || gotR.Hostname != "api.example.com" {
|
||||
t.Fatalf("round-trip route[0] mismatch: %+v", gotR)
|
||||
}
|
||||
if len(got.Components[0].Env) != 2 {
|
||||
t.Fatalf("round-trip env: got %d, want 2", len(got.Components[0].Env))
|
||||
}
|
||||
}
|
||||
|
||||
func TestProtoConversion(t *testing.T) {
|
||||
def := sampleDef()
|
||||
|
||||
|
||||
@@ -36,6 +36,13 @@ service McpAgentService {
|
||||
|
||||
// --- Service lifecycle ---
|
||||
|
||||
message RouteSpec {
|
||||
string name = 1; // route name (used for $PORT_<NAME>)
|
||||
int32 port = 2; // external port on mc-proxy
|
||||
string mode = 3; // "l4" or "l7"
|
||||
string hostname = 4; // optional public hostname override
|
||||
}
|
||||
|
||||
message ComponentSpec {
|
||||
string name = 1;
|
||||
string image = 2;
|
||||
@@ -45,6 +52,8 @@ message ComponentSpec {
|
||||
repeated string ports = 6;
|
||||
repeated string volumes = 7;
|
||||
repeated string cmd = 8;
|
||||
repeated RouteSpec routes = 9;
|
||||
repeated string env = 10;
|
||||
}
|
||||
|
||||
message ServiceSpec {
|
||||
|
||||
Reference in New Issue
Block a user