NodeConfig and MasterNodeConfig gain an optional addresses[] field for fallback addresses tried in order after the primary address. Provides resilience when Tailscale DNS is down or a node is only reachable via LAN. - dialAgentMulti: tries each address with a 3s health check, returns first success - forEachNode: uses multi-address dialing - AgentPool.AddNodeMulti: master tries all addresses when connecting - AllAddresses(): deduplicates primary + fallback addresses Config example: [[nodes]] name = "rift" address = "rift.scylla-hammerhead.ts.net:9444" addresses = ["100.95.252.120:9444", "192.168.88.181:9444"] Existing configs without addresses[] work unchanged. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
154 lines
5.0 KiB
Go
154 lines
5.0 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
|
"git.wntrmute.dev/mc/mcp/internal/config"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/metadata"
|
|
)
|
|
|
|
// dialAgent connects to an agent at the given address and returns a gRPC
|
|
// client. The connection uses TLS and attaches the bearer token to every RPC.
|
|
func dialAgent(address string, cfg *config.CLIConfig) (mcpv1.McpAgentServiceClient, *grpc.ClientConn, error) {
|
|
tlsConfig := &tls.Config{
|
|
MinVersion: tls.VersionTLS13,
|
|
}
|
|
|
|
if cfg.MCIAS.CACert != "" {
|
|
caCert, err := os.ReadFile(cfg.MCIAS.CACert) //nolint:gosec // trusted config path
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("read CA cert %q: %w", cfg.MCIAS.CACert, err)
|
|
}
|
|
pool := x509.NewCertPool()
|
|
if !pool.AppendCertsFromPEM(caCert) {
|
|
return nil, nil, fmt.Errorf("invalid CA cert %q", cfg.MCIAS.CACert)
|
|
}
|
|
tlsConfig.RootCAs = pool
|
|
}
|
|
|
|
token, err := loadBearerToken(cfg)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("load token: %w", err)
|
|
}
|
|
|
|
conn, err := grpc.NewClient(
|
|
address,
|
|
grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
|
|
grpc.WithUnaryInterceptor(tokenInterceptor(token)),
|
|
grpc.WithStreamInterceptor(streamTokenInterceptor(token)),
|
|
)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("dial %q: %w", address, err)
|
|
}
|
|
|
|
return mcpv1.NewMcpAgentServiceClient(conn), conn, nil
|
|
}
|
|
|
|
// dialAgentMulti tries each address in order and returns the first successful
|
|
// connection. Provides resilience when Tailscale DNS is down or a node is
|
|
// reachable via LAN but not Tailnet.
|
|
func dialAgentMulti(addresses []string, cfg *config.CLIConfig) (mcpv1.McpAgentServiceClient, *grpc.ClientConn, error) {
|
|
if len(addresses) == 0 {
|
|
return nil, nil, fmt.Errorf("no addresses to dial")
|
|
}
|
|
if len(addresses) == 1 {
|
|
return dialAgent(addresses[0], cfg)
|
|
}
|
|
|
|
var lastErr error
|
|
for _, addr := range addresses {
|
|
client, conn, err := dialAgent(addr, cfg)
|
|
if err != nil {
|
|
lastErr = fmt.Errorf("%s: %w", addr, err)
|
|
continue
|
|
}
|
|
// Quick health check to verify the connection actually works.
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
_, err = client.NodeStatus(ctx, &mcpv1.NodeStatusRequest{})
|
|
cancel()
|
|
if err != nil {
|
|
_ = conn.Close()
|
|
lastErr = fmt.Errorf("%s: %w", addr, err)
|
|
continue
|
|
}
|
|
return client, conn, nil
|
|
}
|
|
return nil, nil, fmt.Errorf("all addresses failed, last error: %w", lastErr)
|
|
}
|
|
|
|
// dialMaster connects to the master at the given address and returns a gRPC
|
|
// client for the McpMasterService.
|
|
func dialMaster(address string, cfg *config.CLIConfig) (mcpv1.McpMasterServiceClient, *grpc.ClientConn, error) {
|
|
tlsConfig := &tls.Config{
|
|
MinVersion: tls.VersionTLS13,
|
|
}
|
|
|
|
if cfg.MCIAS.CACert != "" {
|
|
caCert, err := os.ReadFile(cfg.MCIAS.CACert) //nolint:gosec // trusted config path
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("read CA cert %q: %w", cfg.MCIAS.CACert, err)
|
|
}
|
|
pool := x509.NewCertPool()
|
|
if !pool.AppendCertsFromPEM(caCert) {
|
|
return nil, nil, fmt.Errorf("invalid CA cert %q", cfg.MCIAS.CACert)
|
|
}
|
|
tlsConfig.RootCAs = pool
|
|
}
|
|
|
|
token, err := loadBearerToken(cfg)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("load token: %w", err)
|
|
}
|
|
|
|
conn, err := grpc.NewClient(
|
|
address,
|
|
grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
|
|
grpc.WithUnaryInterceptor(tokenInterceptor(token)),
|
|
grpc.WithStreamInterceptor(streamTokenInterceptor(token)),
|
|
)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("dial master %q: %w", address, err)
|
|
}
|
|
|
|
return mcpv1.NewMcpMasterServiceClient(conn), conn, nil
|
|
}
|
|
|
|
// tokenInterceptor returns a gRPC client interceptor that attaches the
|
|
// bearer token to outgoing RPC metadata.
|
|
func tokenInterceptor(token string) grpc.UnaryClientInterceptor {
|
|
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
|
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token)
|
|
return invoker(ctx, method, req, reply, cc, opts...)
|
|
}
|
|
}
|
|
|
|
// streamTokenInterceptor returns a gRPC client stream interceptor that
|
|
// attaches the bearer token to outgoing stream metadata.
|
|
func streamTokenInterceptor(token string) grpc.StreamClientInterceptor {
|
|
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
|
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token)
|
|
return streamer(ctx, desc, cc, method, opts...)
|
|
}
|
|
}
|
|
|
|
// loadBearerToken reads the token from file or env var.
|
|
func loadBearerToken(cfg *config.CLIConfig) (string, error) {
|
|
if token := os.Getenv("MCP_TOKEN"); token != "" {
|
|
return token, nil
|
|
}
|
|
token, err := os.ReadFile(cfg.Auth.TokenPath) //nolint:gosec // trusted config path
|
|
if err != nil {
|
|
return "", fmt.Errorf("read token from %q: %w (run 'mcp login' first)", cfg.Auth.TokenPath, err)
|
|
}
|
|
return strings.TrimSpace(string(token)), nil
|
|
}
|