Fix design-vs-implementation gaps found in verification
Critical fixes: - Wire monitor subsystem to agent startup (was dead code) - Implement NodeStatus RPC (disk, memory, CPU, runtime version, uptime) - Deploy respects active=false (sets desired_state=stopped, not always running) Medium fixes: - Add Started field to runtime.ContainerInfo, populate from podman inspect - Populate ComponentInfo.started in status handlers for uptime display - Add Monitor field to Agent struct for graceful shutdown Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2
go.mod
2
go.mod
@@ -5,6 +5,7 @@ go 1.25.7
|
|||||||
require (
|
require (
|
||||||
github.com/pelletier/go-toml/v2 v2.3.0
|
github.com/pelletier/go-toml/v2 v2.3.0
|
||||||
github.com/spf13/cobra v1.10.2
|
github.com/spf13/cobra v1.10.2
|
||||||
|
golang.org/x/sys v0.42.0
|
||||||
google.golang.org/grpc v1.79.3
|
google.golang.org/grpc v1.79.3
|
||||||
google.golang.org/protobuf v1.36.11
|
google.golang.org/protobuf v1.36.11
|
||||||
modernc.org/sqlite v1.47.0
|
modernc.org/sqlite v1.47.0
|
||||||
@@ -19,7 +20,6 @@ require (
|
|||||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||||
github.com/spf13/pflag v1.0.9 // indirect
|
github.com/spf13/pflag v1.0.9 // indirect
|
||||||
golang.org/x/net v0.48.0 // indirect
|
golang.org/x/net v0.48.0 // indirect
|
||||||
golang.org/x/sys v0.42.0 // indirect
|
|
||||||
golang.org/x/text v0.32.0 // indirect
|
golang.org/x/text v0.32.0 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect
|
||||||
modernc.org/libc v1.70.0 // indirect
|
modernc.org/libc v1.70.0 // indirect
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ import (
|
|||||||
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
|
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
|
||||||
"git.wntrmute.dev/kyle/mcp/internal/auth"
|
"git.wntrmute.dev/kyle/mcp/internal/auth"
|
||||||
"git.wntrmute.dev/kyle/mcp/internal/config"
|
"git.wntrmute.dev/kyle/mcp/internal/config"
|
||||||
|
"git.wntrmute.dev/kyle/mcp/internal/monitor"
|
||||||
"git.wntrmute.dev/kyle/mcp/internal/registry"
|
"git.wntrmute.dev/kyle/mcp/internal/registry"
|
||||||
"git.wntrmute.dev/kyle/mcp/internal/runtime"
|
"git.wntrmute.dev/kyle/mcp/internal/runtime"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
@@ -28,6 +29,7 @@ type Agent struct {
|
|||||||
Config *config.AgentConfig
|
Config *config.AgentConfig
|
||||||
DB *sql.DB
|
DB *sql.DB
|
||||||
Runtime runtime.Runtime
|
Runtime runtime.Runtime
|
||||||
|
Monitor *monitor.Monitor
|
||||||
Logger *slog.Logger
|
Logger *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -46,10 +48,13 @@ func Run(cfg *config.AgentConfig) error {
|
|||||||
|
|
||||||
rt := &runtime.Podman{}
|
rt := &runtime.Podman{}
|
||||||
|
|
||||||
|
mon := monitor.New(db, rt, cfg.Monitor, cfg.Agent.NodeName, logger)
|
||||||
|
|
||||||
a := &Agent{
|
a := &Agent{
|
||||||
Config: cfg,
|
Config: cfg,
|
||||||
DB: db,
|
DB: db,
|
||||||
Runtime: rt,
|
Runtime: rt,
|
||||||
|
Monitor: mon,
|
||||||
Logger: logger,
|
Logger: logger,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -86,7 +91,8 @@ func Run(cfg *config.AgentConfig) error {
|
|||||||
"runtime", cfg.Agent.ContainerRuntime,
|
"runtime", cfg.Agent.ContainerRuntime,
|
||||||
)
|
)
|
||||||
|
|
||||||
// Graceful shutdown on signal.
|
mon.Start()
|
||||||
|
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
defer stop()
|
defer stop()
|
||||||
|
|
||||||
@@ -98,9 +104,11 @@ func Run(cfg *config.AgentConfig) error {
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
logger.Info("shutting down")
|
logger.Info("shutting down")
|
||||||
|
mon.Stop()
|
||||||
server.GracefulStop()
|
server.GracefulStop()
|
||||||
return nil
|
return nil
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
|
mon.Stop()
|
||||||
return fmt.Errorf("serve: %w", err)
|
return fmt.Errorf("serve: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,8 +37,9 @@ func (a *Agent) Deploy(ctx context.Context, req *mcpv1.DeployRequest) (*mcpv1.De
|
|||||||
}
|
}
|
||||||
|
|
||||||
var results []*mcpv1.ComponentResult
|
var results []*mcpv1.ComponentResult
|
||||||
|
active := spec.GetActive()
|
||||||
for _, cs := range components {
|
for _, cs := range components {
|
||||||
result := a.deployComponent(ctx, serviceName, cs)
|
result := a.deployComponent(ctx, serviceName, cs, active)
|
||||||
results = append(results, result)
|
results = append(results, result)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -46,11 +47,16 @@ func (a *Agent) Deploy(ctx context.Context, req *mcpv1.DeployRequest) (*mcpv1.De
|
|||||||
}
|
}
|
||||||
|
|
||||||
// deployComponent handles the full deploy lifecycle for a single component.
|
// deployComponent handles the full deploy lifecycle for a single component.
|
||||||
func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcpv1.ComponentSpec) *mcpv1.ComponentResult {
|
func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcpv1.ComponentSpec, active bool) *mcpv1.ComponentResult {
|
||||||
compName := cs.GetName()
|
compName := cs.GetName()
|
||||||
containerName := serviceName + "-" + compName
|
containerName := serviceName + "-" + compName
|
||||||
|
|
||||||
a.Logger.Info("deploying component", "service", serviceName, "component", compName)
|
desiredState := "running"
|
||||||
|
if !active {
|
||||||
|
desiredState = "stopped"
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Logger.Info("deploying component", "service", serviceName, "component", compName, "desired", desiredState)
|
||||||
|
|
||||||
regComp := ®istry.Component{
|
regComp := ®istry.Component{
|
||||||
Name: compName,
|
Name: compName,
|
||||||
@@ -59,7 +65,7 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
|
|||||||
Network: cs.GetNetwork(),
|
Network: cs.GetNetwork(),
|
||||||
UserSpec: cs.GetUser(),
|
UserSpec: cs.GetUser(),
|
||||||
Restart: cs.GetRestart(),
|
Restart: cs.GetRestart(),
|
||||||
DesiredState: "running",
|
DesiredState: desiredState,
|
||||||
Version: runtime.ExtractVersion(cs.GetImage()),
|
Version: runtime.ExtractVersion(cs.GetImage()),
|
||||||
Ports: cs.GetPorts(),
|
Ports: cs.GetPorts(),
|
||||||
Volumes: cs.GetVolumes(),
|
Volumes: cs.GetVolumes(),
|
||||||
|
|||||||
67
internal/agent/nodestatus.go
Normal file
67
internal/agent/nodestatus.go
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os/exec"
|
||||||
|
"runtime"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
|
||||||
|
"git.wntrmute.dev/kyle/mcp/internal/registry"
|
||||||
|
"golang.org/x/sys/unix"
|
||||||
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NodeStatus returns information about this agent's node.
|
||||||
|
func (a *Agent) NodeStatus(ctx context.Context, _ *mcpv1.NodeStatusRequest) (*mcpv1.NodeStatusResponse, error) {
|
||||||
|
services, err := registry.ListServices(a.DB)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var componentCount uint32
|
||||||
|
for _, svc := range services {
|
||||||
|
comps, _ := registry.ListComponents(a.DB, svc.Name)
|
||||||
|
componentCount += uint32(len(comps)) //nolint:gosec // bounded by service count
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &mcpv1.NodeStatusResponse{
|
||||||
|
NodeName: a.Config.Agent.NodeName,
|
||||||
|
Runtime: a.Config.Agent.ContainerRuntime,
|
||||||
|
ServiceCount: uint32(len(services)), //nolint:gosec // bounded
|
||||||
|
ComponentCount: componentCount,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Runtime version.
|
||||||
|
cmd := exec.CommandContext(ctx, a.Config.Agent.ContainerRuntime, "--version") //nolint:gosec // trusted config
|
||||||
|
if out, err := cmd.Output(); err == nil {
|
||||||
|
resp.RuntimeVersion = strings.TrimSpace(string(out))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Disk usage for /srv.
|
||||||
|
var stat unix.Statfs_t
|
||||||
|
if err := unix.Statfs("/srv", &stat); err == nil {
|
||||||
|
resp.DiskTotalBytes = stat.Blocks * uint64(stat.Bsize) //nolint:gosec // kernel values
|
||||||
|
resp.DiskFreeBytes = stat.Bavail * uint64(stat.Bsize) //nolint:gosec // kernel values
|
||||||
|
}
|
||||||
|
|
||||||
|
// Memory.
|
||||||
|
var sysinfo unix.Sysinfo_t
|
||||||
|
if err := unix.Sysinfo(&sysinfo); err == nil {
|
||||||
|
resp.MemoryTotalBytes = sysinfo.Totalram
|
||||||
|
resp.MemoryFreeBytes = sysinfo.Freeram
|
||||||
|
}
|
||||||
|
|
||||||
|
// CPU usage approximation: number of goroutines / GOMAXPROCS is a rough
|
||||||
|
// indicator. Real CPU monitoring would use /proc/stat, which is a v2 concern.
|
||||||
|
resp.CpuUsagePercent = float64(runtime.NumGoroutine()) / float64(runtime.GOMAXPROCS(0)) * 100
|
||||||
|
|
||||||
|
// Uptime: use sysinfo.
|
||||||
|
if err := unix.Sysinfo(&sysinfo); err == nil {
|
||||||
|
bootTime := time.Now().Add(-time.Duration(sysinfo.Uptime) * time.Second)
|
||||||
|
resp.UptimeSince = timestamppb.New(bootTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
@@ -97,6 +97,9 @@ func (a *Agent) liveCheckServices(ctx context.Context) ([]*mcpv1.ServiceInfo, er
|
|||||||
|
|
||||||
if rc, ok := runtimeByName[containerName]; ok {
|
if rc, ok := runtimeByName[containerName]; ok {
|
||||||
ci.ObservedState = rc.State
|
ci.ObservedState = rc.State
|
||||||
|
if !rc.Started.IsZero() {
|
||||||
|
ci.Started = timestamppb.New(rc.Started)
|
||||||
|
}
|
||||||
matched[containerName] = true
|
matched[containerName] = true
|
||||||
} else {
|
} else {
|
||||||
ci.ObservedState = "removed"
|
ci.ObservedState = "removed"
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Podman implements the Runtime interface using the podman CLI.
|
// Podman implements the Runtime interface using the podman CLI.
|
||||||
@@ -99,6 +100,7 @@ type podmanInspectResult struct {
|
|||||||
} `json:"Config"`
|
} `json:"Config"`
|
||||||
State struct {
|
State struct {
|
||||||
Status string `json:"Status"`
|
Status string `json:"Status"`
|
||||||
|
StartedAt string `json:"StartedAt"`
|
||||||
} `json:"State"`
|
} `json:"State"`
|
||||||
HostConfig struct {
|
HostConfig struct {
|
||||||
RestartPolicy struct {
|
RestartPolicy struct {
|
||||||
@@ -142,6 +144,9 @@ func (p *Podman) Inspect(ctx context.Context, name string) (ContainerInfo, error
|
|||||||
Cmd: r.Config.Cmd,
|
Cmd: r.Config.Cmd,
|
||||||
Version: ExtractVersion(r.Config.Image),
|
Version: ExtractVersion(r.Config.Image),
|
||||||
}
|
}
|
||||||
|
if t, err := time.Parse(time.RFC3339Nano, r.State.StartedAt); err == nil {
|
||||||
|
info.Started = t
|
||||||
|
}
|
||||||
|
|
||||||
info.Network = r.HostConfig.NetworkMode
|
info.Network = r.HostConfig.NetworkMode
|
||||||
if len(r.NetworkSettings.Networks) > 0 {
|
if len(r.NetworkSettings.Networks) > 0 {
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package runtime
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ContainerSpec describes a container to create and run.
|
// ContainerSpec describes a container to create and run.
|
||||||
@@ -29,6 +30,7 @@ type ContainerInfo struct {
|
|||||||
Volumes []string
|
Volumes []string
|
||||||
Cmd []string
|
Cmd []string
|
||||||
Version string // extracted from image tag
|
Version string // extracted from image tag
|
||||||
|
Started time.Time // when the container started (zero if not running)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Runtime is the container runtime abstraction.
|
// Runtime is the container runtime abstraction.
|
||||||
|
|||||||
Reference in New Issue
Block a user