Two drift-reporting bugs: 1. The monitor listed only the podman runtime, so unikernel VMs always showed observed=unknown (false drift). It now takes a ContainerLister and the agent passes a merged lister (containers + VMs), mirroring listAllContainers. 2. The monitor computed the lookup name as service+"-"+component, which is wrong when component==service (the name collapses to just the service, e.g. "uktest"/"mc-proxy"). It now uses the canonical naming.ContainerNameFor — extracted to a shared package so the agent and monitor can't disagree. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
167 lines
4.3 KiB
Go
167 lines
4.3 KiB
Go
package monitor
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"git.wntrmute.dev/mc/mcp/internal/config"
|
|
"git.wntrmute.dev/mc/mcp/internal/naming"
|
|
"git.wntrmute.dev/mc/mcp/internal/registry"
|
|
"git.wntrmute.dev/mc/mcp/internal/runtime"
|
|
)
|
|
|
|
// ContainerLister reports the observed containers on the node. A full
|
|
// runtime.Runtime satisfies it; the agent passes a lister that merges the
|
|
// container and unikernel runtimes so the monitor sees VMs too.
|
|
type ContainerLister interface {
|
|
List(ctx context.Context) ([]runtime.ContainerInfo, error)
|
|
}
|
|
|
|
// Monitor watches container states and compares them to the registry,
|
|
// recording events and firing alerts on drift or flapping.
|
|
type Monitor struct {
|
|
db *sql.DB
|
|
lister ContainerLister
|
|
cfg config.MonitorConfig
|
|
logger *slog.Logger
|
|
alerter *Alerter
|
|
stopCh chan struct{}
|
|
done chan struct{}
|
|
|
|
prevState map[string]string // key: "service/component", value: observed state
|
|
}
|
|
|
|
// New creates a Monitor with the given dependencies. lister reports observed
|
|
// containers (and unikernel VMs, via a merged lister).
|
|
func New(db *sql.DB, lister ContainerLister, cfg config.MonitorConfig, nodeName string, logger *slog.Logger) *Monitor {
|
|
return &Monitor{
|
|
db: db,
|
|
lister: lister,
|
|
cfg: cfg,
|
|
logger: logger,
|
|
alerter: NewAlerter(cfg, nodeName, db, logger),
|
|
stopCh: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
prevState: make(map[string]string),
|
|
}
|
|
}
|
|
|
|
// Start launches the monitoring goroutine.
|
|
func (m *Monitor) Start() {
|
|
go m.run()
|
|
}
|
|
|
|
// Stop signals the monitoring goroutine to stop and waits for it to exit.
|
|
func (m *Monitor) Stop() {
|
|
close(m.stopCh)
|
|
<-m.done
|
|
}
|
|
|
|
func (m *Monitor) run() {
|
|
defer close(m.done)
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
m.logger.Error("monitor panic recovered", "panic", fmt.Sprintf("%v", r))
|
|
}
|
|
}()
|
|
|
|
ticker := time.NewTicker(m.cfg.Interval.Duration)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-m.stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
m.tick()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Monitor) tick() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
m.logger.Error("monitor tick panic recovered", "panic", fmt.Sprintf("%v", r))
|
|
}
|
|
}()
|
|
|
|
ctx := context.Background()
|
|
|
|
// Get the current runtime state of all containers.
|
|
containers, err := m.lister.List(ctx)
|
|
if err != nil {
|
|
m.logger.Error("monitor: list containers", "error", err)
|
|
return
|
|
}
|
|
|
|
// Index runtime containers by name for fast lookup.
|
|
runtimeState := make(map[string]string, len(containers))
|
|
for _, c := range containers {
|
|
runtimeState[c.Name] = c.State
|
|
}
|
|
|
|
// Walk all registered services and their components.
|
|
services, err := registry.ListServices(m.db)
|
|
if err != nil {
|
|
m.logger.Error("monitor: list services", "error", err)
|
|
return
|
|
}
|
|
|
|
seen := make(map[string]struct{})
|
|
|
|
for _, svc := range services {
|
|
components, err := registry.ListComponents(m.db, svc.Name)
|
|
if err != nil {
|
|
m.logger.Error("monitor: list components", "error", err, "service", svc.Name)
|
|
continue
|
|
}
|
|
|
|
for _, comp := range components {
|
|
key := comp.Service + "/" + comp.Name
|
|
seen[key] = struct{}{}
|
|
containerName := naming.ContainerNameFor(comp.Service, comp.Name)
|
|
|
|
observed := "unknown"
|
|
if state, ok := runtimeState[containerName]; ok {
|
|
observed = state
|
|
}
|
|
|
|
prev, hasPrev := m.prevState[key]
|
|
if !hasPrev {
|
|
prev = comp.ObservedState
|
|
}
|
|
|
|
if observed != prev {
|
|
if err := registry.InsertEvent(m.db, comp.Service, comp.Name, prev, observed); err != nil {
|
|
m.logger.Error("monitor: insert event", "error", err, "key", key)
|
|
}
|
|
|
|
if err := registry.UpdateComponentState(m.db, comp.Service, comp.Name, "", observed); err != nil {
|
|
m.logger.Error("monitor: update observed state", "error", err, "key", key)
|
|
}
|
|
|
|
m.logger.Info("state change", "service", comp.Service, "component", comp.Name, "prev", prev, "observed", observed)
|
|
}
|
|
|
|
m.alerter.Evaluate(comp.Service, comp.Name, comp.DesiredState, observed, prev)
|
|
|
|
m.prevState[key] = observed
|
|
}
|
|
}
|
|
|
|
// Evict entries for components that no longer exist in the registry.
|
|
for key := range m.prevState {
|
|
if _, ok := seen[key]; !ok {
|
|
delete(m.prevState, key)
|
|
}
|
|
}
|
|
|
|
// Prune old events.
|
|
if _, err := registry.PruneEvents(m.db, time.Now().Add(-m.cfg.Retention.Duration)); err != nil {
|
|
m.logger.Error("monitor: prune events", "error", err)
|
|
}
|
|
}
|