Files
mcp/internal/master/agentclient.go
Kyle Isom 20735e4b41 Add agent client and connection pool for master
AgentClient wraps a gRPC connection to a single agent with typed
forwarding methods (Deploy, UndeployService, SetupEdgeRoute, etc.).
AgentPool manages connections to multiple agents keyed by node name.

Follows the same TLS 1.3 + token interceptor pattern as cmd/mcp/dial.go
but runs server-side with the master's own MCIAS service token.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 15:35:16 -07:00

191 lines
5.8 KiB
Go

// Package master implements the mcp-master orchestrator.
package master
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"strings"
"sync"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)
// AgentClient wraps a gRPC connection to a single mcp-agent.
type AgentClient struct {
conn *grpc.ClientConn
client mcpv1.McpAgentServiceClient
Node string
}
// DialAgent connects to an agent at the given address using TLS 1.3.
// The token is attached to every outgoing RPC via metadata.
func DialAgent(address, caCertPath, token string) (*AgentClient, error) {
tlsConfig := &tls.Config{
MinVersion: tls.VersionTLS13,
}
if caCertPath != "" {
caCert, err := os.ReadFile(caCertPath) //nolint:gosec // trusted config path
if err != nil {
return nil, fmt.Errorf("read CA cert %q: %w", caCertPath, err)
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("invalid CA cert %q", caCertPath)
}
tlsConfig.RootCAs = pool
}
conn, err := grpc.NewClient(
address,
grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
grpc.WithUnaryInterceptor(agentTokenInterceptor(token)),
grpc.WithStreamInterceptor(agentStreamTokenInterceptor(token)),
)
if err != nil {
return nil, fmt.Errorf("dial agent %q: %w", address, err)
}
return &AgentClient{
conn: conn,
client: mcpv1.NewMcpAgentServiceClient(conn),
}, nil
}
// Close closes the underlying gRPC connection.
func (c *AgentClient) Close() error {
if c == nil || c.conn == nil {
return nil
}
return c.conn.Close()
}
// Deploy forwards a deploy request to the agent.
func (c *AgentClient) Deploy(ctx context.Context, req *mcpv1.DeployRequest) (*mcpv1.DeployResponse, error) {
return c.client.Deploy(ctx, req)
}
// UndeployService forwards an undeploy request to the agent.
func (c *AgentClient) UndeployService(ctx context.Context, req *mcpv1.UndeployServiceRequest) (*mcpv1.UndeployServiceResponse, error) {
return c.client.UndeployService(ctx, req)
}
// GetServiceStatus queries a service's status on the agent.
func (c *AgentClient) GetServiceStatus(ctx context.Context, req *mcpv1.GetServiceStatusRequest) (*mcpv1.GetServiceStatusResponse, error) {
return c.client.GetServiceStatus(ctx, req)
}
// ListServices lists all services on the agent.
func (c *AgentClient) ListServices(ctx context.Context, req *mcpv1.ListServicesRequest) (*mcpv1.ListServicesResponse, error) {
return c.client.ListServices(ctx, req)
}
// SetupEdgeRoute sets up an edge route on the agent.
func (c *AgentClient) SetupEdgeRoute(ctx context.Context, req *mcpv1.SetupEdgeRouteRequest) (*mcpv1.SetupEdgeRouteResponse, error) {
return c.client.SetupEdgeRoute(ctx, req)
}
// RemoveEdgeRoute removes an edge route from the agent.
func (c *AgentClient) RemoveEdgeRoute(ctx context.Context, req *mcpv1.RemoveEdgeRouteRequest) (*mcpv1.RemoveEdgeRouteResponse, error) {
return c.client.RemoveEdgeRoute(ctx, req)
}
// ListEdgeRoutes lists edge routes on the agent.
func (c *AgentClient) ListEdgeRoutes(ctx context.Context, req *mcpv1.ListEdgeRoutesRequest) (*mcpv1.ListEdgeRoutesResponse, error) {
return c.client.ListEdgeRoutes(ctx, req)
}
// HealthCheck checks the agent's health.
func (c *AgentClient) HealthCheck(ctx context.Context, req *mcpv1.HealthCheckRequest) (*mcpv1.HealthCheckResponse, error) {
return c.client.HealthCheck(ctx, req)
}
// agentTokenInterceptor attaches the bearer token to outgoing RPCs.
func agentTokenInterceptor(token string) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token)
return invoker(ctx, method, req, reply, cc, opts...)
}
}
func agentStreamTokenInterceptor(token string) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token)
return streamer(ctx, desc, cc, method, opts...)
}
}
// AgentPool manages connections to multiple agents, keyed by node name.
type AgentPool struct {
mu sync.RWMutex
clients map[string]*AgentClient
caCert string
token string
}
// NewAgentPool creates a pool with the given CA cert and service token.
func NewAgentPool(caCertPath, token string) *AgentPool {
return &AgentPool{
clients: make(map[string]*AgentClient),
caCert: caCertPath,
token: token,
}
}
// AddNode dials an agent and adds it to the pool.
func (p *AgentPool) AddNode(name, address string) error {
client, err := DialAgent(address, p.caCert, p.token)
if err != nil {
return fmt.Errorf("add node %s: %w", name, err)
}
client.Node = name
p.mu.Lock()
defer p.mu.Unlock()
// Close existing connection if re-adding.
if old, ok := p.clients[name]; ok {
_ = old.Close()
}
p.clients[name] = client
return nil
}
// Get returns the agent client for a node.
func (p *AgentPool) Get(name string) (*AgentClient, error) {
p.mu.RLock()
defer p.mu.RUnlock()
client, ok := p.clients[name]
if !ok {
return nil, fmt.Errorf("node %q not found in pool", name)
}
return client, nil
}
// Close closes all agent connections.
func (p *AgentPool) Close() {
p.mu.Lock()
defer p.mu.Unlock()
for _, c := range p.clients {
_ = c.Close()
}
p.clients = make(map[string]*AgentClient)
}
// LoadServiceToken reads a token from a file path.
func LoadServiceToken(path string) (string, error) {
data, err := os.ReadFile(path) //nolint:gosec // trusted config path
if err != nil {
return "", fmt.Errorf("read service token %q: %w", path, err)
}
return strings.TrimSpace(string(data)), nil
}