diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 581e272..374e822 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -131,6 +131,15 @@ func Run(cfg *config.AgentConfig, version string) error { } } + // Start heartbeat client (registers with master and sends heartbeats). + hbClient, hbErr := NewHeartbeatClient(*cfg, logger) + if hbErr != nil { + logger.Warn("heartbeat client failed to start", "err", hbErr) + } else if hbClient != nil { + hbClient.Start() + logger.Info("heartbeat client started", "master", cfg.Master.Address) + } + mon.Start() ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) @@ -144,6 +153,9 @@ func Run(cfg *config.AgentConfig, version string) error { select { case <-ctx.Done(): logger.Info("shutting down") + if hbClient != nil { + hbClient.Stop() + } mon.Stop() server.GracefulStop() _ = proxy.Close() diff --git a/internal/agent/heartbeat.go b/internal/agent/heartbeat.go index 49d4027..da9f94d 100644 --- a/internal/agent/heartbeat.go +++ b/internal/agent/heartbeat.go @@ -44,17 +44,31 @@ type HeartbeatClient struct { // NewHeartbeatClient creates a client that registers with the master and // sends periodic heartbeats. Returns nil if master address is not configured. func NewHeartbeatClient(cfg config.AgentConfig, logger interface{ Info(string, ...any); Warn(string, ...any); Error(string, ...any) }) (*HeartbeatClient, error) { - masterAddr := os.Getenv("MCP_MASTER_ADDRESS") - masterCACert := os.Getenv("MCP_MASTER_CA_CERT") - masterToken := os.Getenv("MCP_MASTER_TOKEN_PATH") + // Config takes precedence, env vars as fallback. + masterAddr := cfg.Master.Address + if masterAddr == "" { + masterAddr = os.Getenv("MCP_MASTER_ADDRESS") + } + masterCACert := cfg.Master.CACert + if masterCACert == "" { + masterCACert = os.Getenv("MCP_MASTER_CA_CERT") + } + masterTokenPath := cfg.Master.TokenPath + if masterTokenPath == "" { + masterTokenPath = os.Getenv("MCP_MASTER_TOKEN_PATH") + } + role := cfg.Master.Role + if role == "" { + role = "worker" + } if masterAddr == "" { return nil, nil // master not configured } token := "" - if masterToken != "" { - data, err := os.ReadFile(masterToken) //nolint:gosec // trusted config + if masterTokenPath != "" { + data, err := os.ReadFile(masterTokenPath) //nolint:gosec // trusted config if err != nil { return nil, fmt.Errorf("read master token: %w", err) } @@ -92,7 +106,7 @@ func NewHeartbeatClient(cfg config.AgentConfig, logger interface{ Info(string, . client: mcpv1.NewMcpMasterServiceClient(conn), conn: conn, nodeName: cfg.Agent.NodeName, - role: "worker", // default; master node sets this via config + role: role, address: cfg.Server.GRPCAddr, arch: runtime.GOARCH, interval: 30 * time.Second, diff --git a/internal/config/agent.go b/internal/config/agent.go index ff90f9d..4b4809d 100644 --- a/internal/config/agent.go +++ b/internal/config/agent.go @@ -19,7 +19,17 @@ type AgentConfig struct { MCNS MCNSConfig `toml:"mcns"` Monitor MonitorConfig `toml:"monitor"` Log LogConfig `toml:"log"` - Boot BootConfig `toml:"boot"` + Boot BootConfig `toml:"boot"` + Master AgentMasterConfig `toml:"master"` +} + +// AgentMasterConfig holds the optional master connection settings. +// When configured, the agent self-registers and sends heartbeats. +type AgentMasterConfig struct { + Address string `toml:"address"` // master gRPC address + CACert string `toml:"ca_cert"` // CA cert to verify master's TLS + TokenPath string `toml:"token_path"` // MCIAS service token for auth + Role string `toml:"role"` // "worker", "edge", or "master" } // BootConfig holds the boot sequence for the master node.