Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 714320c018 | |||
| fa8ba6fac1 | |||
| f66758b92b | |||
| 09d0d197c3 | |||
| 52914d50b0 | |||
| bb4bee51ba | |||
| 4ac8a6d60b | |||
| d8f45ca520 | |||
| 95f86157b4 | |||
| 93e26d3789 | |||
| 3d2edb7c26 |
87
cmd/mcp/dns.go
Normal file
87
cmd/mcp/dns.go
Normal file
@@ -0,0 +1,87 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||
"git.wntrmute.dev/mc/mcp/internal/config"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
func dnsCmd() *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "dns",
|
||||
Short: "List all DNS zones and records from MCNS",
|
||||
RunE: runDNS,
|
||||
}
|
||||
}
|
||||
|
||||
func runDNS(_ *cobra.Command, _ []string) error {
|
||||
cfg, err := config.LoadCLIConfig(cfgPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
// DNS is centralized — query the first reachable agent.
|
||||
resp, nodeName, err := queryDNS(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(resp.GetZones()) == 0 {
|
||||
fmt.Println("no DNS zones configured")
|
||||
return nil
|
||||
}
|
||||
|
||||
_ = nodeName
|
||||
for i, zone := range resp.GetZones() {
|
||||
if i > 0 {
|
||||
fmt.Println()
|
||||
}
|
||||
fmt.Printf("ZONE: %s\n", zone.GetName())
|
||||
|
||||
if len(zone.GetRecords()) == 0 {
|
||||
fmt.Println(" (no records)")
|
||||
continue
|
||||
}
|
||||
|
||||
w := newTable()
|
||||
_, _ = fmt.Fprintln(w, " NAME\tTYPE\tVALUE\tTTL")
|
||||
for _, r := range zone.GetRecords() {
|
||||
_, _ = fmt.Fprintf(w, " %s\t%s\t%s\t%d\n",
|
||||
r.GetName(), r.GetType(), r.GetValue(), r.GetTtl())
|
||||
}
|
||||
_ = w.Flush()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// queryDNS tries each configured agent and returns the first successful
|
||||
// DNS listing. DNS is centralized so any agent with MCNS configured works.
|
||||
func queryDNS(cfg *config.CLIConfig) (*mcpv1.ListDNSRecordsResponse, string, error) {
|
||||
for _, node := range cfg.Nodes {
|
||||
client, conn, err := dialAgent(node.Address, cfg)
|
||||
if err != nil {
|
||||
_, _ = fmt.Fprintf(os.Stderr, "warning: %s: %v\n", node.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
resp, err := client.ListDNSRecords(ctx, &mcpv1.ListDNSRecordsRequest{})
|
||||
cancel()
|
||||
_ = conn.Close()
|
||||
|
||||
if err != nil {
|
||||
_, _ = fmt.Fprintf(os.Stderr, "warning: %s: list DNS: %v\n", node.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
return resp, node.Name, nil
|
||||
}
|
||||
|
||||
return nil, "", fmt.Errorf("no reachable agent with DNS configured")
|
||||
}
|
||||
@@ -14,8 +14,8 @@ import (
|
||||
|
||||
func stopCmd() *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "stop <service>",
|
||||
Short: "Stop all components, set active=false",
|
||||
Use: "stop <service>[/<component>]",
|
||||
Short: "Stop components (or all), set active=false",
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
cfg, err := config.LoadCLIConfig(cfgPath)
|
||||
@@ -23,7 +23,7 @@ func stopCmd() *cobra.Command {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
serviceName := args[0]
|
||||
serviceName, component := parseServiceArg(args[0])
|
||||
defPath := filepath.Join(cfg.Services.Dir, serviceName+".toml")
|
||||
|
||||
def, err := servicedef.Load(defPath)
|
||||
@@ -31,11 +31,14 @@ func stopCmd() *cobra.Command {
|
||||
return fmt.Errorf("load service def: %w", err)
|
||||
}
|
||||
|
||||
// Only flip active=false when stopping the whole service.
|
||||
if component == "" {
|
||||
active := false
|
||||
def.Active = &active
|
||||
if err := servicedef.Write(defPath, def); err != nil {
|
||||
return fmt.Errorf("write service def: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
address, err := findNodeAddress(cfg, def.Node)
|
||||
if err != nil {
|
||||
@@ -50,6 +53,7 @@ func stopCmd() *cobra.Command {
|
||||
|
||||
resp, err := client.StopService(context.Background(), &mcpv1.StopServiceRequest{
|
||||
Name: serviceName,
|
||||
Component: component,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("stop service: %w", err)
|
||||
@@ -63,8 +67,8 @@ func stopCmd() *cobra.Command {
|
||||
|
||||
func startCmd() *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "start <service>",
|
||||
Short: "Start all components, set active=true",
|
||||
Use: "start <service>[/<component>]",
|
||||
Short: "Start components (or all), set active=true",
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
cfg, err := config.LoadCLIConfig(cfgPath)
|
||||
@@ -72,7 +76,7 @@ func startCmd() *cobra.Command {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
serviceName := args[0]
|
||||
serviceName, component := parseServiceArg(args[0])
|
||||
defPath := filepath.Join(cfg.Services.Dir, serviceName+".toml")
|
||||
|
||||
def, err := servicedef.Load(defPath)
|
||||
@@ -80,11 +84,14 @@ func startCmd() *cobra.Command {
|
||||
return fmt.Errorf("load service def: %w", err)
|
||||
}
|
||||
|
||||
// Only flip active=true when starting the whole service.
|
||||
if component == "" {
|
||||
active := true
|
||||
def.Active = &active
|
||||
if err := servicedef.Write(defPath, def); err != nil {
|
||||
return fmt.Errorf("write service def: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
address, err := findNodeAddress(cfg, def.Node)
|
||||
if err != nil {
|
||||
@@ -99,6 +106,7 @@ func startCmd() *cobra.Command {
|
||||
|
||||
resp, err := client.StartService(context.Background(), &mcpv1.StartServiceRequest{
|
||||
Name: serviceName,
|
||||
Component: component,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("start service: %w", err)
|
||||
@@ -112,8 +120,8 @@ func startCmd() *cobra.Command {
|
||||
|
||||
func restartCmd() *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "restart <service>",
|
||||
Short: "Restart all components",
|
||||
Use: "restart <service>[/<component>]",
|
||||
Short: "Restart components (or all)",
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
cfg, err := config.LoadCLIConfig(cfgPath)
|
||||
@@ -121,7 +129,7 @@ func restartCmd() *cobra.Command {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
serviceName := args[0]
|
||||
serviceName, component := parseServiceArg(args[0])
|
||||
defPath := filepath.Join(cfg.Services.Dir, serviceName+".toml")
|
||||
|
||||
def, err := servicedef.Load(defPath)
|
||||
@@ -142,6 +150,7 @@ func restartCmd() *cobra.Command {
|
||||
|
||||
resp, err := client.RestartService(context.Background(), &mcpv1.RestartServiceRequest{
|
||||
Name: serviceName,
|
||||
Component: component,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("restart service: %w", err)
|
||||
|
||||
@@ -52,6 +52,8 @@ func main() {
|
||||
root.AddCommand(purgeCmd())
|
||||
root.AddCommand(logsCmd())
|
||||
root.AddCommand(editCmd())
|
||||
root.AddCommand(dnsCmd())
|
||||
root.AddCommand(routeCmd())
|
||||
|
||||
if err := root.Execute(); err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
225
cmd/mcp/route.go
Normal file
225
cmd/mcp/route.go
Normal file
@@ -0,0 +1,225 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||
"git.wntrmute.dev/mc/mcp/internal/config"
|
||||
)
|
||||
|
||||
func routeCmd() *cobra.Command {
|
||||
var nodeName string
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "route",
|
||||
Short: "Manage mc-proxy routes",
|
||||
}
|
||||
|
||||
list := &cobra.Command{
|
||||
Use: "list",
|
||||
Short: "List mc-proxy routes",
|
||||
RunE: func(_ *cobra.Command, _ []string) error {
|
||||
return runRouteList(nodeName)
|
||||
},
|
||||
}
|
||||
|
||||
var (
|
||||
routeMode string
|
||||
backendTLS bool
|
||||
tlsCert string
|
||||
tlsKey string
|
||||
)
|
||||
|
||||
add := &cobra.Command{
|
||||
Use: "add <listener> <hostname> <backend>",
|
||||
Short: "Add a route to mc-proxy",
|
||||
Long: "Add a route. Example: mcp route add -n rift :443 mcq.svc.mcp.metacircular.net 127.0.0.1:48080 --mode l7 --tls-cert /srv/mc-proxy/certs/mcq.pem --tls-key /srv/mc-proxy/certs/mcq.key",
|
||||
Args: cobra.ExactArgs(3),
|
||||
RunE: func(_ *cobra.Command, args []string) error {
|
||||
return runRouteAdd(nodeName, args, routeMode, backendTLS, tlsCert, tlsKey)
|
||||
},
|
||||
}
|
||||
add.Flags().StringVar(&routeMode, "mode", "l4", "route mode (l4 or l7)")
|
||||
add.Flags().BoolVar(&backendTLS, "backend-tls", false, "re-encrypt traffic to backend")
|
||||
add.Flags().StringVar(&tlsCert, "tls-cert", "", "path to TLS cert on the node (required for l7)")
|
||||
add.Flags().StringVar(&tlsKey, "tls-key", "", "path to TLS key on the node (required for l7)")
|
||||
|
||||
remove := &cobra.Command{
|
||||
Use: "remove <listener> <hostname>",
|
||||
Short: "Remove a route from mc-proxy",
|
||||
Long: "Remove a route. Example: mcp route remove -n rift :443 mcq.metacircular.net",
|
||||
Args: cobra.ExactArgs(2),
|
||||
RunE: func(_ *cobra.Command, args []string) error {
|
||||
return runRouteRemove(nodeName, args)
|
||||
},
|
||||
}
|
||||
|
||||
cmd.PersistentFlags().StringVarP(&nodeName, "node", "n", "", "target node (required)")
|
||||
|
||||
cmd.AddCommand(list, add, remove)
|
||||
return cmd
|
||||
}
|
||||
|
||||
func runRouteList(nodeName string) error {
|
||||
if nodeName == "" {
|
||||
return runRouteListAll()
|
||||
}
|
||||
|
||||
cfg, err := config.LoadCLIConfig(cfgPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
address, err := findNodeAddress(cfg, nodeName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client, conn, err := dialAgent(address, cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dial agent: %w", err)
|
||||
}
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
resp, err := client.ListProxyRoutes(ctx, &mcpv1.ListProxyRoutesRequest{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("list routes: %w", err)
|
||||
}
|
||||
|
||||
printRoutes(nodeName, resp)
|
||||
return nil
|
||||
}
|
||||
|
||||
func runRouteListAll() error {
|
||||
first := true
|
||||
return forEachNode(func(node config.NodeConfig, client mcpv1.McpAgentServiceClient) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
resp, err := client.ListProxyRoutes(ctx, &mcpv1.ListProxyRoutesRequest{})
|
||||
if err != nil {
|
||||
_, _ = fmt.Fprintf(os.Stderr, "warning: %s: list routes: %v\n", node.Name, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if !first {
|
||||
fmt.Println()
|
||||
}
|
||||
first = false
|
||||
|
||||
printRoutes(node.Name, resp)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func printRoutes(nodeName string, resp *mcpv1.ListProxyRoutesResponse) {
|
||||
fmt.Printf("NODE: %s\n", nodeName)
|
||||
fmt.Printf("mc-proxy %s\n", resp.GetVersion())
|
||||
if resp.GetStartedAt() != nil {
|
||||
uptime := time.Since(resp.GetStartedAt().AsTime()).Truncate(time.Second)
|
||||
fmt.Printf("uptime: %s\n", uptime)
|
||||
}
|
||||
fmt.Printf("connections: %d\n", resp.GetTotalConnections())
|
||||
fmt.Println()
|
||||
|
||||
for _, ls := range resp.GetListeners() {
|
||||
fmt.Printf(" %s routes=%d active=%d\n",
|
||||
ls.GetAddr(), ls.GetRouteCount(), ls.GetActiveConnections())
|
||||
for _, r := range ls.GetRoutes() {
|
||||
mode := r.GetMode()
|
||||
if mode == "" {
|
||||
mode = "l4"
|
||||
}
|
||||
extra := ""
|
||||
if r.GetBackendTls() {
|
||||
extra = " (re-encrypt)"
|
||||
}
|
||||
fmt.Printf(" %s %s → %s%s\n", mode, r.GetHostname(), r.GetBackend(), extra)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func runRouteAdd(nodeName string, args []string, mode string, backendTLS bool, tlsCert, tlsKey string) error {
|
||||
if nodeName == "" {
|
||||
return fmt.Errorf("--node is required")
|
||||
}
|
||||
|
||||
cfg, err := config.LoadCLIConfig(cfgPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
address, err := findNodeAddress(cfg, nodeName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client, conn, err := dialAgent(address, cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dial agent: %w", err)
|
||||
}
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_, err = client.AddProxyRoute(ctx, &mcpv1.AddProxyRouteRequest{
|
||||
ListenerAddr: args[0],
|
||||
Hostname: args[1],
|
||||
Backend: args[2],
|
||||
Mode: mode,
|
||||
BackendTls: backendTLS,
|
||||
TlsCert: tlsCert,
|
||||
TlsKey: tlsKey,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("add route: %w", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Added route: %s %s → %s on %s (%s)\n", mode, args[1], args[2], args[0], nodeName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func runRouteRemove(nodeName string, args []string) error {
|
||||
if nodeName == "" {
|
||||
return fmt.Errorf("--node is required")
|
||||
}
|
||||
|
||||
cfg, err := config.LoadCLIConfig(cfgPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
address, err := findNodeAddress(cfg, nodeName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client, conn, err := dialAgent(address, cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dial agent: %w", err)
|
||||
}
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_, err = client.RemoveProxyRoute(ctx, &mcpv1.RemoveProxyRouteRequest{
|
||||
ListenerAddr: args[0],
|
||||
Hostname: args[1],
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("remove route: %w", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Removed route: %s from %s (%s)\n", args[1], args[0], nodeName)
|
||||
return nil
|
||||
}
|
||||
@@ -10,7 +10,7 @@
|
||||
let
|
||||
system = "x86_64-linux";
|
||||
pkgs = nixpkgs.legacyPackages.${system};
|
||||
version = pkgs.lib.removePrefix "v" (self.gitDescribe or self.shortRev or self.dirtyShortRev or "unknown");
|
||||
version = "0.8.3";
|
||||
in
|
||||
{
|
||||
packages.${system} = {
|
||||
|
||||
1365
gen/mcp/v1/mcp.pb.go
1365
gen/mcp/v1/mcp.pb.go
File diff suppressed because it is too large
Load Diff
@@ -33,6 +33,14 @@ const (
|
||||
McpAgentService_PushFile_FullMethodName = "/mcp.v1.McpAgentService/PushFile"
|
||||
McpAgentService_PullFile_FullMethodName = "/mcp.v1.McpAgentService/PullFile"
|
||||
McpAgentService_NodeStatus_FullMethodName = "/mcp.v1.McpAgentService/NodeStatus"
|
||||
McpAgentService_ListDNSRecords_FullMethodName = "/mcp.v1.McpAgentService/ListDNSRecords"
|
||||
McpAgentService_ListProxyRoutes_FullMethodName = "/mcp.v1.McpAgentService/ListProxyRoutes"
|
||||
McpAgentService_AddProxyRoute_FullMethodName = "/mcp.v1.McpAgentService/AddProxyRoute"
|
||||
McpAgentService_RemoveProxyRoute_FullMethodName = "/mcp.v1.McpAgentService/RemoveProxyRoute"
|
||||
McpAgentService_SetupEdgeRoute_FullMethodName = "/mcp.v1.McpAgentService/SetupEdgeRoute"
|
||||
McpAgentService_RemoveEdgeRoute_FullMethodName = "/mcp.v1.McpAgentService/RemoveEdgeRoute"
|
||||
McpAgentService_ListEdgeRoutes_FullMethodName = "/mcp.v1.McpAgentService/ListEdgeRoutes"
|
||||
McpAgentService_HealthCheck_FullMethodName = "/mcp.v1.McpAgentService/HealthCheck"
|
||||
McpAgentService_Logs_FullMethodName = "/mcp.v1.McpAgentService/Logs"
|
||||
)
|
||||
|
||||
@@ -61,6 +69,18 @@ type McpAgentServiceClient interface {
|
||||
PullFile(ctx context.Context, in *PullFileRequest, opts ...grpc.CallOption) (*PullFileResponse, error)
|
||||
// Node
|
||||
NodeStatus(ctx context.Context, in *NodeStatusRequest, opts ...grpc.CallOption) (*NodeStatusResponse, error)
|
||||
// DNS (query MCNS)
|
||||
ListDNSRecords(ctx context.Context, in *ListDNSRecordsRequest, opts ...grpc.CallOption) (*ListDNSRecordsResponse, error)
|
||||
// Proxy routes (query mc-proxy)
|
||||
ListProxyRoutes(ctx context.Context, in *ListProxyRoutesRequest, opts ...grpc.CallOption) (*ListProxyRoutesResponse, error)
|
||||
AddProxyRoute(ctx context.Context, in *AddProxyRouteRequest, opts ...grpc.CallOption) (*AddProxyRouteResponse, error)
|
||||
RemoveProxyRoute(ctx context.Context, in *RemoveProxyRouteRequest, opts ...grpc.CallOption) (*RemoveProxyRouteResponse, error)
|
||||
// Edge routing (called by master on edge nodes)
|
||||
SetupEdgeRoute(ctx context.Context, in *SetupEdgeRouteRequest, opts ...grpc.CallOption) (*SetupEdgeRouteResponse, error)
|
||||
RemoveEdgeRoute(ctx context.Context, in *RemoveEdgeRouteRequest, opts ...grpc.CallOption) (*RemoveEdgeRouteResponse, error)
|
||||
ListEdgeRoutes(ctx context.Context, in *ListEdgeRoutesRequest, opts ...grpc.CallOption) (*ListEdgeRoutesResponse, error)
|
||||
// Health (called by master on missed heartbeats)
|
||||
HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error)
|
||||
// Logs
|
||||
Logs(ctx context.Context, in *LogsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[LogsResponse], error)
|
||||
}
|
||||
@@ -213,6 +233,86 @@ func (c *mcpAgentServiceClient) NodeStatus(ctx context.Context, in *NodeStatusRe
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mcpAgentServiceClient) ListDNSRecords(ctx context.Context, in *ListDNSRecordsRequest, opts ...grpc.CallOption) (*ListDNSRecordsResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(ListDNSRecordsResponse)
|
||||
err := c.cc.Invoke(ctx, McpAgentService_ListDNSRecords_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mcpAgentServiceClient) ListProxyRoutes(ctx context.Context, in *ListProxyRoutesRequest, opts ...grpc.CallOption) (*ListProxyRoutesResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(ListProxyRoutesResponse)
|
||||
err := c.cc.Invoke(ctx, McpAgentService_ListProxyRoutes_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mcpAgentServiceClient) AddProxyRoute(ctx context.Context, in *AddProxyRouteRequest, opts ...grpc.CallOption) (*AddProxyRouteResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(AddProxyRouteResponse)
|
||||
err := c.cc.Invoke(ctx, McpAgentService_AddProxyRoute_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mcpAgentServiceClient) RemoveProxyRoute(ctx context.Context, in *RemoveProxyRouteRequest, opts ...grpc.CallOption) (*RemoveProxyRouteResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(RemoveProxyRouteResponse)
|
||||
err := c.cc.Invoke(ctx, McpAgentService_RemoveProxyRoute_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mcpAgentServiceClient) SetupEdgeRoute(ctx context.Context, in *SetupEdgeRouteRequest, opts ...grpc.CallOption) (*SetupEdgeRouteResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(SetupEdgeRouteResponse)
|
||||
err := c.cc.Invoke(ctx, McpAgentService_SetupEdgeRoute_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mcpAgentServiceClient) RemoveEdgeRoute(ctx context.Context, in *RemoveEdgeRouteRequest, opts ...grpc.CallOption) (*RemoveEdgeRouteResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(RemoveEdgeRouteResponse)
|
||||
err := c.cc.Invoke(ctx, McpAgentService_RemoveEdgeRoute_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mcpAgentServiceClient) ListEdgeRoutes(ctx context.Context, in *ListEdgeRoutesRequest, opts ...grpc.CallOption) (*ListEdgeRoutesResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(ListEdgeRoutesResponse)
|
||||
err := c.cc.Invoke(ctx, McpAgentService_ListEdgeRoutes_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mcpAgentServiceClient) HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(HealthCheckResponse)
|
||||
err := c.cc.Invoke(ctx, McpAgentService_HealthCheck_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mcpAgentServiceClient) Logs(ctx context.Context, in *LogsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[LogsResponse], error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
stream, err := c.cc.NewStream(ctx, &McpAgentService_ServiceDesc.Streams[0], McpAgentService_Logs_FullMethodName, cOpts...)
|
||||
@@ -257,6 +357,18 @@ type McpAgentServiceServer interface {
|
||||
PullFile(context.Context, *PullFileRequest) (*PullFileResponse, error)
|
||||
// Node
|
||||
NodeStatus(context.Context, *NodeStatusRequest) (*NodeStatusResponse, error)
|
||||
// DNS (query MCNS)
|
||||
ListDNSRecords(context.Context, *ListDNSRecordsRequest) (*ListDNSRecordsResponse, error)
|
||||
// Proxy routes (query mc-proxy)
|
||||
ListProxyRoutes(context.Context, *ListProxyRoutesRequest) (*ListProxyRoutesResponse, error)
|
||||
AddProxyRoute(context.Context, *AddProxyRouteRequest) (*AddProxyRouteResponse, error)
|
||||
RemoveProxyRoute(context.Context, *RemoveProxyRouteRequest) (*RemoveProxyRouteResponse, error)
|
||||
// Edge routing (called by master on edge nodes)
|
||||
SetupEdgeRoute(context.Context, *SetupEdgeRouteRequest) (*SetupEdgeRouteResponse, error)
|
||||
RemoveEdgeRoute(context.Context, *RemoveEdgeRouteRequest) (*RemoveEdgeRouteResponse, error)
|
||||
ListEdgeRoutes(context.Context, *ListEdgeRoutesRequest) (*ListEdgeRoutesResponse, error)
|
||||
// Health (called by master on missed heartbeats)
|
||||
HealthCheck(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
|
||||
// Logs
|
||||
Logs(*LogsRequest, grpc.ServerStreamingServer[LogsResponse]) error
|
||||
mustEmbedUnimplementedMcpAgentServiceServer()
|
||||
@@ -311,6 +423,30 @@ func (UnimplementedMcpAgentServiceServer) PullFile(context.Context, *PullFileReq
|
||||
func (UnimplementedMcpAgentServiceServer) NodeStatus(context.Context, *NodeStatusRequest) (*NodeStatusResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method NodeStatus not implemented")
|
||||
}
|
||||
func (UnimplementedMcpAgentServiceServer) ListDNSRecords(context.Context, *ListDNSRecordsRequest) (*ListDNSRecordsResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method ListDNSRecords not implemented")
|
||||
}
|
||||
func (UnimplementedMcpAgentServiceServer) ListProxyRoutes(context.Context, *ListProxyRoutesRequest) (*ListProxyRoutesResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method ListProxyRoutes not implemented")
|
||||
}
|
||||
func (UnimplementedMcpAgentServiceServer) AddProxyRoute(context.Context, *AddProxyRouteRequest) (*AddProxyRouteResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method AddProxyRoute not implemented")
|
||||
}
|
||||
func (UnimplementedMcpAgentServiceServer) RemoveProxyRoute(context.Context, *RemoveProxyRouteRequest) (*RemoveProxyRouteResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method RemoveProxyRoute not implemented")
|
||||
}
|
||||
func (UnimplementedMcpAgentServiceServer) SetupEdgeRoute(context.Context, *SetupEdgeRouteRequest) (*SetupEdgeRouteResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method SetupEdgeRoute not implemented")
|
||||
}
|
||||
func (UnimplementedMcpAgentServiceServer) RemoveEdgeRoute(context.Context, *RemoveEdgeRouteRequest) (*RemoveEdgeRouteResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method RemoveEdgeRoute not implemented")
|
||||
}
|
||||
func (UnimplementedMcpAgentServiceServer) ListEdgeRoutes(context.Context, *ListEdgeRoutesRequest) (*ListEdgeRoutesResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method ListEdgeRoutes not implemented")
|
||||
}
|
||||
func (UnimplementedMcpAgentServiceServer) HealthCheck(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method HealthCheck not implemented")
|
||||
}
|
||||
func (UnimplementedMcpAgentServiceServer) Logs(*LogsRequest, grpc.ServerStreamingServer[LogsResponse]) error {
|
||||
return status.Error(codes.Unimplemented, "method Logs not implemented")
|
||||
}
|
||||
@@ -587,6 +723,150 @@ func _McpAgentService_NodeStatus_Handler(srv interface{}, ctx context.Context, d
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _McpAgentService_ListDNSRecords_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(ListDNSRecordsRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(McpAgentServiceServer).ListDNSRecords(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: McpAgentService_ListDNSRecords_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(McpAgentServiceServer).ListDNSRecords(ctx, req.(*ListDNSRecordsRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _McpAgentService_ListProxyRoutes_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(ListProxyRoutesRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(McpAgentServiceServer).ListProxyRoutes(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: McpAgentService_ListProxyRoutes_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(McpAgentServiceServer).ListProxyRoutes(ctx, req.(*ListProxyRoutesRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _McpAgentService_AddProxyRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(AddProxyRouteRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(McpAgentServiceServer).AddProxyRoute(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: McpAgentService_AddProxyRoute_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(McpAgentServiceServer).AddProxyRoute(ctx, req.(*AddProxyRouteRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _McpAgentService_RemoveProxyRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(RemoveProxyRouteRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(McpAgentServiceServer).RemoveProxyRoute(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: McpAgentService_RemoveProxyRoute_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(McpAgentServiceServer).RemoveProxyRoute(ctx, req.(*RemoveProxyRouteRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _McpAgentService_SetupEdgeRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(SetupEdgeRouteRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(McpAgentServiceServer).SetupEdgeRoute(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: McpAgentService_SetupEdgeRoute_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(McpAgentServiceServer).SetupEdgeRoute(ctx, req.(*SetupEdgeRouteRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _McpAgentService_RemoveEdgeRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(RemoveEdgeRouteRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(McpAgentServiceServer).RemoveEdgeRoute(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: McpAgentService_RemoveEdgeRoute_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(McpAgentServiceServer).RemoveEdgeRoute(ctx, req.(*RemoveEdgeRouteRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _McpAgentService_ListEdgeRoutes_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(ListEdgeRoutesRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(McpAgentServiceServer).ListEdgeRoutes(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: McpAgentService_ListEdgeRoutes_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(McpAgentServiceServer).ListEdgeRoutes(ctx, req.(*ListEdgeRoutesRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _McpAgentService_HealthCheck_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(HealthCheckRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(McpAgentServiceServer).HealthCheck(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: McpAgentService_HealthCheck_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(McpAgentServiceServer).HealthCheck(ctx, req.(*HealthCheckRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _McpAgentService_Logs_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
m := new(LogsRequest)
|
||||
if err := stream.RecvMsg(m); err != nil {
|
||||
@@ -661,6 +941,38 @@ var McpAgentService_ServiceDesc = grpc.ServiceDesc{
|
||||
MethodName: "NodeStatus",
|
||||
Handler: _McpAgentService_NodeStatus_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "ListDNSRecords",
|
||||
Handler: _McpAgentService_ListDNSRecords_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "ListProxyRoutes",
|
||||
Handler: _McpAgentService_ListProxyRoutes_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "AddProxyRoute",
|
||||
Handler: _McpAgentService_AddProxyRoute_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "RemoveProxyRoute",
|
||||
Handler: _McpAgentService_RemoveProxyRoute_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "SetupEdgeRoute",
|
||||
Handler: _McpAgentService_SetupEdgeRoute_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "RemoveEdgeRoute",
|
||||
Handler: _McpAgentService_RemoveEdgeRoute_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "ListEdgeRoutes",
|
||||
Handler: _McpAgentService_ListEdgeRoutes_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "HealthCheck",
|
||||
Handler: _McpAgentService_HealthCheck_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
|
||||
@@ -134,7 +134,8 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
|
||||
Error: fmt.Sprintf("allocate route ports: %v", err),
|
||||
}
|
||||
}
|
||||
runSpec.Ports = ports
|
||||
// Merge explicit ports from the spec with route-allocated ports.
|
||||
runSpec.Ports = append(cs.GetPorts(), ports...)
|
||||
runSpec.Env = append(runSpec.Env, env...)
|
||||
} else {
|
||||
// Legacy: use ports directly from the spec.
|
||||
|
||||
@@ -26,8 +26,8 @@ type DNSRegistrar struct {
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// dnsRecord is the JSON representation of an MCNS record.
|
||||
type dnsRecord struct {
|
||||
// DNSRecord is the JSON representation of an MCNS record.
|
||||
type DNSRecord struct {
|
||||
ID int `json:"ID"`
|
||||
Name string `json:"Name"`
|
||||
Type string `json:"Type"`
|
||||
@@ -136,8 +136,87 @@ func (d *DNSRegistrar) RemoveRecord(ctx context.Context, serviceName string) err
|
||||
return nil
|
||||
}
|
||||
|
||||
// DNSZone is the JSON representation of an MCNS zone.
|
||||
type DNSZone struct {
|
||||
Name string `json:"Name"`
|
||||
}
|
||||
|
||||
// ListZones returns all zones from MCNS.
|
||||
func (d *DNSRegistrar) ListZones(ctx context.Context) ([]DNSZone, error) {
|
||||
if d == nil {
|
||||
return nil, fmt.Errorf("DNS registrar not configured")
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("%s/v1/zones", d.serverURL)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create list zones request: %w", err)
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+d.token)
|
||||
|
||||
resp, err := d.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list zones: %w", err)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read list zones response: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("list zones: mcns returned %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var envelope struct {
|
||||
Zones []DNSZone `json:"zones"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &envelope); err != nil {
|
||||
return nil, fmt.Errorf("parse list zones response: %w", err)
|
||||
}
|
||||
return envelope.Zones, nil
|
||||
}
|
||||
|
||||
// ListZoneRecords returns all records in the given zone (no filters).
|
||||
func (d *DNSRegistrar) ListZoneRecords(ctx context.Context, zone string) ([]DNSRecord, error) {
|
||||
if d == nil {
|
||||
return nil, fmt.Errorf("DNS registrar not configured")
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("%s/v1/zones/%s/records", d.serverURL, zone)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create list zone records request: %w", err)
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+d.token)
|
||||
|
||||
resp, err := d.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list zone records: %w", err)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read list zone records response: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("list zone records: mcns returned %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var envelope struct {
|
||||
Records []DNSRecord `json:"records"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &envelope); err != nil {
|
||||
return nil, fmt.Errorf("parse list zone records response: %w", err)
|
||||
}
|
||||
return envelope.Records, nil
|
||||
}
|
||||
|
||||
// listRecords returns A records matching the service name in the zone.
|
||||
func (d *DNSRegistrar) listRecords(ctx context.Context, serviceName string) ([]dnsRecord, error) {
|
||||
func (d *DNSRegistrar) listRecords(ctx context.Context, serviceName string) ([]DNSRecord, error) {
|
||||
url := fmt.Sprintf("%s/v1/zones/%s/records?name=%s&type=A", d.serverURL, d.zone, serviceName)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
@@ -161,7 +240,7 @@ func (d *DNSRegistrar) listRecords(ctx context.Context, serviceName string) ([]d
|
||||
}
|
||||
|
||||
var envelope struct {
|
||||
Records []dnsRecord `json:"records"`
|
||||
Records []DNSRecord `json:"records"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &envelope); err != nil {
|
||||
return nil, fmt.Errorf("parse list response: %w", err)
|
||||
|
||||
40
internal/agent/dns_rpc.go
Normal file
40
internal/agent/dns_rpc.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||
)
|
||||
|
||||
// ListDNSRecords queries MCNS for all zones and their records.
|
||||
func (a *Agent) ListDNSRecords(ctx context.Context, _ *mcpv1.ListDNSRecordsRequest) (*mcpv1.ListDNSRecordsResponse, error) {
|
||||
a.Logger.Debug("ListDNSRecords called")
|
||||
|
||||
zones, err := a.DNS.ListZones(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list zones: %w", err)
|
||||
}
|
||||
|
||||
resp := &mcpv1.ListDNSRecordsResponse{}
|
||||
for _, z := range zones {
|
||||
records, err := a.DNS.ListZoneRecords(ctx, z.Name)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list records for zone %q: %w", z.Name, err)
|
||||
}
|
||||
|
||||
zone := &mcpv1.DNSZone{Name: z.Name}
|
||||
for _, r := range records {
|
||||
zone.Records = append(zone.Records, &mcpv1.DNSRecord{
|
||||
Id: int64(r.ID),
|
||||
Name: r.Name,
|
||||
Type: r.Type,
|
||||
Value: r.Value,
|
||||
Ttl: int32(r.TTL), //nolint:gosec // TTL is bounded
|
||||
})
|
||||
}
|
||||
resp.Zones = append(resp.Zones, zone)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
@@ -90,7 +90,7 @@ func TestEnsureRecordSkipsWhenExists(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method == http.MethodGet {
|
||||
// Return an existing record with the correct value.
|
||||
resp := map[string][]dnsRecord{"records": {{ID: 1, Name: "myservice", Type: "A", Value: "192.168.88.181", TTL: 300}}}
|
||||
resp := map[string][]DNSRecord{"records": {{ID: 1, Name: "myservice", Type: "A", Value: "192.168.88.181", TTL: 300}}}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(resp)
|
||||
return
|
||||
@@ -124,7 +124,7 @@ func TestEnsureRecordUpdatesWrongValue(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method == http.MethodGet {
|
||||
// Return a record with a stale value.
|
||||
resp := map[string][]dnsRecord{"records": {{ID: 42, Name: "myservice", Type: "A", Value: "10.0.0.1", TTL: 300}}}
|
||||
resp := map[string][]DNSRecord{"records": {{ID: 42, Name: "myservice", Type: "A", Value: "10.0.0.1", TTL: 300}}}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(resp)
|
||||
return
|
||||
@@ -160,7 +160,7 @@ func TestRemoveRecordDeletes(t *testing.T) {
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method == http.MethodGet {
|
||||
resp := map[string][]dnsRecord{"records": {{ID: 7, Name: "myservice", Type: "A", Value: "192.168.88.181", TTL: 300}}}
|
||||
resp := map[string][]DNSRecord{"records": {{ID: 7, Name: "myservice", Type: "A", Value: "192.168.88.181", TTL: 300}}}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(resp)
|
||||
return
|
||||
|
||||
196
internal/agent/edge_rpc.go
Normal file
196
internal/agent/edge_rpc.go
Normal file
@@ -0,0 +1,196 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/x509"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||
mcproxy "git.wntrmute.dev/mc/mc-proxy/client/mcproxy"
|
||||
"git.wntrmute.dev/mc/mcp/internal/registry"
|
||||
)
|
||||
|
||||
// SetupEdgeRoute provisions a TLS cert and registers an mc-proxy route for a
|
||||
// public hostname. Called by the master on edge nodes.
|
||||
func (a *Agent) SetupEdgeRoute(ctx context.Context, req *mcpv1.SetupEdgeRouteRequest) (*mcpv1.SetupEdgeRouteResponse, error) {
|
||||
a.Logger.Info("SetupEdgeRoute", "hostname", req.GetHostname(),
|
||||
"backend_hostname", req.GetBackendHostname(), "backend_port", req.GetBackendPort())
|
||||
|
||||
// Validate required fields.
|
||||
if req.GetHostname() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "hostname is required")
|
||||
}
|
||||
if req.GetBackendHostname() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "backend_hostname is required")
|
||||
}
|
||||
if req.GetBackendPort() == 0 {
|
||||
return nil, status.Error(codes.InvalidArgument, "backend_port is required")
|
||||
}
|
||||
if !req.GetBackendTls() {
|
||||
return nil, status.Error(codes.InvalidArgument, "backend_tls must be true")
|
||||
}
|
||||
|
||||
if a.Proxy == nil {
|
||||
return nil, status.Error(codes.FailedPrecondition, "mc-proxy not configured")
|
||||
}
|
||||
|
||||
// Resolve the backend hostname to a Tailnet IP.
|
||||
ips, err := net.LookupHost(req.GetBackendHostname())
|
||||
if err != nil || len(ips) == 0 {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "cannot resolve backend_hostname %q: %v", req.GetBackendHostname(), err)
|
||||
}
|
||||
backendIP := ips[0]
|
||||
|
||||
// Validate the resolved IP is a Tailnet address (100.64.0.0/10).
|
||||
ip := net.ParseIP(backendIP)
|
||||
if ip == nil {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "resolved IP %q is not valid", backendIP)
|
||||
}
|
||||
_, tailnet, _ := net.ParseCIDR("100.64.0.0/10")
|
||||
if !tailnet.Contains(ip) {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "resolved IP %s is not a Tailnet address", backendIP)
|
||||
}
|
||||
|
||||
backend := fmt.Sprintf("%s:%d", backendIP, req.GetBackendPort())
|
||||
|
||||
// Provision TLS cert for the public hostname if cert provisioner is available.
|
||||
certPath := ""
|
||||
keyPath := ""
|
||||
if a.Certs != nil {
|
||||
if err := a.Certs.EnsureCert(ctx, req.GetHostname(), []string{req.GetHostname()}); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "provision cert for %s: %v", req.GetHostname(), err)
|
||||
}
|
||||
certPath = a.Proxy.CertPath(req.GetHostname())
|
||||
keyPath = a.Proxy.KeyPath(req.GetHostname())
|
||||
} else {
|
||||
// No cert provisioner — check if certs already exist on disk.
|
||||
certPath = a.Proxy.CertPath(req.GetHostname())
|
||||
keyPath = a.Proxy.KeyPath(req.GetHostname())
|
||||
if _, err := os.Stat(certPath); err != nil {
|
||||
return nil, status.Errorf(codes.FailedPrecondition, "no cert provisioner and cert not found at %s", certPath)
|
||||
}
|
||||
}
|
||||
|
||||
// Register the L7 route in mc-proxy.
|
||||
route := mcproxy.Route{
|
||||
Hostname: req.GetHostname(),
|
||||
Backend: backend,
|
||||
Mode: "l7",
|
||||
TLSCert: certPath,
|
||||
TLSKey: keyPath,
|
||||
BackendTLS: true,
|
||||
}
|
||||
if err := a.Proxy.AddRoute(ctx, ":443", route); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "add mc-proxy route: %v", err)
|
||||
}
|
||||
|
||||
// Persist the edge route in the registry.
|
||||
if err := registry.CreateEdgeRoute(a.DB, req.GetHostname(), req.GetBackendHostname(), int(req.GetBackendPort()), certPath, keyPath); err != nil {
|
||||
a.Logger.Warn("failed to persist edge route", "hostname", req.GetHostname(), "err", err)
|
||||
}
|
||||
|
||||
a.Logger.Info("edge route established",
|
||||
"hostname", req.GetHostname(), "backend", backend, "cert", certPath)
|
||||
|
||||
return &mcpv1.SetupEdgeRouteResponse{}, nil
|
||||
}
|
||||
|
||||
// RemoveEdgeRoute removes an mc-proxy route and cleans up the TLS cert for a
|
||||
// public hostname. Called by the master on edge nodes.
|
||||
func (a *Agent) RemoveEdgeRoute(ctx context.Context, req *mcpv1.RemoveEdgeRouteRequest) (*mcpv1.RemoveEdgeRouteResponse, error) {
|
||||
a.Logger.Info("RemoveEdgeRoute", "hostname", req.GetHostname())
|
||||
|
||||
if req.GetHostname() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "hostname is required")
|
||||
}
|
||||
|
||||
if a.Proxy == nil {
|
||||
return nil, status.Error(codes.FailedPrecondition, "mc-proxy not configured")
|
||||
}
|
||||
|
||||
// Remove the mc-proxy route.
|
||||
if err := a.Proxy.RemoveRoute(ctx, ":443", req.GetHostname()); err != nil {
|
||||
a.Logger.Warn("remove mc-proxy route", "hostname", req.GetHostname(), "err", err)
|
||||
// Continue — clean up cert and registry even if route removal fails.
|
||||
}
|
||||
|
||||
// Remove the TLS cert.
|
||||
if a.Certs != nil {
|
||||
if err := a.Certs.RemoveCert(req.GetHostname()); err != nil {
|
||||
a.Logger.Warn("remove cert", "hostname", req.GetHostname(), "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from registry.
|
||||
if err := registry.DeleteEdgeRoute(a.DB, req.GetHostname()); err != nil {
|
||||
a.Logger.Warn("delete edge route from registry", "hostname", req.GetHostname(), "err", err)
|
||||
}
|
||||
|
||||
a.Logger.Info("edge route removed", "hostname", req.GetHostname())
|
||||
return &mcpv1.RemoveEdgeRouteResponse{}, nil
|
||||
}
|
||||
|
||||
// ListEdgeRoutes returns all edge routes managed by this agent.
|
||||
func (a *Agent) ListEdgeRoutes(_ context.Context, _ *mcpv1.ListEdgeRoutesRequest) (*mcpv1.ListEdgeRoutesResponse, error) {
|
||||
a.Logger.Debug("ListEdgeRoutes called")
|
||||
|
||||
routes, err := registry.ListEdgeRoutes(a.DB)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "list edge routes: %v", err)
|
||||
}
|
||||
|
||||
resp := &mcpv1.ListEdgeRoutesResponse{}
|
||||
for _, r := range routes {
|
||||
er := &mcpv1.EdgeRoute{
|
||||
Hostname: r.Hostname,
|
||||
BackendHostname: r.BackendHostname,
|
||||
BackendPort: int32(r.BackendPort), //nolint:gosec // port is a small positive integer
|
||||
}
|
||||
|
||||
// Read cert metadata if available.
|
||||
if r.TLSCert != "" {
|
||||
if certData, readErr := os.ReadFile(r.TLSCert); readErr == nil { //nolint:gosec // path from registry, not user input
|
||||
if block, _ := pem.Decode(certData); block != nil {
|
||||
if cert, parseErr := x509.ParseCertificate(block.Bytes); parseErr == nil {
|
||||
er.CertSerial = cert.SerialNumber.String()
|
||||
er.CertExpires = cert.NotAfter.UTC().Format(time.RFC3339)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
resp.Routes = append(resp.Routes, er)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// HealthCheck returns the agent's health status. Called by the master when
|
||||
// heartbeats are missed.
|
||||
func (a *Agent) HealthCheck(_ context.Context, _ *mcpv1.HealthCheckRequest) (*mcpv1.HealthCheckResponse, error) {
|
||||
a.Logger.Debug("HealthCheck called")
|
||||
|
||||
st := "healthy"
|
||||
containers := int32(0)
|
||||
|
||||
// Count running containers if the runtime is available.
|
||||
if a.Runtime != nil {
|
||||
if list, err := a.Runtime.List(context.Background()); err == nil {
|
||||
containers = int32(len(list)) //nolint:gosec // container count is small
|
||||
} else {
|
||||
st = "degraded"
|
||||
}
|
||||
}
|
||||
|
||||
return &mcpv1.HealthCheckResponse{
|
||||
Status: st,
|
||||
Containers: containers,
|
||||
}, nil
|
||||
}
|
||||
@@ -12,9 +12,9 @@ import (
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// StopService stops all components of a service.
|
||||
// StopService stops all components of a service, or a single component if specified.
|
||||
func (a *Agent) StopService(ctx context.Context, req *mcpv1.StopServiceRequest) (*mcpv1.StopServiceResponse, error) {
|
||||
a.Logger.Info("StopService", "service", req.GetName())
|
||||
a.Logger.Info("StopService", "service", req.GetName(), "component", req.GetComponent())
|
||||
|
||||
if req.GetName() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "service name is required")
|
||||
@@ -25,6 +25,13 @@ func (a *Agent) StopService(ctx context.Context, req *mcpv1.StopServiceRequest)
|
||||
return nil, status.Errorf(codes.Internal, "list components: %v", err)
|
||||
}
|
||||
|
||||
if target := req.GetComponent(); target != "" {
|
||||
components, err = filterComponents(components, req.GetName(), target)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var results []*mcpv1.ComponentResult
|
||||
for _, c := range components {
|
||||
containerName := ContainerNameFor(req.GetName(), c.Name)
|
||||
@@ -59,10 +66,10 @@ func (a *Agent) StopService(ctx context.Context, req *mcpv1.StopServiceRequest)
|
||||
return &mcpv1.StopServiceResponse{Results: results}, nil
|
||||
}
|
||||
|
||||
// StartService starts all components of a service. If a container already
|
||||
// exists but is stopped, it is removed first so a fresh one can be created.
|
||||
// StartService starts all components of a service, or a single component if specified.
|
||||
// If a container already exists but is stopped, it is removed first so a fresh one can be created.
|
||||
func (a *Agent) StartService(ctx context.Context, req *mcpv1.StartServiceRequest) (*mcpv1.StartServiceResponse, error) {
|
||||
a.Logger.Info("StartService", "service", req.GetName())
|
||||
a.Logger.Info("StartService", "service", req.GetName(), "component", req.GetComponent())
|
||||
|
||||
if req.GetName() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "service name is required")
|
||||
@@ -73,6 +80,13 @@ func (a *Agent) StartService(ctx context.Context, req *mcpv1.StartServiceRequest
|
||||
return nil, status.Errorf(codes.Internal, "list components: %v", err)
|
||||
}
|
||||
|
||||
if target := req.GetComponent(); target != "" {
|
||||
components, err = filterComponents(components, req.GetName(), target)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var results []*mcpv1.ComponentResult
|
||||
for _, c := range components {
|
||||
r := startComponent(ctx, a, req.GetName(), &c)
|
||||
@@ -82,10 +96,10 @@ func (a *Agent) StartService(ctx context.Context, req *mcpv1.StartServiceRequest
|
||||
return &mcpv1.StartServiceResponse{Results: results}, nil
|
||||
}
|
||||
|
||||
// RestartService restarts all components of a service by stopping, removing,
|
||||
// and re-creating each container. The desired_state is not changed.
|
||||
// RestartService restarts all components of a service, or a single component if specified,
|
||||
// by stopping, removing, and re-creating each container. The desired_state is not changed.
|
||||
func (a *Agent) RestartService(ctx context.Context, req *mcpv1.RestartServiceRequest) (*mcpv1.RestartServiceResponse, error) {
|
||||
a.Logger.Info("RestartService", "service", req.GetName())
|
||||
a.Logger.Info("RestartService", "service", req.GetName(), "component", req.GetComponent())
|
||||
|
||||
if req.GetName() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "service name is required")
|
||||
@@ -96,6 +110,13 @@ func (a *Agent) RestartService(ctx context.Context, req *mcpv1.RestartServiceReq
|
||||
return nil, status.Errorf(codes.Internal, "list components: %v", err)
|
||||
}
|
||||
|
||||
if target := req.GetComponent(); target != "" {
|
||||
components, err = filterComponents(components, req.GetName(), target)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var results []*mcpv1.ComponentResult
|
||||
for _, c := range components {
|
||||
r := restartComponent(ctx, a, req.GetName(), &c)
|
||||
@@ -167,6 +188,16 @@ func componentToSpec(service string, c *registry.Component) runtime.ContainerSpe
|
||||
}
|
||||
}
|
||||
|
||||
// filterComponents returns only the component matching target, or an error if not found.
|
||||
func filterComponents(components []registry.Component, service, target string) ([]registry.Component, error) {
|
||||
for _, c := range components {
|
||||
if c.Name == target {
|
||||
return []registry.Component{c}, nil
|
||||
}
|
||||
}
|
||||
return nil, status.Errorf(codes.NotFound, "component %q not found in service %q", target, service)
|
||||
}
|
||||
|
||||
// componentExists checks whether a component already exists in the registry.
|
||||
func componentExists(db *sql.DB, service, name string) bool {
|
||||
_, err := registry.GetComponent(db, service, name)
|
||||
|
||||
@@ -48,6 +48,40 @@ func (p *ProxyRouter) Close() error {
|
||||
return p.client.Close()
|
||||
}
|
||||
|
||||
// CertPath returns the expected TLS certificate path for a given name.
|
||||
func (p *ProxyRouter) CertPath(name string) string {
|
||||
return filepath.Join(p.certDir, name+".pem")
|
||||
}
|
||||
|
||||
// KeyPath returns the expected TLS key path for a given name.
|
||||
func (p *ProxyRouter) KeyPath(name string) string {
|
||||
return filepath.Join(p.certDir, name+".key")
|
||||
}
|
||||
|
||||
// GetStatus returns the mc-proxy server status.
|
||||
func (p *ProxyRouter) GetStatus(ctx context.Context) (*mcproxy.Status, error) {
|
||||
if p == nil {
|
||||
return nil, fmt.Errorf("mc-proxy not configured")
|
||||
}
|
||||
return p.client.GetStatus(ctx)
|
||||
}
|
||||
|
||||
// AddRoute adds a single route to mc-proxy.
|
||||
func (p *ProxyRouter) AddRoute(ctx context.Context, listenerAddr string, route mcproxy.Route) error {
|
||||
if p == nil {
|
||||
return fmt.Errorf("mc-proxy not configured")
|
||||
}
|
||||
return p.client.AddRoute(ctx, listenerAddr, route)
|
||||
}
|
||||
|
||||
// RemoveRoute removes a single route from mc-proxy.
|
||||
func (p *ProxyRouter) RemoveRoute(ctx context.Context, listenerAddr, hostname string) error {
|
||||
if p == nil {
|
||||
return fmt.Errorf("mc-proxy not configured")
|
||||
}
|
||||
return p.client.RemoveRoute(ctx, listenerAddr, hostname)
|
||||
}
|
||||
|
||||
// RegisterRoutes registers all routes for a service component with mc-proxy.
|
||||
// It uses the assigned host ports from the registry.
|
||||
func (p *ProxyRouter) RegisterRoutes(ctx context.Context, serviceName string, routes []registry.Route, hostPorts map[string]int) error {
|
||||
|
||||
113
internal/agent/proxy_rpc.go
Normal file
113
internal/agent/proxy_rpc.go
Normal file
@@ -0,0 +1,113 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.wntrmute.dev/mc/mc-proxy/client/mcproxy"
|
||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
// ListProxyRoutes queries mc-proxy for its current status and routes.
|
||||
func (a *Agent) ListProxyRoutes(ctx context.Context, _ *mcpv1.ListProxyRoutesRequest) (*mcpv1.ListProxyRoutesResponse, error) {
|
||||
a.Logger.Debug("ListProxyRoutes called")
|
||||
|
||||
status, err := a.Proxy.GetStatus(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get mc-proxy status: %w", err)
|
||||
}
|
||||
|
||||
resp := &mcpv1.ListProxyRoutesResponse{
|
||||
Version: status.Version,
|
||||
TotalConnections: status.TotalConnections,
|
||||
}
|
||||
if !status.StartedAt.IsZero() {
|
||||
resp.StartedAt = timestamppb.New(status.StartedAt)
|
||||
}
|
||||
|
||||
for _, ls := range status.Listeners {
|
||||
listener := &mcpv1.ProxyListenerInfo{
|
||||
Addr: ls.Addr,
|
||||
RouteCount: int32(ls.RouteCount), //nolint:gosec // bounded
|
||||
ActiveConnections: ls.ActiveConnections,
|
||||
}
|
||||
for _, r := range ls.Routes {
|
||||
listener.Routes = append(listener.Routes, &mcpv1.ProxyRouteInfo{
|
||||
Hostname: r.Hostname,
|
||||
Backend: r.Backend,
|
||||
Mode: r.Mode,
|
||||
BackendTls: r.BackendTLS,
|
||||
})
|
||||
}
|
||||
resp.Listeners = append(resp.Listeners, listener)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// AddProxyRoute adds a route to mc-proxy.
|
||||
func (a *Agent) AddProxyRoute(ctx context.Context, req *mcpv1.AddProxyRouteRequest) (*mcpv1.AddProxyRouteResponse, error) {
|
||||
if req.GetListenerAddr() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "listener_addr is required")
|
||||
}
|
||||
if req.GetHostname() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "hostname is required")
|
||||
}
|
||||
if req.GetBackend() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "backend is required")
|
||||
}
|
||||
|
||||
if a.Proxy == nil {
|
||||
return nil, status.Error(codes.FailedPrecondition, "mc-proxy not configured")
|
||||
}
|
||||
|
||||
route := mcproxy.Route{
|
||||
Hostname: req.GetHostname(),
|
||||
Backend: req.GetBackend(),
|
||||
Mode: req.GetMode(),
|
||||
BackendTLS: req.GetBackendTls(),
|
||||
TLSCert: req.GetTlsCert(),
|
||||
TLSKey: req.GetTlsKey(),
|
||||
}
|
||||
|
||||
if err := a.Proxy.AddRoute(ctx, req.GetListenerAddr(), route); err != nil {
|
||||
return nil, fmt.Errorf("add route: %w", err)
|
||||
}
|
||||
|
||||
a.Logger.Info("route added",
|
||||
"listener", req.GetListenerAddr(),
|
||||
"hostname", req.GetHostname(),
|
||||
"backend", req.GetBackend(),
|
||||
"mode", req.GetMode(),
|
||||
)
|
||||
|
||||
return &mcpv1.AddProxyRouteResponse{}, nil
|
||||
}
|
||||
|
||||
// RemoveProxyRoute removes a route from mc-proxy.
|
||||
func (a *Agent) RemoveProxyRoute(ctx context.Context, req *mcpv1.RemoveProxyRouteRequest) (*mcpv1.RemoveProxyRouteResponse, error) {
|
||||
if req.GetListenerAddr() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "listener_addr is required")
|
||||
}
|
||||
if req.GetHostname() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "hostname is required")
|
||||
}
|
||||
|
||||
if a.Proxy == nil {
|
||||
return nil, status.Error(codes.FailedPrecondition, "mc-proxy not configured")
|
||||
}
|
||||
|
||||
if err := a.Proxy.RemoveRoute(ctx, req.GetListenerAddr(), req.GetHostname()); err != nil {
|
||||
return nil, fmt.Errorf("remove route: %w", err)
|
||||
}
|
||||
|
||||
a.Logger.Info("route removed",
|
||||
"listener", req.GetListenerAddr(),
|
||||
"hostname", req.GetHostname(),
|
||||
)
|
||||
|
||||
return &mcpv1.RemoveProxyRouteResponse{}, nil
|
||||
}
|
||||
@@ -142,4 +142,18 @@ var migrations = []string{
|
||||
FOREIGN KEY (service, component) REFERENCES components(service, name) ON DELETE CASCADE
|
||||
);
|
||||
`,
|
||||
|
||||
// Migration 3: service comment
|
||||
`ALTER TABLE services ADD COLUMN comment TEXT NOT NULL DEFAULT '';`,
|
||||
|
||||
// Migration 4: edge routes (v2 — public routes managed by the master)
|
||||
`CREATE TABLE IF NOT EXISTS edge_routes (
|
||||
hostname TEXT NOT NULL PRIMARY KEY,
|
||||
backend_hostname TEXT NOT NULL,
|
||||
backend_port INTEGER NOT NULL,
|
||||
tls_cert TEXT NOT NULL DEFAULT '',
|
||||
tls_key TEXT NOT NULL DEFAULT '',
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);`,
|
||||
}
|
||||
|
||||
93
internal/registry/edge_routes.go
Normal file
93
internal/registry/edge_routes.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// EdgeRoute represents a public edge route managed by the master.
|
||||
type EdgeRoute struct {
|
||||
Hostname string
|
||||
BackendHostname string
|
||||
BackendPort int
|
||||
TLSCert string
|
||||
TLSKey string
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
}
|
||||
|
||||
// CreateEdgeRoute inserts or replaces an edge route.
|
||||
func CreateEdgeRoute(db *sql.DB, hostname, backendHostname string, backendPort int, tlsCert, tlsKey string) error {
|
||||
_, err := db.Exec(`
|
||||
INSERT INTO edge_routes (hostname, backend_hostname, backend_port, tls_cert, tls_key, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, datetime('now'), datetime('now'))
|
||||
ON CONFLICT(hostname) DO UPDATE SET
|
||||
backend_hostname = excluded.backend_hostname,
|
||||
backend_port = excluded.backend_port,
|
||||
tls_cert = excluded.tls_cert,
|
||||
tls_key = excluded.tls_key,
|
||||
updated_at = datetime('now')
|
||||
`, hostname, backendHostname, backendPort, tlsCert, tlsKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create edge route %s: %w", hostname, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetEdgeRoute returns a single edge route by hostname.
|
||||
func GetEdgeRoute(db *sql.DB, hostname string) (*EdgeRoute, error) {
|
||||
var r EdgeRoute
|
||||
var createdAt, updatedAt string
|
||||
err := db.QueryRow(`
|
||||
SELECT hostname, backend_hostname, backend_port, tls_cert, tls_key, created_at, updated_at
|
||||
FROM edge_routes WHERE hostname = ?
|
||||
`, hostname).Scan(&r.Hostname, &r.BackendHostname, &r.BackendPort, &r.TLSCert, &r.TLSKey, &createdAt, &updatedAt)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get edge route %s: %w", hostname, err)
|
||||
}
|
||||
r.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt)
|
||||
r.UpdatedAt, _ = time.Parse("2006-01-02 15:04:05", updatedAt)
|
||||
return &r, nil
|
||||
}
|
||||
|
||||
// ListEdgeRoutes returns all edge routes.
|
||||
func ListEdgeRoutes(db *sql.DB) ([]*EdgeRoute, error) {
|
||||
rows, err := db.Query(`
|
||||
SELECT hostname, backend_hostname, backend_port, tls_cert, tls_key, created_at, updated_at
|
||||
FROM edge_routes ORDER BY hostname
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list edge routes: %w", err)
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
|
||||
var routes []*EdgeRoute
|
||||
for rows.Next() {
|
||||
var r EdgeRoute
|
||||
var createdAt, updatedAt string
|
||||
if err := rows.Scan(&r.Hostname, &r.BackendHostname, &r.BackendPort, &r.TLSCert, &r.TLSKey, &createdAt, &updatedAt); err != nil {
|
||||
return nil, fmt.Errorf("scan edge route: %w", err)
|
||||
}
|
||||
r.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt)
|
||||
r.UpdatedAt, _ = time.Parse("2006-01-02 15:04:05", updatedAt)
|
||||
routes = append(routes, &r)
|
||||
}
|
||||
return routes, rows.Err()
|
||||
}
|
||||
|
||||
// DeleteEdgeRoute removes an edge route by hostname.
|
||||
func DeleteEdgeRoute(db *sql.DB, hostname string) error {
|
||||
result, err := db.Exec(`DELETE FROM edge_routes WHERE hostname = ?`, hostname)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete edge route %s: %w", hostname, err)
|
||||
}
|
||||
n, _ := result.RowsAffected()
|
||||
if n == 0 {
|
||||
return fmt.Errorf("edge route %s not found", hostname)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -180,36 +180,33 @@ func (p *Podman) Inspect(ctx context.Context, name string) (ContainerInfo, error
|
||||
}
|
||||
|
||||
// Logs returns an exec.Cmd that streams container logs. For containers
|
||||
// using the journald log driver, it uses journalctl (podman logs can't
|
||||
// read journald outside the originating user session). For k8s-file or
|
||||
// other drivers, it uses podman logs directly.
|
||||
// using the journald log driver, it tries journalctl first (podman logs
|
||||
// can't read journald outside the originating user session). If journalctl
|
||||
// can't access the journal, it falls back to podman logs.
|
||||
func (p *Podman) Logs(ctx context.Context, containerName string, tail int, follow, timestamps bool, since string) *exec.Cmd {
|
||||
// Check if this container uses the journald log driver.
|
||||
inspectCmd := exec.CommandContext(ctx, p.command(), "inspect", "--format", "{{.HostConfig.LogConfig.Type}}", containerName) //nolint:gosec
|
||||
if out, err := inspectCmd.Output(); err == nil && strings.TrimSpace(string(out)) == "journald" {
|
||||
if p.journalAccessible(ctx, containerName) {
|
||||
return p.journalLogs(ctx, containerName, tail, follow, since)
|
||||
}
|
||||
}
|
||||
|
||||
args := []string{"logs"}
|
||||
if tail > 0 {
|
||||
args = append(args, "--tail", fmt.Sprintf("%d", tail))
|
||||
return p.podmanLogs(ctx, containerName, tail, follow, timestamps, since)
|
||||
}
|
||||
|
||||
// journalAccessible probes whether journalctl can read logs for the container.
|
||||
func (p *Podman) journalAccessible(ctx context.Context, containerName string) bool {
|
||||
args := []string{"--no-pager", "-n", "0"}
|
||||
if os.Getuid() != 0 {
|
||||
args = append(args, "--user")
|
||||
}
|
||||
if follow {
|
||||
args = append(args, "--follow")
|
||||
}
|
||||
if timestamps {
|
||||
args = append(args, "--timestamps")
|
||||
}
|
||||
if since != "" {
|
||||
args = append(args, "--since", since)
|
||||
}
|
||||
args = append(args, containerName)
|
||||
return exec.CommandContext(ctx, p.command(), args...) //nolint:gosec // args built programmatically
|
||||
args = append(args, "CONTAINER_NAME="+containerName)
|
||||
cmd := exec.CommandContext(ctx, "journalctl", args...) //nolint:gosec
|
||||
return cmd.Run() == nil
|
||||
}
|
||||
|
||||
// journalLogs returns a journalctl command filtered by container name.
|
||||
// For rootless podman, container logs go to the user journal, so we
|
||||
// need --user to read them.
|
||||
func (p *Podman) journalLogs(ctx context.Context, containerName string, tail int, follow bool, since string) *exec.Cmd {
|
||||
args := []string{"--no-pager", "--output", "cat"}
|
||||
if os.Getuid() != 0 {
|
||||
@@ -228,6 +225,25 @@ func (p *Podman) journalLogs(ctx context.Context, containerName string, tail int
|
||||
return exec.CommandContext(ctx, "journalctl", args...) //nolint:gosec // args built programmatically
|
||||
}
|
||||
|
||||
// podmanLogs returns a podman logs command.
|
||||
func (p *Podman) podmanLogs(ctx context.Context, containerName string, tail int, follow, timestamps bool, since string) *exec.Cmd {
|
||||
args := []string{"logs"}
|
||||
if tail > 0 {
|
||||
args = append(args, "--tail", fmt.Sprintf("%d", tail))
|
||||
}
|
||||
if follow {
|
||||
args = append(args, "--follow")
|
||||
}
|
||||
if timestamps {
|
||||
args = append(args, "--timestamps")
|
||||
}
|
||||
if since != "" {
|
||||
args = append(args, "--since", since)
|
||||
}
|
||||
args = append(args, containerName)
|
||||
return exec.CommandContext(ctx, p.command(), args...) //nolint:gosec // args built programmatically
|
||||
}
|
||||
|
||||
// Login authenticates to a container registry using the given token as
|
||||
// the password. This enables non-interactive push with service account
|
||||
// tokens (MCR accepts MCIAS JWTs as passwords).
|
||||
|
||||
@@ -34,6 +34,22 @@ service McpAgentService {
|
||||
// Node
|
||||
rpc NodeStatus(NodeStatusRequest) returns (NodeStatusResponse);
|
||||
|
||||
// DNS (query MCNS)
|
||||
rpc ListDNSRecords(ListDNSRecordsRequest) returns (ListDNSRecordsResponse);
|
||||
|
||||
// Proxy routes (query mc-proxy)
|
||||
rpc ListProxyRoutes(ListProxyRoutesRequest) returns (ListProxyRoutesResponse);
|
||||
rpc AddProxyRoute(AddProxyRouteRequest) returns (AddProxyRouteResponse);
|
||||
rpc RemoveProxyRoute(RemoveProxyRouteRequest) returns (RemoveProxyRouteResponse);
|
||||
|
||||
// Edge routing (called by master on edge nodes)
|
||||
rpc SetupEdgeRoute(SetupEdgeRouteRequest) returns (SetupEdgeRouteResponse);
|
||||
rpc RemoveEdgeRoute(RemoveEdgeRouteRequest) returns (RemoveEdgeRouteResponse);
|
||||
rpc ListEdgeRoutes(ListEdgeRoutesRequest) returns (ListEdgeRoutesResponse);
|
||||
|
||||
// Health (called by master on missed heartbeats)
|
||||
rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);
|
||||
|
||||
// Logs
|
||||
rpc Logs(LogsRequest) returns (stream LogsResponse);
|
||||
}
|
||||
@@ -64,6 +80,7 @@ message ServiceSpec {
|
||||
string name = 1;
|
||||
bool active = 2;
|
||||
repeated ComponentSpec components = 3;
|
||||
string comment = 4;
|
||||
}
|
||||
|
||||
message DeployRequest {
|
||||
@@ -84,6 +101,7 @@ message ComponentResult {
|
||||
|
||||
message StopServiceRequest {
|
||||
string name = 1;
|
||||
string component = 2;
|
||||
}
|
||||
|
||||
message StopServiceResponse {
|
||||
@@ -92,6 +110,7 @@ message StopServiceResponse {
|
||||
|
||||
message StartServiceRequest {
|
||||
string name = 1;
|
||||
string component = 2;
|
||||
}
|
||||
|
||||
message StartServiceResponse {
|
||||
@@ -100,6 +119,7 @@ message StartServiceResponse {
|
||||
|
||||
message RestartServiceRequest {
|
||||
string name = 1;
|
||||
string component = 2;
|
||||
}
|
||||
|
||||
message RestartServiceResponse {
|
||||
@@ -140,6 +160,7 @@ message ServiceInfo {
|
||||
string name = 1;
|
||||
bool active = 2;
|
||||
repeated ComponentInfo components = 3;
|
||||
string comment = 4;
|
||||
}
|
||||
|
||||
message ComponentInfo {
|
||||
@@ -301,3 +322,108 @@ message LogsRequest {
|
||||
message LogsResponse {
|
||||
bytes data = 1;
|
||||
}
|
||||
|
||||
// --- DNS ---
|
||||
|
||||
message ListDNSRecordsRequest {}
|
||||
|
||||
message DNSZone {
|
||||
string name = 1;
|
||||
repeated DNSRecord records = 2;
|
||||
}
|
||||
|
||||
message DNSRecord {
|
||||
int64 id = 1;
|
||||
string name = 2;
|
||||
string type = 3;
|
||||
string value = 4;
|
||||
int32 ttl = 5;
|
||||
}
|
||||
|
||||
message ListDNSRecordsResponse {
|
||||
repeated DNSZone zones = 1;
|
||||
}
|
||||
|
||||
// --- Proxy routes ---
|
||||
|
||||
message ListProxyRoutesRequest {}
|
||||
|
||||
message ProxyRouteInfo {
|
||||
string hostname = 1;
|
||||
string backend = 2;
|
||||
string mode = 3;
|
||||
bool backend_tls = 4;
|
||||
}
|
||||
|
||||
message ProxyListenerInfo {
|
||||
string addr = 1;
|
||||
int32 route_count = 2;
|
||||
int64 active_connections = 3;
|
||||
repeated ProxyRouteInfo routes = 4;
|
||||
}
|
||||
|
||||
message ListProxyRoutesResponse {
|
||||
string version = 1;
|
||||
int64 total_connections = 2;
|
||||
google.protobuf.Timestamp started_at = 3;
|
||||
repeated ProxyListenerInfo listeners = 4;
|
||||
}
|
||||
|
||||
message AddProxyRouteRequest {
|
||||
string listener_addr = 1; // e.g. ":443"
|
||||
string hostname = 2;
|
||||
string backend = 3;
|
||||
string mode = 4; // "l4" or "l7"
|
||||
bool backend_tls = 5;
|
||||
string tls_cert = 6; // path to TLS cert (required for l7)
|
||||
string tls_key = 7; // path to TLS key (required for l7)
|
||||
}
|
||||
|
||||
message AddProxyRouteResponse {}
|
||||
|
||||
message RemoveProxyRouteRequest {
|
||||
string listener_addr = 1; // e.g. ":443"
|
||||
string hostname = 2;
|
||||
}
|
||||
|
||||
message RemoveProxyRouteResponse {}
|
||||
|
||||
// --- Edge routes (v2) ---
|
||||
|
||||
message SetupEdgeRouteRequest {
|
||||
string hostname = 1; // public hostname (e.g. "mcq.metacircular.net")
|
||||
string backend_hostname = 2; // internal .svc.mcp hostname
|
||||
int32 backend_port = 3; // port on worker's mc-proxy
|
||||
bool backend_tls = 4; // MUST be true; agent rejects false
|
||||
}
|
||||
|
||||
message SetupEdgeRouteResponse {}
|
||||
|
||||
message RemoveEdgeRouteRequest {
|
||||
string hostname = 1;
|
||||
}
|
||||
|
||||
message RemoveEdgeRouteResponse {}
|
||||
|
||||
message ListEdgeRoutesRequest {}
|
||||
|
||||
message ListEdgeRoutesResponse {
|
||||
repeated EdgeRoute routes = 1;
|
||||
}
|
||||
|
||||
message EdgeRoute {
|
||||
string hostname = 1;
|
||||
string backend_hostname = 2;
|
||||
int32 backend_port = 3;
|
||||
string cert_serial = 4;
|
||||
string cert_expires = 5; // RFC3339
|
||||
}
|
||||
|
||||
// --- Health check (v2) ---
|
||||
|
||||
message HealthCheckRequest {}
|
||||
|
||||
message HealthCheckResponse {
|
||||
string status = 1; // "healthy" or "degraded"
|
||||
int32 containers = 2;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user