Files
mcp/internal/agent/heartbeat.go
Kyle Isom 3b08caaa0a Add [master] config section to agent for registration
Heartbeat client now reads master connection settings from the agent
config ([master] section) with env var fallback. Includes address,
ca_cert, token_path, and role fields.

Agent's Run() creates and starts the heartbeat client automatically
when [master] is configured.

Tested on all three nodes: rift (master), svc (edge), orion (worker)
all registered with the master and sending heartbeats every 30s.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 13:53:15 -07:00

198 lines
5.0 KiB
Go

package agent
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"runtime"
"strings"
"sync"
"time"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/config"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)
// MasterConfig holds the optional master connection settings for the agent.
// When configured, the agent self-registers and sends periodic heartbeats.
type MasterConfig struct {
Address string `toml:"address"` // master gRPC address
CACert string `toml:"ca_cert"` // CA cert to verify master's TLS
TokenPath string `toml:"token_path"` // MCIAS service token for auth
}
// HeartbeatClient manages the agent's connection to the master for
// registration and heartbeats.
type HeartbeatClient struct {
client mcpv1.McpMasterServiceClient
conn *grpc.ClientConn
nodeName string
role string
address string // agent's own gRPC address
arch string
interval time.Duration
stop chan struct{}
wg sync.WaitGroup
logger interface{ Info(string, ...any); Warn(string, ...any); Error(string, ...any) }
}
// NewHeartbeatClient creates a client that registers with the master and
// sends periodic heartbeats. Returns nil if master address is not configured.
func NewHeartbeatClient(cfg config.AgentConfig, logger interface{ Info(string, ...any); Warn(string, ...any); Error(string, ...any) }) (*HeartbeatClient, error) {
// Config takes precedence, env vars as fallback.
masterAddr := cfg.Master.Address
if masterAddr == "" {
masterAddr = os.Getenv("MCP_MASTER_ADDRESS")
}
masterCACert := cfg.Master.CACert
if masterCACert == "" {
masterCACert = os.Getenv("MCP_MASTER_CA_CERT")
}
masterTokenPath := cfg.Master.TokenPath
if masterTokenPath == "" {
masterTokenPath = os.Getenv("MCP_MASTER_TOKEN_PATH")
}
role := cfg.Master.Role
if role == "" {
role = "worker"
}
if masterAddr == "" {
return nil, nil // master not configured
}
token := ""
if masterTokenPath != "" {
data, err := os.ReadFile(masterTokenPath) //nolint:gosec // trusted config
if err != nil {
return nil, fmt.Errorf("read master token: %w", err)
}
token = strings.TrimSpace(string(data))
}
tlsConfig := &tls.Config{MinVersion: tls.VersionTLS13}
if masterCACert != "" {
caCert, err := os.ReadFile(masterCACert) //nolint:gosec // trusted config
if err != nil {
return nil, fmt.Errorf("read master CA cert: %w", err)
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("invalid master CA cert")
}
tlsConfig.RootCAs = pool
}
conn, err := grpc.NewClient(
masterAddr,
grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if token != "" {
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token)
}
return invoker(ctx, method, req, reply, cc, opts...)
}),
)
if err != nil {
return nil, fmt.Errorf("dial master: %w", err)
}
return &HeartbeatClient{
client: mcpv1.NewMcpMasterServiceClient(conn),
conn: conn,
nodeName: cfg.Agent.NodeName,
role: role,
address: cfg.Server.GRPCAddr,
arch: runtime.GOARCH,
interval: 30 * time.Second,
stop: make(chan struct{}),
logger: logger,
}, nil
}
// Start registers with the master and begins the heartbeat loop.
func (hc *HeartbeatClient) Start() {
if hc == nil {
return
}
// Register with the master (retry with backoff).
hc.wg.Add(1)
go func() {
defer hc.wg.Done()
backoff := time.Second
for {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := hc.client.Register(ctx, &mcpv1.RegisterRequest{
Name: hc.nodeName,
Role: hc.role,
Address: hc.address,
Arch: hc.arch,
})
cancel()
if err == nil && resp.GetAccepted() {
hc.logger.Info("registered with master",
"node", hc.nodeName, "master_accepted", true)
break
}
hc.logger.Warn("registration failed, retrying",
"node", hc.nodeName, "err", err, "backoff", backoff)
select {
case <-hc.stop:
return
case <-time.After(backoff):
}
backoff *= 2
if backoff > 60*time.Second {
backoff = 60 * time.Second
}
}
// Heartbeat loop.
ticker := time.NewTicker(hc.interval)
defer ticker.Stop()
for {
select {
case <-hc.stop:
return
case <-ticker.C:
hc.sendHeartbeat()
}
}
}()
}
func (hc *HeartbeatClient) sendHeartbeat() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := hc.client.Heartbeat(ctx, &mcpv1.HeartbeatRequest{
Name: hc.nodeName,
Containers: 0, // TODO: count from runtime
})
if err != nil {
hc.logger.Warn("heartbeat failed", "node", hc.nodeName, "err", err)
}
}
// Stop stops the heartbeat loop and closes the master connection.
func (hc *HeartbeatClient) Stop() {
if hc == nil {
return
}
close(hc.stop)
hc.wg.Wait()
_ = hc.conn.Close()
}