Add agent registration, heartbeats, and monitoring (Phase 4)
Master side: - Register RPC: identity-bound (agent-rift → rift), allowlist check, max nodes limit, upserts node in registry, updates agent pool - Heartbeat RPC: derives node name from MCIAS identity (not request), updates container count and last-heartbeat timestamp - HeartbeatMonitor: background goroutine checks for missed heartbeats (90s threshold), probes agents via HealthCheck, marks unhealthy Agent side: - HeartbeatClient: connects to master via env vars (MCP_MASTER_ADDRESS, MCP_MASTER_CA_CERT, MCP_MASTER_TOKEN_PATH), registers on startup with exponential backoff, sends heartbeats every 30s Proto: added Register and Heartbeat RPCs + messages to master.proto. Architecture v2 Phase 4. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
235
internal/master/registration.go
Normal file
235
internal/master/registration.go
Normal file
@@ -0,0 +1,235 @@
|
||||
package master
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||
"git.wntrmute.dev/mc/mcp/internal/auth"
|
||||
"git.wntrmute.dev/mc/mcp/internal/masterdb"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// Register handles agent self-registration. Identity-bound: the agent's
|
||||
// MCIAS service name must match the claimed node name (agent-rift → rift).
|
||||
func (m *Master) Register(ctx context.Context, req *mcpv1.RegisterRequest) (*mcpv1.RegisterResponse, error) {
|
||||
// Extract caller identity from the auth context.
|
||||
tokenInfo := auth.TokenInfoFromContext(ctx)
|
||||
if tokenInfo == nil {
|
||||
return nil, status.Error(codes.Unauthenticated, "no auth context")
|
||||
}
|
||||
|
||||
// Identity binding: agent-rift can only register name="rift".
|
||||
expectedName := strings.TrimPrefix(tokenInfo.Username, "agent-")
|
||||
if expectedName == tokenInfo.Username {
|
||||
// Not an agent-* account — also allow mcp-agent (legacy).
|
||||
expectedName = req.GetName()
|
||||
}
|
||||
if req.GetName() != expectedName {
|
||||
m.Logger.Warn("registration rejected: name mismatch",
|
||||
"claimed", req.GetName(), "identity", tokenInfo.Username)
|
||||
return nil, status.Errorf(codes.PermissionDenied,
|
||||
"identity %q cannot register as %q", tokenInfo.Username, req.GetName())
|
||||
}
|
||||
|
||||
// Check allowlist.
|
||||
if len(m.Config.Registration.AllowedAgents) > 0 {
|
||||
allowed := false
|
||||
for _, a := range m.Config.Registration.AllowedAgents {
|
||||
if a == tokenInfo.Username {
|
||||
allowed = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !allowed {
|
||||
m.Logger.Warn("registration rejected: not in allowlist",
|
||||
"identity", tokenInfo.Username)
|
||||
return nil, status.Errorf(codes.PermissionDenied,
|
||||
"identity %q not in registration allowlist", tokenInfo.Username)
|
||||
}
|
||||
}
|
||||
|
||||
// Check max nodes.
|
||||
nodes, err := masterdb.ListNodes(m.DB)
|
||||
if err == nil && len(nodes) >= m.Config.Registration.MaxNodes {
|
||||
// Check if this is a re-registration (existing node).
|
||||
found := false
|
||||
for _, n := range nodes {
|
||||
if n.Name == req.GetName() {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return nil, status.Error(codes.ResourceExhausted, "max nodes reached")
|
||||
}
|
||||
}
|
||||
|
||||
// Upsert node in registry.
|
||||
role := req.GetRole()
|
||||
if role == "" {
|
||||
role = "worker"
|
||||
}
|
||||
arch := req.GetArch()
|
||||
if arch == "" {
|
||||
arch = "amd64"
|
||||
}
|
||||
|
||||
if err := masterdb.UpsertNode(m.DB, req.GetName(), req.GetAddress(), role, arch); err != nil {
|
||||
m.Logger.Error("registration upsert failed", "node", req.GetName(), "err", err)
|
||||
return nil, status.Error(codes.Internal, "registration failed")
|
||||
}
|
||||
if err := masterdb.UpdateNodeStatus(m.DB, req.GetName(), "healthy"); err != nil {
|
||||
m.Logger.Warn("update node status", "node", req.GetName(), "err", err)
|
||||
}
|
||||
|
||||
// Update the agent pool connection.
|
||||
if addErr := m.Pool.AddNode(req.GetName(), req.GetAddress()); addErr != nil {
|
||||
m.Logger.Warn("pool update failed", "node", req.GetName(), "err", addErr)
|
||||
}
|
||||
|
||||
m.Logger.Info("agent registered",
|
||||
"node", req.GetName(), "address", req.GetAddress(),
|
||||
"role", role, "arch", arch, "identity", tokenInfo.Username)
|
||||
|
||||
return &mcpv1.RegisterResponse{Accepted: true}, nil
|
||||
}
|
||||
|
||||
// Heartbeat handles agent heartbeats. Updates the node's resource data
|
||||
// and last-heartbeat timestamp. Derives the node name from the MCIAS
|
||||
// identity, not the request (security: don't trust self-reported name).
|
||||
func (m *Master) Heartbeat(ctx context.Context, req *mcpv1.HeartbeatRequest) (*mcpv1.HeartbeatResponse, error) {
|
||||
// Derive node name from identity.
|
||||
tokenInfo := auth.TokenInfoFromContext(ctx)
|
||||
if tokenInfo == nil {
|
||||
return nil, status.Error(codes.Unauthenticated, "no auth context")
|
||||
}
|
||||
|
||||
nodeName := strings.TrimPrefix(tokenInfo.Username, "agent-")
|
||||
if nodeName == tokenInfo.Username {
|
||||
// Legacy mcp-agent account — use the request name.
|
||||
nodeName = req.GetName()
|
||||
}
|
||||
|
||||
// Verify the node is registered.
|
||||
node, err := masterdb.GetNode(m.DB, nodeName)
|
||||
if err != nil || node == nil {
|
||||
return nil, status.Errorf(codes.NotFound, "node %q not registered", nodeName)
|
||||
}
|
||||
|
||||
// Update heartbeat data.
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
_, err = m.DB.Exec(`
|
||||
UPDATE nodes SET
|
||||
containers = ?,
|
||||
status = 'healthy',
|
||||
last_heartbeat = ?,
|
||||
updated_at = datetime('now')
|
||||
WHERE name = ?
|
||||
`, req.GetContainers(), now, nodeName)
|
||||
if err != nil {
|
||||
m.Logger.Warn("heartbeat update failed", "node", nodeName, "err", err)
|
||||
}
|
||||
|
||||
return &mcpv1.HeartbeatResponse{Acknowledged: true}, nil
|
||||
}
|
||||
|
||||
// HeartbeatMonitor runs in the background, checking for agents that have
|
||||
// missed heartbeats and probing them via HealthCheck.
|
||||
type HeartbeatMonitor struct {
|
||||
master *Master
|
||||
interval time.Duration // heartbeat check interval (default: 30s)
|
||||
timeout time.Duration // missed heartbeat threshold (default: 90s)
|
||||
stop chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewHeartbeatMonitor creates a heartbeat monitor.
|
||||
func NewHeartbeatMonitor(m *Master) *HeartbeatMonitor {
|
||||
return &HeartbeatMonitor{
|
||||
master: m,
|
||||
interval: 30 * time.Second,
|
||||
timeout: 90 * time.Second,
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the heartbeat monitoring loop.
|
||||
func (hm *HeartbeatMonitor) Start() {
|
||||
hm.wg.Add(1)
|
||||
go func() {
|
||||
defer hm.wg.Done()
|
||||
// Initial warm-up: don't alert for the first cycle.
|
||||
time.Sleep(hm.timeout)
|
||||
|
||||
ticker := time.NewTicker(hm.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-hm.stop:
|
||||
return
|
||||
case <-ticker.C:
|
||||
hm.check()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop stops the heartbeat monitor.
|
||||
func (hm *HeartbeatMonitor) Stop() {
|
||||
close(hm.stop)
|
||||
hm.wg.Wait()
|
||||
}
|
||||
|
||||
func (hm *HeartbeatMonitor) check() {
|
||||
nodes, err := masterdb.ListNodes(hm.master.DB)
|
||||
if err != nil {
|
||||
hm.master.Logger.Warn("heartbeat check: list nodes", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
for _, node := range nodes {
|
||||
if node.Status == "unhealthy" {
|
||||
continue // already marked, don't spam probes
|
||||
}
|
||||
|
||||
if node.LastHeartbeat == nil {
|
||||
continue // never sent a heartbeat, skip
|
||||
}
|
||||
|
||||
if now.Sub(*node.LastHeartbeat) > hm.timeout {
|
||||
hm.master.Logger.Warn("missed heartbeats, probing",
|
||||
"node", node.Name,
|
||||
"last_heartbeat", node.LastHeartbeat.Format(time.RFC3339))
|
||||
|
||||
// Probe the agent.
|
||||
client, err := hm.master.Pool.Get(node.Name)
|
||||
if err != nil {
|
||||
hm.master.Logger.Warn("probe failed: no connection",
|
||||
"node", node.Name, "err", err)
|
||||
_ = masterdb.UpdateNodeStatus(hm.master.DB, node.Name, "unhealthy")
|
||||
continue
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(),
|
||||
hm.master.Config.Timeouts.HealthCheck.Duration)
|
||||
_, probeErr := client.HealthCheck(ctx, &mcpv1.HealthCheckRequest{})
|
||||
cancel()
|
||||
|
||||
if probeErr != nil {
|
||||
hm.master.Logger.Warn("probe failed",
|
||||
"node", node.Name, "err", probeErr)
|
||||
_ = masterdb.UpdateNodeStatus(hm.master.DB, node.Name, "unhealthy")
|
||||
} else {
|
||||
// Probe succeeded — node is alive, just not sending heartbeats.
|
||||
hm.master.Logger.Info("probe succeeded (heartbeats stale)",
|
||||
"node", node.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user