Files
mcp/internal/monitor/monitor.go
Kyle Isom 08b3e2a472 Migrate module path from kyle/ to mc/ org
All import paths updated to git.wntrmute.dev/mc/. Bumps mcdsl to v1.2.0,
mc-proxy to v1.1.0.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 02:07:42 -07:00

158 lines
3.8 KiB
Go

package monitor
import (
"context"
"database/sql"
"fmt"
"log/slog"
"time"
"git.wntrmute.dev/mc/mcp/internal/config"
"git.wntrmute.dev/mc/mcp/internal/registry"
"git.wntrmute.dev/mc/mcp/internal/runtime"
)
// Monitor watches container states and compares them to the registry,
// recording events and firing alerts on drift or flapping.
type Monitor struct {
db *sql.DB
runtime runtime.Runtime
cfg config.MonitorConfig
logger *slog.Logger
alerter *Alerter
stopCh chan struct{}
done chan struct{}
prevState map[string]string // key: "service/component", value: observed state
}
// New creates a Monitor with the given dependencies.
func New(db *sql.DB, rt runtime.Runtime, cfg config.MonitorConfig, nodeName string, logger *slog.Logger) *Monitor {
return &Monitor{
db: db,
runtime: rt,
cfg: cfg,
logger: logger,
alerter: NewAlerter(cfg, nodeName, db, logger),
stopCh: make(chan struct{}),
done: make(chan struct{}),
prevState: make(map[string]string),
}
}
// Start launches the monitoring goroutine.
func (m *Monitor) Start() {
go m.run()
}
// Stop signals the monitoring goroutine to stop and waits for it to exit.
func (m *Monitor) Stop() {
close(m.stopCh)
<-m.done
}
func (m *Monitor) run() {
defer close(m.done)
defer func() {
if r := recover(); r != nil {
m.logger.Error("monitor panic recovered", "panic", fmt.Sprintf("%v", r))
}
}()
ticker := time.NewTicker(m.cfg.Interval.Duration)
defer ticker.Stop()
for {
select {
case <-m.stopCh:
return
case <-ticker.C:
m.tick()
}
}
}
func (m *Monitor) tick() {
defer func() {
if r := recover(); r != nil {
m.logger.Error("monitor tick panic recovered", "panic", fmt.Sprintf("%v", r))
}
}()
ctx := context.Background()
// Get the current runtime state of all containers.
containers, err := m.runtime.List(ctx)
if err != nil {
m.logger.Error("monitor: list containers", "error", err)
return
}
// Index runtime containers by name for fast lookup.
runtimeState := make(map[string]string, len(containers))
for _, c := range containers {
runtimeState[c.Name] = c.State
}
// Walk all registered services and their components.
services, err := registry.ListServices(m.db)
if err != nil {
m.logger.Error("monitor: list services", "error", err)
return
}
seen := make(map[string]struct{})
for _, svc := range services {
components, err := registry.ListComponents(m.db, svc.Name)
if err != nil {
m.logger.Error("monitor: list components", "error", err, "service", svc.Name)
continue
}
for _, comp := range components {
key := comp.Service + "/" + comp.Name
seen[key] = struct{}{}
containerName := comp.Service + "-" + comp.Name
observed := "unknown"
if state, ok := runtimeState[containerName]; ok {
observed = state
}
prev, hasPrev := m.prevState[key]
if !hasPrev {
prev = comp.ObservedState
}
if observed != prev {
if err := registry.InsertEvent(m.db, comp.Service, comp.Name, prev, observed); err != nil {
m.logger.Error("monitor: insert event", "error", err, "key", key)
}
if err := registry.UpdateComponentState(m.db, comp.Service, comp.Name, "", observed); err != nil {
m.logger.Error("monitor: update observed state", "error", err, "key", key)
}
m.logger.Info("state change", "service", comp.Service, "component", comp.Name, "prev", prev, "observed", observed)
}
m.alerter.Evaluate(comp.Service, comp.Name, comp.DesiredState, observed, prev)
m.prevState[key] = observed
}
}
// Evict entries for components that no longer exist in the registry.
for key := range m.prevState {
if _, ok := seen[key]; !ok {
delete(m.prevState, key)
}
}
// Prune old events.
if _, err := registry.PruneEvents(m.db, time.Now().Add(-m.cfg.Retention.Duration)); err != nil {
m.logger.Error("monitor: prune events", "error", err)
}
}