Add edge routing and health check RPCs (Phase 2)
New agent RPCs for v2 multi-node orchestration: - SetupEdgeRoute: provisions TLS cert from Metacrypt, resolves backend hostname to Tailnet IP, validates it's in 100.64.0.0/10, registers L7 route in mc-proxy. Rejects backend_tls=false. - RemoveEdgeRoute: removes mc-proxy route, cleans up TLS cert, removes registry entry. - ListEdgeRoutes: returns all edge routes with cert serial/expiry. - HealthCheck: returns agent health and container count. New database table (migration 4): edge_routes stores hostname, backend info, and cert paths for persistence across agent restarts. ProxyRouter gains CertPath/KeyPath helpers for consistent cert path construction. Security: - Backend hostname must resolve to a Tailnet IP (100.64.0.0/10) - backend_tls=false is rejected (no cleartext to backends) - Cert provisioning failure fails the setup (no route to missing cert) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
196
internal/agent/edge_rpc.go
Normal file
196
internal/agent/edge_rpc.go
Normal file
@@ -0,0 +1,196 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/x509"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||
mcproxy "git.wntrmute.dev/mc/mc-proxy/client/mcproxy"
|
||||
"git.wntrmute.dev/mc/mcp/internal/registry"
|
||||
)
|
||||
|
||||
// SetupEdgeRoute provisions a TLS cert and registers an mc-proxy route for a
|
||||
// public hostname. Called by the master on edge nodes.
|
||||
func (a *Agent) SetupEdgeRoute(ctx context.Context, req *mcpv1.SetupEdgeRouteRequest) (*mcpv1.SetupEdgeRouteResponse, error) {
|
||||
a.Logger.Info("SetupEdgeRoute", "hostname", req.GetHostname(),
|
||||
"backend_hostname", req.GetBackendHostname(), "backend_port", req.GetBackendPort())
|
||||
|
||||
// Validate required fields.
|
||||
if req.GetHostname() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "hostname is required")
|
||||
}
|
||||
if req.GetBackendHostname() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "backend_hostname is required")
|
||||
}
|
||||
if req.GetBackendPort() == 0 {
|
||||
return nil, status.Error(codes.InvalidArgument, "backend_port is required")
|
||||
}
|
||||
if !req.GetBackendTls() {
|
||||
return nil, status.Error(codes.InvalidArgument, "backend_tls must be true")
|
||||
}
|
||||
|
||||
if a.Proxy == nil {
|
||||
return nil, status.Error(codes.FailedPrecondition, "mc-proxy not configured")
|
||||
}
|
||||
|
||||
// Resolve the backend hostname to a Tailnet IP.
|
||||
ips, err := net.LookupHost(req.GetBackendHostname())
|
||||
if err != nil || len(ips) == 0 {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "cannot resolve backend_hostname %q: %v", req.GetBackendHostname(), err)
|
||||
}
|
||||
backendIP := ips[0]
|
||||
|
||||
// Validate the resolved IP is a Tailnet address (100.64.0.0/10).
|
||||
ip := net.ParseIP(backendIP)
|
||||
if ip == nil {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "resolved IP %q is not valid", backendIP)
|
||||
}
|
||||
_, tailnet, _ := net.ParseCIDR("100.64.0.0/10")
|
||||
if !tailnet.Contains(ip) {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "resolved IP %s is not a Tailnet address", backendIP)
|
||||
}
|
||||
|
||||
backend := fmt.Sprintf("%s:%d", backendIP, req.GetBackendPort())
|
||||
|
||||
// Provision TLS cert for the public hostname if cert provisioner is available.
|
||||
certPath := ""
|
||||
keyPath := ""
|
||||
if a.Certs != nil {
|
||||
if err := a.Certs.EnsureCert(ctx, req.GetHostname(), []string{req.GetHostname()}); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "provision cert for %s: %v", req.GetHostname(), err)
|
||||
}
|
||||
certPath = a.Proxy.CertPath(req.GetHostname())
|
||||
keyPath = a.Proxy.KeyPath(req.GetHostname())
|
||||
} else {
|
||||
// No cert provisioner — check if certs already exist on disk.
|
||||
certPath = a.Proxy.CertPath(req.GetHostname())
|
||||
keyPath = a.Proxy.KeyPath(req.GetHostname())
|
||||
if _, err := os.Stat(certPath); err != nil {
|
||||
return nil, status.Errorf(codes.FailedPrecondition, "no cert provisioner and cert not found at %s", certPath)
|
||||
}
|
||||
}
|
||||
|
||||
// Register the L7 route in mc-proxy.
|
||||
route := mcproxy.Route{
|
||||
Hostname: req.GetHostname(),
|
||||
Backend: backend,
|
||||
Mode: "l7",
|
||||
TLSCert: certPath,
|
||||
TLSKey: keyPath,
|
||||
BackendTLS: true,
|
||||
}
|
||||
if err := a.Proxy.AddRoute(ctx, ":443", route); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "add mc-proxy route: %v", err)
|
||||
}
|
||||
|
||||
// Persist the edge route in the registry.
|
||||
if err := registry.CreateEdgeRoute(a.DB, req.GetHostname(), req.GetBackendHostname(), int(req.GetBackendPort()), certPath, keyPath); err != nil {
|
||||
a.Logger.Warn("failed to persist edge route", "hostname", req.GetHostname(), "err", err)
|
||||
}
|
||||
|
||||
a.Logger.Info("edge route established",
|
||||
"hostname", req.GetHostname(), "backend", backend, "cert", certPath)
|
||||
|
||||
return &mcpv1.SetupEdgeRouteResponse{}, nil
|
||||
}
|
||||
|
||||
// RemoveEdgeRoute removes an mc-proxy route and cleans up the TLS cert for a
|
||||
// public hostname. Called by the master on edge nodes.
|
||||
func (a *Agent) RemoveEdgeRoute(ctx context.Context, req *mcpv1.RemoveEdgeRouteRequest) (*mcpv1.RemoveEdgeRouteResponse, error) {
|
||||
a.Logger.Info("RemoveEdgeRoute", "hostname", req.GetHostname())
|
||||
|
||||
if req.GetHostname() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "hostname is required")
|
||||
}
|
||||
|
||||
if a.Proxy == nil {
|
||||
return nil, status.Error(codes.FailedPrecondition, "mc-proxy not configured")
|
||||
}
|
||||
|
||||
// Remove the mc-proxy route.
|
||||
if err := a.Proxy.RemoveRoute(ctx, ":443", req.GetHostname()); err != nil {
|
||||
a.Logger.Warn("remove mc-proxy route", "hostname", req.GetHostname(), "err", err)
|
||||
// Continue — clean up cert and registry even if route removal fails.
|
||||
}
|
||||
|
||||
// Remove the TLS cert.
|
||||
if a.Certs != nil {
|
||||
if err := a.Certs.RemoveCert(req.GetHostname()); err != nil {
|
||||
a.Logger.Warn("remove cert", "hostname", req.GetHostname(), "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from registry.
|
||||
if err := registry.DeleteEdgeRoute(a.DB, req.GetHostname()); err != nil {
|
||||
a.Logger.Warn("delete edge route from registry", "hostname", req.GetHostname(), "err", err)
|
||||
}
|
||||
|
||||
a.Logger.Info("edge route removed", "hostname", req.GetHostname())
|
||||
return &mcpv1.RemoveEdgeRouteResponse{}, nil
|
||||
}
|
||||
|
||||
// ListEdgeRoutes returns all edge routes managed by this agent.
|
||||
func (a *Agent) ListEdgeRoutes(_ context.Context, _ *mcpv1.ListEdgeRoutesRequest) (*mcpv1.ListEdgeRoutesResponse, error) {
|
||||
a.Logger.Debug("ListEdgeRoutes called")
|
||||
|
||||
routes, err := registry.ListEdgeRoutes(a.DB)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "list edge routes: %v", err)
|
||||
}
|
||||
|
||||
resp := &mcpv1.ListEdgeRoutesResponse{}
|
||||
for _, r := range routes {
|
||||
er := &mcpv1.EdgeRoute{
|
||||
Hostname: r.Hostname,
|
||||
BackendHostname: r.BackendHostname,
|
||||
BackendPort: int32(r.BackendPort), //nolint:gosec // port is a small positive integer
|
||||
}
|
||||
|
||||
// Read cert metadata if available.
|
||||
if r.TLSCert != "" {
|
||||
if certData, readErr := os.ReadFile(r.TLSCert); readErr == nil { //nolint:gosec // path from registry, not user input
|
||||
if block, _ := pem.Decode(certData); block != nil {
|
||||
if cert, parseErr := x509.ParseCertificate(block.Bytes); parseErr == nil {
|
||||
er.CertSerial = cert.SerialNumber.String()
|
||||
er.CertExpires = cert.NotAfter.UTC().Format(time.RFC3339)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
resp.Routes = append(resp.Routes, er)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// HealthCheck returns the agent's health status. Called by the master when
|
||||
// heartbeats are missed.
|
||||
func (a *Agent) HealthCheck(_ context.Context, _ *mcpv1.HealthCheckRequest) (*mcpv1.HealthCheckResponse, error) {
|
||||
a.Logger.Debug("HealthCheck called")
|
||||
|
||||
st := "healthy"
|
||||
containers := int32(0)
|
||||
|
||||
// Count running containers if the runtime is available.
|
||||
if a.Runtime != nil {
|
||||
if list, err := a.Runtime.List(context.Background()); err == nil {
|
||||
containers = int32(len(list)) //nolint:gosec // container count is small
|
||||
} else {
|
||||
st = "degraded"
|
||||
}
|
||||
}
|
||||
|
||||
return &mcpv1.HealthCheckResponse{
|
||||
Status: st,
|
||||
Containers: containers,
|
||||
}, nil
|
||||
}
|
||||
@@ -48,6 +48,16 @@ func (p *ProxyRouter) Close() error {
|
||||
return p.client.Close()
|
||||
}
|
||||
|
||||
// CertPath returns the expected TLS certificate path for a given name.
|
||||
func (p *ProxyRouter) CertPath(name string) string {
|
||||
return filepath.Join(p.certDir, name+".pem")
|
||||
}
|
||||
|
||||
// KeyPath returns the expected TLS key path for a given name.
|
||||
func (p *ProxyRouter) KeyPath(name string) string {
|
||||
return filepath.Join(p.certDir, name+".key")
|
||||
}
|
||||
|
||||
// GetStatus returns the mc-proxy server status.
|
||||
func (p *ProxyRouter) GetStatus(ctx context.Context) (*mcproxy.Status, error) {
|
||||
if p == nil {
|
||||
|
||||
@@ -142,4 +142,18 @@ var migrations = []string{
|
||||
FOREIGN KEY (service, component) REFERENCES components(service, name) ON DELETE CASCADE
|
||||
);
|
||||
`,
|
||||
|
||||
// Migration 3: service comment
|
||||
`ALTER TABLE services ADD COLUMN comment TEXT NOT NULL DEFAULT '';`,
|
||||
|
||||
// Migration 4: edge routes (v2 — public routes managed by the master)
|
||||
`CREATE TABLE IF NOT EXISTS edge_routes (
|
||||
hostname TEXT NOT NULL PRIMARY KEY,
|
||||
backend_hostname TEXT NOT NULL,
|
||||
backend_port INTEGER NOT NULL,
|
||||
tls_cert TEXT NOT NULL DEFAULT '',
|
||||
tls_key TEXT NOT NULL DEFAULT '',
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);`,
|
||||
}
|
||||
|
||||
93
internal/registry/edge_routes.go
Normal file
93
internal/registry/edge_routes.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// EdgeRoute represents a public edge route managed by the master.
|
||||
type EdgeRoute struct {
|
||||
Hostname string
|
||||
BackendHostname string
|
||||
BackendPort int
|
||||
TLSCert string
|
||||
TLSKey string
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
}
|
||||
|
||||
// CreateEdgeRoute inserts or replaces an edge route.
|
||||
func CreateEdgeRoute(db *sql.DB, hostname, backendHostname string, backendPort int, tlsCert, tlsKey string) error {
|
||||
_, err := db.Exec(`
|
||||
INSERT INTO edge_routes (hostname, backend_hostname, backend_port, tls_cert, tls_key, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, datetime('now'), datetime('now'))
|
||||
ON CONFLICT(hostname) DO UPDATE SET
|
||||
backend_hostname = excluded.backend_hostname,
|
||||
backend_port = excluded.backend_port,
|
||||
tls_cert = excluded.tls_cert,
|
||||
tls_key = excluded.tls_key,
|
||||
updated_at = datetime('now')
|
||||
`, hostname, backendHostname, backendPort, tlsCert, tlsKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create edge route %s: %w", hostname, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetEdgeRoute returns a single edge route by hostname.
|
||||
func GetEdgeRoute(db *sql.DB, hostname string) (*EdgeRoute, error) {
|
||||
var r EdgeRoute
|
||||
var createdAt, updatedAt string
|
||||
err := db.QueryRow(`
|
||||
SELECT hostname, backend_hostname, backend_port, tls_cert, tls_key, created_at, updated_at
|
||||
FROM edge_routes WHERE hostname = ?
|
||||
`, hostname).Scan(&r.Hostname, &r.BackendHostname, &r.BackendPort, &r.TLSCert, &r.TLSKey, &createdAt, &updatedAt)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get edge route %s: %w", hostname, err)
|
||||
}
|
||||
r.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt)
|
||||
r.UpdatedAt, _ = time.Parse("2006-01-02 15:04:05", updatedAt)
|
||||
return &r, nil
|
||||
}
|
||||
|
||||
// ListEdgeRoutes returns all edge routes.
|
||||
func ListEdgeRoutes(db *sql.DB) ([]*EdgeRoute, error) {
|
||||
rows, err := db.Query(`
|
||||
SELECT hostname, backend_hostname, backend_port, tls_cert, tls_key, created_at, updated_at
|
||||
FROM edge_routes ORDER BY hostname
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list edge routes: %w", err)
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
|
||||
var routes []*EdgeRoute
|
||||
for rows.Next() {
|
||||
var r EdgeRoute
|
||||
var createdAt, updatedAt string
|
||||
if err := rows.Scan(&r.Hostname, &r.BackendHostname, &r.BackendPort, &r.TLSCert, &r.TLSKey, &createdAt, &updatedAt); err != nil {
|
||||
return nil, fmt.Errorf("scan edge route: %w", err)
|
||||
}
|
||||
r.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt)
|
||||
r.UpdatedAt, _ = time.Parse("2006-01-02 15:04:05", updatedAt)
|
||||
routes = append(routes, &r)
|
||||
}
|
||||
return routes, rows.Err()
|
||||
}
|
||||
|
||||
// DeleteEdgeRoute removes an edge route by hostname.
|
||||
func DeleteEdgeRoute(db *sql.DB, hostname string) error {
|
||||
result, err := db.Exec(`DELETE FROM edge_routes WHERE hostname = ?`, hostname)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete edge route %s: %w", hostname, err)
|
||||
}
|
||||
n, _ := result.RowsAffected()
|
||||
if n == 0 {
|
||||
return fmt.Errorf("edge route %s not found", hostname)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user