Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3b08caaa0a | |||
| 6351b68ef6 | |||
| fa4d022bc1 | |||
| 9d543998dc | |||
| f9f6f339f4 |
@@ -43,6 +43,7 @@ func main() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
root.AddCommand(snapshotCmd())
|
root.AddCommand(snapshotCmd())
|
||||||
|
root.AddCommand(recoverCmd())
|
||||||
|
|
||||||
if err := root.Execute(); err != nil {
|
if err := root.Execute(); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
|||||||
68
cmd/mcp-agent/recover.go
Normal file
68
cmd/mcp-agent/recover.go
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/agent"
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/config"
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/registry"
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/runtime"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
func recoverCmd() *cobra.Command {
|
||||||
|
return &cobra.Command{
|
||||||
|
Use: "recover",
|
||||||
|
Short: "Recreate containers from the agent registry",
|
||||||
|
Long: `Recover recreates containers from the agent's SQLite registry for all
|
||||||
|
services whose desired state is "running" but which don't have a running
|
||||||
|
container in podman.
|
||||||
|
|
||||||
|
This is the recovery path after a podman database loss (e.g., after a
|
||||||
|
UID change, podman reset, or reboot that cleared container state).
|
||||||
|
|
||||||
|
Images must be cached locally — recover does not pull from MCR.`,
|
||||||
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
|
cfg, err := config.LoadAgentConfig(cfgPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("load config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
|
||||||
|
Level: slog.LevelInfo,
|
||||||
|
}))
|
||||||
|
|
||||||
|
db, err := registry.Open(cfg.Database.Path)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("open registry: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = db.Close() }()
|
||||||
|
|
||||||
|
proxy, err := agent.NewProxyRouter(cfg.MCProxy.Socket, cfg.MCProxy.CertDir, logger)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warn("mc-proxy not available, routes will not be registered", "err", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
certs, err := agent.NewCertProvisioner(cfg.Metacrypt, cfg.MCProxy.CertDir, logger)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warn("cert provisioner not available", "err", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a := &agent.Agent{
|
||||||
|
Config: cfg,
|
||||||
|
DB: db,
|
||||||
|
Runtime: &runtime.Podman{},
|
||||||
|
Logger: logger,
|
||||||
|
PortAlloc: agent.NewPortAllocator(),
|
||||||
|
Proxy: proxy,
|
||||||
|
Certs: certs,
|
||||||
|
Version: version,
|
||||||
|
}
|
||||||
|
|
||||||
|
return a.Recover(context.Background())
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||||
"git.wntrmute.dev/mc/mcp/internal/config"
|
"git.wntrmute.dev/mc/mcp/internal/config"
|
||||||
@@ -52,6 +53,38 @@ func dialAgent(address string, cfg *config.CLIConfig) (mcpv1.McpAgentServiceClie
|
|||||||
return mcpv1.NewMcpAgentServiceClient(conn), conn, nil
|
return mcpv1.NewMcpAgentServiceClient(conn), conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// dialAgentMulti tries each address in order and returns the first successful
|
||||||
|
// connection. Provides resilience when Tailscale DNS is down or a node is
|
||||||
|
// reachable via LAN but not Tailnet.
|
||||||
|
func dialAgentMulti(addresses []string, cfg *config.CLIConfig) (mcpv1.McpAgentServiceClient, *grpc.ClientConn, error) {
|
||||||
|
if len(addresses) == 0 {
|
||||||
|
return nil, nil, fmt.Errorf("no addresses to dial")
|
||||||
|
}
|
||||||
|
if len(addresses) == 1 {
|
||||||
|
return dialAgent(addresses[0], cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
var lastErr error
|
||||||
|
for _, addr := range addresses {
|
||||||
|
client, conn, err := dialAgent(addr, cfg)
|
||||||
|
if err != nil {
|
||||||
|
lastErr = fmt.Errorf("%s: %w", addr, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Quick health check to verify the connection actually works.
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||||
|
_, err = client.NodeStatus(ctx, &mcpv1.NodeStatusRequest{})
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
|
_ = conn.Close()
|
||||||
|
lastErr = fmt.Errorf("%s: %w", addr, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return client, conn, nil
|
||||||
|
}
|
||||||
|
return nil, nil, fmt.Errorf("all addresses failed, last error: %w", lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
// dialMaster connects to the master at the given address and returns a gRPC
|
// dialMaster connects to the master at the given address and returns a gRPC
|
||||||
// client for the McpMasterService.
|
// client for the McpMasterService.
|
||||||
func dialMaster(address string, cfg *config.CLIConfig) (mcpv1.McpMasterServiceClient, *grpc.ClientConn, error) {
|
func dialMaster(address string, cfg *config.CLIConfig) (mcpv1.McpMasterServiceClient, *grpc.ClientConn, error) {
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// findNodeAddress looks up a node by name in the CLI config and returns
|
// findNodeAddress looks up a node by name in the CLI config and returns
|
||||||
// its address.
|
// its primary address.
|
||||||
func findNodeAddress(cfg *config.CLIConfig, nodeName string) (string, error) {
|
func findNodeAddress(cfg *config.CLIConfig, nodeName string) (string, error) {
|
||||||
for _, n := range cfg.Nodes {
|
for _, n := range cfg.Nodes {
|
||||||
if n.Name == nodeName {
|
if n.Name == nodeName {
|
||||||
@@ -19,6 +19,16 @@ func findNodeAddress(cfg *config.CLIConfig, nodeName string) (string, error) {
|
|||||||
return "", fmt.Errorf("node %q not found in config", nodeName)
|
return "", fmt.Errorf("node %q not found in config", nodeName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// findNode looks up a node by name in the CLI config.
|
||||||
|
func findNode(cfg *config.CLIConfig, nodeName string) (*config.NodeConfig, error) {
|
||||||
|
for i := range cfg.Nodes {
|
||||||
|
if cfg.Nodes[i].Name == nodeName {
|
||||||
|
return &cfg.Nodes[i], nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("node %q not found in config", nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
// printComponentResults prints the result of each component operation.
|
// printComponentResults prints the result of each component operation.
|
||||||
func printComponentResults(results []*mcpv1.ComponentResult) {
|
func printComponentResults(results []*mcpv1.ComponentResult) {
|
||||||
for _, r := range results {
|
for _, r := range results {
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ func forEachNode(fn func(node config.NodeConfig, client mcpv1.McpAgentServiceCli
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, node := range cfg.Nodes {
|
for _, node := range cfg.Nodes {
|
||||||
client, conn, err := dialAgent(node.Address, cfg)
|
client, conn, err := dialAgentMulti(node.AllAddresses(), cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_, _ = fmt.Fprintf(os.Stderr, "warning: %s: %v\n", node.Name, err)
|
_, _ = fmt.Fprintf(os.Stderr, "warning: %s: %v\n", node.Name, err)
|
||||||
continue
|
continue
|
||||||
@@ -85,7 +85,7 @@ func psCmd() *cobra.Command {
|
|||||||
Short: "Live check: query runtime on all agents",
|
Short: "Live check: query runtime on all agents",
|
||||||
RunE: func(cmd *cobra.Command, args []string) error {
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
w := newTable()
|
w := newTable()
|
||||||
_, _ = fmt.Fprintln(w, "SERVICE\tCOMPONENT\tNODE\tSTATE\tVERSION\tUPTIME")
|
_, _ = fmt.Fprintln(w, "SERVICE\tCOMPONENT\tNODE\tSTATE\tVERSION\tUPTIME\t")
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
if err := forEachNode(func(node config.NodeConfig, client mcpv1.McpAgentServiceClient) error {
|
if err := forEachNode(func(node config.NodeConfig, client mcpv1.McpAgentServiceClient) error {
|
||||||
@@ -96,19 +96,25 @@ func psCmd() *cobra.Command {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, svc := range resp.GetServices() {
|
for _, svc := range resp.GetServices() {
|
||||||
|
comment := svc.GetComment()
|
||||||
for _, comp := range svc.GetComponents() {
|
for _, comp := range svc.GetComponents() {
|
||||||
uptime := "-"
|
uptime := "-"
|
||||||
if comp.GetStarted() != nil {
|
if comp.GetStarted() != nil {
|
||||||
d := now.Sub(comp.GetStarted().AsTime())
|
d := now.Sub(comp.GetStarted().AsTime())
|
||||||
uptime = formatDuration(d)
|
uptime = formatDuration(d)
|
||||||
}
|
}
|
||||||
_, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n",
|
col7 := ""
|
||||||
|
if comment != "" {
|
||||||
|
col7 = "# " + comment
|
||||||
|
}
|
||||||
|
_, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
|
||||||
svc.GetName(),
|
svc.GetName(),
|
||||||
comp.GetName(),
|
comp.GetName(),
|
||||||
node.Name,
|
node.Name,
|
||||||
comp.GetObservedState(),
|
comp.GetObservedState(),
|
||||||
comp.GetVersion(),
|
comp.GetVersion(),
|
||||||
uptime,
|
uptime,
|
||||||
|
col7,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -712,6 +712,238 @@ func (x *NodeInfo) GetServices() int32 {
|
|||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RegisterRequest struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
|
||||||
|
Role string `protobuf:"bytes,2,opt,name=role,proto3" json:"role,omitempty"` // "worker", "edge", or "master"
|
||||||
|
Address string `protobuf:"bytes,3,opt,name=address,proto3" json:"address,omitempty"` // agent gRPC address
|
||||||
|
Arch string `protobuf:"bytes,4,opt,name=arch,proto3" json:"arch,omitempty"` // "amd64" or "arm64"
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *RegisterRequest) Reset() {
|
||||||
|
*x = RegisterRequest{}
|
||||||
|
mi := &file_proto_mcp_v1_master_proto_msgTypes[12]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *RegisterRequest) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*RegisterRequest) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *RegisterRequest) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_proto_mcp_v1_master_proto_msgTypes[12]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use RegisterRequest.ProtoReflect.Descriptor instead.
|
||||||
|
func (*RegisterRequest) Descriptor() ([]byte, []int) {
|
||||||
|
return file_proto_mcp_v1_master_proto_rawDescGZIP(), []int{12}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *RegisterRequest) GetName() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.Name
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *RegisterRequest) GetRole() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.Role
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *RegisterRequest) GetAddress() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.Address
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *RegisterRequest) GetArch() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.Arch
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
type RegisterResponse struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
Accepted bool `protobuf:"varint,1,opt,name=accepted,proto3" json:"accepted,omitempty"`
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *RegisterResponse) Reset() {
|
||||||
|
*x = RegisterResponse{}
|
||||||
|
mi := &file_proto_mcp_v1_master_proto_msgTypes[13]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *RegisterResponse) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*RegisterResponse) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *RegisterResponse) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_proto_mcp_v1_master_proto_msgTypes[13]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use RegisterResponse.ProtoReflect.Descriptor instead.
|
||||||
|
func (*RegisterResponse) Descriptor() ([]byte, []int) {
|
||||||
|
return file_proto_mcp_v1_master_proto_rawDescGZIP(), []int{13}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *RegisterResponse) GetAccepted() bool {
|
||||||
|
if x != nil {
|
||||||
|
return x.Accepted
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
type HeartbeatRequest struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
|
||||||
|
CpuMillicores int64 `protobuf:"varint,2,opt,name=cpu_millicores,json=cpuMillicores,proto3" json:"cpu_millicores,omitempty"`
|
||||||
|
MemoryBytes int64 `protobuf:"varint,3,opt,name=memory_bytes,json=memoryBytes,proto3" json:"memory_bytes,omitempty"`
|
||||||
|
DiskBytes int64 `protobuf:"varint,4,opt,name=disk_bytes,json=diskBytes,proto3" json:"disk_bytes,omitempty"`
|
||||||
|
Containers int32 `protobuf:"varint,5,opt,name=containers,proto3" json:"containers,omitempty"`
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *HeartbeatRequest) Reset() {
|
||||||
|
*x = HeartbeatRequest{}
|
||||||
|
mi := &file_proto_mcp_v1_master_proto_msgTypes[14]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *HeartbeatRequest) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*HeartbeatRequest) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *HeartbeatRequest) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_proto_mcp_v1_master_proto_msgTypes[14]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use HeartbeatRequest.ProtoReflect.Descriptor instead.
|
||||||
|
func (*HeartbeatRequest) Descriptor() ([]byte, []int) {
|
||||||
|
return file_proto_mcp_v1_master_proto_rawDescGZIP(), []int{14}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *HeartbeatRequest) GetName() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.Name
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *HeartbeatRequest) GetCpuMillicores() int64 {
|
||||||
|
if x != nil {
|
||||||
|
return x.CpuMillicores
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *HeartbeatRequest) GetMemoryBytes() int64 {
|
||||||
|
if x != nil {
|
||||||
|
return x.MemoryBytes
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *HeartbeatRequest) GetDiskBytes() int64 {
|
||||||
|
if x != nil {
|
||||||
|
return x.DiskBytes
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *HeartbeatRequest) GetContainers() int32 {
|
||||||
|
if x != nil {
|
||||||
|
return x.Containers
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
type HeartbeatResponse struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
Acknowledged bool `protobuf:"varint,1,opt,name=acknowledged,proto3" json:"acknowledged,omitempty"`
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *HeartbeatResponse) Reset() {
|
||||||
|
*x = HeartbeatResponse{}
|
||||||
|
mi := &file_proto_mcp_v1_master_proto_msgTypes[15]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *HeartbeatResponse) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*HeartbeatResponse) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *HeartbeatResponse) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_proto_mcp_v1_master_proto_msgTypes[15]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use HeartbeatResponse.ProtoReflect.Descriptor instead.
|
||||||
|
func (*HeartbeatResponse) Descriptor() ([]byte, []int) {
|
||||||
|
return file_proto_mcp_v1_master_proto_rawDescGZIP(), []int{15}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *HeartbeatResponse) GetAcknowledged() bool {
|
||||||
|
if x != nil {
|
||||||
|
return x.Acknowledged
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
var File_proto_mcp_v1_master_proto protoreflect.FileDescriptor
|
var File_proto_mcp_v1_master_proto protoreflect.FileDescriptor
|
||||||
|
|
||||||
const file_proto_mcp_v1_master_proto_rawDesc = "" +
|
const file_proto_mcp_v1_master_proto_rawDesc = "" +
|
||||||
@@ -765,12 +997,32 @@ const file_proto_mcp_v1_master_proto_rawDesc = "" +
|
|||||||
"containers\x18\x06 \x01(\x05R\n" +
|
"containers\x18\x06 \x01(\x05R\n" +
|
||||||
"containers\x12%\n" +
|
"containers\x12%\n" +
|
||||||
"\x0elast_heartbeat\x18\a \x01(\tR\rlastHeartbeat\x12\x1a\n" +
|
"\x0elast_heartbeat\x18\a \x01(\tR\rlastHeartbeat\x12\x1a\n" +
|
||||||
"\bservices\x18\b \x01(\x05R\bservices2\xa9\x02\n" +
|
"\bservices\x18\b \x01(\x05R\bservices\"g\n" +
|
||||||
|
"\x0fRegisterRequest\x12\x12\n" +
|
||||||
|
"\x04name\x18\x01 \x01(\tR\x04name\x12\x12\n" +
|
||||||
|
"\x04role\x18\x02 \x01(\tR\x04role\x12\x18\n" +
|
||||||
|
"\aaddress\x18\x03 \x01(\tR\aaddress\x12\x12\n" +
|
||||||
|
"\x04arch\x18\x04 \x01(\tR\x04arch\".\n" +
|
||||||
|
"\x10RegisterResponse\x12\x1a\n" +
|
||||||
|
"\baccepted\x18\x01 \x01(\bR\baccepted\"\xaf\x01\n" +
|
||||||
|
"\x10HeartbeatRequest\x12\x12\n" +
|
||||||
|
"\x04name\x18\x01 \x01(\tR\x04name\x12%\n" +
|
||||||
|
"\x0ecpu_millicores\x18\x02 \x01(\x03R\rcpuMillicores\x12!\n" +
|
||||||
|
"\fmemory_bytes\x18\x03 \x01(\x03R\vmemoryBytes\x12\x1d\n" +
|
||||||
|
"\n" +
|
||||||
|
"disk_bytes\x18\x04 \x01(\x03R\tdiskBytes\x12\x1e\n" +
|
||||||
|
"\n" +
|
||||||
|
"containers\x18\x05 \x01(\x05R\n" +
|
||||||
|
"containers\"7\n" +
|
||||||
|
"\x11HeartbeatResponse\x12\"\n" +
|
||||||
|
"\facknowledged\x18\x01 \x01(\bR\facknowledged2\xaa\x03\n" +
|
||||||
"\x10McpMasterService\x12C\n" +
|
"\x10McpMasterService\x12C\n" +
|
||||||
"\x06Deploy\x12\x1b.mcp.v1.MasterDeployRequest\x1a\x1c.mcp.v1.MasterDeployResponse\x12I\n" +
|
"\x06Deploy\x12\x1b.mcp.v1.MasterDeployRequest\x1a\x1c.mcp.v1.MasterDeployResponse\x12I\n" +
|
||||||
"\bUndeploy\x12\x1d.mcp.v1.MasterUndeployRequest\x1a\x1e.mcp.v1.MasterUndeployResponse\x12C\n" +
|
"\bUndeploy\x12\x1d.mcp.v1.MasterUndeployRequest\x1a\x1e.mcp.v1.MasterUndeployResponse\x12C\n" +
|
||||||
"\x06Status\x12\x1b.mcp.v1.MasterStatusRequest\x1a\x1c.mcp.v1.MasterStatusResponse\x12@\n" +
|
"\x06Status\x12\x1b.mcp.v1.MasterStatusRequest\x1a\x1c.mcp.v1.MasterStatusResponse\x12@\n" +
|
||||||
"\tListNodes\x12\x18.mcp.v1.ListNodesRequest\x1a\x19.mcp.v1.ListNodesResponseB*Z(git.wntrmute.dev/mc/mcp/gen/mcp/v1;mcpv1b\x06proto3"
|
"\tListNodes\x12\x18.mcp.v1.ListNodesRequest\x1a\x19.mcp.v1.ListNodesResponse\x12=\n" +
|
||||||
|
"\bRegister\x12\x17.mcp.v1.RegisterRequest\x1a\x18.mcp.v1.RegisterResponse\x12@\n" +
|
||||||
|
"\tHeartbeat\x12\x18.mcp.v1.HeartbeatRequest\x1a\x19.mcp.v1.HeartbeatResponseB*Z(git.wntrmute.dev/mc/mcp/gen/mcp/v1;mcpv1b\x06proto3"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
file_proto_mcp_v1_master_proto_rawDescOnce sync.Once
|
file_proto_mcp_v1_master_proto_rawDescOnce sync.Once
|
||||||
@@ -784,7 +1036,7 @@ func file_proto_mcp_v1_master_proto_rawDescGZIP() []byte {
|
|||||||
return file_proto_mcp_v1_master_proto_rawDescData
|
return file_proto_mcp_v1_master_proto_rawDescData
|
||||||
}
|
}
|
||||||
|
|
||||||
var file_proto_mcp_v1_master_proto_msgTypes = make([]protoimpl.MessageInfo, 12)
|
var file_proto_mcp_v1_master_proto_msgTypes = make([]protoimpl.MessageInfo, 16)
|
||||||
var file_proto_mcp_v1_master_proto_goTypes = []any{
|
var file_proto_mcp_v1_master_proto_goTypes = []any{
|
||||||
(*MasterDeployRequest)(nil), // 0: mcp.v1.MasterDeployRequest
|
(*MasterDeployRequest)(nil), // 0: mcp.v1.MasterDeployRequest
|
||||||
(*MasterDeployResponse)(nil), // 1: mcp.v1.MasterDeployResponse
|
(*MasterDeployResponse)(nil), // 1: mcp.v1.MasterDeployResponse
|
||||||
@@ -798,10 +1050,14 @@ var file_proto_mcp_v1_master_proto_goTypes = []any{
|
|||||||
(*ListNodesRequest)(nil), // 9: mcp.v1.ListNodesRequest
|
(*ListNodesRequest)(nil), // 9: mcp.v1.ListNodesRequest
|
||||||
(*ListNodesResponse)(nil), // 10: mcp.v1.ListNodesResponse
|
(*ListNodesResponse)(nil), // 10: mcp.v1.ListNodesResponse
|
||||||
(*NodeInfo)(nil), // 11: mcp.v1.NodeInfo
|
(*NodeInfo)(nil), // 11: mcp.v1.NodeInfo
|
||||||
(*ServiceSpec)(nil), // 12: mcp.v1.ServiceSpec
|
(*RegisterRequest)(nil), // 12: mcp.v1.RegisterRequest
|
||||||
|
(*RegisterResponse)(nil), // 13: mcp.v1.RegisterResponse
|
||||||
|
(*HeartbeatRequest)(nil), // 14: mcp.v1.HeartbeatRequest
|
||||||
|
(*HeartbeatResponse)(nil), // 15: mcp.v1.HeartbeatResponse
|
||||||
|
(*ServiceSpec)(nil), // 16: mcp.v1.ServiceSpec
|
||||||
}
|
}
|
||||||
var file_proto_mcp_v1_master_proto_depIdxs = []int32{
|
var file_proto_mcp_v1_master_proto_depIdxs = []int32{
|
||||||
12, // 0: mcp.v1.MasterDeployRequest.service:type_name -> mcp.v1.ServiceSpec
|
16, // 0: mcp.v1.MasterDeployRequest.service:type_name -> mcp.v1.ServiceSpec
|
||||||
2, // 1: mcp.v1.MasterDeployResponse.deploy_result:type_name -> mcp.v1.StepResult
|
2, // 1: mcp.v1.MasterDeployResponse.deploy_result:type_name -> mcp.v1.StepResult
|
||||||
2, // 2: mcp.v1.MasterDeployResponse.edge_route_result:type_name -> mcp.v1.StepResult
|
2, // 2: mcp.v1.MasterDeployResponse.edge_route_result:type_name -> mcp.v1.StepResult
|
||||||
2, // 3: mcp.v1.MasterDeployResponse.dns_result:type_name -> mcp.v1.StepResult
|
2, // 3: mcp.v1.MasterDeployResponse.dns_result:type_name -> mcp.v1.StepResult
|
||||||
@@ -812,12 +1068,16 @@ var file_proto_mcp_v1_master_proto_depIdxs = []int32{
|
|||||||
3, // 8: mcp.v1.McpMasterService.Undeploy:input_type -> mcp.v1.MasterUndeployRequest
|
3, // 8: mcp.v1.McpMasterService.Undeploy:input_type -> mcp.v1.MasterUndeployRequest
|
||||||
5, // 9: mcp.v1.McpMasterService.Status:input_type -> mcp.v1.MasterStatusRequest
|
5, // 9: mcp.v1.McpMasterService.Status:input_type -> mcp.v1.MasterStatusRequest
|
||||||
9, // 10: mcp.v1.McpMasterService.ListNodes:input_type -> mcp.v1.ListNodesRequest
|
9, // 10: mcp.v1.McpMasterService.ListNodes:input_type -> mcp.v1.ListNodesRequest
|
||||||
1, // 11: mcp.v1.McpMasterService.Deploy:output_type -> mcp.v1.MasterDeployResponse
|
12, // 11: mcp.v1.McpMasterService.Register:input_type -> mcp.v1.RegisterRequest
|
||||||
4, // 12: mcp.v1.McpMasterService.Undeploy:output_type -> mcp.v1.MasterUndeployResponse
|
14, // 12: mcp.v1.McpMasterService.Heartbeat:input_type -> mcp.v1.HeartbeatRequest
|
||||||
6, // 13: mcp.v1.McpMasterService.Status:output_type -> mcp.v1.MasterStatusResponse
|
1, // 13: mcp.v1.McpMasterService.Deploy:output_type -> mcp.v1.MasterDeployResponse
|
||||||
10, // 14: mcp.v1.McpMasterService.ListNodes:output_type -> mcp.v1.ListNodesResponse
|
4, // 14: mcp.v1.McpMasterService.Undeploy:output_type -> mcp.v1.MasterUndeployResponse
|
||||||
11, // [11:15] is the sub-list for method output_type
|
6, // 15: mcp.v1.McpMasterService.Status:output_type -> mcp.v1.MasterStatusResponse
|
||||||
7, // [7:11] is the sub-list for method input_type
|
10, // 16: mcp.v1.McpMasterService.ListNodes:output_type -> mcp.v1.ListNodesResponse
|
||||||
|
13, // 17: mcp.v1.McpMasterService.Register:output_type -> mcp.v1.RegisterResponse
|
||||||
|
15, // 18: mcp.v1.McpMasterService.Heartbeat:output_type -> mcp.v1.HeartbeatResponse
|
||||||
|
13, // [13:19] is the sub-list for method output_type
|
||||||
|
7, // [7:13] is the sub-list for method input_type
|
||||||
7, // [7:7] is the sub-list for extension type_name
|
7, // [7:7] is the sub-list for extension type_name
|
||||||
7, // [7:7] is the sub-list for extension extendee
|
7, // [7:7] is the sub-list for extension extendee
|
||||||
0, // [0:7] is the sub-list for field type_name
|
0, // [0:7] is the sub-list for field type_name
|
||||||
@@ -835,7 +1095,7 @@ func file_proto_mcp_v1_master_proto_init() {
|
|||||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||||
RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_mcp_v1_master_proto_rawDesc), len(file_proto_mcp_v1_master_proto_rawDesc)),
|
RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_mcp_v1_master_proto_rawDesc), len(file_proto_mcp_v1_master_proto_rawDesc)),
|
||||||
NumEnums: 0,
|
NumEnums: 0,
|
||||||
NumMessages: 12,
|
NumMessages: 16,
|
||||||
NumExtensions: 0,
|
NumExtensions: 0,
|
||||||
NumServices: 1,
|
NumServices: 1,
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -25,6 +25,8 @@ const (
|
|||||||
McpMasterService_Undeploy_FullMethodName = "/mcp.v1.McpMasterService/Undeploy"
|
McpMasterService_Undeploy_FullMethodName = "/mcp.v1.McpMasterService/Undeploy"
|
||||||
McpMasterService_Status_FullMethodName = "/mcp.v1.McpMasterService/Status"
|
McpMasterService_Status_FullMethodName = "/mcp.v1.McpMasterService/Status"
|
||||||
McpMasterService_ListNodes_FullMethodName = "/mcp.v1.McpMasterService/ListNodes"
|
McpMasterService_ListNodes_FullMethodName = "/mcp.v1.McpMasterService/ListNodes"
|
||||||
|
McpMasterService_Register_FullMethodName = "/mcp.v1.McpMasterService/Register"
|
||||||
|
McpMasterService_Heartbeat_FullMethodName = "/mcp.v1.McpMasterService/Heartbeat"
|
||||||
)
|
)
|
||||||
|
|
||||||
// McpMasterServiceClient is the client API for McpMasterService service.
|
// McpMasterServiceClient is the client API for McpMasterService service.
|
||||||
@@ -40,6 +42,9 @@ type McpMasterServiceClient interface {
|
|||||||
Undeploy(ctx context.Context, in *MasterUndeployRequest, opts ...grpc.CallOption) (*MasterUndeployResponse, error)
|
Undeploy(ctx context.Context, in *MasterUndeployRequest, opts ...grpc.CallOption) (*MasterUndeployResponse, error)
|
||||||
Status(ctx context.Context, in *MasterStatusRequest, opts ...grpc.CallOption) (*MasterStatusResponse, error)
|
Status(ctx context.Context, in *MasterStatusRequest, opts ...grpc.CallOption) (*MasterStatusResponse, error)
|
||||||
ListNodes(ctx context.Context, in *ListNodesRequest, opts ...grpc.CallOption) (*ListNodesResponse, error)
|
ListNodes(ctx context.Context, in *ListNodesRequest, opts ...grpc.CallOption) (*ListNodesResponse, error)
|
||||||
|
// Agent registration and health (called by agents).
|
||||||
|
Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error)
|
||||||
|
Heartbeat(ctx context.Context, in *HeartbeatRequest, opts ...grpc.CallOption) (*HeartbeatResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type mcpMasterServiceClient struct {
|
type mcpMasterServiceClient struct {
|
||||||
@@ -90,6 +95,26 @@ func (c *mcpMasterServiceClient) ListNodes(ctx context.Context, in *ListNodesReq
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *mcpMasterServiceClient) Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error) {
|
||||||
|
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||||
|
out := new(RegisterResponse)
|
||||||
|
err := c.cc.Invoke(ctx, McpMasterService_Register_FullMethodName, in, out, cOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mcpMasterServiceClient) Heartbeat(ctx context.Context, in *HeartbeatRequest, opts ...grpc.CallOption) (*HeartbeatResponse, error) {
|
||||||
|
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||||
|
out := new(HeartbeatResponse)
|
||||||
|
err := c.cc.Invoke(ctx, McpMasterService_Heartbeat_FullMethodName, in, out, cOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
// McpMasterServiceServer is the server API for McpMasterService service.
|
// McpMasterServiceServer is the server API for McpMasterService service.
|
||||||
// All implementations must embed UnimplementedMcpMasterServiceServer
|
// All implementations must embed UnimplementedMcpMasterServiceServer
|
||||||
// for forward compatibility.
|
// for forward compatibility.
|
||||||
@@ -103,6 +128,9 @@ type McpMasterServiceServer interface {
|
|||||||
Undeploy(context.Context, *MasterUndeployRequest) (*MasterUndeployResponse, error)
|
Undeploy(context.Context, *MasterUndeployRequest) (*MasterUndeployResponse, error)
|
||||||
Status(context.Context, *MasterStatusRequest) (*MasterStatusResponse, error)
|
Status(context.Context, *MasterStatusRequest) (*MasterStatusResponse, error)
|
||||||
ListNodes(context.Context, *ListNodesRequest) (*ListNodesResponse, error)
|
ListNodes(context.Context, *ListNodesRequest) (*ListNodesResponse, error)
|
||||||
|
// Agent registration and health (called by agents).
|
||||||
|
Register(context.Context, *RegisterRequest) (*RegisterResponse, error)
|
||||||
|
Heartbeat(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error)
|
||||||
mustEmbedUnimplementedMcpMasterServiceServer()
|
mustEmbedUnimplementedMcpMasterServiceServer()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,6 +153,12 @@ func (UnimplementedMcpMasterServiceServer) Status(context.Context, *MasterStatus
|
|||||||
func (UnimplementedMcpMasterServiceServer) ListNodes(context.Context, *ListNodesRequest) (*ListNodesResponse, error) {
|
func (UnimplementedMcpMasterServiceServer) ListNodes(context.Context, *ListNodesRequest) (*ListNodesResponse, error) {
|
||||||
return nil, status.Error(codes.Unimplemented, "method ListNodes not implemented")
|
return nil, status.Error(codes.Unimplemented, "method ListNodes not implemented")
|
||||||
}
|
}
|
||||||
|
func (UnimplementedMcpMasterServiceServer) Register(context.Context, *RegisterRequest) (*RegisterResponse, error) {
|
||||||
|
return nil, status.Error(codes.Unimplemented, "method Register not implemented")
|
||||||
|
}
|
||||||
|
func (UnimplementedMcpMasterServiceServer) Heartbeat(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) {
|
||||||
|
return nil, status.Error(codes.Unimplemented, "method Heartbeat not implemented")
|
||||||
|
}
|
||||||
func (UnimplementedMcpMasterServiceServer) mustEmbedUnimplementedMcpMasterServiceServer() {}
|
func (UnimplementedMcpMasterServiceServer) mustEmbedUnimplementedMcpMasterServiceServer() {}
|
||||||
func (UnimplementedMcpMasterServiceServer) testEmbeddedByValue() {}
|
func (UnimplementedMcpMasterServiceServer) testEmbeddedByValue() {}
|
||||||
|
|
||||||
@@ -218,6 +252,42 @@ func _McpMasterService_ListNodes_Handler(srv interface{}, ctx context.Context, d
|
|||||||
return interceptor(ctx, in, info, handler)
|
return interceptor(ctx, in, info, handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func _McpMasterService_Register_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||||
|
in := new(RegisterRequest)
|
||||||
|
if err := dec(in); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if interceptor == nil {
|
||||||
|
return srv.(McpMasterServiceServer).Register(ctx, in)
|
||||||
|
}
|
||||||
|
info := &grpc.UnaryServerInfo{
|
||||||
|
Server: srv,
|
||||||
|
FullMethod: McpMasterService_Register_FullMethodName,
|
||||||
|
}
|
||||||
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
|
return srv.(McpMasterServiceServer).Register(ctx, req.(*RegisterRequest))
|
||||||
|
}
|
||||||
|
return interceptor(ctx, in, info, handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
func _McpMasterService_Heartbeat_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||||
|
in := new(HeartbeatRequest)
|
||||||
|
if err := dec(in); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if interceptor == nil {
|
||||||
|
return srv.(McpMasterServiceServer).Heartbeat(ctx, in)
|
||||||
|
}
|
||||||
|
info := &grpc.UnaryServerInfo{
|
||||||
|
Server: srv,
|
||||||
|
FullMethod: McpMasterService_Heartbeat_FullMethodName,
|
||||||
|
}
|
||||||
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
|
return srv.(McpMasterServiceServer).Heartbeat(ctx, req.(*HeartbeatRequest))
|
||||||
|
}
|
||||||
|
return interceptor(ctx, in, info, handler)
|
||||||
|
}
|
||||||
|
|
||||||
// McpMasterService_ServiceDesc is the grpc.ServiceDesc for McpMasterService service.
|
// McpMasterService_ServiceDesc is the grpc.ServiceDesc for McpMasterService service.
|
||||||
// It's only intended for direct use with grpc.RegisterService,
|
// It's only intended for direct use with grpc.RegisterService,
|
||||||
// and not to be introspected or modified (even as a copy)
|
// and not to be introspected or modified (even as a copy)
|
||||||
@@ -241,6 +311,14 @@ var McpMasterService_ServiceDesc = grpc.ServiceDesc{
|
|||||||
MethodName: "ListNodes",
|
MethodName: "ListNodes",
|
||||||
Handler: _McpMasterService_ListNodes_Handler,
|
Handler: _McpMasterService_ListNodes_Handler,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
MethodName: "Register",
|
||||||
|
Handler: _McpMasterService_Register_Handler,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
MethodName: "Heartbeat",
|
||||||
|
Handler: _McpMasterService_Heartbeat_Handler,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Streams: []grpc.StreamDesc{},
|
Streams: []grpc.StreamDesc{},
|
||||||
Metadata: "proto/mcp/v1/master.proto",
|
Metadata: "proto/mcp/v1/master.proto",
|
||||||
|
|||||||
@@ -119,6 +119,27 @@ func Run(cfg *config.AgentConfig, version string) error {
|
|||||||
"runtime", cfg.Agent.ContainerRuntime,
|
"runtime", cfg.Agent.ContainerRuntime,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Run boot sequence before starting the gRPC server.
|
||||||
|
// On the master node, this starts foundation services (MCIAS, MCNS)
|
||||||
|
// before core services, ensuring dependencies are met.
|
||||||
|
if len(cfg.Boot.Sequence) > 0 {
|
||||||
|
bootCtx, bootCancel := context.WithCancel(context.Background())
|
||||||
|
defer bootCancel()
|
||||||
|
if err := a.RunBootSequence(bootCtx); err != nil {
|
||||||
|
logger.Error("boot sequence failed", "err", err)
|
||||||
|
// Continue starting the gRPC server — partial boot is better than no agent.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start heartbeat client (registers with master and sends heartbeats).
|
||||||
|
hbClient, hbErr := NewHeartbeatClient(*cfg, logger)
|
||||||
|
if hbErr != nil {
|
||||||
|
logger.Warn("heartbeat client failed to start", "err", hbErr)
|
||||||
|
} else if hbClient != nil {
|
||||||
|
hbClient.Start()
|
||||||
|
logger.Info("heartbeat client started", "master", cfg.Master.Address)
|
||||||
|
}
|
||||||
|
|
||||||
mon.Start()
|
mon.Start()
|
||||||
|
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
@@ -132,6 +153,9 @@ func Run(cfg *config.AgentConfig, version string) error {
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
logger.Info("shutting down")
|
logger.Info("shutting down")
|
||||||
|
if hbClient != nil {
|
||||||
|
hbClient.Stop()
|
||||||
|
}
|
||||||
mon.Stop()
|
mon.Stop()
|
||||||
server.GracefulStop()
|
server.GracefulStop()
|
||||||
_ = proxy.Close()
|
_ = proxy.Close()
|
||||||
|
|||||||
202
internal/agent/boot.go
Normal file
202
internal/agent/boot.go
Normal file
@@ -0,0 +1,202 @@
|
|||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/config"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
healthpb "google.golang.org/grpc/health/grpc_health_v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RunBootSequence executes the boot stages defined in the agent config.
|
||||||
|
// Each stage's services must be healthy before the next stage starts.
|
||||||
|
// This is used on the master node to start foundation services (MCIAS,
|
||||||
|
// MCNS) before core services (Metacrypt, MCR) before the master itself.
|
||||||
|
//
|
||||||
|
// If no boot sequence is configured, this is a no-op.
|
||||||
|
func (a *Agent) RunBootSequence(ctx context.Context) error {
|
||||||
|
stages := a.Config.Boot.Sequence
|
||||||
|
if len(stages) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Logger.Info("boot sequence starting", "stages", len(stages))
|
||||||
|
|
||||||
|
for i, stage := range stages {
|
||||||
|
a.Logger.Info("boot stage starting",
|
||||||
|
"stage", stage.Name,
|
||||||
|
"services", stage.Services,
|
||||||
|
"timeout", stage.Timeout.Duration,
|
||||||
|
"health", stage.Health,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Use the recover logic to start any services in this stage
|
||||||
|
// that aren't already running.
|
||||||
|
if err := a.Recover(ctx); err != nil {
|
||||||
|
a.Logger.Warn("boot stage recover failed", "stage", stage.Name, "err", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all services in this stage to be healthy.
|
||||||
|
timeout := stage.Timeout.Duration
|
||||||
|
if timeout == 0 {
|
||||||
|
timeout = 60 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := a.waitForHealthy(ctx, stage, timeout, i == 0); err != nil {
|
||||||
|
if i == 0 {
|
||||||
|
// Foundation stage: block and retry indefinitely.
|
||||||
|
a.Logger.Error("foundation stage failed — retrying indefinitely",
|
||||||
|
"stage", stage.Name, "err", err)
|
||||||
|
for {
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
if retryErr := a.waitForHealthy(ctx, stage, timeout, true); retryErr == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Non-foundation: log and proceed.
|
||||||
|
a.Logger.Warn("boot stage not fully healthy, proceeding",
|
||||||
|
"stage", stage.Name, "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Logger.Info("boot stage complete", "stage", stage.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Logger.Info("boot sequence complete")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitForHealthy waits until all services in the stage pass their health check.
|
||||||
|
func (a *Agent) waitForHealthy(ctx context.Context, stage config.BootStage, timeout time.Duration, isFoundation bool) error {
|
||||||
|
deadline := time.Now().Add(timeout)
|
||||||
|
|
||||||
|
for _, svc := range stage.Services {
|
||||||
|
for {
|
||||||
|
if time.Now().After(deadline) {
|
||||||
|
return fmt.Errorf("timeout waiting for %s", svc)
|
||||||
|
}
|
||||||
|
|
||||||
|
healthy, err := a.checkServiceHealth(ctx, svc, stage.Health)
|
||||||
|
if err == nil && healthy {
|
||||||
|
a.Logger.Info("service healthy", "service", svc, "check", stage.Health)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkServiceHealth probes a service using the specified health check method.
|
||||||
|
func (a *Agent) checkServiceHealth(ctx context.Context, serviceName, method string) (bool, error) {
|
||||||
|
// Find the service's port from the registry.
|
||||||
|
port, err := a.findServicePort(serviceName)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch method {
|
||||||
|
case "tcp", "":
|
||||||
|
return a.checkTCP(ctx, port)
|
||||||
|
case "grpc":
|
||||||
|
return a.checkGRPC(ctx, port)
|
||||||
|
default:
|
||||||
|
// Unknown method, fall back to TCP.
|
||||||
|
return a.checkTCP(ctx, port)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// findServicePort finds the first mapped port for a service from the registry
|
||||||
|
// or from the running container.
|
||||||
|
func (a *Agent) findServicePort(serviceName string) (int, error) {
|
||||||
|
// Check the running containers for a mapped port.
|
||||||
|
containers, err := a.Runtime.List(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("list containers: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range containers {
|
||||||
|
// Container name might be "service" or "service-component"
|
||||||
|
if c.Name == serviceName || len(c.Name) > len(serviceName) && c.Name[:len(serviceName)+1] == serviceName+"-" {
|
||||||
|
// Parse the first port mapping to get the host port.
|
||||||
|
for _, p := range c.Ports {
|
||||||
|
// Port format: "127.0.0.1:28443->8443/tcp" or "8443/tcp"
|
||||||
|
port := parseHostPort(p)
|
||||||
|
if port > 0 {
|
||||||
|
return port, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0, fmt.Errorf("no port found for service %s", serviceName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseHostPort extracts the host port from a podman port mapping string.
|
||||||
|
func parseHostPort(mapping string) int {
|
||||||
|
// Format: "127.0.0.1:28443->8443/tcp" or "0.0.0.0:53->53/tcp"
|
||||||
|
for i := len(mapping) - 1; i >= 0; i-- {
|
||||||
|
if mapping[i] == ':' {
|
||||||
|
// Found the host:port separator
|
||||||
|
rest := mapping[i+1:]
|
||||||
|
// Find the -> separator
|
||||||
|
for j := 0; j < len(rest); j++ {
|
||||||
|
if rest[j] == '-' {
|
||||||
|
portStr := rest[:j]
|
||||||
|
var port int
|
||||||
|
for _, ch := range portStr {
|
||||||
|
if ch >= '0' && ch <= '9' {
|
||||||
|
port = port*10 + int(ch-'0')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if port > 0 {
|
||||||
|
return port
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkTCP attempts a TCP connection to localhost:port.
|
||||||
|
func (a *Agent) checkTCP(ctx context.Context, port int) (bool, error) {
|
||||||
|
addr := fmt.Sprintf("127.0.0.1:%d", port)
|
||||||
|
conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
_ = conn.Close()
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkGRPC calls the standard gRPC health check on localhost:port.
|
||||||
|
func (a *Agent) checkGRPC(ctx context.Context, port int) (bool, error) {
|
||||||
|
addr := fmt.Sprintf("127.0.0.1:%d", port)
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
defer func() { _ = conn.Close() }()
|
||||||
|
|
||||||
|
client := healthpb.NewHealthClient(conn)
|
||||||
|
resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.GetStatus() == healthpb.HealthCheckResponse_SERVING, nil
|
||||||
|
}
|
||||||
197
internal/agent/heartbeat.go
Normal file
197
internal/agent/heartbeat.go
Normal file
@@ -0,0 +1,197 @@
|
|||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"runtime"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/config"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials"
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MasterConfig holds the optional master connection settings for the agent.
|
||||||
|
// When configured, the agent self-registers and sends periodic heartbeats.
|
||||||
|
type MasterConfig struct {
|
||||||
|
Address string `toml:"address"` // master gRPC address
|
||||||
|
CACert string `toml:"ca_cert"` // CA cert to verify master's TLS
|
||||||
|
TokenPath string `toml:"token_path"` // MCIAS service token for auth
|
||||||
|
}
|
||||||
|
|
||||||
|
// HeartbeatClient manages the agent's connection to the master for
|
||||||
|
// registration and heartbeats.
|
||||||
|
type HeartbeatClient struct {
|
||||||
|
client mcpv1.McpMasterServiceClient
|
||||||
|
conn *grpc.ClientConn
|
||||||
|
nodeName string
|
||||||
|
role string
|
||||||
|
address string // agent's own gRPC address
|
||||||
|
arch string
|
||||||
|
interval time.Duration
|
||||||
|
stop chan struct{}
|
||||||
|
wg sync.WaitGroup
|
||||||
|
logger interface{ Info(string, ...any); Warn(string, ...any); Error(string, ...any) }
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewHeartbeatClient creates a client that registers with the master and
|
||||||
|
// sends periodic heartbeats. Returns nil if master address is not configured.
|
||||||
|
func NewHeartbeatClient(cfg config.AgentConfig, logger interface{ Info(string, ...any); Warn(string, ...any); Error(string, ...any) }) (*HeartbeatClient, error) {
|
||||||
|
// Config takes precedence, env vars as fallback.
|
||||||
|
masterAddr := cfg.Master.Address
|
||||||
|
if masterAddr == "" {
|
||||||
|
masterAddr = os.Getenv("MCP_MASTER_ADDRESS")
|
||||||
|
}
|
||||||
|
masterCACert := cfg.Master.CACert
|
||||||
|
if masterCACert == "" {
|
||||||
|
masterCACert = os.Getenv("MCP_MASTER_CA_CERT")
|
||||||
|
}
|
||||||
|
masterTokenPath := cfg.Master.TokenPath
|
||||||
|
if masterTokenPath == "" {
|
||||||
|
masterTokenPath = os.Getenv("MCP_MASTER_TOKEN_PATH")
|
||||||
|
}
|
||||||
|
role := cfg.Master.Role
|
||||||
|
if role == "" {
|
||||||
|
role = "worker"
|
||||||
|
}
|
||||||
|
|
||||||
|
if masterAddr == "" {
|
||||||
|
return nil, nil // master not configured
|
||||||
|
}
|
||||||
|
|
||||||
|
token := ""
|
||||||
|
if masterTokenPath != "" {
|
||||||
|
data, err := os.ReadFile(masterTokenPath) //nolint:gosec // trusted config
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("read master token: %w", err)
|
||||||
|
}
|
||||||
|
token = strings.TrimSpace(string(data))
|
||||||
|
}
|
||||||
|
|
||||||
|
tlsConfig := &tls.Config{MinVersion: tls.VersionTLS13}
|
||||||
|
if masterCACert != "" {
|
||||||
|
caCert, err := os.ReadFile(masterCACert) //nolint:gosec // trusted config
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("read master CA cert: %w", err)
|
||||||
|
}
|
||||||
|
pool := x509.NewCertPool()
|
||||||
|
if !pool.AppendCertsFromPEM(caCert) {
|
||||||
|
return nil, fmt.Errorf("invalid master CA cert")
|
||||||
|
}
|
||||||
|
tlsConfig.RootCAs = pool
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := grpc.NewClient(
|
||||||
|
masterAddr,
|
||||||
|
grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
|
||||||
|
grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||||
|
if token != "" {
|
||||||
|
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token)
|
||||||
|
}
|
||||||
|
return invoker(ctx, method, req, reply, cc, opts...)
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("dial master: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &HeartbeatClient{
|
||||||
|
client: mcpv1.NewMcpMasterServiceClient(conn),
|
||||||
|
conn: conn,
|
||||||
|
nodeName: cfg.Agent.NodeName,
|
||||||
|
role: role,
|
||||||
|
address: cfg.Server.GRPCAddr,
|
||||||
|
arch: runtime.GOARCH,
|
||||||
|
interval: 30 * time.Second,
|
||||||
|
stop: make(chan struct{}),
|
||||||
|
logger: logger,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start registers with the master and begins the heartbeat loop.
|
||||||
|
func (hc *HeartbeatClient) Start() {
|
||||||
|
if hc == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register with the master (retry with backoff).
|
||||||
|
hc.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer hc.wg.Done()
|
||||||
|
|
||||||
|
backoff := time.Second
|
||||||
|
for {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
resp, err := hc.client.Register(ctx, &mcpv1.RegisterRequest{
|
||||||
|
Name: hc.nodeName,
|
||||||
|
Role: hc.role,
|
||||||
|
Address: hc.address,
|
||||||
|
Arch: hc.arch,
|
||||||
|
})
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
if err == nil && resp.GetAccepted() {
|
||||||
|
hc.logger.Info("registered with master",
|
||||||
|
"node", hc.nodeName, "master_accepted", true)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
hc.logger.Warn("registration failed, retrying",
|
||||||
|
"node", hc.nodeName, "err", err, "backoff", backoff)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-hc.stop:
|
||||||
|
return
|
||||||
|
case <-time.After(backoff):
|
||||||
|
}
|
||||||
|
|
||||||
|
backoff *= 2
|
||||||
|
if backoff > 60*time.Second {
|
||||||
|
backoff = 60 * time.Second
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Heartbeat loop.
|
||||||
|
ticker := time.NewTicker(hc.interval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-hc.stop:
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
hc.sendHeartbeat()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hc *HeartbeatClient) sendHeartbeat() {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
_, err := hc.client.Heartbeat(ctx, &mcpv1.HeartbeatRequest{
|
||||||
|
Name: hc.nodeName,
|
||||||
|
Containers: 0, // TODO: count from runtime
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
hc.logger.Warn("heartbeat failed", "node", hc.nodeName, "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops the heartbeat loop and closes the master connection.
|
||||||
|
func (hc *HeartbeatClient) Stop() {
|
||||||
|
if hc == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
close(hc.stop)
|
||||||
|
hc.wg.Wait()
|
||||||
|
_ = hc.conn.Close()
|
||||||
|
}
|
||||||
139
internal/agent/recover.go
Normal file
139
internal/agent/recover.go
Normal file
@@ -0,0 +1,139 @@
|
|||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/registry"
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/runtime"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Recover recreates containers from the agent's registry for all services
|
||||||
|
// whose desired state is "running" but which don't have a running container
|
||||||
|
// in podman. This is the recovery path after a podman database loss (e.g.,
|
||||||
|
// after a UID change or podman reset).
|
||||||
|
//
|
||||||
|
// Recover does NOT pull images — it assumes the images are cached locally.
|
||||||
|
// If an image is missing, that component is skipped with a warning.
|
||||||
|
func (a *Agent) Recover(ctx context.Context) error {
|
||||||
|
services, err := registry.ListServices(a.DB)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("list services: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the list of currently running containers from podman.
|
||||||
|
running, err := a.Runtime.List(ctx)
|
||||||
|
if err != nil {
|
||||||
|
a.Logger.Warn("cannot list containers, assuming none running", "err", err)
|
||||||
|
running = nil
|
||||||
|
}
|
||||||
|
runningSet := make(map[string]bool)
|
||||||
|
for _, c := range running {
|
||||||
|
runningSet[c.Name] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
var recovered, skipped, already int
|
||||||
|
|
||||||
|
for _, svc := range services {
|
||||||
|
if !svc.Active {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
comps, err := registry.ListComponents(a.DB, svc.Name)
|
||||||
|
if err != nil {
|
||||||
|
a.Logger.Warn("list components", "service", svc.Name, "err", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, comp := range comps {
|
||||||
|
if comp.DesiredState != "running" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
containerName := svc.Name + "-" + comp.Name
|
||||||
|
if comp.Name == svc.Name {
|
||||||
|
containerName = svc.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip if container is already running.
|
||||||
|
if runningSet[containerName] {
|
||||||
|
already++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Logger.Info("recovering container",
|
||||||
|
"service", svc.Name,
|
||||||
|
"component", comp.Name,
|
||||||
|
"image", comp.Image,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Remove any stale container with the same name.
|
||||||
|
_ = a.Runtime.Remove(ctx, containerName)
|
||||||
|
|
||||||
|
// Build the container spec from the registry.
|
||||||
|
spec := runtime.ContainerSpec{
|
||||||
|
Name: containerName,
|
||||||
|
Image: comp.Image,
|
||||||
|
Network: comp.Network,
|
||||||
|
User: comp.UserSpec,
|
||||||
|
Restart: comp.Restart,
|
||||||
|
Volumes: comp.Volumes,
|
||||||
|
Cmd: comp.Cmd,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allocate ports from routes if the component has routes.
|
||||||
|
if len(comp.Routes) > 0 && a.PortAlloc != nil {
|
||||||
|
ports, env, allocErr := a.allocateRoutePorts(svc.Name, comp.Name, comp.Routes)
|
||||||
|
if allocErr != nil {
|
||||||
|
a.Logger.Warn("allocate route ports", "container", containerName, "err", allocErr)
|
||||||
|
spec.Ports = comp.Ports
|
||||||
|
} else {
|
||||||
|
spec.Ports = append(comp.Ports, ports...)
|
||||||
|
spec.Env = append(spec.Env, env...)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
spec.Ports = comp.Ports
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := a.Runtime.Run(ctx, spec); err != nil {
|
||||||
|
a.Logger.Error("recover container failed",
|
||||||
|
"container", containerName,
|
||||||
|
"err", err,
|
||||||
|
)
|
||||||
|
skipped++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-register mc-proxy routes.
|
||||||
|
if a.Proxy != nil && len(comp.Routes) > 0 {
|
||||||
|
hostPorts, hpErr := registry.GetRouteHostPorts(a.DB, svc.Name, comp.Name)
|
||||||
|
if hpErr == nil {
|
||||||
|
if proxyErr := a.Proxy.RegisterRoutes(ctx, svc.Name, comp.Routes, hostPorts); proxyErr != nil {
|
||||||
|
a.Logger.Warn("re-register routes", "service", svc.Name, "err", proxyErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Provision TLS certs if needed.
|
||||||
|
if a.Certs != nil && hasL7Routes(comp.Routes) {
|
||||||
|
hostnames := l7Hostnames(svc.Name, comp.Routes)
|
||||||
|
if certErr := a.Certs.EnsureCert(ctx, svc.Name, hostnames); certErr != nil {
|
||||||
|
a.Logger.Warn("cert provisioning", "service", svc.Name, "err", certErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
recovered++
|
||||||
|
a.Logger.Info("container recovered", "container", containerName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Logger.Info("recovery complete",
|
||||||
|
"recovered", recovered,
|
||||||
|
"skipped", skipped,
|
||||||
|
"already_running", already,
|
||||||
|
)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// hasL7Routes and l7Hostnames are defined in deploy.go.
|
||||||
@@ -19,6 +19,33 @@ type AgentConfig struct {
|
|||||||
MCNS MCNSConfig `toml:"mcns"`
|
MCNS MCNSConfig `toml:"mcns"`
|
||||||
Monitor MonitorConfig `toml:"monitor"`
|
Monitor MonitorConfig `toml:"monitor"`
|
||||||
Log LogConfig `toml:"log"`
|
Log LogConfig `toml:"log"`
|
||||||
|
Boot BootConfig `toml:"boot"`
|
||||||
|
Master AgentMasterConfig `toml:"master"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// AgentMasterConfig holds the optional master connection settings.
|
||||||
|
// When configured, the agent self-registers and sends heartbeats.
|
||||||
|
type AgentMasterConfig struct {
|
||||||
|
Address string `toml:"address"` // master gRPC address
|
||||||
|
CACert string `toml:"ca_cert"` // CA cert to verify master's TLS
|
||||||
|
TokenPath string `toml:"token_path"` // MCIAS service token for auth
|
||||||
|
Role string `toml:"role"` // "worker", "edge", or "master"
|
||||||
|
}
|
||||||
|
|
||||||
|
// BootConfig holds the boot sequence for the master node.
|
||||||
|
// Each stage's services must be healthy before the next stage starts.
|
||||||
|
// Worker and edge nodes don't use this — they wait for the master.
|
||||||
|
type BootConfig struct {
|
||||||
|
Sequence []BootStage `toml:"sequence"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// BootStage defines a group of services that must be started and healthy
|
||||||
|
// before the next stage begins.
|
||||||
|
type BootStage struct {
|
||||||
|
Name string `toml:"name"`
|
||||||
|
Services []string `toml:"services"`
|
||||||
|
Timeout Duration `toml:"timeout"`
|
||||||
|
Health string `toml:"health"` // "tcp", "grpc", or "http"
|
||||||
}
|
}
|
||||||
|
|
||||||
// MetacryptConfig holds the Metacrypt CA integration settings for
|
// MetacryptConfig holds the Metacrypt CA integration settings for
|
||||||
|
|||||||
@@ -50,9 +50,28 @@ type AuthConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NodeConfig defines a managed node that the CLI connects to.
|
// NodeConfig defines a managed node that the CLI connects to.
|
||||||
|
// Address is the primary address. Addresses is an optional list of
|
||||||
|
// fallback addresses tried in order if the primary fails. This
|
||||||
|
// provides resilience when Tailscale DNS is down or a node is
|
||||||
|
// reachable via LAN but not Tailnet.
|
||||||
type NodeConfig struct {
|
type NodeConfig struct {
|
||||||
Name string `toml:"name"`
|
Name string `toml:"name"`
|
||||||
Address string `toml:"address"`
|
Address string `toml:"address"`
|
||||||
|
Addresses []string `toml:"addresses,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// AllAddresses returns the node's primary address followed by any
|
||||||
|
// fallback addresses, deduplicated.
|
||||||
|
func (n NodeConfig) AllAddresses() []string {
|
||||||
|
seen := make(map[string]bool)
|
||||||
|
var addrs []string
|
||||||
|
for _, a := range append([]string{n.Address}, n.Addresses...) {
|
||||||
|
if a != "" && !seen[a] {
|
||||||
|
seen[a] = true
|
||||||
|
addrs = append(addrs, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return addrs
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadCLIConfig reads and validates a CLI configuration file.
|
// LoadCLIConfig reads and validates a CLI configuration file.
|
||||||
|
|||||||
@@ -64,9 +64,24 @@ type TimeoutsConfig struct {
|
|||||||
type MasterNodeConfig struct {
|
type MasterNodeConfig struct {
|
||||||
Name string `toml:"name"`
|
Name string `toml:"name"`
|
||||||
Address string `toml:"address"`
|
Address string `toml:"address"`
|
||||||
|
Addresses []string `toml:"addresses,omitempty"`
|
||||||
Role string `toml:"role"` // "worker", "edge", or "master"
|
Role string `toml:"role"` // "worker", "edge", or "master"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AllAddresses returns the node's primary address followed by any
|
||||||
|
// fallback addresses, deduplicated.
|
||||||
|
func (n MasterNodeConfig) AllAddresses() []string {
|
||||||
|
seen := make(map[string]bool)
|
||||||
|
var addrs []string
|
||||||
|
for _, a := range append([]string{n.Address}, n.Addresses...) {
|
||||||
|
if a != "" && !seen[a] {
|
||||||
|
seen[a] = true
|
||||||
|
addrs = append(addrs, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return addrs
|
||||||
|
}
|
||||||
|
|
||||||
// LoadMasterConfig reads and validates a master configuration file.
|
// LoadMasterConfig reads and validates a master configuration file.
|
||||||
func LoadMasterConfig(path string) (*MasterConfig, error) {
|
func LoadMasterConfig(path string) (*MasterConfig, error) {
|
||||||
data, err := os.ReadFile(path) //nolint:gosec // config path from trusted CLI flag
|
data, err := os.ReadFile(path) //nolint:gosec // config path from trusted CLI flag
|
||||||
|
|||||||
@@ -140,22 +140,31 @@ func NewAgentPool(caCertPath, token string) *AgentPool {
|
|||||||
|
|
||||||
// AddNode dials an agent and adds it to the pool.
|
// AddNode dials an agent and adds it to the pool.
|
||||||
func (p *AgentPool) AddNode(name, address string) error {
|
func (p *AgentPool) AddNode(name, address string) error {
|
||||||
client, err := DialAgent(address, p.caCert, p.token)
|
return p.AddNodeMulti(name, []string{address})
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddNodeMulti tries each address in order and adds the first successful
|
||||||
|
// connection to the pool.
|
||||||
|
func (p *AgentPool) AddNodeMulti(name string, addresses []string) error {
|
||||||
|
var lastErr error
|
||||||
|
for _, addr := range addresses {
|
||||||
|
client, err := DialAgent(addr, p.caCert, p.token)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("add node %s: %w", name, err)
|
lastErr = fmt.Errorf("%s: %w", addr, err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
client.Node = name
|
client.Node = name
|
||||||
|
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
|
||||||
|
|
||||||
// Close existing connection if re-adding.
|
|
||||||
if old, ok := p.clients[name]; ok {
|
if old, ok := p.clients[name]; ok {
|
||||||
_ = old.Close()
|
_ = old.Close()
|
||||||
}
|
}
|
||||||
p.clients[name] = client
|
p.clients[name] = client
|
||||||
|
p.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
return fmt.Errorf("add node %s: all addresses failed: %w", name, lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
// Get returns the agent client for a node.
|
// Get returns the agent client for a node.
|
||||||
func (p *AgentPool) Get(name string) (*AgentClient, error) {
|
func (p *AgentPool) Get(name string) (*AgentClient, error) {
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ func Run(cfg *config.MasterConfig, version string) error {
|
|||||||
// Create agent connection pool.
|
// Create agent connection pool.
|
||||||
pool := NewAgentPool(cfg.Master.CACert, token)
|
pool := NewAgentPool(cfg.Master.CACert, token)
|
||||||
for _, n := range cfg.Nodes {
|
for _, n := range cfg.Nodes {
|
||||||
if addErr := pool.AddNode(n.Name, n.Address); addErr != nil {
|
if addErr := pool.AddNodeMulti(n.Name, n.AllAddresses()); addErr != nil {
|
||||||
logger.Warn("failed to connect to agent", "node", n.Name, "err", addErr)
|
logger.Warn("failed to connect to agent", "node", n.Name, "err", addErr)
|
||||||
// Non-fatal: the node may come up later.
|
// Non-fatal: the node may come up later.
|
||||||
}
|
}
|
||||||
@@ -124,6 +124,10 @@ func Run(cfg *config.MasterConfig, version string) error {
|
|||||||
"nodes", len(cfg.Nodes),
|
"nodes", len(cfg.Nodes),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Start heartbeat monitor.
|
||||||
|
hbMonitor := NewHeartbeatMonitor(m)
|
||||||
|
hbMonitor.Start()
|
||||||
|
|
||||||
// Signal handling.
|
// Signal handling.
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
defer stop()
|
defer stop()
|
||||||
@@ -136,10 +140,12 @@ func Run(cfg *config.MasterConfig, version string) error {
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
logger.Info("shutting down")
|
logger.Info("shutting down")
|
||||||
|
hbMonitor.Stop()
|
||||||
server.GracefulStop()
|
server.GracefulStop()
|
||||||
pool.Close()
|
pool.Close()
|
||||||
return nil
|
return nil
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
|
hbMonitor.Stop()
|
||||||
pool.Close()
|
pool.Close()
|
||||||
return fmt.Errorf("serve: %w", err)
|
return fmt.Errorf("serve: %w", err)
|
||||||
}
|
}
|
||||||
|
|||||||
235
internal/master/registration.go
Normal file
235
internal/master/registration.go
Normal file
@@ -0,0 +1,235 @@
|
|||||||
|
package master
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/auth"
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/masterdb"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Register handles agent self-registration. Identity-bound: the agent's
|
||||||
|
// MCIAS service name must match the claimed node name (agent-rift → rift).
|
||||||
|
func (m *Master) Register(ctx context.Context, req *mcpv1.RegisterRequest) (*mcpv1.RegisterResponse, error) {
|
||||||
|
// Extract caller identity from the auth context.
|
||||||
|
tokenInfo := auth.TokenInfoFromContext(ctx)
|
||||||
|
if tokenInfo == nil {
|
||||||
|
return nil, status.Error(codes.Unauthenticated, "no auth context")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Identity binding: agent-rift can only register name="rift".
|
||||||
|
expectedName := strings.TrimPrefix(tokenInfo.Username, "agent-")
|
||||||
|
if expectedName == tokenInfo.Username {
|
||||||
|
// Not an agent-* account — also allow mcp-agent (legacy).
|
||||||
|
expectedName = req.GetName()
|
||||||
|
}
|
||||||
|
if req.GetName() != expectedName {
|
||||||
|
m.Logger.Warn("registration rejected: name mismatch",
|
||||||
|
"claimed", req.GetName(), "identity", tokenInfo.Username)
|
||||||
|
return nil, status.Errorf(codes.PermissionDenied,
|
||||||
|
"identity %q cannot register as %q", tokenInfo.Username, req.GetName())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check allowlist.
|
||||||
|
if len(m.Config.Registration.AllowedAgents) > 0 {
|
||||||
|
allowed := false
|
||||||
|
for _, a := range m.Config.Registration.AllowedAgents {
|
||||||
|
if a == tokenInfo.Username {
|
||||||
|
allowed = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !allowed {
|
||||||
|
m.Logger.Warn("registration rejected: not in allowlist",
|
||||||
|
"identity", tokenInfo.Username)
|
||||||
|
return nil, status.Errorf(codes.PermissionDenied,
|
||||||
|
"identity %q not in registration allowlist", tokenInfo.Username)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check max nodes.
|
||||||
|
nodes, err := masterdb.ListNodes(m.DB)
|
||||||
|
if err == nil && len(nodes) >= m.Config.Registration.MaxNodes {
|
||||||
|
// Check if this is a re-registration (existing node).
|
||||||
|
found := false
|
||||||
|
for _, n := range nodes {
|
||||||
|
if n.Name == req.GetName() {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
return nil, status.Error(codes.ResourceExhausted, "max nodes reached")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upsert node in registry.
|
||||||
|
role := req.GetRole()
|
||||||
|
if role == "" {
|
||||||
|
role = "worker"
|
||||||
|
}
|
||||||
|
arch := req.GetArch()
|
||||||
|
if arch == "" {
|
||||||
|
arch = "amd64"
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := masterdb.UpsertNode(m.DB, req.GetName(), req.GetAddress(), role, arch); err != nil {
|
||||||
|
m.Logger.Error("registration upsert failed", "node", req.GetName(), "err", err)
|
||||||
|
return nil, status.Error(codes.Internal, "registration failed")
|
||||||
|
}
|
||||||
|
if err := masterdb.UpdateNodeStatus(m.DB, req.GetName(), "healthy"); err != nil {
|
||||||
|
m.Logger.Warn("update node status", "node", req.GetName(), "err", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the agent pool connection.
|
||||||
|
if addErr := m.Pool.AddNode(req.GetName(), req.GetAddress()); addErr != nil {
|
||||||
|
m.Logger.Warn("pool update failed", "node", req.GetName(), "err", addErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.Logger.Info("agent registered",
|
||||||
|
"node", req.GetName(), "address", req.GetAddress(),
|
||||||
|
"role", role, "arch", arch, "identity", tokenInfo.Username)
|
||||||
|
|
||||||
|
return &mcpv1.RegisterResponse{Accepted: true}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Heartbeat handles agent heartbeats. Updates the node's resource data
|
||||||
|
// and last-heartbeat timestamp. Derives the node name from the MCIAS
|
||||||
|
// identity, not the request (security: don't trust self-reported name).
|
||||||
|
func (m *Master) Heartbeat(ctx context.Context, req *mcpv1.HeartbeatRequest) (*mcpv1.HeartbeatResponse, error) {
|
||||||
|
// Derive node name from identity.
|
||||||
|
tokenInfo := auth.TokenInfoFromContext(ctx)
|
||||||
|
if tokenInfo == nil {
|
||||||
|
return nil, status.Error(codes.Unauthenticated, "no auth context")
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeName := strings.TrimPrefix(tokenInfo.Username, "agent-")
|
||||||
|
if nodeName == tokenInfo.Username {
|
||||||
|
// Legacy mcp-agent account — use the request name.
|
||||||
|
nodeName = req.GetName()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the node is registered.
|
||||||
|
node, err := masterdb.GetNode(m.DB, nodeName)
|
||||||
|
if err != nil || node == nil {
|
||||||
|
return nil, status.Errorf(codes.NotFound, "node %q not registered", nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update heartbeat data.
|
||||||
|
now := time.Now().UTC().Format(time.RFC3339)
|
||||||
|
_, err = m.DB.Exec(`
|
||||||
|
UPDATE nodes SET
|
||||||
|
containers = ?,
|
||||||
|
status = 'healthy',
|
||||||
|
last_heartbeat = ?,
|
||||||
|
updated_at = datetime('now')
|
||||||
|
WHERE name = ?
|
||||||
|
`, req.GetContainers(), now, nodeName)
|
||||||
|
if err != nil {
|
||||||
|
m.Logger.Warn("heartbeat update failed", "node", nodeName, "err", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &mcpv1.HeartbeatResponse{Acknowledged: true}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HeartbeatMonitor runs in the background, checking for agents that have
|
||||||
|
// missed heartbeats and probing them via HealthCheck.
|
||||||
|
type HeartbeatMonitor struct {
|
||||||
|
master *Master
|
||||||
|
interval time.Duration // heartbeat check interval (default: 30s)
|
||||||
|
timeout time.Duration // missed heartbeat threshold (default: 90s)
|
||||||
|
stop chan struct{}
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewHeartbeatMonitor creates a heartbeat monitor.
|
||||||
|
func NewHeartbeatMonitor(m *Master) *HeartbeatMonitor {
|
||||||
|
return &HeartbeatMonitor{
|
||||||
|
master: m,
|
||||||
|
interval: 30 * time.Second,
|
||||||
|
timeout: 90 * time.Second,
|
||||||
|
stop: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start begins the heartbeat monitoring loop.
|
||||||
|
func (hm *HeartbeatMonitor) Start() {
|
||||||
|
hm.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer hm.wg.Done()
|
||||||
|
// Initial warm-up: don't alert for the first cycle.
|
||||||
|
time.Sleep(hm.timeout)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(hm.interval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-hm.stop:
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
hm.check()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops the heartbeat monitor.
|
||||||
|
func (hm *HeartbeatMonitor) Stop() {
|
||||||
|
close(hm.stop)
|
||||||
|
hm.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hm *HeartbeatMonitor) check() {
|
||||||
|
nodes, err := masterdb.ListNodes(hm.master.DB)
|
||||||
|
if err != nil {
|
||||||
|
hm.master.Logger.Warn("heartbeat check: list nodes", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
for _, node := range nodes {
|
||||||
|
if node.Status == "unhealthy" {
|
||||||
|
continue // already marked, don't spam probes
|
||||||
|
}
|
||||||
|
|
||||||
|
if node.LastHeartbeat == nil {
|
||||||
|
continue // never sent a heartbeat, skip
|
||||||
|
}
|
||||||
|
|
||||||
|
if now.Sub(*node.LastHeartbeat) > hm.timeout {
|
||||||
|
hm.master.Logger.Warn("missed heartbeats, probing",
|
||||||
|
"node", node.Name,
|
||||||
|
"last_heartbeat", node.LastHeartbeat.Format(time.RFC3339))
|
||||||
|
|
||||||
|
// Probe the agent.
|
||||||
|
client, err := hm.master.Pool.Get(node.Name)
|
||||||
|
if err != nil {
|
||||||
|
hm.master.Logger.Warn("probe failed: no connection",
|
||||||
|
"node", node.Name, "err", err)
|
||||||
|
_ = masterdb.UpdateNodeStatus(hm.master.DB, node.Name, "unhealthy")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(),
|
||||||
|
hm.master.Config.Timeouts.HealthCheck.Duration)
|
||||||
|
_, probeErr := client.HealthCheck(ctx, &mcpv1.HealthCheckRequest{})
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
if probeErr != nil {
|
||||||
|
hm.master.Logger.Warn("probe failed",
|
||||||
|
"node", node.Name, "err", probeErr)
|
||||||
|
_ = masterdb.UpdateNodeStatus(hm.master.DB, node.Name, "unhealthy")
|
||||||
|
} else {
|
||||||
|
// Probe succeeded — node is alive, just not sending heartbeats.
|
||||||
|
hm.master.Logger.Info("probe succeeded (heartbeats stale)",
|
||||||
|
"node", node.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -16,6 +16,10 @@ service McpMasterService {
|
|||||||
rpc Undeploy(MasterUndeployRequest) returns (MasterUndeployResponse);
|
rpc Undeploy(MasterUndeployRequest) returns (MasterUndeployResponse);
|
||||||
rpc Status(MasterStatusRequest) returns (MasterStatusResponse);
|
rpc Status(MasterStatusRequest) returns (MasterStatusResponse);
|
||||||
rpc ListNodes(ListNodesRequest) returns (ListNodesResponse);
|
rpc ListNodes(ListNodesRequest) returns (ListNodesResponse);
|
||||||
|
|
||||||
|
// Agent registration and health (called by agents).
|
||||||
|
rpc Register(RegisterRequest) returns (RegisterResponse);
|
||||||
|
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Deploy ---
|
// --- Deploy ---
|
||||||
@@ -93,3 +97,30 @@ message NodeInfo {
|
|||||||
string last_heartbeat = 7; // RFC3339
|
string last_heartbeat = 7; // RFC3339
|
||||||
int32 services = 8; // placement count
|
int32 services = 8; // placement count
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- Registration ---
|
||||||
|
|
||||||
|
message RegisterRequest {
|
||||||
|
string name = 1;
|
||||||
|
string role = 2; // "worker", "edge", or "master"
|
||||||
|
string address = 3; // agent gRPC address
|
||||||
|
string arch = 4; // "amd64" or "arm64"
|
||||||
|
}
|
||||||
|
|
||||||
|
message RegisterResponse {
|
||||||
|
bool accepted = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Heartbeat ---
|
||||||
|
|
||||||
|
message HeartbeatRequest {
|
||||||
|
string name = 1;
|
||||||
|
int64 cpu_millicores = 2;
|
||||||
|
int64 memory_bytes = 3;
|
||||||
|
int64 disk_bytes = 4;
|
||||||
|
int32 containers = 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
message HeartbeatResponse {
|
||||||
|
bool acknowledged = 1;
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user