package monitor import ( "context" "database/sql" "fmt" "log/slog" "time" "git.wntrmute.dev/mc/mcp/internal/config" "git.wntrmute.dev/mc/mcp/internal/registry" "git.wntrmute.dev/mc/mcp/internal/runtime" ) // Monitor watches container states and compares them to the registry, // recording events and firing alerts on drift or flapping. type Monitor struct { db *sql.DB runtime runtime.Runtime cfg config.MonitorConfig logger *slog.Logger alerter *Alerter stopCh chan struct{} done chan struct{} prevState map[string]string // key: "service/component", value: observed state } // New creates a Monitor with the given dependencies. func New(db *sql.DB, rt runtime.Runtime, cfg config.MonitorConfig, nodeName string, logger *slog.Logger) *Monitor { return &Monitor{ db: db, runtime: rt, cfg: cfg, logger: logger, alerter: NewAlerter(cfg, nodeName, db, logger), stopCh: make(chan struct{}), done: make(chan struct{}), prevState: make(map[string]string), } } // Start launches the monitoring goroutine. func (m *Monitor) Start() { go m.run() } // Stop signals the monitoring goroutine to stop and waits for it to exit. func (m *Monitor) Stop() { close(m.stopCh) <-m.done } func (m *Monitor) run() { defer close(m.done) defer func() { if r := recover(); r != nil { m.logger.Error("monitor panic recovered", "panic", fmt.Sprintf("%v", r)) } }() ticker := time.NewTicker(m.cfg.Interval.Duration) defer ticker.Stop() for { select { case <-m.stopCh: return case <-ticker.C: m.tick() } } } func (m *Monitor) tick() { defer func() { if r := recover(); r != nil { m.logger.Error("monitor tick panic recovered", "panic", fmt.Sprintf("%v", r)) } }() ctx := context.Background() // Get the current runtime state of all containers. containers, err := m.runtime.List(ctx) if err != nil { m.logger.Error("monitor: list containers", "error", err) return } // Index runtime containers by name for fast lookup. runtimeState := make(map[string]string, len(containers)) for _, c := range containers { runtimeState[c.Name] = c.State } // Walk all registered services and their components. services, err := registry.ListServices(m.db) if err != nil { m.logger.Error("monitor: list services", "error", err) return } seen := make(map[string]struct{}) for _, svc := range services { components, err := registry.ListComponents(m.db, svc.Name) if err != nil { m.logger.Error("monitor: list components", "error", err, "service", svc.Name) continue } for _, comp := range components { key := comp.Service + "/" + comp.Name seen[key] = struct{}{} containerName := comp.Service + "-" + comp.Name observed := "unknown" if state, ok := runtimeState[containerName]; ok { observed = state } prev, hasPrev := m.prevState[key] if !hasPrev { prev = comp.ObservedState } if observed != prev { if err := registry.InsertEvent(m.db, comp.Service, comp.Name, prev, observed); err != nil { m.logger.Error("monitor: insert event", "error", err, "key", key) } if err := registry.UpdateComponentState(m.db, comp.Service, comp.Name, "", observed); err != nil { m.logger.Error("monitor: update observed state", "error", err, "key", key) } m.logger.Info("state change", "service", comp.Service, "component", comp.Name, "prev", prev, "observed", observed) } m.alerter.Evaluate(comp.Service, comp.Name, comp.DesiredState, observed, prev) m.prevState[key] = observed } } // Evict entries for components that no longer exist in the registry. for key := range m.prevState { if _, ok := seen[key]; !ok { delete(m.prevState, key) } } // Prune old events. if _, err := registry.PruneEvents(m.db, time.Now().Add(-m.cfg.Retention.Duration)); err != nil { m.logger.Error("monitor: prune events", "error", err) } }