Phase B: Agent registers routes with mc-proxy on deploy
The agent connects to mc-proxy via Unix socket and automatically registers/removes routes during deploy and stop. This eliminates manual mcproxyctl usage or TOML editing. - New ProxyRouter abstraction wraps mc-proxy client library - Deploy: after container starts, registers routes with mc-proxy using host ports from the registry - Stop: removes routes from mc-proxy before stopping container - Config: [mcproxy] section with socket path and cert_dir - Nil-safe: if mc-proxy socket not configured, route registration is silently skipped (backward compatible) - L7 routes use certs from convention path (<cert_dir>/<service>.pem) - L4 routes use TLS passthrough (backend_tls=true) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -32,6 +32,7 @@ type Agent struct {
|
||||
Monitor *monitor.Monitor
|
||||
Logger *slog.Logger
|
||||
PortAlloc *PortAllocator
|
||||
Proxy *ProxyRouter
|
||||
}
|
||||
|
||||
// Run starts the agent: opens the database, sets up the gRPC server with
|
||||
@@ -51,6 +52,11 @@ func Run(cfg *config.AgentConfig) error {
|
||||
|
||||
mon := monitor.New(db, rt, cfg.Monitor, cfg.Agent.NodeName, logger)
|
||||
|
||||
proxy, err := NewProxyRouter(cfg.MCProxy.Socket, cfg.MCProxy.CertDir, logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("connect to mc-proxy: %w", err)
|
||||
}
|
||||
|
||||
a := &Agent{
|
||||
Config: cfg,
|
||||
DB: db,
|
||||
@@ -58,6 +64,7 @@ func Run(cfg *config.AgentConfig) error {
|
||||
Monitor: mon,
|
||||
Logger: logger,
|
||||
PortAlloc: NewPortAllocator(),
|
||||
Proxy: proxy,
|
||||
}
|
||||
|
||||
tlsCert, err := tls.LoadX509KeyPair(cfg.Server.TLSCert, cfg.Server.TLSKey)
|
||||
@@ -108,6 +115,7 @@ func Run(cfg *config.AgentConfig) error {
|
||||
logger.Info("shutting down")
|
||||
mon.Stop()
|
||||
server.GracefulStop()
|
||||
_ = proxy.Close()
|
||||
return nil
|
||||
case err := <-errCh:
|
||||
mon.Stop()
|
||||
|
||||
@@ -146,6 +146,16 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
|
||||
}
|
||||
}
|
||||
|
||||
// Register routes with mc-proxy after the container is running.
|
||||
if len(regRoutes) > 0 && a.Proxy != nil {
|
||||
hostPorts, err := registry.GetRouteHostPorts(a.DB, serviceName, compName)
|
||||
if err != nil {
|
||||
a.Logger.Warn("failed to get host ports for route registration", "service", serviceName, "component", compName, "err", err)
|
||||
} else if err := a.Proxy.RegisterRoutes(ctx, serviceName, regRoutes, hostPorts); err != nil {
|
||||
a.Logger.Warn("failed to register routes with mc-proxy", "service", serviceName, "component", compName, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := registry.UpdateComponentState(a.DB, serviceName, compName, "running", "running"); err != nil {
|
||||
a.Logger.Warn("failed to update component state", "service", serviceName, "component", compName, "err", err)
|
||||
}
|
||||
|
||||
@@ -30,6 +30,13 @@ func (a *Agent) StopService(ctx context.Context, req *mcpv1.StopServiceRequest)
|
||||
containerName := ContainerNameFor(req.GetName(), c.Name)
|
||||
r := &mcpv1.ComponentResult{Name: c.Name, Success: true}
|
||||
|
||||
// Remove routes from mc-proxy before stopping the container.
|
||||
if len(c.Routes) > 0 && a.Proxy != nil {
|
||||
if err := a.Proxy.RemoveRoutes(ctx, req.GetName(), c.Routes); err != nil {
|
||||
a.Logger.Warn("failed to remove routes", "service", req.GetName(), "component", c.Name, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := a.Runtime.Stop(ctx, containerName); err != nil {
|
||||
a.Logger.Info("stop container (ignored)", "container", containerName, "error", err)
|
||||
}
|
||||
|
||||
138
internal/agent/proxy.go
Normal file
138
internal/agent/proxy.go
Normal file
@@ -0,0 +1,138 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"path/filepath"
|
||||
|
||||
"git.wntrmute.dev/kyle/mc-proxy/client/mcproxy"
|
||||
"git.wntrmute.dev/kyle/mcp/internal/registry"
|
||||
)
|
||||
|
||||
// ProxyRouter registers and removes routes with mc-proxy.
|
||||
// If the mc-proxy socket is not configured, it logs and returns nil
|
||||
// (route registration is optional).
|
||||
type ProxyRouter struct {
|
||||
client *mcproxy.Client
|
||||
certDir string
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// NewProxyRouter connects to mc-proxy via Unix socket. Returns nil
|
||||
// if socketPath is empty (route registration disabled).
|
||||
func NewProxyRouter(socketPath, certDir string, logger *slog.Logger) (*ProxyRouter, error) {
|
||||
if socketPath == "" {
|
||||
logger.Info("mc-proxy socket not configured, route registration disabled")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
client, err := mcproxy.Dial(socketPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connect to mc-proxy at %s: %w", socketPath, err)
|
||||
}
|
||||
|
||||
logger.Info("connected to mc-proxy", "socket", socketPath)
|
||||
return &ProxyRouter{
|
||||
client: client,
|
||||
certDir: certDir,
|
||||
logger: logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close closes the mc-proxy connection.
|
||||
func (p *ProxyRouter) Close() error {
|
||||
if p == nil || p.client == nil {
|
||||
return nil
|
||||
}
|
||||
return p.client.Close()
|
||||
}
|
||||
|
||||
// RegisterRoutes registers all routes for a service component with mc-proxy.
|
||||
// It uses the assigned host ports from the registry.
|
||||
func (p *ProxyRouter) RegisterRoutes(ctx context.Context, serviceName string, routes []registry.Route, hostPorts map[string]int) error {
|
||||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, r := range routes {
|
||||
hostPort, ok := hostPorts[r.Name]
|
||||
if !ok || hostPort == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
hostname := r.Hostname
|
||||
if hostname == "" {
|
||||
hostname = serviceName + ".svc.mcp.metacircular.net"
|
||||
}
|
||||
|
||||
listenerAddr := listenerForMode(r.Mode, r.Port)
|
||||
backend := fmt.Sprintf("127.0.0.1:%d", hostPort)
|
||||
|
||||
route := mcproxy.Route{
|
||||
Hostname: hostname,
|
||||
Backend: backend,
|
||||
Mode: r.Mode,
|
||||
BackendTLS: r.Mode == "l4", // L4 passthrough: backend handles TLS. L7: mc-proxy terminates.
|
||||
}
|
||||
|
||||
// L7 routes need TLS cert/key for mc-proxy to terminate TLS.
|
||||
if r.Mode == "l7" {
|
||||
route.TLSCert = filepath.Join(p.certDir, serviceName+".pem")
|
||||
route.TLSKey = filepath.Join(p.certDir, serviceName+".key")
|
||||
}
|
||||
|
||||
p.logger.Info("registering route",
|
||||
"service", serviceName,
|
||||
"hostname", hostname,
|
||||
"listener", listenerAddr,
|
||||
"backend", backend,
|
||||
"mode", r.Mode,
|
||||
)
|
||||
|
||||
if err := p.client.AddRoute(ctx, listenerAddr, route); err != nil {
|
||||
return fmt.Errorf("register route %s on %s: %w", hostname, listenerAddr, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveRoutes removes all routes for a service component from mc-proxy.
|
||||
func (p *ProxyRouter) RemoveRoutes(ctx context.Context, serviceName string, routes []registry.Route) error {
|
||||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, r := range routes {
|
||||
hostname := r.Hostname
|
||||
if hostname == "" {
|
||||
hostname = serviceName + ".svc.mcp.metacircular.net"
|
||||
}
|
||||
|
||||
listenerAddr := listenerForMode(r.Mode, r.Port)
|
||||
|
||||
p.logger.Info("removing route",
|
||||
"service", serviceName,
|
||||
"hostname", hostname,
|
||||
"listener", listenerAddr,
|
||||
)
|
||||
|
||||
if err := p.client.RemoveRoute(ctx, listenerAddr, hostname); err != nil {
|
||||
// Log but don't fail — the route may already be gone.
|
||||
p.logger.Warn("failed to remove route",
|
||||
"hostname", hostname,
|
||||
"listener", listenerAddr,
|
||||
"err", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// listenerForMode returns the mc-proxy listener address for a given
|
||||
// route mode and external port.
|
||||
func listenerForMode(mode string, port int) string {
|
||||
return fmt.Sprintf(":%d", port)
|
||||
}
|
||||
57
internal/agent/proxy_test.go
Normal file
57
internal/agent/proxy_test.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.wntrmute.dev/kyle/mcp/internal/registry"
|
||||
)
|
||||
|
||||
func TestListenerForMode(t *testing.T) {
|
||||
tests := []struct {
|
||||
mode string
|
||||
port int
|
||||
want string
|
||||
}{
|
||||
{"l4", 8443, ":8443"},
|
||||
{"l7", 443, ":443"},
|
||||
{"l4", 9443, ":9443"},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
got := listenerForMode(tt.mode, tt.port)
|
||||
if got != tt.want {
|
||||
t.Errorf("listenerForMode(%q, %d) = %q, want %q", tt.mode, tt.port, got, tt.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNilProxyRouterIsNoop(t *testing.T) {
|
||||
var p *ProxyRouter
|
||||
|
||||
// All methods should return nil on a nil ProxyRouter.
|
||||
if err := p.RegisterRoutes(nil, "svc", nil, nil); err != nil {
|
||||
t.Errorf("RegisterRoutes on nil: %v", err)
|
||||
}
|
||||
if err := p.RemoveRoutes(nil, "svc", nil); err != nil {
|
||||
t.Errorf("RemoveRoutes on nil: %v", err)
|
||||
}
|
||||
if err := p.Close(); err != nil {
|
||||
t.Errorf("Close on nil: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegisterRoutesSkipsZeroHostPort(t *testing.T) {
|
||||
// A nil ProxyRouter should be a no-op, so this tests the skip logic
|
||||
// indirectly. With a nil proxy, RegisterRoutes returns nil even
|
||||
// with routes that have zero host ports.
|
||||
var p *ProxyRouter
|
||||
|
||||
routes := []registry.Route{
|
||||
{Name: "rest", Port: 8443, Mode: "l4"},
|
||||
}
|
||||
hostPorts := map[string]int{"rest": 0}
|
||||
|
||||
if err := p.RegisterRoutes(nil, "svc", routes, hostPorts); err != nil {
|
||||
t.Errorf("RegisterRoutes: %v", err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user