P2.2-P2.9, P3.2-P3.10, P4.1-P4.3: Complete Phases 2, 3, and 4

11 work units built in parallel and merged:

Agent handlers (Phase 2):
- P2.2 Deploy: pull images, stop/remove/run containers, update registry
- P2.3 Lifecycle: stop/start/restart with desired_state tracking
- P2.4 Status: list (registry), live check (runtime), get status (drift+events)
- P2.5 Sync: receive desired state, reconcile unmanaged containers
- P2.6 File transfer: push/pull scoped to /srv/<service>/, path validation
- P2.7 Adopt: match <service>-* containers, derive component names
- P2.8 Monitor: continuous watch loop, drift/flap alerting, event pruning
- P2.9 Snapshot: VACUUM INTO database backup command

CLI commands (Phase 3):
- P3.2 Login, P3.3 Deploy, P3.4 Stop/Start/Restart
- P3.5 List/Ps/Status, P3.6 Sync, P3.7 Adopt
- P3.8 Service show/edit/export, P3.9 Push/Pull, P3.10 Node list/add/remove

Deployment artifacts (Phase 4):
- Systemd units (agent service + backup timer)
- Example configs (CLI + agent)
- Install script (idempotent)

All packages: build, vet, lint (0 issues), test (all pass).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-26 12:21:18 -07:00
parent d7cc970133
commit 8f913ddf9b
33 changed files with 3593 additions and 62 deletions

114
internal/agent/adopt.go Normal file
View File

@@ -0,0 +1,114 @@
package agent
import (
"context"
"fmt"
"strings"
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
"git.wntrmute.dev/kyle/mcp/internal/registry"
"git.wntrmute.dev/kyle/mcp/internal/runtime"
)
// AdoptContainers discovers running containers that match the given service
// name and registers them in the component registry. Containers named
// "<service>-*" or exactly "<service>" are matched.
func (a *Agent) AdoptContainers(ctx context.Context, req *mcpv1.AdoptContainersRequest) (*mcpv1.AdoptContainersResponse, error) {
service := req.GetService()
if service == "" {
return nil, fmt.Errorf("service name is required")
}
containers, err := a.Runtime.List(ctx)
if err != nil {
return nil, fmt.Errorf("list containers: %w", err)
}
prefix := service + "-"
// Filter matching containers before modifying any state.
type match struct {
container runtime.ContainerInfo
component string
}
var matches []match
for _, c := range containers {
switch {
case c.Name == service:
matches = append(matches, match{c, service})
case strings.HasPrefix(c.Name, prefix):
matches = append(matches, match{c, strings.TrimPrefix(c.Name, prefix)})
}
}
if len(matches) == 0 {
return &mcpv1.AdoptContainersResponse{}, nil
}
// Ensure the service exists once, before adopting any containers.
if err := registry.CreateService(a.DB, service, true); err != nil {
if _, getErr := registry.GetService(a.DB, service); getErr != nil {
return nil, fmt.Errorf("create service %q: %w", service, err)
}
}
var results []*mcpv1.AdoptResult
for _, m := range matches {
a.Logger.Info("adopting", "service", service, "container", m.container.Name, "component", m.component)
// Inspect the container to get full details (List only returns
// name, image, state, and version).
info, err := a.Runtime.Inspect(ctx, m.container.Name)
if err != nil {
results = append(results, &mcpv1.AdoptResult{
Container: m.container.Name,
Component: m.component,
Success: false,
Error: fmt.Sprintf("inspect container: %v", err),
})
continue
}
comp := &registry.Component{
Name: m.component,
Service: service,
Image: info.Image,
Network: info.Network,
UserSpec: info.User,
Restart: info.Restart,
DesiredState: desiredFromObserved(info.State),
ObservedState: info.State,
Version: info.Version,
Ports: info.Ports,
Volumes: info.Volumes,
Cmd: info.Cmd,
}
if createErr := registry.CreateComponent(a.DB, comp); createErr != nil {
results = append(results, &mcpv1.AdoptResult{
Container: m.container.Name,
Component: m.component,
Success: false,
Error: "already managed",
})
continue
}
results = append(results, &mcpv1.AdoptResult{
Container: m.container.Name,
Component: m.component,
Success: true,
})
}
return &mcpv1.AdoptContainersResponse{Results: results}, nil
}
// desiredFromObserved maps an observed container state to the desired state.
// Running containers should stay running; everything else is treated as stopped.
func desiredFromObserved(observed string) string {
if observed == "running" {
return "running"
}
return "stopped"
}

View File

@@ -0,0 +1,244 @@
package agent
import (
"context"
"testing"
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
"git.wntrmute.dev/kyle/mcp/internal/registry"
"git.wntrmute.dev/kyle/mcp/internal/runtime"
)
func TestAdoptContainers(t *testing.T) {
rt := &fakeRuntime{
containers: []runtime.ContainerInfo{
{
Name: "metacrypt-api",
Image: "mcr.svc.mcp.metacircular.net:8443/metacrypt:v1.0.0",
State: "running",
Network: "docker_default",
User: "0:0",
Restart: "unless-stopped",
Ports: []string{"127.0.0.1:18443:8443"},
Volumes: []string{"/srv/metacrypt:/srv/metacrypt"},
Cmd: []string{"server"},
Version: "v1.0.0",
},
{
Name: "metacrypt-web",
Image: "mcr.svc.mcp.metacircular.net:8443/metacrypt-web:v1.0.0",
State: "running",
Network: "docker_default",
Restart: "unless-stopped",
Ports: []string{"127.0.0.1:18080:8080"},
Version: "v1.0.0",
},
{
Name: "unrelated-container",
Image: "nginx:latest",
State: "running",
},
},
}
a := newTestAgent(t, rt)
ctx := context.Background()
resp, err := a.AdoptContainers(ctx, &mcpv1.AdoptContainersRequest{Service: "metacrypt"})
if err != nil {
t.Fatalf("AdoptContainers: %v", err)
}
if len(resp.Results) != 2 {
t.Fatalf("expected 2 results, got %d", len(resp.Results))
}
for _, r := range resp.Results {
if !r.Success {
t.Fatalf("expected success for %q, got error: %s", r.Container, r.Error)
}
}
// Verify components were created in the registry.
components, err := registry.ListComponents(a.DB, "metacrypt")
if err != nil {
t.Fatalf("ListComponents: %v", err)
}
if len(components) != 2 {
t.Fatalf("expected 2 components, got %d", len(components))
}
api, err := registry.GetComponent(a.DB, "metacrypt", "api")
if err != nil {
t.Fatalf("GetComponent api: %v", err)
}
if api.Image != "mcr.svc.mcp.metacircular.net:8443/metacrypt:v1.0.0" {
t.Fatalf("api image: got %q", api.Image)
}
if api.DesiredState != "running" {
t.Fatalf("api desired state: got %q, want running", api.DesiredState)
}
if api.Network != "docker_default" {
t.Fatalf("api network: got %q, want docker_default", api.Network)
}
}
func TestAdoptContainersNoMatch(t *testing.T) {
rt := &fakeRuntime{
containers: []runtime.ContainerInfo{
{Name: "unrelated", Image: "nginx:latest", State: "running"},
},
}
a := newTestAgent(t, rt)
ctx := context.Background()
resp, err := a.AdoptContainers(ctx, &mcpv1.AdoptContainersRequest{Service: "metacrypt"})
if err != nil {
t.Fatalf("AdoptContainers: %v", err)
}
if len(resp.Results) != 0 {
t.Fatalf("expected 0 results, got %d", len(resp.Results))
}
}
func TestAdoptContainersAlreadyManaged(t *testing.T) {
rt := &fakeRuntime{
containers: []runtime.ContainerInfo{
{
Name: "metacrypt-api",
Image: "mcr.svc.mcp.metacircular.net:8443/metacrypt:v1.0.0",
State: "running",
Network: "docker_default",
Restart: "unless-stopped",
Version: "v1.0.0",
},
},
}
a := newTestAgent(t, rt)
ctx := context.Background()
// First adopt succeeds.
resp, err := a.AdoptContainers(ctx, &mcpv1.AdoptContainersRequest{Service: "metacrypt"})
if err != nil {
t.Fatalf("first adopt: %v", err)
}
if len(resp.Results) != 1 || !resp.Results[0].Success {
t.Fatalf("first adopt should succeed: %+v", resp.Results)
}
// Second adopt should report "already managed".
resp, err = a.AdoptContainers(ctx, &mcpv1.AdoptContainersRequest{Service: "metacrypt"})
if err != nil {
t.Fatalf("second adopt: %v", err)
}
if len(resp.Results) != 1 {
t.Fatalf("expected 1 result, got %d", len(resp.Results))
}
if resp.Results[0].Success {
t.Fatal("second adopt should fail for already-managed container")
}
if resp.Results[0].Error != "already managed" {
t.Fatalf("expected 'already managed' error, got %q", resp.Results[0].Error)
}
}
func TestAdoptContainersSingleComponent(t *testing.T) {
rt := &fakeRuntime{
containers: []runtime.ContainerInfo{
{
Name: "mc-proxy",
Image: "mcr.svc.mcp.metacircular.net:8443/mc-proxy:v1.0.0",
State: "running",
Network: "host",
Restart: "unless-stopped",
Version: "v1.0.0",
},
},
}
a := newTestAgent(t, rt)
ctx := context.Background()
resp, err := a.AdoptContainers(ctx, &mcpv1.AdoptContainersRequest{Service: "mc-proxy"})
if err != nil {
t.Fatalf("AdoptContainers: %v", err)
}
if len(resp.Results) != 1 {
t.Fatalf("expected 1 result, got %d", len(resp.Results))
}
if resp.Results[0].Component != "mc-proxy" {
t.Fatalf("expected component name 'mc-proxy', got %q", resp.Results[0].Component)
}
if !resp.Results[0].Success {
t.Fatalf("expected success, got error: %s", resp.Results[0].Error)
}
}
func TestAdoptContainersStoppedContainer(t *testing.T) {
rt := &fakeRuntime{
containers: []runtime.ContainerInfo{
{
Name: "metacrypt-api",
Image: "mcr.svc.mcp.metacircular.net:8443/metacrypt:v1.0.0",
State: "exited",
Network: "docker_default",
Restart: "unless-stopped",
Version: "v1.0.0",
},
},
}
a := newTestAgent(t, rt)
ctx := context.Background()
resp, err := a.AdoptContainers(ctx, &mcpv1.AdoptContainersRequest{Service: "metacrypt"})
if err != nil {
t.Fatalf("AdoptContainers: %v", err)
}
if len(resp.Results) != 1 || !resp.Results[0].Success {
t.Fatalf("expected success: %+v", resp.Results)
}
comp, err := registry.GetComponent(a.DB, "metacrypt", "api")
if err != nil {
t.Fatalf("GetComponent: %v", err)
}
if comp.DesiredState != "stopped" {
t.Fatalf("desired state: got %q, want stopped", comp.DesiredState)
}
if comp.ObservedState != "exited" {
t.Fatalf("observed state: got %q, want exited", comp.ObservedState)
}
}
func TestDesiredFromObserved(t *testing.T) {
tests := []struct {
observed string
want string
}{
{"running", "running"},
{"exited", "stopped"},
{"stopped", "stopped"},
{"created", "stopped"},
{"", "stopped"},
}
for _, tt := range tests {
got := desiredFromObserved(tt.observed)
if got != tt.want {
t.Errorf("desiredFromObserved(%q) = %q, want %q", tt.observed, got, tt.want)
}
}
}
func TestAdoptContainersEmptyService(t *testing.T) {
rt := &fakeRuntime{}
a := newTestAgent(t, rt)
ctx := context.Background()
_, err := a.AdoptContainers(ctx, &mcpv1.AdoptContainersRequest{Service: ""})
if err == nil {
t.Fatal("expected error for empty service name")
}
}

139
internal/agent/deploy.go Normal file
View File

@@ -0,0 +1,139 @@
package agent
import (
"context"
"database/sql"
"errors"
"fmt"
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
"git.wntrmute.dev/kyle/mcp/internal/registry"
"git.wntrmute.dev/kyle/mcp/internal/runtime"
)
// Deploy deploys a service (or a single component of it) to this node.
func (a *Agent) Deploy(ctx context.Context, req *mcpv1.DeployRequest) (*mcpv1.DeployResponse, error) {
spec := req.GetService()
if spec == nil {
return nil, fmt.Errorf("deploy: missing service spec")
}
serviceName := spec.GetName()
a.Logger.Info("deploying", "service", serviceName)
if err := ensureService(a.DB, serviceName, spec.GetActive()); err != nil {
return nil, fmt.Errorf("deploy: ensure service %q: %w", serviceName, err)
}
components := spec.GetComponents()
if target := req.GetComponent(); target != "" {
var filtered []*mcpv1.ComponentSpec
for _, cs := range components {
if cs.GetName() == target {
filtered = append(filtered, cs)
}
}
components = filtered
}
var results []*mcpv1.ComponentResult
for _, cs := range components {
result := a.deployComponent(ctx, serviceName, cs)
results = append(results, result)
}
return &mcpv1.DeployResponse{Results: results}, nil
}
// deployComponent handles the full deploy lifecycle for a single component.
func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcpv1.ComponentSpec) *mcpv1.ComponentResult {
compName := cs.GetName()
containerName := serviceName + "-" + compName
a.Logger.Info("deploying component", "service", serviceName, "component", compName)
regComp := &registry.Component{
Name: compName,
Service: serviceName,
Image: cs.GetImage(),
Network: cs.GetNetwork(),
UserSpec: cs.GetUser(),
Restart: cs.GetRestart(),
DesiredState: "running",
Version: runtime.ExtractVersion(cs.GetImage()),
Ports: cs.GetPorts(),
Volumes: cs.GetVolumes(),
Cmd: cs.GetCmd(),
}
if err := ensureComponent(a.DB, regComp); err != nil {
return &mcpv1.ComponentResult{
Name: compName,
Error: fmt.Sprintf("ensure component: %v", err),
}
}
if err := a.Runtime.Pull(ctx, cs.GetImage()); err != nil {
return &mcpv1.ComponentResult{
Name: compName,
Error: fmt.Sprintf("pull image: %v", err),
}
}
_ = a.Runtime.Stop(ctx, containerName) // may not exist yet
_ = a.Runtime.Remove(ctx, containerName) // may not exist yet
runSpec := runtime.ContainerSpec{
Name: containerName,
Image: cs.GetImage(),
Network: cs.GetNetwork(),
User: cs.GetUser(),
Restart: cs.GetRestart(),
Ports: cs.GetPorts(),
Volumes: cs.GetVolumes(),
Cmd: cs.GetCmd(),
}
if err := a.Runtime.Run(ctx, runSpec); err != nil {
_ = registry.UpdateComponentState(a.DB, serviceName, compName, "", "removed")
return &mcpv1.ComponentResult{
Name: compName,
Error: fmt.Sprintf("run container: %v", err),
}
}
if err := registry.UpdateComponentState(a.DB, serviceName, compName, "running", "running"); err != nil {
a.Logger.Warn("failed to update component state", "service", serviceName, "component", compName, "err", err)
}
return &mcpv1.ComponentResult{
Name: compName,
Success: true,
}
}
// ensureService creates the service if it does not exist, or updates its
// active flag if it does.
func ensureService(db *sql.DB, name string, active bool) error {
_, err := registry.GetService(db, name)
if errors.Is(err, sql.ErrNoRows) {
return registry.CreateService(db, name, active)
}
if err != nil {
return err
}
return registry.UpdateServiceActive(db, name, active)
}
// ensureComponent creates the component if it does not exist, or updates its
// spec if it does.
func ensureComponent(db *sql.DB, c *registry.Component) error {
_, err := registry.GetComponent(db, c.Service, c.Name)
if errors.Is(err, sql.ErrNoRows) {
c.ObservedState = "unknown"
return registry.CreateComponent(db, c)
}
if err != nil {
return err
}
return registry.UpdateComponentSpec(db, c)
}

152
internal/agent/files.go Normal file
View File

@@ -0,0 +1,152 @@
package agent
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"strings"
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// validatePath validates and resolves a relative path within a service's
// /srv/<service>/ directory. It rejects path traversal, absolute paths,
// and symlink escapes.
func validatePath(service, relPath string) (string, error) {
if service == "" {
return "", fmt.Errorf("empty service name")
}
if relPath == "" {
return "", fmt.Errorf("empty path")
}
if filepath.IsAbs(relPath) {
return "", fmt.Errorf("absolute path not allowed: %s", relPath)
}
cleaned := filepath.Clean(relPath)
if strings.Contains(cleaned, "..") {
return "", fmt.Errorf("path traversal not allowed: %s", relPath)
}
serviceDir := filepath.Join("/srv", service)
fullPath := filepath.Join(serviceDir, cleaned)
if !strings.HasPrefix(fullPath, serviceDir+"/") {
return "", fmt.Errorf("path escapes service directory: %s", relPath)
}
parentDir := filepath.Dir(fullPath)
if _, err := os.Stat(parentDir); err == nil {
resolved, err := filepath.EvalSymlinks(parentDir)
if err != nil {
return "", fmt.Errorf("resolve symlinks: %w", err)
}
if !strings.HasPrefix(resolved, serviceDir) {
return "", fmt.Errorf("symlink escapes service directory: %s", relPath)
}
}
return fullPath, nil
}
// PushFile writes a file to the node's filesystem under /srv/<service>/.
func (a *Agent) PushFile(ctx context.Context, req *mcpv1.PushFileRequest) (*mcpv1.PushFileResponse, error) {
if req.Service == "" {
return nil, status.Errorf(codes.InvalidArgument, "service name required")
}
if req.Path == "" {
return nil, status.Errorf(codes.InvalidArgument, "path required")
}
fullPath, err := validatePath(req.Service, req.Path)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid path: %v", err)
}
a.Logger.Info("push file", "service", req.Service, "path", req.Path)
dir := filepath.Dir(fullPath)
if err := os.MkdirAll(dir, 0750); err != nil {
return nil, status.Errorf(codes.Internal, "create directories: %v", err)
}
// Atomic write: temp file in the same directory, then rename.
tmp, err := os.CreateTemp(dir, ".mcp-push-*")
if err != nil {
return nil, status.Errorf(codes.Internal, "create temp file: %v", err)
}
tmpName := tmp.Name()
cleanup := func() { _ = os.Remove(tmpName) }
if _, err := tmp.Write(req.Content); err != nil {
_ = tmp.Close()
cleanup()
return nil, status.Errorf(codes.Internal, "write temp file: %v", err)
}
if err := tmp.Close(); err != nil {
cleanup()
return nil, status.Errorf(codes.Internal, "close temp file: %v", err)
}
mode := os.FileMode(req.Mode)
if mode == 0 {
mode = 0600
}
if err := os.Chmod(tmpName, mode); err != nil {
cleanup()
return nil, status.Errorf(codes.Internal, "set permissions: %v", err)
}
if err := os.Rename(tmpName, fullPath); err != nil {
cleanup()
return nil, status.Errorf(codes.Internal, "rename to target: %v", err)
}
return &mcpv1.PushFileResponse{Success: true}, nil
}
// PullFile reads a file from the node's filesystem under /srv/<service>/.
func (a *Agent) PullFile(ctx context.Context, req *mcpv1.PullFileRequest) (*mcpv1.PullFileResponse, error) {
if req.Service == "" {
return nil, status.Errorf(codes.InvalidArgument, "service name required")
}
if req.Path == "" {
return nil, status.Errorf(codes.InvalidArgument, "path required")
}
fullPath, err := validatePath(req.Service, req.Path)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid path: %v", err)
}
a.Logger.Info("pull file", "service", req.Service, "path", req.Path)
f, err := os.Open(fullPath) //nolint:gosec // path validated by validatePath
if err != nil {
if os.IsNotExist(err) {
return nil, status.Errorf(codes.NotFound, "file not found: %s", req.Path)
}
return nil, status.Errorf(codes.Internal, "open file: %v", err)
}
defer f.Close() //nolint:errcheck
info, err := f.Stat()
if err != nil {
return nil, status.Errorf(codes.Internal, "stat file: %v", err)
}
content, err := io.ReadAll(f)
if err != nil {
return nil, status.Errorf(codes.Internal, "read file: %v", err)
}
return &mcpv1.PullFileResponse{
Content: content,
Mode: uint32(info.Mode().Perm()),
}, nil
}

View File

@@ -0,0 +1,82 @@
package agent
import (
"testing"
)
func TestValidatePath(t *testing.T) {
tests := []struct {
name string
service string
path string
want string
wantErr bool
}{
{
name: "valid simple path",
service: "mcr",
path: "mcr.toml",
want: "/srv/mcr/mcr.toml",
},
{
name: "valid nested path",
service: "mcr",
path: "certs/cert.pem",
want: "/srv/mcr/certs/cert.pem",
},
{
name: "reject traversal",
service: "mcr",
path: "../etc/passwd",
wantErr: true,
},
{
name: "reject absolute path",
service: "mcr",
path: "/etc/passwd",
wantErr: true,
},
{
name: "reject empty service",
service: "",
path: "mcr.toml",
wantErr: true,
},
{
name: "reject empty path",
service: "mcr",
path: "",
wantErr: true,
},
{
name: "reject double dot in middle",
service: "mcr",
path: "certs/../../etc/passwd",
wantErr: true,
},
{
name: "valid deeply nested",
service: "metacrypt",
path: "data/keys/primary.key",
want: "/srv/metacrypt/data/keys/primary.key",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := validatePath(tt.service, tt.path)
if tt.wantErr {
if err == nil {
t.Fatalf("expected error, got path %q", got)
}
return
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got != tt.want {
t.Errorf("got %q, want %q", got, tt.want)
}
})
}
}

160
internal/agent/lifecycle.go Normal file
View File

@@ -0,0 +1,160 @@
package agent
import (
"context"
"database/sql"
"fmt"
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
"git.wntrmute.dev/kyle/mcp/internal/registry"
"git.wntrmute.dev/kyle/mcp/internal/runtime"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// StopService stops all components of a service.
func (a *Agent) StopService(ctx context.Context, req *mcpv1.StopServiceRequest) (*mcpv1.StopServiceResponse, error) {
a.Logger.Info("StopService", "service", req.GetName())
if req.GetName() == "" {
return nil, status.Error(codes.InvalidArgument, "service name is required")
}
components, err := registry.ListComponents(a.DB, req.GetName())
if err != nil {
return nil, status.Errorf(codes.Internal, "list components: %v", err)
}
var results []*mcpv1.ComponentResult
for _, c := range components {
containerName := req.GetName() + "-" + c.Name
r := &mcpv1.ComponentResult{Name: c.Name, Success: true}
if err := a.Runtime.Stop(ctx, containerName); err != nil {
a.Logger.Info("stop container (ignored)", "container", containerName, "error", err)
}
if err := registry.UpdateComponentState(a.DB, req.GetName(), c.Name, "stopped", "stopped"); err != nil {
r.Success = false
r.Error = fmt.Sprintf("update state: %v", err)
}
results = append(results, r)
}
return &mcpv1.StopServiceResponse{Results: results}, nil
}
// StartService starts all components of a service. If a container already
// exists but is stopped, it is removed first so a fresh one can be created.
func (a *Agent) StartService(ctx context.Context, req *mcpv1.StartServiceRequest) (*mcpv1.StartServiceResponse, error) {
a.Logger.Info("StartService", "service", req.GetName())
if req.GetName() == "" {
return nil, status.Error(codes.InvalidArgument, "service name is required")
}
components, err := registry.ListComponents(a.DB, req.GetName())
if err != nil {
return nil, status.Errorf(codes.Internal, "list components: %v", err)
}
var results []*mcpv1.ComponentResult
for _, c := range components {
r := startComponent(ctx, a, req.GetName(), &c)
results = append(results, r)
}
return &mcpv1.StartServiceResponse{Results: results}, nil
}
// RestartService restarts all components of a service by stopping, removing,
// and re-creating each container. The desired_state is not changed.
func (a *Agent) RestartService(ctx context.Context, req *mcpv1.RestartServiceRequest) (*mcpv1.RestartServiceResponse, error) {
a.Logger.Info("RestartService", "service", req.GetName())
if req.GetName() == "" {
return nil, status.Error(codes.InvalidArgument, "service name is required")
}
components, err := registry.ListComponents(a.DB, req.GetName())
if err != nil {
return nil, status.Errorf(codes.Internal, "list components: %v", err)
}
var results []*mcpv1.ComponentResult
for _, c := range components {
r := restartComponent(ctx, a, req.GetName(), &c)
results = append(results, r)
}
return &mcpv1.RestartServiceResponse{Results: results}, nil
}
// startComponent removes any existing container and runs a fresh one from
// the registry spec, then updates state to running.
func startComponent(ctx context.Context, a *Agent, service string, c *registry.Component) *mcpv1.ComponentResult {
containerName := service + "-" + c.Name
r := &mcpv1.ComponentResult{Name: c.Name, Success: true}
// Remove any pre-existing container; ignore errors for non-existent ones.
_ = a.Runtime.Stop(ctx, containerName)
_ = a.Runtime.Remove(ctx, containerName)
spec := componentToSpec(service, c)
if err := a.Runtime.Run(ctx, spec); err != nil {
r.Success = false
r.Error = fmt.Sprintf("run container: %v", err)
return r
}
if err := registry.UpdateComponentState(a.DB, service, c.Name, "running", "running"); err != nil {
r.Success = false
r.Error = fmt.Sprintf("update state: %v", err)
}
return r
}
// restartComponent stops, removes, and re-creates a container without
// changing the desired_state in the registry.
func restartComponent(ctx context.Context, a *Agent, service string, c *registry.Component) *mcpv1.ComponentResult {
containerName := service + "-" + c.Name
r := &mcpv1.ComponentResult{Name: c.Name, Success: true}
_ = a.Runtime.Stop(ctx, containerName)
_ = a.Runtime.Remove(ctx, containerName)
spec := componentToSpec(service, c)
if err := a.Runtime.Run(ctx, spec); err != nil {
r.Success = false
r.Error = fmt.Sprintf("run container: %v", err)
_ = registry.UpdateComponentState(a.DB, service, c.Name, "", "stopped")
return r
}
if err := registry.UpdateComponentState(a.DB, service, c.Name, "", "running"); err != nil {
r.Success = false
r.Error = fmt.Sprintf("update state: %v", err)
}
return r
}
// componentToSpec builds a runtime.ContainerSpec from a registry Component.
func componentToSpec(service string, c *registry.Component) runtime.ContainerSpec {
return runtime.ContainerSpec{
Name: service + "-" + c.Name,
Image: c.Image,
Network: c.Network,
User: c.UserSpec,
Restart: c.Restart,
Ports: c.Ports,
Volumes: c.Volumes,
Cmd: c.Cmd,
}
}
// componentExists checks whether a component already exists in the registry.
func componentExists(db *sql.DB, service, name string) bool {
_, err := registry.GetComponent(db, service, name)
return err == nil
}

219
internal/agent/status.go Normal file
View File

@@ -0,0 +1,219 @@
package agent
import (
"context"
"fmt"
"strings"
"time"
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
"git.wntrmute.dev/kyle/mcp/internal/registry"
"git.wntrmute.dev/kyle/mcp/internal/runtime"
"google.golang.org/protobuf/types/known/timestamppb"
)
// buildServiceInfo converts a registry Service and its components into a proto
// ServiceInfo message.
func buildServiceInfo(svc registry.Service, components []registry.Component) *mcpv1.ServiceInfo {
info := &mcpv1.ServiceInfo{
Name: svc.Name,
Active: svc.Active,
}
for _, c := range components {
info.Components = append(info.Components, &mcpv1.ComponentInfo{
Name: c.Name,
Image: c.Image,
DesiredState: c.DesiredState,
ObservedState: c.ObservedState,
Version: c.Version,
})
}
return info
}
// ListServices returns all services and their components from the registry.
// It does not query the container runtime.
func (a *Agent) ListServices(ctx context.Context, req *mcpv1.ListServicesRequest) (*mcpv1.ListServicesResponse, error) {
a.Logger.Debug("ListServices called")
services, err := registry.ListServices(a.DB)
if err != nil {
return nil, fmt.Errorf("list services: %w", err)
}
resp := &mcpv1.ListServicesResponse{}
for _, svc := range services {
components, err := registry.ListComponents(a.DB, svc.Name)
if err != nil {
return nil, fmt.Errorf("list components for %q: %w", svc.Name, err)
}
resp.Services = append(resp.Services, buildServiceInfo(svc, components))
}
return resp, nil
}
// liveCheckServices queries the container runtime and reconciles with the
// registry. It returns a list of ServiceInfo messages with updated observed
// state. This shared logic is used by both LiveCheck and GetServiceStatus.
func (a *Agent) liveCheckServices(ctx context.Context) ([]*mcpv1.ServiceInfo, error) {
containers, err := a.Runtime.List(ctx)
if err != nil {
return nil, fmt.Errorf("runtime list: %w", err)
}
runtimeByName := make(map[string]runtime.ContainerInfo, len(containers))
for _, c := range containers {
runtimeByName[c.Name] = c
}
matched := make(map[string]bool)
services, err := registry.ListServices(a.DB)
if err != nil {
return nil, fmt.Errorf("list services: %w", err)
}
var result []*mcpv1.ServiceInfo
for _, svc := range services {
components, err := registry.ListComponents(a.DB, svc.Name)
if err != nil {
return nil, fmt.Errorf("list components for %q: %w", svc.Name, err)
}
info := &mcpv1.ServiceInfo{
Name: svc.Name,
Active: svc.Active,
}
for _, comp := range components {
containerName := svc.Name + "-" + comp.Name
ci := &mcpv1.ComponentInfo{
Name: comp.Name,
Image: comp.Image,
DesiredState: comp.DesiredState,
Version: comp.Version,
}
if rc, ok := runtimeByName[containerName]; ok {
ci.ObservedState = rc.State
matched[containerName] = true
} else {
ci.ObservedState = "removed"
}
info.Components = append(info.Components, ci)
}
result = append(result, info)
}
for _, c := range containers {
if matched[c.Name] {
continue
}
svcName, compName := splitContainerName(c.Name)
result = append(result, &mcpv1.ServiceInfo{
Name: svcName,
Active: false,
Components: []*mcpv1.ComponentInfo{
{
Name: compName,
Image: c.Image,
DesiredState: "ignore",
ObservedState: c.State,
Version: c.Version,
},
},
})
}
return result, nil
}
// LiveCheck queries the container runtime, reconciles against the registry,
// and returns the updated state for all services.
func (a *Agent) LiveCheck(ctx context.Context, req *mcpv1.LiveCheckRequest) (*mcpv1.LiveCheckResponse, error) {
a.Logger.Debug("LiveCheck called")
services, err := a.liveCheckServices(ctx)
if err != nil {
return nil, fmt.Errorf("live check: %w", err)
}
return &mcpv1.LiveCheckResponse{Services: services}, nil
}
// GetServiceStatus performs a live check, detects drift, and returns recent
// events. If a service name is provided, results are filtered to that service.
func (a *Agent) GetServiceStatus(ctx context.Context, req *mcpv1.GetServiceStatusRequest) (*mcpv1.GetServiceStatusResponse, error) {
a.Logger.Debug("GetServiceStatus called", "service", req.GetName())
services, err := a.liveCheckServices(ctx)
if err != nil {
return nil, fmt.Errorf("live check: %w", err)
}
if req.GetName() != "" {
var filtered []*mcpv1.ServiceInfo
for _, svc := range services {
if svc.Name == req.GetName() {
filtered = append(filtered, svc)
}
}
services = filtered
}
var drift []*mcpv1.DriftInfo
for _, svc := range services {
for _, comp := range svc.Components {
if comp.DesiredState == "ignore" {
continue
}
if comp.DesiredState != comp.ObservedState {
drift = append(drift, &mcpv1.DriftInfo{
Service: svc.Name,
Component: comp.Name,
DesiredState: comp.DesiredState,
ObservedState: comp.ObservedState,
})
}
}
}
since := time.Now().Add(-1 * time.Hour)
svcFilter := req.GetName()
events, err := registry.QueryEvents(a.DB, svcFilter, "", since, 50)
if err != nil {
return nil, fmt.Errorf("query events: %w", err)
}
var protoEvents []*mcpv1.EventInfo
for _, e := range events {
protoEvents = append(protoEvents, &mcpv1.EventInfo{
Service: e.Service,
Component: e.Component,
PrevState: e.PrevState,
NewState: e.NewState,
Timestamp: timestamppb.New(e.Timestamp),
})
}
return &mcpv1.GetServiceStatusResponse{
Services: services,
Drift: drift,
RecentEvents: protoEvents,
}, nil
}
// splitContainerName splits a container name like "metacrypt-api" into service
// and component parts. If there is no hyphen, the whole name is used as both
// the service and component name.
func splitContainerName(name string) (service, component string) {
if i := strings.Index(name, "-"); i >= 0 {
return name[:i], name[i+1:]
}
return name, name
}

View File

@@ -0,0 +1,274 @@
package agent
import (
"context"
"testing"
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
"git.wntrmute.dev/kyle/mcp/internal/registry"
"git.wntrmute.dev/kyle/mcp/internal/runtime"
)
func TestListServices(t *testing.T) {
a := newTestAgent(t, &fakeRuntime{})
ctx := context.Background()
// Empty registry.
resp, err := a.ListServices(ctx, &mcpv1.ListServicesRequest{})
if err != nil {
t.Fatalf("ListServices: %v", err)
}
if len(resp.Services) != 0 {
t.Fatalf("expected 0 services, got %d", len(resp.Services))
}
// Add a service with components.
if err := registry.CreateService(a.DB, "metacrypt", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "api", Service: "metacrypt",
Image: "img:v1", DesiredState: "running", ObservedState: "running",
Version: "v1",
}); err != nil {
t.Fatalf("create component: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "web", Service: "metacrypt",
Image: "img-web:v1", DesiredState: "running", ObservedState: "unknown",
Version: "v1",
}); err != nil {
t.Fatalf("create component: %v", err)
}
resp, err = a.ListServices(ctx, &mcpv1.ListServicesRequest{})
if err != nil {
t.Fatalf("ListServices: %v", err)
}
if len(resp.Services) != 1 {
t.Fatalf("expected 1 service, got %d", len(resp.Services))
}
svc := resp.Services[0]
if svc.Name != "metacrypt" || !svc.Active {
t.Fatalf("unexpected service: %+v", svc)
}
if len(svc.Components) != 2 {
t.Fatalf("expected 2 components, got %d", len(svc.Components))
}
if svc.Components[0].Name != "api" {
t.Fatalf("expected first component 'api', got %q", svc.Components[0].Name)
}
}
func TestLiveCheck(t *testing.T) {
rt := &fakeRuntime{
containers: []runtime.ContainerInfo{
{Name: "metacrypt-api", Image: "img:v1", State: "running", Version: "v1"},
{Name: "unmanaged-thing", Image: "other:latest", State: "running", Version: "latest"},
},
}
a := newTestAgent(t, rt)
ctx := context.Background()
// Set up registry with one service and one component.
if err := registry.CreateService(a.DB, "metacrypt", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "api", Service: "metacrypt",
Image: "img:v1", DesiredState: "running", ObservedState: "unknown",
Version: "v1",
}); err != nil {
t.Fatalf("create component: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "web", Service: "metacrypt",
Image: "img-web:v1", DesiredState: "running", ObservedState: "unknown",
Version: "v1",
}); err != nil {
t.Fatalf("create component: %v", err)
}
resp, err := a.LiveCheck(ctx, &mcpv1.LiveCheckRequest{})
if err != nil {
t.Fatalf("LiveCheck: %v", err)
}
// Should have 2 entries: the registered service and the unmanaged container.
if len(resp.Services) != 2 {
t.Fatalf("expected 2 service entries, got %d", len(resp.Services))
}
// First entry: metacrypt (registered).
mc := resp.Services[0]
if mc.Name != "metacrypt" {
t.Fatalf("expected 'metacrypt', got %q", mc.Name)
}
if len(mc.Components) != 2 {
t.Fatalf("expected 2 components, got %d", len(mc.Components))
}
// api should be running (found in runtime).
api := mc.Components[0]
if api.Name != "api" || api.ObservedState != "running" {
t.Fatalf("api: expected observed_state=running, got %q", api.ObservedState)
}
// web should be removed (not found in runtime).
web := mc.Components[1]
if web.Name != "web" || web.ObservedState != "removed" {
t.Fatalf("web: expected observed_state=removed, got %q", web.ObservedState)
}
// Second entry: unmanaged container.
unmanaged := resp.Services[1]
if unmanaged.Name != "unmanaged" {
t.Fatalf("expected unmanaged service name 'unmanaged', got %q", unmanaged.Name)
}
if len(unmanaged.Components) != 1 {
t.Fatalf("expected 1 unmanaged component, got %d", len(unmanaged.Components))
}
uc := unmanaged.Components[0]
if uc.DesiredState != "ignore" {
t.Fatalf("unmanaged desired_state: expected 'ignore', got %q", uc.DesiredState)
}
if uc.ObservedState != "running" {
t.Fatalf("unmanaged observed_state: expected 'running', got %q", uc.ObservedState)
}
}
func TestGetServiceStatus_DriftDetection(t *testing.T) {
rt := &fakeRuntime{
containers: []runtime.ContainerInfo{
{Name: "metacrypt-api", Image: "img:v1", State: "exited", Version: "v1"},
},
}
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "metacrypt", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "api", Service: "metacrypt",
Image: "img:v1", DesiredState: "running", ObservedState: "running",
Version: "v1",
}); err != nil {
t.Fatalf("create component: %v", err)
}
// Add an event so we can verify it appears.
if err := registry.InsertEvent(a.DB, "metacrypt", "api", "running", "exited"); err != nil {
t.Fatalf("insert event: %v", err)
}
resp, err := a.GetServiceStatus(ctx, &mcpv1.GetServiceStatusRequest{Name: "metacrypt"})
if err != nil {
t.Fatalf("GetServiceStatus: %v", err)
}
if len(resp.Services) != 1 {
t.Fatalf("expected 1 service, got %d", len(resp.Services))
}
if resp.Services[0].Name != "metacrypt" {
t.Fatalf("expected 'metacrypt', got %q", resp.Services[0].Name)
}
// Drift: desired=running, observed=exited.
if len(resp.Drift) != 1 {
t.Fatalf("expected 1 drift entry, got %d", len(resp.Drift))
}
d := resp.Drift[0]
if d.Service != "metacrypt" || d.Component != "api" {
t.Fatalf("drift: unexpected service/component: %q/%q", d.Service, d.Component)
}
if d.DesiredState != "running" || d.ObservedState != "exited" {
t.Fatalf("drift: desired=%q, observed=%q", d.DesiredState, d.ObservedState)
}
// Events.
if len(resp.RecentEvents) != 1 {
t.Fatalf("expected 1 event, got %d", len(resp.RecentEvents))
}
ev := resp.RecentEvents[0]
if ev.PrevState != "running" || ev.NewState != "exited" {
t.Fatalf("event: prev=%q, new=%q", ev.PrevState, ev.NewState)
}
}
func TestGetServiceStatus_FilterByName(t *testing.T) {
rt := &fakeRuntime{
containers: []runtime.ContainerInfo{
{Name: "metacrypt-api", Image: "img:v1", State: "running", Version: "v1"},
{Name: "mcr-api", Image: "mcr:v1", State: "running", Version: "v1"},
},
}
a := newTestAgent(t, rt)
ctx := context.Background()
for _, svc := range []string{"metacrypt", "mcr"} {
if err := registry.CreateService(a.DB, svc, true); err != nil {
t.Fatalf("create service %q: %v", svc, err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "api", Service: svc,
Image: svc + ":v1", DesiredState: "running", ObservedState: "unknown",
Version: "v1",
}); err != nil {
t.Fatalf("create component for %q: %v", svc, err)
}
}
resp, err := a.GetServiceStatus(ctx, &mcpv1.GetServiceStatusRequest{Name: "mcr"})
if err != nil {
t.Fatalf("GetServiceStatus: %v", err)
}
if len(resp.Services) != 1 {
t.Fatalf("expected 1 service, got %d", len(resp.Services))
}
if resp.Services[0].Name != "mcr" {
t.Fatalf("expected 'mcr', got %q", resp.Services[0].Name)
}
}
func TestGetServiceStatus_IgnoreSkipsDrift(t *testing.T) {
rt := &fakeRuntime{
containers: []runtime.ContainerInfo{
{Name: "random-thing", Image: "img:v1", State: "running", Version: "v1"},
},
}
a := newTestAgent(t, rt)
ctx := context.Background()
// No registered services, only an unmanaged container.
resp, err := a.GetServiceStatus(ctx, &mcpv1.GetServiceStatusRequest{})
if err != nil {
t.Fatalf("GetServiceStatus: %v", err)
}
// The unmanaged container should not appear in drift.
if len(resp.Drift) != 0 {
t.Fatalf("expected 0 drift entries for unmanaged, got %d", len(resp.Drift))
}
}
func TestSplitContainerName(t *testing.T) {
tests := []struct {
name string
service string
comp string
}{
{"metacrypt-api", "metacrypt", "api"},
{"metacrypt-web-ui", "metacrypt", "web-ui"},
{"standalone", "standalone", "standalone"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svc, comp := splitContainerName(tt.name)
if svc != tt.service || comp != tt.comp {
t.Fatalf("splitContainerName(%q) = (%q, %q), want (%q, %q)",
tt.name, svc, comp, tt.service, tt.comp)
}
})
}
}

183
internal/agent/sync.go Normal file
View File

@@ -0,0 +1,183 @@
package agent
import (
"context"
"fmt"
"strings"
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
"git.wntrmute.dev/kyle/mcp/internal/registry"
"git.wntrmute.dev/kyle/mcp/internal/runtime"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// SyncDesiredState reconciles the agent's registry with the declared service
// specs. It creates or updates services and components, then discovers
// untracked containers and adds them with desired_state "ignore".
func (a *Agent) SyncDesiredState(ctx context.Context, req *mcpv1.SyncDesiredStateRequest) (*mcpv1.SyncDesiredStateResponse, error) {
a.Logger.Info("SyncDesiredState", "services", len(req.GetServices()))
known := make(map[string]bool)
var results []*mcpv1.ServiceSyncResult
for _, spec := range req.GetServices() {
if spec.GetName() == "" {
return nil, status.Error(codes.InvalidArgument, "service name is required")
}
r, names, err := a.syncService(ctx, spec)
if err != nil {
return nil, err
}
for _, n := range names {
known[n] = true
}
results = append(results, r)
}
// Reconciliation: find containers not in the registry and add them.
if err := a.reconcileUntracked(ctx, known); err != nil {
a.Logger.Info("reconcile untracked containers failed", "error", err)
}
return &mcpv1.SyncDesiredStateResponse{Results: results}, nil
}
// syncService creates or updates a single service and its components.
// It returns the sync result and a list of container names that belong to
// this service.
func (a *Agent) syncService(_ context.Context, spec *mcpv1.ServiceSpec) (*mcpv1.ServiceSyncResult, []string, error) {
result := &mcpv1.ServiceSyncResult{Name: spec.GetName()}
var changes []string
var containerNames []string
desiredState := "running"
if !spec.GetActive() {
desiredState = "stopped"
}
// Create or update the service record.
existing, err := registry.GetService(a.DB, spec.GetName())
if err != nil {
// Service does not exist; create it.
if err := registry.CreateService(a.DB, spec.GetName(), spec.GetActive()); err != nil {
return nil, nil, status.Errorf(codes.Internal, "create service %q: %v", spec.GetName(), err)
}
changes = append(changes, "created service")
} else if existing.Active != spec.GetActive() {
if err := registry.UpdateServiceActive(a.DB, spec.GetName(), spec.GetActive()); err != nil {
return nil, nil, status.Errorf(codes.Internal, "update service %q: %v", spec.GetName(), err)
}
changes = append(changes, fmt.Sprintf("active: %v -> %v", existing.Active, spec.GetActive()))
}
// Create or update each component.
for _, cs := range spec.GetComponents() {
containerName := spec.GetName() + "-" + cs.GetName()
containerNames = append(containerNames, containerName)
comp := protoToComponent(spec.GetName(), cs, desiredState)
if componentExists(a.DB, spec.GetName(), cs.GetName()) {
if err := registry.UpdateComponentSpec(a.DB, comp); err != nil {
return nil, nil, status.Errorf(codes.Internal, "update component %q/%q: %v", spec.GetName(), cs.GetName(), err)
}
if err := registry.UpdateComponentState(a.DB, spec.GetName(), cs.GetName(), desiredState, ""); err != nil {
return nil, nil, status.Errorf(codes.Internal, "update component state %q/%q: %v", spec.GetName(), cs.GetName(), err)
}
changes = append(changes, fmt.Sprintf("updated %s", cs.GetName()))
} else {
if err := registry.CreateComponent(a.DB, comp); err != nil {
return nil, nil, status.Errorf(codes.Internal, "create component %q/%q: %v", spec.GetName(), cs.GetName(), err)
}
changes = append(changes, fmt.Sprintf("created %s", cs.GetName()))
}
}
result.Changed = len(changes) > 0
result.Summary = strings.Join(changes, "; ")
if !result.Changed {
result.Summary = "no changes"
}
a.Logger.Info("sync service", "service", spec.GetName(), "changed", result.Changed, "summary", result.Summary)
return result, containerNames, nil
}
// reconcileUntracked lists all containers from the runtime and adds any that
// are not already tracked in the registry with desired_state "ignore".
func (a *Agent) reconcileUntracked(ctx context.Context, known map[string]bool) error {
containers, err := a.Runtime.List(ctx)
if err != nil {
return fmt.Errorf("list containers: %w", err)
}
for _, c := range containers {
if known[c.Name] {
continue
}
service, component, ok := parseContainerName(c.Name)
if !ok {
continue
}
if componentExists(a.DB, service, component) {
continue
}
if _, err := registry.GetService(a.DB, service); err != nil {
if err := registry.CreateService(a.DB, service, true); err != nil {
a.Logger.Info("reconcile: create service failed", "service", service, "error", err)
continue
}
}
comp := &registry.Component{
Name: component,
Service: service,
Image: c.Image,
Network: c.Network,
UserSpec: c.User,
Restart: c.Restart,
DesiredState: "ignore",
ObservedState: c.State,
Version: runtime.ExtractVersion(c.Image),
}
if err := registry.CreateComponent(a.DB, comp); err != nil {
a.Logger.Info("reconcile: create component failed", "container", c.Name, "error", err)
continue
}
a.Logger.Info("reconcile: adopted untracked container", "container", c.Name, "desired_state", "ignore")
}
return nil
}
// protoToComponent converts a proto ComponentSpec to a registry Component.
func protoToComponent(service string, cs *mcpv1.ComponentSpec, desiredState string) *registry.Component {
return &registry.Component{
Name: cs.GetName(),
Service: service,
Image: cs.GetImage(),
Network: cs.GetNetwork(),
UserSpec: cs.GetUser(),
Restart: cs.GetRestart(),
Ports: cs.GetPorts(),
Volumes: cs.GetVolumes(),
Cmd: cs.GetCmd(),
DesiredState: desiredState,
Version: runtime.ExtractVersion(cs.GetImage()),
}
}
// parseContainerName splits "service-component" into its parts. Returns false
// if the name does not contain a hyphen.
func parseContainerName(name string) (service, component string, ok bool) {
i := strings.IndexByte(name, '-')
if i < 0 {
return "", "", false
}
return name[:i], name[i+1:], true
}

View File

@@ -0,0 +1,61 @@
package agent
import (
"context"
"log/slog"
"path/filepath"
"testing"
"git.wntrmute.dev/kyle/mcp/internal/config"
"git.wntrmute.dev/kyle/mcp/internal/registry"
"git.wntrmute.dev/kyle/mcp/internal/runtime"
)
// fakeRuntime implements runtime.Runtime for testing.
type fakeRuntime struct {
containers []runtime.ContainerInfo
inspectMap map[string]runtime.ContainerInfo
listErr error
}
func (f *fakeRuntime) Pull(_ context.Context, _ string) error { return nil }
func (f *fakeRuntime) Run(_ context.Context, _ runtime.ContainerSpec) error { return nil }
func (f *fakeRuntime) Stop(_ context.Context, _ string) error { return nil }
func (f *fakeRuntime) Remove(_ context.Context, _ string) error { return nil }
func (f *fakeRuntime) List(_ context.Context) ([]runtime.ContainerInfo, error) {
return f.containers, f.listErr
}
func (f *fakeRuntime) Inspect(_ context.Context, name string) (runtime.ContainerInfo, error) {
if f.inspectMap != nil {
if info, ok := f.inspectMap[name]; ok {
return info, nil
}
}
for _, c := range f.containers {
if c.Name == name {
return c, nil
}
}
return runtime.ContainerInfo{}, nil
}
// newTestAgent creates an Agent with a temp database for testing.
func newTestAgent(t *testing.T, rt runtime.Runtime) *Agent {
t.Helper()
db, err := registry.Open(filepath.Join(t.TempDir(), "test.db"))
if err != nil {
t.Fatalf("open test db: %v", err)
}
t.Cleanup(func() { _ = db.Close() })
return &Agent{
Config: &config.AgentConfig{
Agent: config.AgentSettings{NodeName: "test-node"},
},
DB: db,
Runtime: rt,
Logger: slog.Default(),
}
}

View File

@@ -0,0 +1,113 @@
package monitor
import (
"database/sql"
"fmt"
"log/slog"
"os"
"os/exec"
"time"
"git.wntrmute.dev/kyle/mcp/internal/config"
"git.wntrmute.dev/kyle/mcp/internal/registry"
)
// Alerter evaluates state transitions and fires alerts for drift or flapping.
type Alerter struct {
command []string
cooldown time.Duration
flapThreshold int
flapWindow time.Duration
nodeName string
db *sql.DB
logger *slog.Logger
lastAlert map[string]time.Time // key: "service/component"
}
// NewAlerter creates an Alerter from monitoring configuration.
func NewAlerter(cfg config.MonitorConfig, nodeName string, db *sql.DB, logger *slog.Logger) *Alerter {
return &Alerter{
command: cfg.AlertCommand,
cooldown: cfg.Cooldown.Duration,
flapThreshold: cfg.FlapThreshold,
flapWindow: cfg.FlapWindow.Duration,
nodeName: nodeName,
db: db,
logger: logger,
lastAlert: make(map[string]time.Time),
}
}
// Evaluate checks a component's state transition and fires alerts as needed.
// It is called for every component on each monitor tick.
func (al *Alerter) Evaluate(service, component, desiredState, observedState, prevState string) {
if desiredState == "ignore" {
return
}
key := service + "/" + component
// Drift check: desired state does not match observed state.
if desiredState != observedState {
if al.cooledDown(key) {
al.fire("drift", service, component, desiredState, observedState, prevState, 0)
}
}
// Flap check: too many transitions in the flap window.
if observedState != prevState {
count, err := registry.CountEvents(al.db, service, component, time.Now().Add(-al.flapWindow))
if err != nil {
al.logger.Error("alerter: count events", "error", err, "key", key)
return
}
if count >= al.flapThreshold {
if al.cooledDown(key) {
al.fire("flapping", service, component, desiredState, observedState, prevState, count)
}
}
}
}
// cooledDown returns true and records the alert time if enough time has
// elapsed since the last alert for this key. Returns false if suppressed.
func (al *Alerter) cooledDown(key string) bool {
if last, ok := al.lastAlert[key]; ok {
if time.Since(last) < al.cooldown {
return false
}
}
al.lastAlert[key] = time.Now()
return true
}
func (al *Alerter) fire(alertType, service, component, desired, observed, prev string, transitions int) {
al.logger.Warn("alert",
"type", alertType,
"service", service,
"component", component,
"desired", desired,
"observed", observed,
)
if len(al.command) == 0 {
return
}
cmd := exec.Command(al.command[0], al.command[1:]...) //nolint:gosec // alert command from trusted config
cmd.Env = append(os.Environ(),
"MCP_COMPONENT="+component,
"MCP_SERVICE="+service,
"MCP_NODE="+al.nodeName,
"MCP_DESIRED="+desired,
"MCP_OBSERVED="+observed,
"MCP_PREV_STATE="+prev,
"MCP_ALERT_TYPE="+alertType,
"MCP_TRANSITIONS="+fmt.Sprintf("%d", transitions),
)
if out, err := cmd.CombinedOutput(); err != nil {
al.logger.Error("alert command failed", "error", err, "output", string(out))
}
}

157
internal/monitor/monitor.go Normal file
View File

@@ -0,0 +1,157 @@
package monitor
import (
"context"
"database/sql"
"fmt"
"log/slog"
"time"
"git.wntrmute.dev/kyle/mcp/internal/config"
"git.wntrmute.dev/kyle/mcp/internal/registry"
"git.wntrmute.dev/kyle/mcp/internal/runtime"
)
// Monitor watches container states and compares them to the registry,
// recording events and firing alerts on drift or flapping.
type Monitor struct {
db *sql.DB
runtime runtime.Runtime
cfg config.MonitorConfig
logger *slog.Logger
alerter *Alerter
stopCh chan struct{}
done chan struct{}
prevState map[string]string // key: "service/component", value: observed state
}
// New creates a Monitor with the given dependencies.
func New(db *sql.DB, rt runtime.Runtime, cfg config.MonitorConfig, nodeName string, logger *slog.Logger) *Monitor {
return &Monitor{
db: db,
runtime: rt,
cfg: cfg,
logger: logger,
alerter: NewAlerter(cfg, nodeName, db, logger),
stopCh: make(chan struct{}),
done: make(chan struct{}),
prevState: make(map[string]string),
}
}
// Start launches the monitoring goroutine.
func (m *Monitor) Start() {
go m.run()
}
// Stop signals the monitoring goroutine to stop and waits for it to exit.
func (m *Monitor) Stop() {
close(m.stopCh)
<-m.done
}
func (m *Monitor) run() {
defer close(m.done)
defer func() {
if r := recover(); r != nil {
m.logger.Error("monitor panic recovered", "panic", fmt.Sprintf("%v", r))
}
}()
ticker := time.NewTicker(m.cfg.Interval.Duration)
defer ticker.Stop()
for {
select {
case <-m.stopCh:
return
case <-ticker.C:
m.tick()
}
}
}
func (m *Monitor) tick() {
defer func() {
if r := recover(); r != nil {
m.logger.Error("monitor tick panic recovered", "panic", fmt.Sprintf("%v", r))
}
}()
ctx := context.Background()
// Get the current runtime state of all containers.
containers, err := m.runtime.List(ctx)
if err != nil {
m.logger.Error("monitor: list containers", "error", err)
return
}
// Index runtime containers by name for fast lookup.
runtimeState := make(map[string]string, len(containers))
for _, c := range containers {
runtimeState[c.Name] = c.State
}
// Walk all registered services and their components.
services, err := registry.ListServices(m.db)
if err != nil {
m.logger.Error("monitor: list services", "error", err)
return
}
seen := make(map[string]struct{})
for _, svc := range services {
components, err := registry.ListComponents(m.db, svc.Name)
if err != nil {
m.logger.Error("monitor: list components", "error", err, "service", svc.Name)
continue
}
for _, comp := range components {
key := comp.Service + "/" + comp.Name
seen[key] = struct{}{}
containerName := comp.Service + "-" + comp.Name
observed := "unknown"
if state, ok := runtimeState[containerName]; ok {
observed = state
}
prev, hasPrev := m.prevState[key]
if !hasPrev {
prev = comp.ObservedState
}
if observed != prev {
if err := registry.InsertEvent(m.db, comp.Service, comp.Name, prev, observed); err != nil {
m.logger.Error("monitor: insert event", "error", err, "key", key)
}
if err := registry.UpdateComponentState(m.db, comp.Service, comp.Name, "", observed); err != nil {
m.logger.Error("monitor: update observed state", "error", err, "key", key)
}
m.logger.Info("state change", "service", comp.Service, "component", comp.Name, "prev", prev, "observed", observed)
}
m.alerter.Evaluate(comp.Service, comp.Name, comp.DesiredState, observed, prev)
m.prevState[key] = observed
}
}
// Evict entries for components that no longer exist in the registry.
for key := range m.prevState {
if _, ok := seen[key]; !ok {
delete(m.prevState, key)
}
}
// Prune old events.
if _, err := registry.PruneEvents(m.db, time.Now().Add(-m.cfg.Retention.Duration)); err != nil {
m.logger.Error("monitor: prune events", "error", err)
}
}

View File

@@ -0,0 +1,280 @@
package monitor
import (
"context"
"database/sql"
"log/slog"
"os"
"path/filepath"
"testing"
"time"
"git.wntrmute.dev/kyle/mcp/internal/config"
"git.wntrmute.dev/kyle/mcp/internal/registry"
"git.wntrmute.dev/kyle/mcp/internal/runtime"
)
func openTestDB(t *testing.T) *sql.DB {
t.Helper()
db, err := registry.Open(filepath.Join(t.TempDir(), "test.db"))
if err != nil {
t.Fatalf("open db: %v", err)
}
t.Cleanup(func() { _ = db.Close() })
return db
}
func testLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
}
func testMonitorConfig() config.MonitorConfig {
return config.MonitorConfig{
Interval: config.Duration{Duration: 1 * time.Second},
Cooldown: config.Duration{Duration: 1 * time.Minute},
FlapThreshold: 3,
FlapWindow: config.Duration{Duration: 10 * time.Minute},
Retention: config.Duration{Duration: 24 * time.Hour},
}
}
// fakeRuntime implements runtime.Runtime for testing.
type fakeRuntime struct {
containers []runtime.ContainerInfo
}
func (f *fakeRuntime) Pull(_ context.Context, _ string) error { return nil }
func (f *fakeRuntime) Run(_ context.Context, _ runtime.ContainerSpec) error { return nil }
func (f *fakeRuntime) Stop(_ context.Context, _ string) error { return nil }
func (f *fakeRuntime) Remove(_ context.Context, _ string) error { return nil }
func (f *fakeRuntime) Inspect(_ context.Context, _ string) (runtime.ContainerInfo, error) {
return runtime.ContainerInfo{}, nil
}
func (f *fakeRuntime) List(_ context.Context) ([]runtime.ContainerInfo, error) {
return f.containers, nil
}
func TestAlerterDriftDetection(t *testing.T) {
db := openTestDB(t)
logger := testLogger()
cfg := testMonitorConfig()
al := NewAlerter(cfg, "test-node", db, logger)
// Set up a service and component so CountEvents works.
if err := registry.CreateService(db, "metacrypt", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(db, &registry.Component{
Name: "api", Service: "metacrypt", Image: "img:v1",
Restart: "unless-stopped", DesiredState: "running", ObservedState: "running",
}); err != nil {
t.Fatalf("create component: %v", err)
}
// Desired is "running" but observed is "exited" -- drift should fire.
al.Evaluate("metacrypt", "api", "running", "exited", "running")
// Verify alert was recorded (lastAlert should be set).
key := "metacrypt/api"
if _, ok := al.lastAlert[key]; !ok {
t.Fatal("expected drift alert to be recorded in lastAlert")
}
}
func TestAlerterIgnoreState(t *testing.T) {
db := openTestDB(t)
logger := testLogger()
cfg := testMonitorConfig()
al := NewAlerter(cfg, "test-node", db, logger)
// Components with desired_state "ignore" should not trigger alerts.
al.Evaluate("metacrypt", "api", "ignore", "exited", "running")
key := "metacrypt/api"
if _, ok := al.lastAlert[key]; ok {
t.Fatal("expected no alert for ignored component")
}
}
func TestAlerterCooldownSuppression(t *testing.T) {
db := openTestDB(t)
logger := testLogger()
cfg := testMonitorConfig()
cfg.Cooldown.Duration = 1 * time.Hour // long cooldown
al := NewAlerter(cfg, "test-node", db, logger)
if err := registry.CreateService(db, "metacrypt", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(db, &registry.Component{
Name: "api", Service: "metacrypt", Image: "img:v1",
Restart: "unless-stopped", DesiredState: "running", ObservedState: "running",
}); err != nil {
t.Fatalf("create component: %v", err)
}
// First call should fire.
al.Evaluate("metacrypt", "api", "running", "exited", "running")
key := "metacrypt/api"
first, ok := al.lastAlert[key]
if !ok {
t.Fatal("expected first alert to fire")
}
// Second call should be suppressed (within cooldown).
al.Evaluate("metacrypt", "api", "running", "exited", "exited")
second := al.lastAlert[key]
if !second.Equal(first) {
t.Fatal("expected second alert to be suppressed by cooldown")
}
}
func TestAlerterFlapDetection(t *testing.T) {
db := openTestDB(t)
logger := testLogger()
cfg := testMonitorConfig()
cfg.FlapThreshold = 2
cfg.FlapWindow.Duration = 10 * time.Minute
cfg.Cooldown.Duration = 0 // disable cooldown for this test
al := NewAlerter(cfg, "test-node", db, logger)
if err := registry.CreateService(db, "metacrypt", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(db, &registry.Component{
Name: "api", Service: "metacrypt", Image: "img:v1",
Restart: "unless-stopped", DesiredState: "running", ObservedState: "unknown",
}); err != nil {
t.Fatalf("create component: %v", err)
}
// Insert enough events to exceed the flap threshold.
for i := 0; i < 3; i++ {
if err := registry.InsertEvent(db, "metacrypt", "api", "running", "exited"); err != nil {
t.Fatalf("insert event %d: %v", i, err)
}
}
// Evaluate with a state transition -- should detect flapping.
al.Evaluate("metacrypt", "api", "running", "exited", "running")
key := "metacrypt/api"
if _, ok := al.lastAlert[key]; !ok {
t.Fatal("expected flap alert to fire")
}
}
func TestMonitorTickStateChange(t *testing.T) {
db := openTestDB(t)
logger := testLogger()
cfg := testMonitorConfig()
if err := registry.CreateService(db, "metacrypt", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(db, &registry.Component{
Name: "api", Service: "metacrypt", Image: "img:v1",
Restart: "unless-stopped", DesiredState: "running", ObservedState: "unknown",
}); err != nil {
t.Fatalf("create component: %v", err)
}
rt := &fakeRuntime{
containers: []runtime.ContainerInfo{
{Name: "metacrypt-api", State: "running"},
},
}
m := New(db, rt, cfg, "test-node", logger)
// Run a single tick.
m.tick()
// Verify observed state was updated in the registry.
comp, err := registry.GetComponent(db, "metacrypt", "api")
if err != nil {
t.Fatalf("get component: %v", err)
}
if comp.ObservedState != "running" {
t.Fatalf("observed state: got %q, want %q", comp.ObservedState, "running")
}
// Verify an event was recorded (unknown -> running).
events, err := registry.QueryEvents(db, "metacrypt", "api", time.Now().Add(-1*time.Hour), 0)
if err != nil {
t.Fatalf("query events: %v", err)
}
if len(events) != 1 {
t.Fatalf("events: got %d, want 1", len(events))
}
if events[0].PrevState != "unknown" || events[0].NewState != "running" {
t.Fatalf("event: got %q->%q, want unknown->running", events[0].PrevState, events[0].NewState)
}
// Verify prevState map was updated.
if m.prevState["metacrypt/api"] != "running" {
t.Fatalf("prevState: got %q, want %q", m.prevState["metacrypt/api"], "running")
}
}
func TestMonitorStartStop(t *testing.T) {
db := openTestDB(t)
logger := testLogger()
cfg := testMonitorConfig()
cfg.Interval.Duration = 50 * time.Millisecond
rt := &fakeRuntime{}
m := New(db, rt, cfg, "test-node", logger)
m.Start()
// Give it a moment to tick at least once.
time.Sleep(150 * time.Millisecond)
m.Stop()
// If Stop returns, the goroutine exited cleanly.
}
func TestMonitorNoChangeNoEvent(t *testing.T) {
db := openTestDB(t)
logger := testLogger()
cfg := testMonitorConfig()
if err := registry.CreateService(db, "metacrypt", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(db, &registry.Component{
Name: "api", Service: "metacrypt", Image: "img:v1",
Restart: "unless-stopped", DesiredState: "running", ObservedState: "running",
}); err != nil {
t.Fatalf("create component: %v", err)
}
rt := &fakeRuntime{
containers: []runtime.ContainerInfo{
{Name: "metacrypt-api", State: "running"},
},
}
m := New(db, rt, cfg, "test-node", logger)
// Seed prevState so that observed == prev (no change).
m.prevState["metacrypt/api"] = "running"
m.tick()
// No events should be recorded when state is unchanged.
events, err := registry.QueryEvents(db, "metacrypt", "api", time.Now().Add(-1*time.Hour), 0)
if err != nil {
t.Fatalf("query events: %v", err)
}
if len(events) != 0 {
t.Fatalf("events: got %d, want 0 (no state change)", len(events))
}
}