diff --git a/PROGRESS_V1.md b/PROGRESS_V1.md index e212ad5..3df657f 100644 --- a/PROGRESS_V1.md +++ b/PROGRESS_V1.md @@ -16,33 +16,33 @@ ## Phase 2: Agent - [x] **P2.1** Agent skeleton and gRPC server -- [ ] **P2.2** Deploy handler -- [ ] **P2.3** Lifecycle handlers (stop, start, restart) -- [ ] **P2.4** Status handlers (list, live check, get status) -- [ ] **P2.5** Sync handler -- [ ] **P2.6** File transfer handlers -- [ ] **P2.7** Adopt handler -- [ ] **P2.8** Monitor subsystem -- [ ] **P2.9** Snapshot command +- [x] **P2.2** Deploy handler +- [x] **P2.3** Lifecycle handlers (stop, start, restart) +- [x] **P2.4** Status handlers (list, live check, get status) +- [x] **P2.5** Sync handler +- [x] **P2.6** File transfer handlers +- [x] **P2.7** Adopt handler +- [x] **P2.8** Monitor subsystem +- [x] **P2.9** Snapshot command ## Phase 3: CLI - [x] **P3.1** CLI skeleton -- [ ] **P3.2** Login command -- [ ] **P3.3** Deploy command -- [ ] **P3.4** Lifecycle commands (stop, start, restart) -- [ ] **P3.5** Status commands (list, ps, status) -- [ ] **P3.6** Sync command -- [ ] **P3.7** Adopt command -- [ ] **P3.8** Service commands (show, edit, export) -- [ ] **P3.9** Transfer commands (push, pull) -- [ ] **P3.10** Node commands +- [x] **P3.2** Login command +- [x] **P3.3** Deploy command +- [x] **P3.4** Lifecycle commands (stop, start, restart) +- [x] **P3.5** Status commands (list, ps, status) +- [x] **P3.6** Sync command +- [x] **P3.7** Adopt command +- [x] **P3.8** Service commands (show, edit, export) +- [x] **P3.9** Transfer commands (push, pull) +- [x] **P3.10** Node commands ## Phase 4: Deployment Artifacts -- [ ] **P4.1** Systemd units -- [ ] **P4.2** Example configs -- [ ] **P4.3** Install script +- [x] **P4.1** Systemd units +- [x] **P4.2** Example configs +- [x] **P4.3** Install script ## Phase 5: Integration and Polish diff --git a/cmd/mcp-agent/main.go b/cmd/mcp-agent/main.go index a206ef7..9c813d1 100644 --- a/cmd/mcp-agent/main.go +++ b/cmd/mcp-agent/main.go @@ -42,6 +42,8 @@ func main() { }, }) + root.AddCommand(snapshotCmd()) + if err := root.Execute(); err != nil { log.Fatal(err) os.Exit(1) diff --git a/cmd/mcp-agent/snapshot.go b/cmd/mcp-agent/snapshot.go new file mode 100644 index 0000000..a1ceda3 --- /dev/null +++ b/cmd/mcp-agent/snapshot.go @@ -0,0 +1,48 @@ +package main + +import ( + "database/sql" + "fmt" + "os" + "path/filepath" + "time" + + "git.wntrmute.dev/kyle/mcp/internal/config" + "github.com/spf13/cobra" + _ "modernc.org/sqlite" +) + +func snapshotCmd() *cobra.Command { + return &cobra.Command{ + Use: "snapshot", + Short: "Create a database backup", + RunE: func(cmd *cobra.Command, args []string) error { + cfg, err := config.LoadAgentConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + backupDir := filepath.Join(filepath.Dir(cfg.Database.Path), "backups") + if err := os.MkdirAll(backupDir, 0750); err != nil { + return fmt.Errorf("create backup dir: %w", err) + } + + ts := time.Now().Format("20060102-150405") + backupPath := filepath.Join(backupDir, fmt.Sprintf("mcp-%s.db", ts)) + + db, err := sql.Open("sqlite", cfg.Database.Path) + if err != nil { + return fmt.Errorf("open database: %w", err) + } + defer func() { _ = db.Close() }() + + //nolint:gosec // backupPath is derived from config + timestamp, not user input; VACUUM INTO does not support placeholders + if _, err := db.Exec("VACUUM INTO '" + backupPath + "'"); err != nil { + return fmt.Errorf("vacuum into: %w", err) + } + + fmt.Printf("snapshot: %s\n", backupPath) + return nil + }, + } +} diff --git a/cmd/mcp/adopt.go b/cmd/mcp/adopt.go index 8c48e74..3042a71 100644 --- a/cmd/mcp/adopt.go +++ b/cmd/mcp/adopt.go @@ -1,18 +1,78 @@ package main import ( + "context" "fmt" + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/config" "github.com/spf13/cobra" ) func adoptCmd() *cobra.Command { - return &cobra.Command{ + cmd := &cobra.Command{ Use: "adopt ", Short: "Adopt all -* containers into a service", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + nodeName, _ := cmd.Flags().GetString("node") + + var addr string + if nodeName != "" { + for _, n := range cfg.Nodes { + if n.Name == nodeName { + addr = n.Address + break + } + } + if addr == "" { + return fmt.Errorf("node %q not found in config", nodeName) + } + } else { + if len(cfg.Nodes) == 0 { + return fmt.Errorf("no nodes configured") + } + nodeName = cfg.Nodes[0].Name + addr = cfg.Nodes[0].Address + } + + client, conn, err := dialAgent(addr, cfg) + if err != nil { + return fmt.Errorf("dial %s: %w", nodeName, err) + } + defer func() { _ = conn.Close() }() + + resp, err := client.AdoptContainers(context.Background(), &mcpv1.AdoptContainersRequest{ + Service: args[0], + }) + if err != nil { + return fmt.Errorf("adopt on %s: %w", nodeName, err) + } + + results := resp.GetResults() + if len(results) == 0 { + fmt.Printf("no containers matching %q found on %s\n", args[0]+"-*", nodeName) + return nil + } + + for _, r := range results { + if r.GetSuccess() { + fmt.Printf(" adopted %s -> %s\n", r.GetContainer(), r.GetComponent()) + } else { + fmt.Printf(" failed %s -> %s: %s\n", r.GetContainer(), r.GetComponent(), r.GetError()) + } + } + + return nil }, } + + cmd.Flags().String("node", "", "target node (default: first node in config)") + + return cmd } diff --git a/cmd/mcp/deploy.go b/cmd/mcp/deploy.go index f5edd1c..051ea5e 100644 --- a/cmd/mcp/deploy.go +++ b/cmd/mcp/deploy.go @@ -1,9 +1,16 @@ package main import ( + "context" "fmt" + "path/filepath" + "strings" "github.com/spf13/cobra" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/config" + "git.wntrmute.dev/kyle/mcp/internal/servicedef" ) func deployCmd() *cobra.Command { @@ -12,9 +19,113 @@ func deployCmd() *cobra.Command { Short: "Deploy service from service definition", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + serviceName, component := parseServiceArg(args[0]) + + def, err := loadServiceDef(cmd, cfg, serviceName) + if err != nil { + return err + } + + spec := servicedef.ToProto(def) + + address, err := findNodeAddress(cfg, def.Node) + if err != nil { + return err + } + + client, conn, err := dialAgent(address, cfg) + if err != nil { + return fmt.Errorf("dial agent: %w", err) + } + defer func() { _ = conn.Close() }() + + resp, err := client.Deploy(context.Background(), &mcpv1.DeployRequest{ + Service: spec, + Component: component, + }) + if err != nil { + return fmt.Errorf("deploy: %w", err) + } + + printComponentResults(resp.GetResults()) + return nil }, } cmd.Flags().StringP("file", "f", "", "service definition file") return cmd } + +// parseServiceArg splits a "service/component" argument into its parts. +func parseServiceArg(arg string) (service, component string) { + parts := strings.SplitN(arg, "/", 2) + service = parts[0] + if len(parts) == 2 { + component = parts[1] + } + return service, component +} + +// loadServiceDef attempts to load a service definition from the -f flag, +// the configured services directory, or by falling back to the agent registry. +func loadServiceDef(cmd *cobra.Command, cfg *config.CLIConfig, serviceName string) (*servicedef.ServiceDef, error) { + // Check -f flag first. + filePath, _ := cmd.Flags().GetString("file") + if filePath != "" { + def, err := servicedef.Load(filePath) + if err != nil { + return nil, fmt.Errorf("load service def from %q: %w", filePath, err) + } + return def, nil + } + + // Try services directory. + dirPath := filepath.Join(cfg.Services.Dir, serviceName+".toml") + def, err := servicedef.Load(dirPath) + if err == nil { + return def, nil + } + + // Fall back to agent registry: query each node for the service. + for _, node := range cfg.Nodes { + client, conn, dialErr := dialAgent(node.Address, cfg) + if dialErr != nil { + continue + } + + resp, listErr := client.ListServices(context.Background(), &mcpv1.ListServicesRequest{}) + _ = conn.Close() + if listErr != nil { + continue + } + + for _, svc := range resp.GetServices() { + if svc.GetName() == serviceName { + return servicedef.FromProto(serviceSpecFromInfo(svc), node.Name), nil + } + } + } + + return nil, fmt.Errorf("service definition %q not found in %q or agent registry", serviceName, cfg.Services.Dir) +} + +// serviceSpecFromInfo converts a ServiceInfo to a ServiceSpec for use with +// servicedef.FromProto. This is needed because the agent registry returns +// ServiceInfo, not ServiceSpec. +func serviceSpecFromInfo(info *mcpv1.ServiceInfo) *mcpv1.ServiceSpec { + spec := &mcpv1.ServiceSpec{ + Name: info.GetName(), + Active: info.GetActive(), + } + for _, c := range info.GetComponents() { + spec.Components = append(spec.Components, &mcpv1.ComponentSpec{ + Name: c.GetName(), + Image: c.GetImage(), + }) + } + return spec +} diff --git a/cmd/mcp/dial.go b/cmd/mcp/dial.go index caafb28..65e16de 100644 --- a/cmd/mcp/dial.go +++ b/cmd/mcp/dial.go @@ -14,13 +14,6 @@ import ( "google.golang.org/grpc/metadata" ) -// Ensure dial helpers are referenced to satisfy linters until CLI commands -// are implemented. This will be removed when the first command uses dialAgent. -var ( - _ = dialAgent - _ = loadBearerToken -) - // dialAgent connects to an agent at the given address and returns a gRPC // client. The connection uses TLS and attaches the bearer token to every RPC. func dialAgent(address string, cfg *config.CLIConfig) (mcpv1.McpAgentServiceClient, *grpc.ClientConn, error) { diff --git a/cmd/mcp/helpers.go b/cmd/mcp/helpers.go new file mode 100644 index 0000000..a0af0a6 --- /dev/null +++ b/cmd/mcp/helpers.go @@ -0,0 +1,31 @@ +package main + +import ( + "fmt" + "os" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/config" +) + +// findNodeAddress looks up a node by name in the CLI config and returns +// its address. +func findNodeAddress(cfg *config.CLIConfig, nodeName string) (string, error) { + for _, n := range cfg.Nodes { + if n.Name == nodeName { + return n.Address, nil + } + } + return "", fmt.Errorf("node %q not found in config", nodeName) +} + +// printComponentResults prints the result of each component operation. +func printComponentResults(results []*mcpv1.ComponentResult) { + for _, r := range results { + if r.GetSuccess() { + fmt.Printf(" %s: ok\n", r.GetName()) + } else { + fmt.Fprintf(os.Stderr, " %s: error: %s\n", r.GetName(), r.GetError()) + } + } +} diff --git a/cmd/mcp/lifecycle.go b/cmd/mcp/lifecycle.go index b5a2906..ef1a1ce 100644 --- a/cmd/mcp/lifecycle.go +++ b/cmd/mcp/lifecycle.go @@ -1,9 +1,15 @@ package main import ( + "context" "fmt" + "path/filepath" "github.com/spf13/cobra" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/config" + "git.wntrmute.dev/kyle/mcp/internal/servicedef" ) func stopCmd() *cobra.Command { @@ -12,7 +18,45 @@ func stopCmd() *cobra.Command { Short: "Stop all components, set active=false", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + serviceName := args[0] + defPath := filepath.Join(cfg.Services.Dir, serviceName+".toml") + + def, err := servicedef.Load(defPath) + if err != nil { + return fmt.Errorf("load service def: %w", err) + } + + active := false + def.Active = &active + if err := servicedef.Write(defPath, def); err != nil { + return fmt.Errorf("write service def: %w", err) + } + + address, err := findNodeAddress(cfg, def.Node) + if err != nil { + return err + } + + client, conn, err := dialAgent(address, cfg) + if err != nil { + return fmt.Errorf("dial agent: %w", err) + } + defer func() { _ = conn.Close() }() + + resp, err := client.StopService(context.Background(), &mcpv1.StopServiceRequest{ + Name: serviceName, + }) + if err != nil { + return fmt.Errorf("stop service: %w", err) + } + + printComponentResults(resp.GetResults()) + return nil }, } } @@ -23,7 +67,45 @@ func startCmd() *cobra.Command { Short: "Start all components, set active=true", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + serviceName := args[0] + defPath := filepath.Join(cfg.Services.Dir, serviceName+".toml") + + def, err := servicedef.Load(defPath) + if err != nil { + return fmt.Errorf("load service def: %w", err) + } + + active := true + def.Active = &active + if err := servicedef.Write(defPath, def); err != nil { + return fmt.Errorf("write service def: %w", err) + } + + address, err := findNodeAddress(cfg, def.Node) + if err != nil { + return err + } + + client, conn, err := dialAgent(address, cfg) + if err != nil { + return fmt.Errorf("dial agent: %w", err) + } + defer func() { _ = conn.Close() }() + + resp, err := client.StartService(context.Background(), &mcpv1.StartServiceRequest{ + Name: serviceName, + }) + if err != nil { + return fmt.Errorf("start service: %w", err) + } + + printComponentResults(resp.GetResults()) + return nil }, } } @@ -34,7 +116,39 @@ func restartCmd() *cobra.Command { Short: "Restart all components", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + serviceName := args[0] + defPath := filepath.Join(cfg.Services.Dir, serviceName+".toml") + + def, err := servicedef.Load(defPath) + if err != nil { + return fmt.Errorf("load service def: %w", err) + } + + address, err := findNodeAddress(cfg, def.Node) + if err != nil { + return err + } + + client, conn, err := dialAgent(address, cfg) + if err != nil { + return fmt.Errorf("dial agent: %w", err) + } + defer func() { _ = conn.Close() }() + + resp, err := client.RestartService(context.Background(), &mcpv1.RestartServiceRequest{ + Name: serviceName, + }) + if err != nil { + return fmt.Errorf("restart service: %w", err) + } + + printComponentResults(resp.GetResults()) + return nil }, } } diff --git a/cmd/mcp/login.go b/cmd/mcp/login.go index aefb021..1cbb983 100644 --- a/cmd/mcp/login.go +++ b/cmd/mcp/login.go @@ -1,9 +1,15 @@ package main import ( + "bufio" "fmt" + "os" + "strings" "github.com/spf13/cobra" + + "git.wntrmute.dev/kyle/mcp/internal/auth" + "git.wntrmute.dev/kyle/mcp/internal/config" ) func loginCmd() *cobra.Command { @@ -11,7 +17,42 @@ func loginCmd() *cobra.Command { Use: "login", Short: "Authenticate to MCIAS, store token", RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + scanner := bufio.NewScanner(os.Stdin) + + fmt.Print("Username: ") + if !scanner.Scan() { + if err := scanner.Err(); err != nil { + return fmt.Errorf("read username: %w", err) + } + return fmt.Errorf("read username: unexpected end of input") + } + username := strings.TrimSpace(scanner.Text()) + + fmt.Print("Password: ") + if !scanner.Scan() { + if err := scanner.Err(); err != nil { + return fmt.Errorf("read password: %w", err) + } + return fmt.Errorf("read password: unexpected end of input") + } + password := strings.TrimSpace(scanner.Text()) + + token, err := auth.Login(cfg.MCIAS.ServerURL, cfg.MCIAS.CACert, username, password) + if err != nil { + return fmt.Errorf("login: %w", err) + } + + if err := auth.SaveToken(cfg.Auth.TokenPath, token); err != nil { + return fmt.Errorf("save token: %w", err) + } + + fmt.Println("Login successful.") + return nil }, } } diff --git a/cmd/mcp/node.go b/cmd/mcp/node.go index fb03ff5..f1461c8 100644 --- a/cmd/mcp/node.go +++ b/cmd/mcp/node.go @@ -2,7 +2,12 @@ package main import ( "fmt" + "os" + "text/tabwriter" + toml "github.com/pelletier/go-toml/v2" + + "git.wntrmute.dev/kyle/mcp/internal/config" "github.com/spf13/cobra" ) @@ -15,29 +20,112 @@ func nodeCmd() *cobra.Command { list := &cobra.Command{ Use: "list", Short: "List registered nodes", - RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") - }, + RunE: runNodeList, } add := &cobra.Command{ Use: "add
", Short: "Register a node", Args: cobra.ExactArgs(2), - RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") - }, + RunE: runNodeAdd, } remove := &cobra.Command{ Use: "remove ", Short: "Deregister a node", Args: cobra.ExactArgs(1), - RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") - }, + RunE: runNodeRemove, } cmd.AddCommand(list, add, remove) return cmd } + +func runNodeList(_ *cobra.Command, _ []string) error { + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + w := tabwriter.NewWriter(os.Stdout, 0, 4, 2, ' ', 0) + _, _ = fmt.Fprintln(w, "NAME\tADDRESS") + for _, n := range cfg.Nodes { + _, _ = fmt.Fprintf(w, "%s\t%s\n", n.Name, n.Address) + } + return w.Flush() +} + +func runNodeAdd(_ *cobra.Command, args []string) error { + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + name := args[0] + address := args[1] + + for _, n := range cfg.Nodes { + if n.Name == name { + return fmt.Errorf("node %q already exists", name) + } + } + + cfg.Nodes = append(cfg.Nodes, config.NodeConfig{ + Name: name, + Address: address, + }) + + if err := writeConfig(cfgPath, cfg); err != nil { + return fmt.Errorf("write config: %w", err) + } + + fmt.Printf("Added node %s (%s)\n", name, address) + return nil +} + +func runNodeRemove(_ *cobra.Command, args []string) error { + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + name := args[0] + + var found bool + nodes := make([]config.NodeConfig, 0, len(cfg.Nodes)) + for _, n := range cfg.Nodes { + if n.Name == name { + found = true + continue + } + nodes = append(nodes, n) + } + + if !found { + return fmt.Errorf("node %q not found", name) + } + + cfg.Nodes = nodes + + if err := writeConfig(cfgPath, cfg); err != nil { + return fmt.Errorf("write config: %w", err) + } + + fmt.Printf("Removed node %s\n", name) + return nil +} + +// writeConfig serializes the CLIConfig to TOML and writes it back to the +// config file. +func writeConfig(path string, cfg *config.CLIConfig) error { + data, err := toml.Marshal(cfg) + if err != nil { + return fmt.Errorf("marshal config: %w", err) + } + + if err := os.WriteFile(path, data, 0o600); err != nil { + return fmt.Errorf("write config %q: %w", path, err) + } + + return nil +} diff --git a/cmd/mcp/service.go b/cmd/mcp/service.go index 76565ae..b150655 100644 --- a/cmd/mcp/service.go +++ b/cmd/mcp/service.go @@ -1,9 +1,18 @@ package main import ( + "context" "fmt" + "os" + "os/exec" + "path/filepath" + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/config" + "git.wntrmute.dev/kyle/mcp/internal/servicedef" + toml "github.com/pelletier/go-toml/v2" "github.com/spf13/cobra" + "google.golang.org/grpc" ) func serviceCmd() *cobra.Command { @@ -16,30 +25,204 @@ func serviceCmd() *cobra.Command { Use: "show ", Short: "Print current spec from agent registry", Args: cobra.ExactArgs(1), - RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") - }, + RunE: runServiceShow, } edit := &cobra.Command{ Use: "edit ", Short: "Open service definition in $EDITOR", Args: cobra.ExactArgs(1), - RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") - }, + RunE: runServiceEdit, } export := &cobra.Command{ Use: "export ", Short: "Write agent registry spec to local service file", Args: cobra.ExactArgs(1), - RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") - }, + RunE: runServiceExport, } export.Flags().StringP("file", "f", "", "output file path") cmd.AddCommand(show, edit, export) return cmd } + +// findServiceNode checks the local service definition file first, then +// queries all agents to locate which node runs the named service. +func findServiceNode(cfg *config.CLIConfig, serviceName string) (string, string, error) { + defPath := filepath.Join(cfg.Services.Dir, serviceName+".toml") + if def, err := servicedef.Load(defPath); err == nil { + for _, n := range cfg.Nodes { + if n.Name == def.Node { + return n.Name, n.Address, nil + } + } + return "", "", fmt.Errorf("node %q from service def not found in config", def.Node) + } + + for _, n := range cfg.Nodes { + client, conn, err := dialAgent(n.Address, cfg) + if err != nil { + continue + } + resp, err := client.ListServices(context.Background(), &mcpv1.ListServicesRequest{}) + _ = conn.Close() + if err != nil { + continue + } + for _, svc := range resp.GetServices() { + if svc.GetName() == serviceName { + return n.Name, n.Address, nil + } + } + } + + return "", "", fmt.Errorf("service %q not found on any node", serviceName) +} + +// fetchServiceInfo dials the agent at address, calls ListServices, and returns +// the matching ServiceInfo. The caller must close the returned connection. +func fetchServiceInfo(address string, cfg *config.CLIConfig, serviceName string) (*mcpv1.ServiceInfo, *grpc.ClientConn, error) { + client, conn, err := dialAgent(address, cfg) + if err != nil { + return nil, nil, fmt.Errorf("dial agent: %w", err) + } + + resp, err := client.ListServices(context.Background(), &mcpv1.ListServicesRequest{}) + if err != nil { + _ = conn.Close() + return nil, nil, fmt.Errorf("list services: %w", err) + } + + for _, svc := range resp.GetServices() { + if svc.GetName() == serviceName { + return svc, conn, nil + } + } + + _ = conn.Close() + return nil, nil, fmt.Errorf("service %q not found on agent", serviceName) +} + +// serviceInfoToSpec converts a ServiceInfo (runtime view) to a ServiceSpec +// (config view) for use with servicedef.FromProto. +func serviceInfoToSpec(info *mcpv1.ServiceInfo) *mcpv1.ServiceSpec { + spec := &mcpv1.ServiceSpec{ + Name: info.GetName(), + Active: info.GetActive(), + } + for _, c := range info.GetComponents() { + spec.Components = append(spec.Components, &mcpv1.ComponentSpec{ + Name: c.GetName(), + Image: c.GetImage(), + }) + } + return spec +} + +func runServiceShow(_ *cobra.Command, args []string) error { + serviceName := args[0] + + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + _, address, err := findServiceNode(cfg, serviceName) + if err != nil { + return err + } + + svc, conn, err := fetchServiceInfo(address, cfg, serviceName) + if err != nil { + return err + } + defer func() { _ = conn.Close() }() + + def := servicedef.FromProto(serviceInfoToSpec(svc), "") + data, err := toml.Marshal(def) + if err != nil { + return fmt.Errorf("marshal service: %w", err) + } + fmt.Print(string(data)) + return nil +} + +func runServiceEdit(_ *cobra.Command, args []string) error { + serviceName := args[0] + + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + filePath := filepath.Join(cfg.Services.Dir, serviceName+".toml") + + // If local file does not exist, export from agent first. + if _, err := os.Stat(filePath); os.IsNotExist(err) { + nodeName, address, err := findServiceNode(cfg, serviceName) + if err != nil { + return fmt.Errorf("find service for export: %w", err) + } + + svc, conn, err := fetchServiceInfo(address, cfg, serviceName) + if err != nil { + return err + } + defer func() { _ = conn.Close() }() + + def := servicedef.FromProto(serviceInfoToSpec(svc), nodeName) + if err := servicedef.Write(filePath, def); err != nil { + return fmt.Errorf("write service def: %w", err) + } + _, _ = fmt.Fprintf(os.Stderr, "Exported %s from agent to %s\n", serviceName, filePath) + } + + editor := os.Getenv("EDITOR") + if editor == "" { + editor = os.Getenv("VISUAL") + } + if editor == "" { + editor = "vi" + } + + cmd := exec.Command(editor, filePath) //nolint:gosec // editor from trusted env + cmd.Stdin = os.Stdin + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + return cmd.Run() +} + +func runServiceExport(cmd *cobra.Command, args []string) error { + serviceName := args[0] + + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + nodeName, address, err := findServiceNode(cfg, serviceName) + if err != nil { + return err + } + + svc, conn, err := fetchServiceInfo(address, cfg, serviceName) + if err != nil { + return err + } + defer func() { _ = conn.Close() }() + + def := servicedef.FromProto(serviceInfoToSpec(svc), nodeName) + + outPath, _ := cmd.Flags().GetString("file") + if outPath == "" { + outPath = filepath.Join(cfg.Services.Dir, serviceName+".toml") + } + + if err := servicedef.Write(outPath, def); err != nil { + return fmt.Errorf("write service def: %w", err) + } + fmt.Printf("Exported %s to %s\n", serviceName, outPath) + return nil +} diff --git a/cmd/mcp/status.go b/cmd/mcp/status.go index 6679aa7..5a283a3 100644 --- a/cmd/mcp/status.go +++ b/cmd/mcp/status.go @@ -1,17 +1,80 @@ package main import ( + "context" "fmt" + "os" + "text/tabwriter" + "time" + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/config" "github.com/spf13/cobra" ) +// newTable returns a tabwriter configured for CLI table output. +func newTable() *tabwriter.Writer { + return tabwriter.NewWriter(os.Stdout, 0, 4, 2, ' ', 0) +} + +// forEachNode loads the CLI config, then calls fn for every configured node. +// Dial errors are printed as warnings and the node is skipped. Connections +// are closed before moving to the next node, avoiding defer-in-loop leaks. +func forEachNode(fn func(node config.NodeConfig, client mcpv1.McpAgentServiceClient) error) error { + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + for _, node := range cfg.Nodes { + client, conn, err := dialAgent(node.Address, cfg) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "warning: %s: %v\n", node.Name, err) + continue + } + + fnErr := fn(node, client) + _ = conn.Close() + if fnErr != nil { + return fnErr + } + } + + return nil +} + func listCmd() *cobra.Command { return &cobra.Command{ Use: "list", Short: "List services from all agents (registry, no runtime query)", RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") + w := newTable() + _, _ = fmt.Fprintln(w, "SERVICE\tCOMPONENT\tDESIRED\tOBSERVED\tVERSION") + + if err := forEachNode(func(node config.NodeConfig, client mcpv1.McpAgentServiceClient) error { + resp, err := client.ListServices(context.Background(), &mcpv1.ListServicesRequest{}) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "warning: %s: list services: %v\n", node.Name, err) + return nil + } + + for _, svc := range resp.GetServices() { + for _, comp := range svc.GetComponents() { + _, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", + svc.GetName(), + comp.GetName(), + comp.GetDesiredState(), + comp.GetObservedState(), + comp.GetVersion(), + ) + } + } + return nil + }); err != nil { + return err + } + + return w.Flush() }, } } @@ -21,7 +84,40 @@ func psCmd() *cobra.Command { Use: "ps", Short: "Live check: query runtime on all agents", RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") + w := newTable() + _, _ = fmt.Fprintln(w, "SERVICE\tCOMPONENT\tNODE\tSTATE\tVERSION\tUPTIME") + + now := time.Now() + if err := forEachNode(func(node config.NodeConfig, client mcpv1.McpAgentServiceClient) error { + resp, err := client.LiveCheck(context.Background(), &mcpv1.LiveCheckRequest{}) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "warning: %s: live check: %v\n", node.Name, err) + return nil + } + + for _, svc := range resp.GetServices() { + for _, comp := range svc.GetComponents() { + uptime := "-" + if comp.GetStarted() != nil { + d := now.Sub(comp.GetStarted().AsTime()) + uptime = formatDuration(d) + } + _, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n", + svc.GetName(), + comp.GetName(), + node.Name, + comp.GetObservedState(), + comp.GetVersion(), + uptime, + ) + } + } + return nil + }); err != nil { + return err + } + + return w.Flush() }, } } @@ -32,7 +128,110 @@ func statusCmd() *cobra.Command { Short: "Full picture: live query + drift + recent events", Args: cobra.MaximumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") + var serviceName string + if len(args) > 0 { + serviceName = args[0] + } + + var allServices []*mcpv1.ServiceInfo + var allDrift []*mcpv1.DriftInfo + var allEvents []*mcpv1.EventInfo + + if err := forEachNode(func(node config.NodeConfig, client mcpv1.McpAgentServiceClient) error { + resp, err := client.GetServiceStatus(context.Background(), &mcpv1.GetServiceStatusRequest{ + Name: serviceName, + }) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "warning: %s: get service status: %v\n", node.Name, err) + return nil + } + + allServices = append(allServices, resp.GetServices()...) + allDrift = append(allDrift, resp.GetDrift()...) + allEvents = append(allEvents, resp.GetRecentEvents()...) + return nil + }); err != nil { + return err + } + + // Services table. + w := newTable() + _, _ = fmt.Fprintln(w, "SERVICE\tCOMPONENT\tDESIRED\tOBSERVED\tVERSION") + for _, svc := range allServices { + for _, comp := range svc.GetComponents() { + _, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", + svc.GetName(), + comp.GetName(), + comp.GetDesiredState(), + comp.GetObservedState(), + comp.GetVersion(), + ) + } + } + if err := w.Flush(); err != nil { + return fmt.Errorf("flush services table: %w", err) + } + + // Drift section. + if len(allDrift) > 0 { + fmt.Println() + fmt.Println("DRIFT:") + dw := newTable() + _, _ = fmt.Fprintln(dw, "SERVICE\tCOMPONENT\tDESIRED\tOBSERVED") + for _, d := range allDrift { + _, _ = fmt.Fprintf(dw, "%s\t%s\t%s\t%s\n", + d.GetService(), + d.GetComponent(), + d.GetDesiredState(), + d.GetObservedState(), + ) + } + if err := dw.Flush(); err != nil { + return fmt.Errorf("flush drift table: %w", err) + } + } + + // Recent events section. + if len(allEvents) > 0 { + fmt.Println() + fmt.Println("RECENT EVENTS:") + ew := newTable() + _, _ = fmt.Fprintln(ew, "SERVICE\tCOMPONENT\tPREV\tNEW\tTIME") + for _, e := range allEvents { + ts := "-" + if e.GetTimestamp() != nil { + ts = e.GetTimestamp().AsTime().Format(time.RFC3339) + } + _, _ = fmt.Fprintf(ew, "%s\t%s\t%s\t%s\t%s\n", + e.GetService(), + e.GetComponent(), + e.GetPrevState(), + e.GetNewState(), + ts, + ) + } + if err := ew.Flush(); err != nil { + return fmt.Errorf("flush events table: %w", err) + } + } + + return nil }, } } + +// formatDuration returns a human-readable duration string. +func formatDuration(d time.Duration) string { + if d < time.Minute { + return fmt.Sprintf("%ds", int(d.Seconds())) + } + if d < time.Hour { + return fmt.Sprintf("%dm%ds", int(d.Minutes()), int(d.Seconds())%60) + } + if d < 24*time.Hour { + return fmt.Sprintf("%dh%dm", int(d.Hours()), int(d.Minutes())%60) + } + days := int(d.Hours()) / 24 + hours := int(d.Hours()) % 24 + return fmt.Sprintf("%dd%dh", days, hours) +} diff --git a/cmd/mcp/sync.go b/cmd/mcp/sync.go index ce0e670..497d931 100644 --- a/cmd/mcp/sync.go +++ b/cmd/mcp/sync.go @@ -1,8 +1,12 @@ package main import ( + "context" "fmt" + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/config" + "git.wntrmute.dev/kyle/mcp/internal/servicedef" "github.com/spf13/cobra" ) @@ -11,7 +15,69 @@ func syncCmd() *cobra.Command { Use: "sync", Short: "Push service definitions to agents (update desired state)", RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + defs, err := servicedef.LoadAll(cfg.Services.Dir) + if err != nil { + return fmt.Errorf("load service definitions: %w", err) + } + + // Group definitions by node. + byNode := make(map[string][]*servicedef.ServiceDef) + for _, def := range defs { + byNode[def.Node] = append(byNode[def.Node], def) + } + + // Build node name -> address lookup. + nodeAddr := make(map[string]string, len(cfg.Nodes)) + for _, n := range cfg.Nodes { + nodeAddr[n.Name] = n.Address + } + + for nodeName, nodeDefs := range byNode { + addr, ok := nodeAddr[nodeName] + if !ok { + fmt.Printf("warning: no node %q in config, skipping %d service(s)\n", nodeName, len(nodeDefs)) + continue + } + + client, conn, err := dialAgent(addr, cfg) + if err != nil { + return fmt.Errorf("dial %s: %w", nodeName, err) + } + defer func() { _ = conn.Close() }() + + var specs []*mcpv1.ServiceSpec + for _, def := range nodeDefs { + specs = append(specs, servicedef.ToProto(def)) + } + + resp, err := client.SyncDesiredState(context.Background(), &mcpv1.SyncDesiredStateRequest{ + Services: specs, + }) + if err != nil { + return fmt.Errorf("sync %s: %w", nodeName, err) + } + + fmt.Printf("node %s:\n", nodeName) + for _, r := range resp.GetResults() { + marker := " unchanged" + if r.GetChanged() { + marker = " changed" + } + summary := r.GetSummary() + if summary != "" { + fmt.Printf("%s %s: %s\n", marker, r.GetName(), summary) + } else { + fmt.Printf("%s %s\n", marker, r.GetName()) + } + } + } + + return nil }, } } diff --git a/cmd/mcp/transfer.go b/cmd/mcp/transfer.go index 8699b64..a471b37 100644 --- a/cmd/mcp/transfer.go +++ b/cmd/mcp/transfer.go @@ -1,8 +1,14 @@ package main import ( + "context" "fmt" + "io" + "os" + "path/filepath" + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/config" "github.com/spf13/cobra" ) @@ -11,9 +17,7 @@ func pushCmd() *cobra.Command { Use: "push [path]", Short: "Copy a local file into /srv//[path]", Args: cobra.RangeArgs(2, 3), - RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") - }, + RunE: runPush, } } @@ -22,8 +26,116 @@ func pullCmd() *cobra.Command { Use: "pull [local-file]", Short: "Copy a file from /srv// to local", Args: cobra.RangeArgs(2, 3), - RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") - }, + RunE: runPull, } } + +func runPush(_ *cobra.Command, args []string) error { + localFile := args[0] + serviceName := args[1] + + remotePath := filepath.Base(localFile) + if len(args) == 3 { + remotePath = args[2] + } + + f, err := os.Open(localFile) //nolint:gosec // user-specified path + if err != nil { + return fmt.Errorf("open local file %q: %w", localFile, err) + } + defer func() { _ = f.Close() }() + + info, err := f.Stat() + if err != nil { + return fmt.Errorf("stat local file %q: %w", localFile, err) + } + mode := uint32(info.Mode().Perm()) + + content, err := io.ReadAll(f) + if err != nil { + return fmt.Errorf("read local file %q: %w", localFile, err) + } + + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + _, address, err := findServiceNode(cfg, serviceName) + if err != nil { + return err + } + + client, conn, err := dialAgent(address, cfg) + if err != nil { + return fmt.Errorf("dial agent: %w", err) + } + defer func() { _ = conn.Close() }() + + resp, err := client.PushFile(context.Background(), &mcpv1.PushFileRequest{ + Service: serviceName, + Path: remotePath, + Content: content, + Mode: mode, + }) + if err != nil { + return fmt.Errorf("push file: %w", err) + } + + if !resp.GetSuccess() { + return fmt.Errorf("push file: %s", resp.GetError()) + } + + fmt.Printf("Pushed %s to %s:%s/%s\n", localFile, serviceName, "/srv/"+serviceName, remotePath) + return nil +} + +func runPull(_ *cobra.Command, args []string) error { + serviceName := args[0] + remotePath := args[1] + + localFile := filepath.Base(remotePath) + if len(args) == 3 { + localFile = args[2] + } + + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + _, address, err := findServiceNode(cfg, serviceName) + if err != nil { + return err + } + + client, conn, err := dialAgent(address, cfg) + if err != nil { + return fmt.Errorf("dial agent: %w", err) + } + defer func() { _ = conn.Close() }() + + resp, err := client.PullFile(context.Background(), &mcpv1.PullFileRequest{ + Service: serviceName, + Path: remotePath, + }) + if err != nil { + return fmt.Errorf("pull file: %w", err) + } + + if resp.GetError() != "" { + return fmt.Errorf("pull file: %s", resp.GetError()) + } + + mode := os.FileMode(resp.GetMode()) + if mode == 0 { + mode = 0o644 + } + + if err := os.WriteFile(localFile, resp.GetContent(), mode); err != nil { + return fmt.Errorf("write local file %q: %w", localFile, err) + } + + fmt.Printf("Pulled %s:%s to %s (mode %04o)\n", serviceName, remotePath, localFile, mode) + return nil +} diff --git a/deploy/examples/mcp-agent.toml b/deploy/examples/mcp-agent.toml new file mode 100644 index 0000000..ba24a6a --- /dev/null +++ b/deploy/examples/mcp-agent.toml @@ -0,0 +1,91 @@ +# MCP Agent configuration +# +# Default location: /srv/mcp/mcp-agent.toml +# Override with: mcp-agent server --config /path/to/mcp-agent.toml + +# ------------------------------------------------------------------ +# gRPC server +# ------------------------------------------------------------------ +[server] +# Listen address for the gRPC server. Bind to the overlay network +# interface only -- the agent does not sit behind mc-proxy. +# Env override: MCP_AGENT_SERVER_GRPC_ADDR +grpc_addr = "100.95.252.120:9444" + +# TLS certificate and private key for the gRPC server. The certificate +# should be issued by the Metacrypt CA and valid for the overlay IP. +# Env overrides: MCP_AGENT_SERVER_TLS_CERT, MCP_AGENT_SERVER_TLS_KEY +tls_cert = "/srv/mcp/certs/mcp-agent.crt" +tls_key = "/srv/mcp/certs/mcp-agent.key" + +# ------------------------------------------------------------------ +# Database +# ------------------------------------------------------------------ +[database] +# Path to the SQLite database. The agent stores desired state, observed +# state, deployed specs, and events here. WAL mode, foreign keys on. +# Env override: MCP_AGENT_DATABASE_PATH +path = "/srv/mcp/mcp.db" + +# ------------------------------------------------------------------ +# MCIAS authentication +# ------------------------------------------------------------------ +[mcias] +# URL of the MCIAS server used to validate bearer tokens from the CLI. +server_url = "https://mcias.svc.mcp.metacircular.net:8443" + +# Path to the CA certificate that signed the MCIAS TLS certificate. +# If empty, the system trust store is used. +ca_cert = "/usr/local/share/ca-certificates/metacircular-ca.crt" + +# Service name presented to MCIAS during token validation. Must match +# a service registered in MCIAS. +service_name = "mcp" + +# ------------------------------------------------------------------ +# Agent settings +# ------------------------------------------------------------------ +[agent] +# Unique name for this node. Must match the name used in [[nodes]] +# entries in the CLI config. +# Env override: MCP_AGENT_NODE_NAME +node_name = "rift" + +# Container runtime binary. Currently only "podman" is supported. +# Env override: MCP_AGENT_CONTAINER_RUNTIME +container_runtime = "podman" + +# ------------------------------------------------------------------ +# Monitoring +# ------------------------------------------------------------------ +[monitor] +# How often the monitor checks container state against desired state. +# Default: 60s +interval = "60s" + +# Command to execute when an alert fires. Uses exec-style invocation +# (argv array, no shell). The alert message is passed as the final +# argument. Omit to disable alerting. +# alert_command = ["/usr/local/bin/notify", "--channel", "ops"] + +# Minimum time between repeated alerts for the same condition. +# Default: 15m +cooldown = "15m" + +# Number of state transitions within flap_window that triggers a +# flapping alert. Default: 3 +flap_threshold = 3 + +# Time window for flap detection. Default: 10m +flap_window = "10m" + +# How long to retain event records in the database. Default: 30d +retention = "30d" + +# ------------------------------------------------------------------ +# Logging +# ------------------------------------------------------------------ +[log] +# Log level: debug, info, warn, error. Default: info +# Env override: MCP_AGENT_LOG_LEVEL +level = "info" diff --git a/deploy/examples/mcp.toml b/deploy/examples/mcp.toml new file mode 100644 index 0000000..c703975 --- /dev/null +++ b/deploy/examples/mcp.toml @@ -0,0 +1,56 @@ +# MCP CLI configuration +# +# Default location: ~/.config/mcp/mcp.toml +# Override with: mcp --config /path/to/mcp.toml + +# ------------------------------------------------------------------ +# Service definitions +# ------------------------------------------------------------------ +[services] +# Directory containing service definition TOML files (one per service). +# Each file declares the components, images, ports, and volumes for a +# service. The CLI reads these files to push intent to agents. +dir = "~/.config/mcp/services" + +# ------------------------------------------------------------------ +# MCIAS authentication +# ------------------------------------------------------------------ +[mcias] +# URL of the MCIAS server used for login and token validation. +server_url = "https://mcias.svc.mcp.metacircular.net:8443" + +# Path to the CA certificate that signed the MCIAS TLS certificate. +# If empty, the system trust store is used. +ca_cert = "/usr/local/share/ca-certificates/metacircular-ca.crt" + +# Service name presented to MCIAS during authentication. This must +# match a service registered in MCIAS. +service_name = "mcp" + +# ------------------------------------------------------------------ +# Token storage +# ------------------------------------------------------------------ +[auth] +# Path where the CLI stores the MCIAS bearer token after login. +# The file is created with 0600 permissions. +token_path = "~/.config/mcp/token" + +# Optional: username for unattended (non-interactive) operation. +# When set alongside password_file, "mcp login" uses these +# credentials automatically instead of prompting. +# username = "admin" + +# Optional: path to a file containing the password. The file should +# be owned by the operator and have 0600 permissions. +# password_file = "~/.config/mcp/password" + +# ------------------------------------------------------------------ +# Managed nodes +# ------------------------------------------------------------------ +# Each [[nodes]] entry registers a node that the CLI can target. +# The name must match the node_name in the agent's config. The +# address is host:port on the overlay network. + +[[nodes]] +name = "rift" +address = "100.95.252.120:9444" diff --git a/deploy/scripts/install-agent.sh b/deploy/scripts/install-agent.sh new file mode 100755 index 0000000..89f6a65 --- /dev/null +++ b/deploy/scripts/install-agent.sh @@ -0,0 +1,92 @@ +#!/usr/bin/env bash +# install-agent.sh -- Install and configure the MCP agent. +# +# Usage: install-agent.sh [path-to-mcp-agent-binary] +# +# This script is idempotent and safe to run multiple times. + +set -euo pipefail + +if [[ "$(id -u)" -ne 0 ]]; then + echo "error: this script must be run as root" >&2 + exit 1 +fi + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +DEPLOY_DIR="$(cd "${SCRIPT_DIR}/.." && pwd)" +BINARY="${1:-./mcp-agent}" + +# ------------------------------------------------------------------ +# 1. Create mcp user and group +# ------------------------------------------------------------------ +if ! id -u mcp &>/dev/null; then + useradd --system --shell /sbin/nologin --home-dir /srv/mcp --no-create-home mcp + echo "Created system user: mcp" +else + echo "User mcp already exists" +fi + +# ------------------------------------------------------------------ +# 2. Create directories +# ------------------------------------------------------------------ +install -d -o mcp -g mcp -m 0750 /srv/mcp +install -d -o mcp -g mcp -m 0750 /srv/mcp/certs +install -d -o mcp -g mcp -m 0750 /srv/mcp/backups + +# ------------------------------------------------------------------ +# 3. Install binary +# ------------------------------------------------------------------ +if [[ ! -f "${BINARY}" ]]; then + echo "error: binary not found: ${BINARY}" >&2 + echo "Usage: $0 [path-to-mcp-agent-binary]" >&2 + exit 1 +fi + +install -o root -g root -m 0755 "${BINARY}" /usr/local/bin/mcp-agent +echo "Installed mcp-agent to /usr/local/bin/mcp-agent" + +# ------------------------------------------------------------------ +# 4. Install example config if none exists +# ------------------------------------------------------------------ +if [[ ! -f /srv/mcp/mcp-agent.toml ]]; then + install -o mcp -g mcp -m 0640 "${DEPLOY_DIR}/examples/mcp-agent.toml" /srv/mcp/mcp-agent.toml + echo "Installed example config to /srv/mcp/mcp-agent.toml (edit before starting)" +else + echo "Config /srv/mcp/mcp-agent.toml already exists, skipping" +fi + +# ------------------------------------------------------------------ +# 5. Install systemd units +# ------------------------------------------------------------------ +install -o root -g root -m 0644 "${DEPLOY_DIR}/systemd/mcp-agent.service" /etc/systemd/system/ +install -o root -g root -m 0644 "${DEPLOY_DIR}/systemd/mcp-agent-backup.service" /etc/systemd/system/ +install -o root -g root -m 0644 "${DEPLOY_DIR}/systemd/mcp-agent-backup.timer" /etc/systemd/system/ + +# ------------------------------------------------------------------ +# 6. Reload systemd +# ------------------------------------------------------------------ +systemctl daemon-reload +echo "Systemd units installed and daemon reloaded" + +# ------------------------------------------------------------------ +# 7. Next steps +# ------------------------------------------------------------------ +cat <<'NEXT' + +--- Next steps --- +1. Edit /srv/mcp/mcp-agent.toml: + - Set server.grpc_addr to this node's overlay IP + - Set agent.node_name to this node's name + - Set mcias.server_url to the MCIAS server address + - Place TLS cert/key in /srv/mcp/certs/ + +2. Enable and start the agent: + systemctl enable --now mcp-agent + +3. Enable the daily backup timer: + systemctl enable --now mcp-agent-backup.timer + +4. Verify the agent is running: + systemctl status mcp-agent + journalctl -u mcp-agent -f +NEXT diff --git a/deploy/systemd/mcp-agent-backup.service b/deploy/systemd/mcp-agent-backup.service new file mode 100644 index 0000000..60e4407 --- /dev/null +++ b/deploy/systemd/mcp-agent-backup.service @@ -0,0 +1,25 @@ +[Unit] +Description=MCP Agent database backup +After=mcp-agent.service + +[Service] +Type=oneshot +ExecStart=/usr/local/bin/mcp-agent snapshot --config /srv/mcp/mcp-agent.toml + +User=mcp +Group=mcp + +NoNewPrivileges=true +ProtectSystem=strict +ProtectHome=true +PrivateTmp=true +PrivateDevices=true +ProtectKernelTunables=true +ProtectKernelModules=true +ProtectControlGroups=true +RestrictSUIDSGID=true +RestrictNamespaces=true +LockPersonality=true +MemoryDenyWriteExecute=true +RestrictRealtime=true +ReadWritePaths=/srv diff --git a/deploy/systemd/mcp-agent-backup.timer b/deploy/systemd/mcp-agent-backup.timer new file mode 100644 index 0000000..18da246 --- /dev/null +++ b/deploy/systemd/mcp-agent-backup.timer @@ -0,0 +1,10 @@ +[Unit] +Description=Daily MCP Agent database backup + +[Timer] +OnCalendar=*-*-* 02:00:00 +RandomizedDelaySec=300 +Persistent=true + +[Install] +WantedBy=timers.target diff --git a/deploy/systemd/mcp-agent.service b/deploy/systemd/mcp-agent.service new file mode 100644 index 0000000..0d8a274 --- /dev/null +++ b/deploy/systemd/mcp-agent.service @@ -0,0 +1,31 @@ +[Unit] +Description=MCP Agent +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +ExecStart=/usr/local/bin/mcp-agent server --config /srv/mcp/mcp-agent.toml +Restart=on-failure +RestartSec=5 + +User=mcp +Group=mcp + +NoNewPrivileges=true +ProtectSystem=strict +ProtectHome=true +PrivateTmp=true +PrivateDevices=true +ProtectKernelTunables=true +ProtectKernelModules=true +ProtectControlGroups=true +RestrictSUIDSGID=true +RestrictNamespaces=true +LockPersonality=true +MemoryDenyWriteExecute=true +RestrictRealtime=true +ReadWritePaths=/srv + +[Install] +WantedBy=multi-user.target diff --git a/internal/agent/adopt.go b/internal/agent/adopt.go new file mode 100644 index 0000000..79b3270 --- /dev/null +++ b/internal/agent/adopt.go @@ -0,0 +1,114 @@ +package agent + +import ( + "context" + "fmt" + "strings" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/registry" + "git.wntrmute.dev/kyle/mcp/internal/runtime" +) + +// AdoptContainers discovers running containers that match the given service +// name and registers them in the component registry. Containers named +// "-*" or exactly "" are matched. +func (a *Agent) AdoptContainers(ctx context.Context, req *mcpv1.AdoptContainersRequest) (*mcpv1.AdoptContainersResponse, error) { + service := req.GetService() + if service == "" { + return nil, fmt.Errorf("service name is required") + } + + containers, err := a.Runtime.List(ctx) + if err != nil { + return nil, fmt.Errorf("list containers: %w", err) + } + + prefix := service + "-" + + // Filter matching containers before modifying any state. + type match struct { + container runtime.ContainerInfo + component string + } + var matches []match + for _, c := range containers { + switch { + case c.Name == service: + matches = append(matches, match{c, service}) + case strings.HasPrefix(c.Name, prefix): + matches = append(matches, match{c, strings.TrimPrefix(c.Name, prefix)}) + } + } + + if len(matches) == 0 { + return &mcpv1.AdoptContainersResponse{}, nil + } + + // Ensure the service exists once, before adopting any containers. + if err := registry.CreateService(a.DB, service, true); err != nil { + if _, getErr := registry.GetService(a.DB, service); getErr != nil { + return nil, fmt.Errorf("create service %q: %w", service, err) + } + } + + var results []*mcpv1.AdoptResult + for _, m := range matches { + a.Logger.Info("adopting", "service", service, "container", m.container.Name, "component", m.component) + + // Inspect the container to get full details (List only returns + // name, image, state, and version). + info, err := a.Runtime.Inspect(ctx, m.container.Name) + if err != nil { + results = append(results, &mcpv1.AdoptResult{ + Container: m.container.Name, + Component: m.component, + Success: false, + Error: fmt.Sprintf("inspect container: %v", err), + }) + continue + } + + comp := ®istry.Component{ + Name: m.component, + Service: service, + Image: info.Image, + Network: info.Network, + UserSpec: info.User, + Restart: info.Restart, + DesiredState: desiredFromObserved(info.State), + ObservedState: info.State, + Version: info.Version, + Ports: info.Ports, + Volumes: info.Volumes, + Cmd: info.Cmd, + } + + if createErr := registry.CreateComponent(a.DB, comp); createErr != nil { + results = append(results, &mcpv1.AdoptResult{ + Container: m.container.Name, + Component: m.component, + Success: false, + Error: "already managed", + }) + continue + } + + results = append(results, &mcpv1.AdoptResult{ + Container: m.container.Name, + Component: m.component, + Success: true, + }) + } + + return &mcpv1.AdoptContainersResponse{Results: results}, nil +} + +// desiredFromObserved maps an observed container state to the desired state. +// Running containers should stay running; everything else is treated as stopped. +func desiredFromObserved(observed string) string { + if observed == "running" { + return "running" + } + return "stopped" +} diff --git a/internal/agent/adopt_test.go b/internal/agent/adopt_test.go new file mode 100644 index 0000000..ac15c4c --- /dev/null +++ b/internal/agent/adopt_test.go @@ -0,0 +1,244 @@ +package agent + +import ( + "context" + "testing" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/registry" + "git.wntrmute.dev/kyle/mcp/internal/runtime" +) + +func TestAdoptContainers(t *testing.T) { + rt := &fakeRuntime{ + containers: []runtime.ContainerInfo{ + { + Name: "metacrypt-api", + Image: "mcr.svc.mcp.metacircular.net:8443/metacrypt:v1.0.0", + State: "running", + Network: "docker_default", + User: "0:0", + Restart: "unless-stopped", + Ports: []string{"127.0.0.1:18443:8443"}, + Volumes: []string{"/srv/metacrypt:/srv/metacrypt"}, + Cmd: []string{"server"}, + Version: "v1.0.0", + }, + { + Name: "metacrypt-web", + Image: "mcr.svc.mcp.metacircular.net:8443/metacrypt-web:v1.0.0", + State: "running", + Network: "docker_default", + Restart: "unless-stopped", + Ports: []string{"127.0.0.1:18080:8080"}, + Version: "v1.0.0", + }, + { + Name: "unrelated-container", + Image: "nginx:latest", + State: "running", + }, + }, + } + + a := newTestAgent(t, rt) + ctx := context.Background() + + resp, err := a.AdoptContainers(ctx, &mcpv1.AdoptContainersRequest{Service: "metacrypt"}) + if err != nil { + t.Fatalf("AdoptContainers: %v", err) + } + + if len(resp.Results) != 2 { + t.Fatalf("expected 2 results, got %d", len(resp.Results)) + } + + for _, r := range resp.Results { + if !r.Success { + t.Fatalf("expected success for %q, got error: %s", r.Container, r.Error) + } + } + + // Verify components were created in the registry. + components, err := registry.ListComponents(a.DB, "metacrypt") + if err != nil { + t.Fatalf("ListComponents: %v", err) + } + if len(components) != 2 { + t.Fatalf("expected 2 components, got %d", len(components)) + } + + api, err := registry.GetComponent(a.DB, "metacrypt", "api") + if err != nil { + t.Fatalf("GetComponent api: %v", err) + } + if api.Image != "mcr.svc.mcp.metacircular.net:8443/metacrypt:v1.0.0" { + t.Fatalf("api image: got %q", api.Image) + } + if api.DesiredState != "running" { + t.Fatalf("api desired state: got %q, want running", api.DesiredState) + } + if api.Network != "docker_default" { + t.Fatalf("api network: got %q, want docker_default", api.Network) + } +} + +func TestAdoptContainersNoMatch(t *testing.T) { + rt := &fakeRuntime{ + containers: []runtime.ContainerInfo{ + {Name: "unrelated", Image: "nginx:latest", State: "running"}, + }, + } + + a := newTestAgent(t, rt) + ctx := context.Background() + + resp, err := a.AdoptContainers(ctx, &mcpv1.AdoptContainersRequest{Service: "metacrypt"}) + if err != nil { + t.Fatalf("AdoptContainers: %v", err) + } + if len(resp.Results) != 0 { + t.Fatalf("expected 0 results, got %d", len(resp.Results)) + } +} + +func TestAdoptContainersAlreadyManaged(t *testing.T) { + rt := &fakeRuntime{ + containers: []runtime.ContainerInfo{ + { + Name: "metacrypt-api", + Image: "mcr.svc.mcp.metacircular.net:8443/metacrypt:v1.0.0", + State: "running", + Network: "docker_default", + Restart: "unless-stopped", + Version: "v1.0.0", + }, + }, + } + + a := newTestAgent(t, rt) + ctx := context.Background() + + // First adopt succeeds. + resp, err := a.AdoptContainers(ctx, &mcpv1.AdoptContainersRequest{Service: "metacrypt"}) + if err != nil { + t.Fatalf("first adopt: %v", err) + } + if len(resp.Results) != 1 || !resp.Results[0].Success { + t.Fatalf("first adopt should succeed: %+v", resp.Results) + } + + // Second adopt should report "already managed". + resp, err = a.AdoptContainers(ctx, &mcpv1.AdoptContainersRequest{Service: "metacrypt"}) + if err != nil { + t.Fatalf("second adopt: %v", err) + } + if len(resp.Results) != 1 { + t.Fatalf("expected 1 result, got %d", len(resp.Results)) + } + if resp.Results[0].Success { + t.Fatal("second adopt should fail for already-managed container") + } + if resp.Results[0].Error != "already managed" { + t.Fatalf("expected 'already managed' error, got %q", resp.Results[0].Error) + } +} + +func TestAdoptContainersSingleComponent(t *testing.T) { + rt := &fakeRuntime{ + containers: []runtime.ContainerInfo{ + { + Name: "mc-proxy", + Image: "mcr.svc.mcp.metacircular.net:8443/mc-proxy:v1.0.0", + State: "running", + Network: "host", + Restart: "unless-stopped", + Version: "v1.0.0", + }, + }, + } + + a := newTestAgent(t, rt) + ctx := context.Background() + + resp, err := a.AdoptContainers(ctx, &mcpv1.AdoptContainersRequest{Service: "mc-proxy"}) + if err != nil { + t.Fatalf("AdoptContainers: %v", err) + } + if len(resp.Results) != 1 { + t.Fatalf("expected 1 result, got %d", len(resp.Results)) + } + if resp.Results[0].Component != "mc-proxy" { + t.Fatalf("expected component name 'mc-proxy', got %q", resp.Results[0].Component) + } + if !resp.Results[0].Success { + t.Fatalf("expected success, got error: %s", resp.Results[0].Error) + } +} + +func TestAdoptContainersStoppedContainer(t *testing.T) { + rt := &fakeRuntime{ + containers: []runtime.ContainerInfo{ + { + Name: "metacrypt-api", + Image: "mcr.svc.mcp.metacircular.net:8443/metacrypt:v1.0.0", + State: "exited", + Network: "docker_default", + Restart: "unless-stopped", + Version: "v1.0.0", + }, + }, + } + + a := newTestAgent(t, rt) + ctx := context.Background() + + resp, err := a.AdoptContainers(ctx, &mcpv1.AdoptContainersRequest{Service: "metacrypt"}) + if err != nil { + t.Fatalf("AdoptContainers: %v", err) + } + if len(resp.Results) != 1 || !resp.Results[0].Success { + t.Fatalf("expected success: %+v", resp.Results) + } + + comp, err := registry.GetComponent(a.DB, "metacrypt", "api") + if err != nil { + t.Fatalf("GetComponent: %v", err) + } + if comp.DesiredState != "stopped" { + t.Fatalf("desired state: got %q, want stopped", comp.DesiredState) + } + if comp.ObservedState != "exited" { + t.Fatalf("observed state: got %q, want exited", comp.ObservedState) + } +} + +func TestDesiredFromObserved(t *testing.T) { + tests := []struct { + observed string + want string + }{ + {"running", "running"}, + {"exited", "stopped"}, + {"stopped", "stopped"}, + {"created", "stopped"}, + {"", "stopped"}, + } + for _, tt := range tests { + got := desiredFromObserved(tt.observed) + if got != tt.want { + t.Errorf("desiredFromObserved(%q) = %q, want %q", tt.observed, got, tt.want) + } + } +} + +func TestAdoptContainersEmptyService(t *testing.T) { + rt := &fakeRuntime{} + a := newTestAgent(t, rt) + ctx := context.Background() + + _, err := a.AdoptContainers(ctx, &mcpv1.AdoptContainersRequest{Service: ""}) + if err == nil { + t.Fatal("expected error for empty service name") + } +} diff --git a/internal/agent/deploy.go b/internal/agent/deploy.go new file mode 100644 index 0000000..649597c --- /dev/null +++ b/internal/agent/deploy.go @@ -0,0 +1,139 @@ +package agent + +import ( + "context" + "database/sql" + "errors" + "fmt" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/registry" + "git.wntrmute.dev/kyle/mcp/internal/runtime" +) + +// Deploy deploys a service (or a single component of it) to this node. +func (a *Agent) Deploy(ctx context.Context, req *mcpv1.DeployRequest) (*mcpv1.DeployResponse, error) { + spec := req.GetService() + if spec == nil { + return nil, fmt.Errorf("deploy: missing service spec") + } + + serviceName := spec.GetName() + a.Logger.Info("deploying", "service", serviceName) + + if err := ensureService(a.DB, serviceName, spec.GetActive()); err != nil { + return nil, fmt.Errorf("deploy: ensure service %q: %w", serviceName, err) + } + + components := spec.GetComponents() + if target := req.GetComponent(); target != "" { + var filtered []*mcpv1.ComponentSpec + for _, cs := range components { + if cs.GetName() == target { + filtered = append(filtered, cs) + } + } + components = filtered + } + + var results []*mcpv1.ComponentResult + for _, cs := range components { + result := a.deployComponent(ctx, serviceName, cs) + results = append(results, result) + } + + return &mcpv1.DeployResponse{Results: results}, nil +} + +// deployComponent handles the full deploy lifecycle for a single component. +func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcpv1.ComponentSpec) *mcpv1.ComponentResult { + compName := cs.GetName() + containerName := serviceName + "-" + compName + + a.Logger.Info("deploying component", "service", serviceName, "component", compName) + + regComp := ®istry.Component{ + Name: compName, + Service: serviceName, + Image: cs.GetImage(), + Network: cs.GetNetwork(), + UserSpec: cs.GetUser(), + Restart: cs.GetRestart(), + DesiredState: "running", + Version: runtime.ExtractVersion(cs.GetImage()), + Ports: cs.GetPorts(), + Volumes: cs.GetVolumes(), + Cmd: cs.GetCmd(), + } + + if err := ensureComponent(a.DB, regComp); err != nil { + return &mcpv1.ComponentResult{ + Name: compName, + Error: fmt.Sprintf("ensure component: %v", err), + } + } + + if err := a.Runtime.Pull(ctx, cs.GetImage()); err != nil { + return &mcpv1.ComponentResult{ + Name: compName, + Error: fmt.Sprintf("pull image: %v", err), + } + } + + _ = a.Runtime.Stop(ctx, containerName) // may not exist yet + _ = a.Runtime.Remove(ctx, containerName) // may not exist yet + + runSpec := runtime.ContainerSpec{ + Name: containerName, + Image: cs.GetImage(), + Network: cs.GetNetwork(), + User: cs.GetUser(), + Restart: cs.GetRestart(), + Ports: cs.GetPorts(), + Volumes: cs.GetVolumes(), + Cmd: cs.GetCmd(), + } + if err := a.Runtime.Run(ctx, runSpec); err != nil { + _ = registry.UpdateComponentState(a.DB, serviceName, compName, "", "removed") + return &mcpv1.ComponentResult{ + Name: compName, + Error: fmt.Sprintf("run container: %v", err), + } + } + + if err := registry.UpdateComponentState(a.DB, serviceName, compName, "running", "running"); err != nil { + a.Logger.Warn("failed to update component state", "service", serviceName, "component", compName, "err", err) + } + + return &mcpv1.ComponentResult{ + Name: compName, + Success: true, + } +} + +// ensureService creates the service if it does not exist, or updates its +// active flag if it does. +func ensureService(db *sql.DB, name string, active bool) error { + _, err := registry.GetService(db, name) + if errors.Is(err, sql.ErrNoRows) { + return registry.CreateService(db, name, active) + } + if err != nil { + return err + } + return registry.UpdateServiceActive(db, name, active) +} + +// ensureComponent creates the component if it does not exist, or updates its +// spec if it does. +func ensureComponent(db *sql.DB, c *registry.Component) error { + _, err := registry.GetComponent(db, c.Service, c.Name) + if errors.Is(err, sql.ErrNoRows) { + c.ObservedState = "unknown" + return registry.CreateComponent(db, c) + } + if err != nil { + return err + } + return registry.UpdateComponentSpec(db, c) +} diff --git a/internal/agent/files.go b/internal/agent/files.go new file mode 100644 index 0000000..bfa4208 --- /dev/null +++ b/internal/agent/files.go @@ -0,0 +1,152 @@ +package agent + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// validatePath validates and resolves a relative path within a service's +// /srv// directory. It rejects path traversal, absolute paths, +// and symlink escapes. +func validatePath(service, relPath string) (string, error) { + if service == "" { + return "", fmt.Errorf("empty service name") + } + if relPath == "" { + return "", fmt.Errorf("empty path") + } + if filepath.IsAbs(relPath) { + return "", fmt.Errorf("absolute path not allowed: %s", relPath) + } + + cleaned := filepath.Clean(relPath) + if strings.Contains(cleaned, "..") { + return "", fmt.Errorf("path traversal not allowed: %s", relPath) + } + + serviceDir := filepath.Join("/srv", service) + fullPath := filepath.Join(serviceDir, cleaned) + + if !strings.HasPrefix(fullPath, serviceDir+"/") { + return "", fmt.Errorf("path escapes service directory: %s", relPath) + } + + parentDir := filepath.Dir(fullPath) + if _, err := os.Stat(parentDir); err == nil { + resolved, err := filepath.EvalSymlinks(parentDir) + if err != nil { + return "", fmt.Errorf("resolve symlinks: %w", err) + } + if !strings.HasPrefix(resolved, serviceDir) { + return "", fmt.Errorf("symlink escapes service directory: %s", relPath) + } + } + + return fullPath, nil +} + +// PushFile writes a file to the node's filesystem under /srv//. +func (a *Agent) PushFile(ctx context.Context, req *mcpv1.PushFileRequest) (*mcpv1.PushFileResponse, error) { + if req.Service == "" { + return nil, status.Errorf(codes.InvalidArgument, "service name required") + } + if req.Path == "" { + return nil, status.Errorf(codes.InvalidArgument, "path required") + } + + fullPath, err := validatePath(req.Service, req.Path) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid path: %v", err) + } + + a.Logger.Info("push file", "service", req.Service, "path", req.Path) + + dir := filepath.Dir(fullPath) + if err := os.MkdirAll(dir, 0750); err != nil { + return nil, status.Errorf(codes.Internal, "create directories: %v", err) + } + + // Atomic write: temp file in the same directory, then rename. + tmp, err := os.CreateTemp(dir, ".mcp-push-*") + if err != nil { + return nil, status.Errorf(codes.Internal, "create temp file: %v", err) + } + tmpName := tmp.Name() + + cleanup := func() { _ = os.Remove(tmpName) } + + if _, err := tmp.Write(req.Content); err != nil { + _ = tmp.Close() + cleanup() + return nil, status.Errorf(codes.Internal, "write temp file: %v", err) + } + if err := tmp.Close(); err != nil { + cleanup() + return nil, status.Errorf(codes.Internal, "close temp file: %v", err) + } + + mode := os.FileMode(req.Mode) + if mode == 0 { + mode = 0600 + } + if err := os.Chmod(tmpName, mode); err != nil { + cleanup() + return nil, status.Errorf(codes.Internal, "set permissions: %v", err) + } + + if err := os.Rename(tmpName, fullPath); err != nil { + cleanup() + return nil, status.Errorf(codes.Internal, "rename to target: %v", err) + } + + return &mcpv1.PushFileResponse{Success: true}, nil +} + +// PullFile reads a file from the node's filesystem under /srv//. +func (a *Agent) PullFile(ctx context.Context, req *mcpv1.PullFileRequest) (*mcpv1.PullFileResponse, error) { + if req.Service == "" { + return nil, status.Errorf(codes.InvalidArgument, "service name required") + } + if req.Path == "" { + return nil, status.Errorf(codes.InvalidArgument, "path required") + } + + fullPath, err := validatePath(req.Service, req.Path) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid path: %v", err) + } + + a.Logger.Info("pull file", "service", req.Service, "path", req.Path) + + f, err := os.Open(fullPath) //nolint:gosec // path validated by validatePath + if err != nil { + if os.IsNotExist(err) { + return nil, status.Errorf(codes.NotFound, "file not found: %s", req.Path) + } + return nil, status.Errorf(codes.Internal, "open file: %v", err) + } + defer f.Close() //nolint:errcheck + + info, err := f.Stat() + if err != nil { + return nil, status.Errorf(codes.Internal, "stat file: %v", err) + } + + content, err := io.ReadAll(f) + if err != nil { + return nil, status.Errorf(codes.Internal, "read file: %v", err) + } + + return &mcpv1.PullFileResponse{ + Content: content, + Mode: uint32(info.Mode().Perm()), + }, nil +} diff --git a/internal/agent/files_test.go b/internal/agent/files_test.go new file mode 100644 index 0000000..d250eb5 --- /dev/null +++ b/internal/agent/files_test.go @@ -0,0 +1,82 @@ +package agent + +import ( + "testing" +) + +func TestValidatePath(t *testing.T) { + tests := []struct { + name string + service string + path string + want string + wantErr bool + }{ + { + name: "valid simple path", + service: "mcr", + path: "mcr.toml", + want: "/srv/mcr/mcr.toml", + }, + { + name: "valid nested path", + service: "mcr", + path: "certs/cert.pem", + want: "/srv/mcr/certs/cert.pem", + }, + { + name: "reject traversal", + service: "mcr", + path: "../etc/passwd", + wantErr: true, + }, + { + name: "reject absolute path", + service: "mcr", + path: "/etc/passwd", + wantErr: true, + }, + { + name: "reject empty service", + service: "", + path: "mcr.toml", + wantErr: true, + }, + { + name: "reject empty path", + service: "mcr", + path: "", + wantErr: true, + }, + { + name: "reject double dot in middle", + service: "mcr", + path: "certs/../../etc/passwd", + wantErr: true, + }, + { + name: "valid deeply nested", + service: "metacrypt", + path: "data/keys/primary.key", + want: "/srv/metacrypt/data/keys/primary.key", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := validatePath(tt.service, tt.path) + if tt.wantErr { + if err == nil { + t.Fatalf("expected error, got path %q", got) + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != tt.want { + t.Errorf("got %q, want %q", got, tt.want) + } + }) + } +} diff --git a/internal/agent/lifecycle.go b/internal/agent/lifecycle.go new file mode 100644 index 0000000..c7f5f1b --- /dev/null +++ b/internal/agent/lifecycle.go @@ -0,0 +1,160 @@ +package agent + +import ( + "context" + "database/sql" + "fmt" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/registry" + "git.wntrmute.dev/kyle/mcp/internal/runtime" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// StopService stops all components of a service. +func (a *Agent) StopService(ctx context.Context, req *mcpv1.StopServiceRequest) (*mcpv1.StopServiceResponse, error) { + a.Logger.Info("StopService", "service", req.GetName()) + + if req.GetName() == "" { + return nil, status.Error(codes.InvalidArgument, "service name is required") + } + + components, err := registry.ListComponents(a.DB, req.GetName()) + if err != nil { + return nil, status.Errorf(codes.Internal, "list components: %v", err) + } + + var results []*mcpv1.ComponentResult + for _, c := range components { + containerName := req.GetName() + "-" + c.Name + r := &mcpv1.ComponentResult{Name: c.Name, Success: true} + + if err := a.Runtime.Stop(ctx, containerName); err != nil { + a.Logger.Info("stop container (ignored)", "container", containerName, "error", err) + } + + if err := registry.UpdateComponentState(a.DB, req.GetName(), c.Name, "stopped", "stopped"); err != nil { + r.Success = false + r.Error = fmt.Sprintf("update state: %v", err) + } + + results = append(results, r) + } + + return &mcpv1.StopServiceResponse{Results: results}, nil +} + +// StartService starts all components of a service. If a container already +// exists but is stopped, it is removed first so a fresh one can be created. +func (a *Agent) StartService(ctx context.Context, req *mcpv1.StartServiceRequest) (*mcpv1.StartServiceResponse, error) { + a.Logger.Info("StartService", "service", req.GetName()) + + if req.GetName() == "" { + return nil, status.Error(codes.InvalidArgument, "service name is required") + } + + components, err := registry.ListComponents(a.DB, req.GetName()) + if err != nil { + return nil, status.Errorf(codes.Internal, "list components: %v", err) + } + + var results []*mcpv1.ComponentResult + for _, c := range components { + r := startComponent(ctx, a, req.GetName(), &c) + results = append(results, r) + } + + return &mcpv1.StartServiceResponse{Results: results}, nil +} + +// RestartService restarts all components of a service by stopping, removing, +// and re-creating each container. The desired_state is not changed. +func (a *Agent) RestartService(ctx context.Context, req *mcpv1.RestartServiceRequest) (*mcpv1.RestartServiceResponse, error) { + a.Logger.Info("RestartService", "service", req.GetName()) + + if req.GetName() == "" { + return nil, status.Error(codes.InvalidArgument, "service name is required") + } + + components, err := registry.ListComponents(a.DB, req.GetName()) + if err != nil { + return nil, status.Errorf(codes.Internal, "list components: %v", err) + } + + var results []*mcpv1.ComponentResult + for _, c := range components { + r := restartComponent(ctx, a, req.GetName(), &c) + results = append(results, r) + } + + return &mcpv1.RestartServiceResponse{Results: results}, nil +} + +// startComponent removes any existing container and runs a fresh one from +// the registry spec, then updates state to running. +func startComponent(ctx context.Context, a *Agent, service string, c *registry.Component) *mcpv1.ComponentResult { + containerName := service + "-" + c.Name + r := &mcpv1.ComponentResult{Name: c.Name, Success: true} + + // Remove any pre-existing container; ignore errors for non-existent ones. + _ = a.Runtime.Stop(ctx, containerName) + _ = a.Runtime.Remove(ctx, containerName) + + spec := componentToSpec(service, c) + if err := a.Runtime.Run(ctx, spec); err != nil { + r.Success = false + r.Error = fmt.Sprintf("run container: %v", err) + return r + } + + if err := registry.UpdateComponentState(a.DB, service, c.Name, "running", "running"); err != nil { + r.Success = false + r.Error = fmt.Sprintf("update state: %v", err) + } + return r +} + +// restartComponent stops, removes, and re-creates a container without +// changing the desired_state in the registry. +func restartComponent(ctx context.Context, a *Agent, service string, c *registry.Component) *mcpv1.ComponentResult { + containerName := service + "-" + c.Name + r := &mcpv1.ComponentResult{Name: c.Name, Success: true} + + _ = a.Runtime.Stop(ctx, containerName) + _ = a.Runtime.Remove(ctx, containerName) + + spec := componentToSpec(service, c) + if err := a.Runtime.Run(ctx, spec); err != nil { + r.Success = false + r.Error = fmt.Sprintf("run container: %v", err) + _ = registry.UpdateComponentState(a.DB, service, c.Name, "", "stopped") + return r + } + + if err := registry.UpdateComponentState(a.DB, service, c.Name, "", "running"); err != nil { + r.Success = false + r.Error = fmt.Sprintf("update state: %v", err) + } + return r +} + +// componentToSpec builds a runtime.ContainerSpec from a registry Component. +func componentToSpec(service string, c *registry.Component) runtime.ContainerSpec { + return runtime.ContainerSpec{ + Name: service + "-" + c.Name, + Image: c.Image, + Network: c.Network, + User: c.UserSpec, + Restart: c.Restart, + Ports: c.Ports, + Volumes: c.Volumes, + Cmd: c.Cmd, + } +} + +// componentExists checks whether a component already exists in the registry. +func componentExists(db *sql.DB, service, name string) bool { + _, err := registry.GetComponent(db, service, name) + return err == nil +} diff --git a/internal/agent/status.go b/internal/agent/status.go new file mode 100644 index 0000000..1f7a0c9 --- /dev/null +++ b/internal/agent/status.go @@ -0,0 +1,219 @@ +package agent + +import ( + "context" + "fmt" + "strings" + "time" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/registry" + "git.wntrmute.dev/kyle/mcp/internal/runtime" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// buildServiceInfo converts a registry Service and its components into a proto +// ServiceInfo message. +func buildServiceInfo(svc registry.Service, components []registry.Component) *mcpv1.ServiceInfo { + info := &mcpv1.ServiceInfo{ + Name: svc.Name, + Active: svc.Active, + } + for _, c := range components { + info.Components = append(info.Components, &mcpv1.ComponentInfo{ + Name: c.Name, + Image: c.Image, + DesiredState: c.DesiredState, + ObservedState: c.ObservedState, + Version: c.Version, + }) + } + return info +} + +// ListServices returns all services and their components from the registry. +// It does not query the container runtime. +func (a *Agent) ListServices(ctx context.Context, req *mcpv1.ListServicesRequest) (*mcpv1.ListServicesResponse, error) { + a.Logger.Debug("ListServices called") + + services, err := registry.ListServices(a.DB) + if err != nil { + return nil, fmt.Errorf("list services: %w", err) + } + + resp := &mcpv1.ListServicesResponse{} + for _, svc := range services { + components, err := registry.ListComponents(a.DB, svc.Name) + if err != nil { + return nil, fmt.Errorf("list components for %q: %w", svc.Name, err) + } + resp.Services = append(resp.Services, buildServiceInfo(svc, components)) + } + + return resp, nil +} + +// liveCheckServices queries the container runtime and reconciles with the +// registry. It returns a list of ServiceInfo messages with updated observed +// state. This shared logic is used by both LiveCheck and GetServiceStatus. +func (a *Agent) liveCheckServices(ctx context.Context) ([]*mcpv1.ServiceInfo, error) { + containers, err := a.Runtime.List(ctx) + if err != nil { + return nil, fmt.Errorf("runtime list: %w", err) + } + + runtimeByName := make(map[string]runtime.ContainerInfo, len(containers)) + for _, c := range containers { + runtimeByName[c.Name] = c + } + + matched := make(map[string]bool) + + services, err := registry.ListServices(a.DB) + if err != nil { + return nil, fmt.Errorf("list services: %w", err) + } + + var result []*mcpv1.ServiceInfo + for _, svc := range services { + components, err := registry.ListComponents(a.DB, svc.Name) + if err != nil { + return nil, fmt.Errorf("list components for %q: %w", svc.Name, err) + } + + info := &mcpv1.ServiceInfo{ + Name: svc.Name, + Active: svc.Active, + } + + for _, comp := range components { + containerName := svc.Name + "-" + comp.Name + ci := &mcpv1.ComponentInfo{ + Name: comp.Name, + Image: comp.Image, + DesiredState: comp.DesiredState, + Version: comp.Version, + } + + if rc, ok := runtimeByName[containerName]; ok { + ci.ObservedState = rc.State + matched[containerName] = true + } else { + ci.ObservedState = "removed" + } + + info.Components = append(info.Components, ci) + } + + result = append(result, info) + } + + for _, c := range containers { + if matched[c.Name] { + continue + } + + svcName, compName := splitContainerName(c.Name) + + result = append(result, &mcpv1.ServiceInfo{ + Name: svcName, + Active: false, + Components: []*mcpv1.ComponentInfo{ + { + Name: compName, + Image: c.Image, + DesiredState: "ignore", + ObservedState: c.State, + Version: c.Version, + }, + }, + }) + } + + return result, nil +} + +// LiveCheck queries the container runtime, reconciles against the registry, +// and returns the updated state for all services. +func (a *Agent) LiveCheck(ctx context.Context, req *mcpv1.LiveCheckRequest) (*mcpv1.LiveCheckResponse, error) { + a.Logger.Debug("LiveCheck called") + + services, err := a.liveCheckServices(ctx) + if err != nil { + return nil, fmt.Errorf("live check: %w", err) + } + + return &mcpv1.LiveCheckResponse{Services: services}, nil +} + +// GetServiceStatus performs a live check, detects drift, and returns recent +// events. If a service name is provided, results are filtered to that service. +func (a *Agent) GetServiceStatus(ctx context.Context, req *mcpv1.GetServiceStatusRequest) (*mcpv1.GetServiceStatusResponse, error) { + a.Logger.Debug("GetServiceStatus called", "service", req.GetName()) + + services, err := a.liveCheckServices(ctx) + if err != nil { + return nil, fmt.Errorf("live check: %w", err) + } + + if req.GetName() != "" { + var filtered []*mcpv1.ServiceInfo + for _, svc := range services { + if svc.Name == req.GetName() { + filtered = append(filtered, svc) + } + } + services = filtered + } + + var drift []*mcpv1.DriftInfo + for _, svc := range services { + for _, comp := range svc.Components { + if comp.DesiredState == "ignore" { + continue + } + if comp.DesiredState != comp.ObservedState { + drift = append(drift, &mcpv1.DriftInfo{ + Service: svc.Name, + Component: comp.Name, + DesiredState: comp.DesiredState, + ObservedState: comp.ObservedState, + }) + } + } + } + + since := time.Now().Add(-1 * time.Hour) + svcFilter := req.GetName() + events, err := registry.QueryEvents(a.DB, svcFilter, "", since, 50) + if err != nil { + return nil, fmt.Errorf("query events: %w", err) + } + + var protoEvents []*mcpv1.EventInfo + for _, e := range events { + protoEvents = append(protoEvents, &mcpv1.EventInfo{ + Service: e.Service, + Component: e.Component, + PrevState: e.PrevState, + NewState: e.NewState, + Timestamp: timestamppb.New(e.Timestamp), + }) + } + + return &mcpv1.GetServiceStatusResponse{ + Services: services, + Drift: drift, + RecentEvents: protoEvents, + }, nil +} + +// splitContainerName splits a container name like "metacrypt-api" into service +// and component parts. If there is no hyphen, the whole name is used as both +// the service and component name. +func splitContainerName(name string) (service, component string) { + if i := strings.Index(name, "-"); i >= 0 { + return name[:i], name[i+1:] + } + return name, name +} diff --git a/internal/agent/status_test.go b/internal/agent/status_test.go new file mode 100644 index 0000000..8a894c5 --- /dev/null +++ b/internal/agent/status_test.go @@ -0,0 +1,274 @@ +package agent + +import ( + "context" + "testing" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/registry" + "git.wntrmute.dev/kyle/mcp/internal/runtime" +) + +func TestListServices(t *testing.T) { + a := newTestAgent(t, &fakeRuntime{}) + ctx := context.Background() + + // Empty registry. + resp, err := a.ListServices(ctx, &mcpv1.ListServicesRequest{}) + if err != nil { + t.Fatalf("ListServices: %v", err) + } + if len(resp.Services) != 0 { + t.Fatalf("expected 0 services, got %d", len(resp.Services)) + } + + // Add a service with components. + if err := registry.CreateService(a.DB, "metacrypt", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "api", Service: "metacrypt", + Image: "img:v1", DesiredState: "running", ObservedState: "running", + Version: "v1", + }); err != nil { + t.Fatalf("create component: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "web", Service: "metacrypt", + Image: "img-web:v1", DesiredState: "running", ObservedState: "unknown", + Version: "v1", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + resp, err = a.ListServices(ctx, &mcpv1.ListServicesRequest{}) + if err != nil { + t.Fatalf("ListServices: %v", err) + } + if len(resp.Services) != 1 { + t.Fatalf("expected 1 service, got %d", len(resp.Services)) + } + svc := resp.Services[0] + if svc.Name != "metacrypt" || !svc.Active { + t.Fatalf("unexpected service: %+v", svc) + } + if len(svc.Components) != 2 { + t.Fatalf("expected 2 components, got %d", len(svc.Components)) + } + if svc.Components[0].Name != "api" { + t.Fatalf("expected first component 'api', got %q", svc.Components[0].Name) + } +} + +func TestLiveCheck(t *testing.T) { + rt := &fakeRuntime{ + containers: []runtime.ContainerInfo{ + {Name: "metacrypt-api", Image: "img:v1", State: "running", Version: "v1"}, + {Name: "unmanaged-thing", Image: "other:latest", State: "running", Version: "latest"}, + }, + } + a := newTestAgent(t, rt) + ctx := context.Background() + + // Set up registry with one service and one component. + if err := registry.CreateService(a.DB, "metacrypt", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "api", Service: "metacrypt", + Image: "img:v1", DesiredState: "running", ObservedState: "unknown", + Version: "v1", + }); err != nil { + t.Fatalf("create component: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "web", Service: "metacrypt", + Image: "img-web:v1", DesiredState: "running", ObservedState: "unknown", + Version: "v1", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + resp, err := a.LiveCheck(ctx, &mcpv1.LiveCheckRequest{}) + if err != nil { + t.Fatalf("LiveCheck: %v", err) + } + + // Should have 2 entries: the registered service and the unmanaged container. + if len(resp.Services) != 2 { + t.Fatalf("expected 2 service entries, got %d", len(resp.Services)) + } + + // First entry: metacrypt (registered). + mc := resp.Services[0] + if mc.Name != "metacrypt" { + t.Fatalf("expected 'metacrypt', got %q", mc.Name) + } + if len(mc.Components) != 2 { + t.Fatalf("expected 2 components, got %d", len(mc.Components)) + } + + // api should be running (found in runtime). + api := mc.Components[0] + if api.Name != "api" || api.ObservedState != "running" { + t.Fatalf("api: expected observed_state=running, got %q", api.ObservedState) + } + + // web should be removed (not found in runtime). + web := mc.Components[1] + if web.Name != "web" || web.ObservedState != "removed" { + t.Fatalf("web: expected observed_state=removed, got %q", web.ObservedState) + } + + // Second entry: unmanaged container. + unmanaged := resp.Services[1] + if unmanaged.Name != "unmanaged" { + t.Fatalf("expected unmanaged service name 'unmanaged', got %q", unmanaged.Name) + } + if len(unmanaged.Components) != 1 { + t.Fatalf("expected 1 unmanaged component, got %d", len(unmanaged.Components)) + } + uc := unmanaged.Components[0] + if uc.DesiredState != "ignore" { + t.Fatalf("unmanaged desired_state: expected 'ignore', got %q", uc.DesiredState) + } + if uc.ObservedState != "running" { + t.Fatalf("unmanaged observed_state: expected 'running', got %q", uc.ObservedState) + } +} + +func TestGetServiceStatus_DriftDetection(t *testing.T) { + rt := &fakeRuntime{ + containers: []runtime.ContainerInfo{ + {Name: "metacrypt-api", Image: "img:v1", State: "exited", Version: "v1"}, + }, + } + a := newTestAgent(t, rt) + ctx := context.Background() + + if err := registry.CreateService(a.DB, "metacrypt", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "api", Service: "metacrypt", + Image: "img:v1", DesiredState: "running", ObservedState: "running", + Version: "v1", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + // Add an event so we can verify it appears. + if err := registry.InsertEvent(a.DB, "metacrypt", "api", "running", "exited"); err != nil { + t.Fatalf("insert event: %v", err) + } + + resp, err := a.GetServiceStatus(ctx, &mcpv1.GetServiceStatusRequest{Name: "metacrypt"}) + if err != nil { + t.Fatalf("GetServiceStatus: %v", err) + } + + if len(resp.Services) != 1 { + t.Fatalf("expected 1 service, got %d", len(resp.Services)) + } + if resp.Services[0].Name != "metacrypt" { + t.Fatalf("expected 'metacrypt', got %q", resp.Services[0].Name) + } + + // Drift: desired=running, observed=exited. + if len(resp.Drift) != 1 { + t.Fatalf("expected 1 drift entry, got %d", len(resp.Drift)) + } + d := resp.Drift[0] + if d.Service != "metacrypt" || d.Component != "api" { + t.Fatalf("drift: unexpected service/component: %q/%q", d.Service, d.Component) + } + if d.DesiredState != "running" || d.ObservedState != "exited" { + t.Fatalf("drift: desired=%q, observed=%q", d.DesiredState, d.ObservedState) + } + + // Events. + if len(resp.RecentEvents) != 1 { + t.Fatalf("expected 1 event, got %d", len(resp.RecentEvents)) + } + ev := resp.RecentEvents[0] + if ev.PrevState != "running" || ev.NewState != "exited" { + t.Fatalf("event: prev=%q, new=%q", ev.PrevState, ev.NewState) + } +} + +func TestGetServiceStatus_FilterByName(t *testing.T) { + rt := &fakeRuntime{ + containers: []runtime.ContainerInfo{ + {Name: "metacrypt-api", Image: "img:v1", State: "running", Version: "v1"}, + {Name: "mcr-api", Image: "mcr:v1", State: "running", Version: "v1"}, + }, + } + a := newTestAgent(t, rt) + ctx := context.Background() + + for _, svc := range []string{"metacrypt", "mcr"} { + if err := registry.CreateService(a.DB, svc, true); err != nil { + t.Fatalf("create service %q: %v", svc, err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "api", Service: svc, + Image: svc + ":v1", DesiredState: "running", ObservedState: "unknown", + Version: "v1", + }); err != nil { + t.Fatalf("create component for %q: %v", svc, err) + } + } + + resp, err := a.GetServiceStatus(ctx, &mcpv1.GetServiceStatusRequest{Name: "mcr"}) + if err != nil { + t.Fatalf("GetServiceStatus: %v", err) + } + if len(resp.Services) != 1 { + t.Fatalf("expected 1 service, got %d", len(resp.Services)) + } + if resp.Services[0].Name != "mcr" { + t.Fatalf("expected 'mcr', got %q", resp.Services[0].Name) + } +} + +func TestGetServiceStatus_IgnoreSkipsDrift(t *testing.T) { + rt := &fakeRuntime{ + containers: []runtime.ContainerInfo{ + {Name: "random-thing", Image: "img:v1", State: "running", Version: "v1"}, + }, + } + a := newTestAgent(t, rt) + ctx := context.Background() + + // No registered services, only an unmanaged container. + resp, err := a.GetServiceStatus(ctx, &mcpv1.GetServiceStatusRequest{}) + if err != nil { + t.Fatalf("GetServiceStatus: %v", err) + } + + // The unmanaged container should not appear in drift. + if len(resp.Drift) != 0 { + t.Fatalf("expected 0 drift entries for unmanaged, got %d", len(resp.Drift)) + } +} + +func TestSplitContainerName(t *testing.T) { + tests := []struct { + name string + service string + comp string + }{ + {"metacrypt-api", "metacrypt", "api"}, + {"metacrypt-web-ui", "metacrypt", "web-ui"}, + {"standalone", "standalone", "standalone"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + svc, comp := splitContainerName(tt.name) + if svc != tt.service || comp != tt.comp { + t.Fatalf("splitContainerName(%q) = (%q, %q), want (%q, %q)", + tt.name, svc, comp, tt.service, tt.comp) + } + }) + } +} diff --git a/internal/agent/sync.go b/internal/agent/sync.go new file mode 100644 index 0000000..7d5dfa7 --- /dev/null +++ b/internal/agent/sync.go @@ -0,0 +1,183 @@ +package agent + +import ( + "context" + "fmt" + "strings" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/registry" + "git.wntrmute.dev/kyle/mcp/internal/runtime" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// SyncDesiredState reconciles the agent's registry with the declared service +// specs. It creates or updates services and components, then discovers +// untracked containers and adds them with desired_state "ignore". +func (a *Agent) SyncDesiredState(ctx context.Context, req *mcpv1.SyncDesiredStateRequest) (*mcpv1.SyncDesiredStateResponse, error) { + a.Logger.Info("SyncDesiredState", "services", len(req.GetServices())) + + known := make(map[string]bool) + + var results []*mcpv1.ServiceSyncResult + for _, spec := range req.GetServices() { + if spec.GetName() == "" { + return nil, status.Error(codes.InvalidArgument, "service name is required") + } + r, names, err := a.syncService(ctx, spec) + if err != nil { + return nil, err + } + for _, n := range names { + known[n] = true + } + results = append(results, r) + } + + // Reconciliation: find containers not in the registry and add them. + if err := a.reconcileUntracked(ctx, known); err != nil { + a.Logger.Info("reconcile untracked containers failed", "error", err) + } + + return &mcpv1.SyncDesiredStateResponse{Results: results}, nil +} + +// syncService creates or updates a single service and its components. +// It returns the sync result and a list of container names that belong to +// this service. +func (a *Agent) syncService(_ context.Context, spec *mcpv1.ServiceSpec) (*mcpv1.ServiceSyncResult, []string, error) { + result := &mcpv1.ServiceSyncResult{Name: spec.GetName()} + var changes []string + var containerNames []string + + desiredState := "running" + if !spec.GetActive() { + desiredState = "stopped" + } + + // Create or update the service record. + existing, err := registry.GetService(a.DB, spec.GetName()) + if err != nil { + // Service does not exist; create it. + if err := registry.CreateService(a.DB, spec.GetName(), spec.GetActive()); err != nil { + return nil, nil, status.Errorf(codes.Internal, "create service %q: %v", spec.GetName(), err) + } + changes = append(changes, "created service") + } else if existing.Active != spec.GetActive() { + if err := registry.UpdateServiceActive(a.DB, spec.GetName(), spec.GetActive()); err != nil { + return nil, nil, status.Errorf(codes.Internal, "update service %q: %v", spec.GetName(), err) + } + changes = append(changes, fmt.Sprintf("active: %v -> %v", existing.Active, spec.GetActive())) + } + + // Create or update each component. + for _, cs := range spec.GetComponents() { + containerName := spec.GetName() + "-" + cs.GetName() + containerNames = append(containerNames, containerName) + + comp := protoToComponent(spec.GetName(), cs, desiredState) + + if componentExists(a.DB, spec.GetName(), cs.GetName()) { + if err := registry.UpdateComponentSpec(a.DB, comp); err != nil { + return nil, nil, status.Errorf(codes.Internal, "update component %q/%q: %v", spec.GetName(), cs.GetName(), err) + } + if err := registry.UpdateComponentState(a.DB, spec.GetName(), cs.GetName(), desiredState, ""); err != nil { + return nil, nil, status.Errorf(codes.Internal, "update component state %q/%q: %v", spec.GetName(), cs.GetName(), err) + } + changes = append(changes, fmt.Sprintf("updated %s", cs.GetName())) + } else { + if err := registry.CreateComponent(a.DB, comp); err != nil { + return nil, nil, status.Errorf(codes.Internal, "create component %q/%q: %v", spec.GetName(), cs.GetName(), err) + } + changes = append(changes, fmt.Sprintf("created %s", cs.GetName())) + } + } + + result.Changed = len(changes) > 0 + result.Summary = strings.Join(changes, "; ") + if !result.Changed { + result.Summary = "no changes" + } + + a.Logger.Info("sync service", "service", spec.GetName(), "changed", result.Changed, "summary", result.Summary) + return result, containerNames, nil +} + +// reconcileUntracked lists all containers from the runtime and adds any that +// are not already tracked in the registry with desired_state "ignore". +func (a *Agent) reconcileUntracked(ctx context.Context, known map[string]bool) error { + containers, err := a.Runtime.List(ctx) + if err != nil { + return fmt.Errorf("list containers: %w", err) + } + + for _, c := range containers { + if known[c.Name] { + continue + } + + service, component, ok := parseContainerName(c.Name) + if !ok { + continue + } + + if componentExists(a.DB, service, component) { + continue + } + + if _, err := registry.GetService(a.DB, service); err != nil { + if err := registry.CreateService(a.DB, service, true); err != nil { + a.Logger.Info("reconcile: create service failed", "service", service, "error", err) + continue + } + } + + comp := ®istry.Component{ + Name: component, + Service: service, + Image: c.Image, + Network: c.Network, + UserSpec: c.User, + Restart: c.Restart, + DesiredState: "ignore", + ObservedState: c.State, + Version: runtime.ExtractVersion(c.Image), + } + if err := registry.CreateComponent(a.DB, comp); err != nil { + a.Logger.Info("reconcile: create component failed", "container", c.Name, "error", err) + continue + } + + a.Logger.Info("reconcile: adopted untracked container", "container", c.Name, "desired_state", "ignore") + } + + return nil +} + +// protoToComponent converts a proto ComponentSpec to a registry Component. +func protoToComponent(service string, cs *mcpv1.ComponentSpec, desiredState string) *registry.Component { + return ®istry.Component{ + Name: cs.GetName(), + Service: service, + Image: cs.GetImage(), + Network: cs.GetNetwork(), + UserSpec: cs.GetUser(), + Restart: cs.GetRestart(), + Ports: cs.GetPorts(), + Volumes: cs.GetVolumes(), + Cmd: cs.GetCmd(), + DesiredState: desiredState, + Version: runtime.ExtractVersion(cs.GetImage()), + } +} + +// parseContainerName splits "service-component" into its parts. Returns false +// if the name does not contain a hyphen. +func parseContainerName(name string) (service, component string, ok bool) { + i := strings.IndexByte(name, '-') + if i < 0 { + return "", "", false + } + return name[:i], name[i+1:], true +} diff --git a/internal/agent/testhelpers_test.go b/internal/agent/testhelpers_test.go new file mode 100644 index 0000000..472017b --- /dev/null +++ b/internal/agent/testhelpers_test.go @@ -0,0 +1,61 @@ +package agent + +import ( + "context" + "log/slog" + "path/filepath" + "testing" + + "git.wntrmute.dev/kyle/mcp/internal/config" + "git.wntrmute.dev/kyle/mcp/internal/registry" + "git.wntrmute.dev/kyle/mcp/internal/runtime" +) + +// fakeRuntime implements runtime.Runtime for testing. +type fakeRuntime struct { + containers []runtime.ContainerInfo + inspectMap map[string]runtime.ContainerInfo + listErr error +} + +func (f *fakeRuntime) Pull(_ context.Context, _ string) error { return nil } +func (f *fakeRuntime) Run(_ context.Context, _ runtime.ContainerSpec) error { return nil } +func (f *fakeRuntime) Stop(_ context.Context, _ string) error { return nil } +func (f *fakeRuntime) Remove(_ context.Context, _ string) error { return nil } + +func (f *fakeRuntime) List(_ context.Context) ([]runtime.ContainerInfo, error) { + return f.containers, f.listErr +} + +func (f *fakeRuntime) Inspect(_ context.Context, name string) (runtime.ContainerInfo, error) { + if f.inspectMap != nil { + if info, ok := f.inspectMap[name]; ok { + return info, nil + } + } + for _, c := range f.containers { + if c.Name == name { + return c, nil + } + } + return runtime.ContainerInfo{}, nil +} + +// newTestAgent creates an Agent with a temp database for testing. +func newTestAgent(t *testing.T, rt runtime.Runtime) *Agent { + t.Helper() + db, err := registry.Open(filepath.Join(t.TempDir(), "test.db")) + if err != nil { + t.Fatalf("open test db: %v", err) + } + t.Cleanup(func() { _ = db.Close() }) + + return &Agent{ + Config: &config.AgentConfig{ + Agent: config.AgentSettings{NodeName: "test-node"}, + }, + DB: db, + Runtime: rt, + Logger: slog.Default(), + } +} diff --git a/internal/monitor/alerting.go b/internal/monitor/alerting.go new file mode 100644 index 0000000..befd77d --- /dev/null +++ b/internal/monitor/alerting.go @@ -0,0 +1,113 @@ +package monitor + +import ( + "database/sql" + "fmt" + "log/slog" + "os" + "os/exec" + "time" + + "git.wntrmute.dev/kyle/mcp/internal/config" + "git.wntrmute.dev/kyle/mcp/internal/registry" +) + +// Alerter evaluates state transitions and fires alerts for drift or flapping. +type Alerter struct { + command []string + cooldown time.Duration + flapThreshold int + flapWindow time.Duration + nodeName string + db *sql.DB + logger *slog.Logger + lastAlert map[string]time.Time // key: "service/component" +} + +// NewAlerter creates an Alerter from monitoring configuration. +func NewAlerter(cfg config.MonitorConfig, nodeName string, db *sql.DB, logger *slog.Logger) *Alerter { + return &Alerter{ + command: cfg.AlertCommand, + cooldown: cfg.Cooldown.Duration, + flapThreshold: cfg.FlapThreshold, + flapWindow: cfg.FlapWindow.Duration, + nodeName: nodeName, + db: db, + logger: logger, + lastAlert: make(map[string]time.Time), + } +} + +// Evaluate checks a component's state transition and fires alerts as needed. +// It is called for every component on each monitor tick. +func (al *Alerter) Evaluate(service, component, desiredState, observedState, prevState string) { + if desiredState == "ignore" { + return + } + + key := service + "/" + component + + // Drift check: desired state does not match observed state. + if desiredState != observedState { + if al.cooledDown(key) { + al.fire("drift", service, component, desiredState, observedState, prevState, 0) + } + } + + // Flap check: too many transitions in the flap window. + if observedState != prevState { + count, err := registry.CountEvents(al.db, service, component, time.Now().Add(-al.flapWindow)) + if err != nil { + al.logger.Error("alerter: count events", "error", err, "key", key) + return + } + + if count >= al.flapThreshold { + if al.cooledDown(key) { + al.fire("flapping", service, component, desiredState, observedState, prevState, count) + } + } + } +} + +// cooledDown returns true and records the alert time if enough time has +// elapsed since the last alert for this key. Returns false if suppressed. +func (al *Alerter) cooledDown(key string) bool { + if last, ok := al.lastAlert[key]; ok { + if time.Since(last) < al.cooldown { + return false + } + } + al.lastAlert[key] = time.Now() + return true +} + +func (al *Alerter) fire(alertType, service, component, desired, observed, prev string, transitions int) { + al.logger.Warn("alert", + "type", alertType, + "service", service, + "component", component, + "desired", desired, + "observed", observed, + ) + + if len(al.command) == 0 { + return + } + + cmd := exec.Command(al.command[0], al.command[1:]...) //nolint:gosec // alert command from trusted config + cmd.Env = append(os.Environ(), + "MCP_COMPONENT="+component, + "MCP_SERVICE="+service, + "MCP_NODE="+al.nodeName, + "MCP_DESIRED="+desired, + "MCP_OBSERVED="+observed, + "MCP_PREV_STATE="+prev, + "MCP_ALERT_TYPE="+alertType, + "MCP_TRANSITIONS="+fmt.Sprintf("%d", transitions), + ) + + if out, err := cmd.CombinedOutput(); err != nil { + al.logger.Error("alert command failed", "error", err, "output", string(out)) + } +} diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go new file mode 100644 index 0000000..0f4e02a --- /dev/null +++ b/internal/monitor/monitor.go @@ -0,0 +1,157 @@ +package monitor + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "time" + + "git.wntrmute.dev/kyle/mcp/internal/config" + "git.wntrmute.dev/kyle/mcp/internal/registry" + "git.wntrmute.dev/kyle/mcp/internal/runtime" +) + +// Monitor watches container states and compares them to the registry, +// recording events and firing alerts on drift or flapping. +type Monitor struct { + db *sql.DB + runtime runtime.Runtime + cfg config.MonitorConfig + logger *slog.Logger + alerter *Alerter + stopCh chan struct{} + done chan struct{} + + prevState map[string]string // key: "service/component", value: observed state +} + +// New creates a Monitor with the given dependencies. +func New(db *sql.DB, rt runtime.Runtime, cfg config.MonitorConfig, nodeName string, logger *slog.Logger) *Monitor { + return &Monitor{ + db: db, + runtime: rt, + cfg: cfg, + logger: logger, + alerter: NewAlerter(cfg, nodeName, db, logger), + stopCh: make(chan struct{}), + done: make(chan struct{}), + prevState: make(map[string]string), + } +} + +// Start launches the monitoring goroutine. +func (m *Monitor) Start() { + go m.run() +} + +// Stop signals the monitoring goroutine to stop and waits for it to exit. +func (m *Monitor) Stop() { + close(m.stopCh) + <-m.done +} + +func (m *Monitor) run() { + defer close(m.done) + defer func() { + if r := recover(); r != nil { + m.logger.Error("monitor panic recovered", "panic", fmt.Sprintf("%v", r)) + } + }() + + ticker := time.NewTicker(m.cfg.Interval.Duration) + defer ticker.Stop() + + for { + select { + case <-m.stopCh: + return + case <-ticker.C: + m.tick() + } + } +} + +func (m *Monitor) tick() { + defer func() { + if r := recover(); r != nil { + m.logger.Error("monitor tick panic recovered", "panic", fmt.Sprintf("%v", r)) + } + }() + + ctx := context.Background() + + // Get the current runtime state of all containers. + containers, err := m.runtime.List(ctx) + if err != nil { + m.logger.Error("monitor: list containers", "error", err) + return + } + + // Index runtime containers by name for fast lookup. + runtimeState := make(map[string]string, len(containers)) + for _, c := range containers { + runtimeState[c.Name] = c.State + } + + // Walk all registered services and their components. + services, err := registry.ListServices(m.db) + if err != nil { + m.logger.Error("monitor: list services", "error", err) + return + } + + seen := make(map[string]struct{}) + + for _, svc := range services { + components, err := registry.ListComponents(m.db, svc.Name) + if err != nil { + m.logger.Error("monitor: list components", "error", err, "service", svc.Name) + continue + } + + for _, comp := range components { + key := comp.Service + "/" + comp.Name + seen[key] = struct{}{} + containerName := comp.Service + "-" + comp.Name + + observed := "unknown" + if state, ok := runtimeState[containerName]; ok { + observed = state + } + + prev, hasPrev := m.prevState[key] + if !hasPrev { + prev = comp.ObservedState + } + + if observed != prev { + if err := registry.InsertEvent(m.db, comp.Service, comp.Name, prev, observed); err != nil { + m.logger.Error("monitor: insert event", "error", err, "key", key) + } + + if err := registry.UpdateComponentState(m.db, comp.Service, comp.Name, "", observed); err != nil { + m.logger.Error("monitor: update observed state", "error", err, "key", key) + } + + m.logger.Info("state change", "service", comp.Service, "component", comp.Name, "prev", prev, "observed", observed) + } + + m.alerter.Evaluate(comp.Service, comp.Name, comp.DesiredState, observed, prev) + + m.prevState[key] = observed + } + } + + // Evict entries for components that no longer exist in the registry. + for key := range m.prevState { + if _, ok := seen[key]; !ok { + delete(m.prevState, key) + } + } + + // Prune old events. + if _, err := registry.PruneEvents(m.db, time.Now().Add(-m.cfg.Retention.Duration)); err != nil { + m.logger.Error("monitor: prune events", "error", err) + } +} diff --git a/internal/monitor/monitor_test.go b/internal/monitor/monitor_test.go new file mode 100644 index 0000000..1b131ed --- /dev/null +++ b/internal/monitor/monitor_test.go @@ -0,0 +1,280 @@ +package monitor + +import ( + "context" + "database/sql" + "log/slog" + "os" + "path/filepath" + "testing" + "time" + + "git.wntrmute.dev/kyle/mcp/internal/config" + "git.wntrmute.dev/kyle/mcp/internal/registry" + "git.wntrmute.dev/kyle/mcp/internal/runtime" +) + +func openTestDB(t *testing.T) *sql.DB { + t.Helper() + db, err := registry.Open(filepath.Join(t.TempDir(), "test.db")) + if err != nil { + t.Fatalf("open db: %v", err) + } + t.Cleanup(func() { _ = db.Close() }) + return db +} + +func testLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) +} + +func testMonitorConfig() config.MonitorConfig { + return config.MonitorConfig{ + Interval: config.Duration{Duration: 1 * time.Second}, + Cooldown: config.Duration{Duration: 1 * time.Minute}, + FlapThreshold: 3, + FlapWindow: config.Duration{Duration: 10 * time.Minute}, + Retention: config.Duration{Duration: 24 * time.Hour}, + } +} + +// fakeRuntime implements runtime.Runtime for testing. +type fakeRuntime struct { + containers []runtime.ContainerInfo +} + +func (f *fakeRuntime) Pull(_ context.Context, _ string) error { return nil } +func (f *fakeRuntime) Run(_ context.Context, _ runtime.ContainerSpec) error { return nil } +func (f *fakeRuntime) Stop(_ context.Context, _ string) error { return nil } +func (f *fakeRuntime) Remove(_ context.Context, _ string) error { return nil } + +func (f *fakeRuntime) Inspect(_ context.Context, _ string) (runtime.ContainerInfo, error) { + return runtime.ContainerInfo{}, nil +} + +func (f *fakeRuntime) List(_ context.Context) ([]runtime.ContainerInfo, error) { + return f.containers, nil +} + +func TestAlerterDriftDetection(t *testing.T) { + db := openTestDB(t) + logger := testLogger() + cfg := testMonitorConfig() + + al := NewAlerter(cfg, "test-node", db, logger) + + // Set up a service and component so CountEvents works. + if err := registry.CreateService(db, "metacrypt", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(db, ®istry.Component{ + Name: "api", Service: "metacrypt", Image: "img:v1", + Restart: "unless-stopped", DesiredState: "running", ObservedState: "running", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + // Desired is "running" but observed is "exited" -- drift should fire. + al.Evaluate("metacrypt", "api", "running", "exited", "running") + + // Verify alert was recorded (lastAlert should be set). + key := "metacrypt/api" + if _, ok := al.lastAlert[key]; !ok { + t.Fatal("expected drift alert to be recorded in lastAlert") + } +} + +func TestAlerterIgnoreState(t *testing.T) { + db := openTestDB(t) + logger := testLogger() + cfg := testMonitorConfig() + + al := NewAlerter(cfg, "test-node", db, logger) + + // Components with desired_state "ignore" should not trigger alerts. + al.Evaluate("metacrypt", "api", "ignore", "exited", "running") + + key := "metacrypt/api" + if _, ok := al.lastAlert[key]; ok { + t.Fatal("expected no alert for ignored component") + } +} + +func TestAlerterCooldownSuppression(t *testing.T) { + db := openTestDB(t) + logger := testLogger() + cfg := testMonitorConfig() + cfg.Cooldown.Duration = 1 * time.Hour // long cooldown + + al := NewAlerter(cfg, "test-node", db, logger) + + if err := registry.CreateService(db, "metacrypt", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(db, ®istry.Component{ + Name: "api", Service: "metacrypt", Image: "img:v1", + Restart: "unless-stopped", DesiredState: "running", ObservedState: "running", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + // First call should fire. + al.Evaluate("metacrypt", "api", "running", "exited", "running") + key := "metacrypt/api" + first, ok := al.lastAlert[key] + if !ok { + t.Fatal("expected first alert to fire") + } + + // Second call should be suppressed (within cooldown). + al.Evaluate("metacrypt", "api", "running", "exited", "exited") + second := al.lastAlert[key] + if !second.Equal(first) { + t.Fatal("expected second alert to be suppressed by cooldown") + } +} + +func TestAlerterFlapDetection(t *testing.T) { + db := openTestDB(t) + logger := testLogger() + cfg := testMonitorConfig() + cfg.FlapThreshold = 2 + cfg.FlapWindow.Duration = 10 * time.Minute + cfg.Cooldown.Duration = 0 // disable cooldown for this test + + al := NewAlerter(cfg, "test-node", db, logger) + + if err := registry.CreateService(db, "metacrypt", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(db, ®istry.Component{ + Name: "api", Service: "metacrypt", Image: "img:v1", + Restart: "unless-stopped", DesiredState: "running", ObservedState: "unknown", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + // Insert enough events to exceed the flap threshold. + for i := 0; i < 3; i++ { + if err := registry.InsertEvent(db, "metacrypt", "api", "running", "exited"); err != nil { + t.Fatalf("insert event %d: %v", i, err) + } + } + + // Evaluate with a state transition -- should detect flapping. + al.Evaluate("metacrypt", "api", "running", "exited", "running") + + key := "metacrypt/api" + if _, ok := al.lastAlert[key]; !ok { + t.Fatal("expected flap alert to fire") + } +} + +func TestMonitorTickStateChange(t *testing.T) { + db := openTestDB(t) + logger := testLogger() + cfg := testMonitorConfig() + + if err := registry.CreateService(db, "metacrypt", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(db, ®istry.Component{ + Name: "api", Service: "metacrypt", Image: "img:v1", + Restart: "unless-stopped", DesiredState: "running", ObservedState: "unknown", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + rt := &fakeRuntime{ + containers: []runtime.ContainerInfo{ + {Name: "metacrypt-api", State: "running"}, + }, + } + + m := New(db, rt, cfg, "test-node", logger) + + // Run a single tick. + m.tick() + + // Verify observed state was updated in the registry. + comp, err := registry.GetComponent(db, "metacrypt", "api") + if err != nil { + t.Fatalf("get component: %v", err) + } + if comp.ObservedState != "running" { + t.Fatalf("observed state: got %q, want %q", comp.ObservedState, "running") + } + + // Verify an event was recorded (unknown -> running). + events, err := registry.QueryEvents(db, "metacrypt", "api", time.Now().Add(-1*time.Hour), 0) + if err != nil { + t.Fatalf("query events: %v", err) + } + if len(events) != 1 { + t.Fatalf("events: got %d, want 1", len(events)) + } + if events[0].PrevState != "unknown" || events[0].NewState != "running" { + t.Fatalf("event: got %q->%q, want unknown->running", events[0].PrevState, events[0].NewState) + } + + // Verify prevState map was updated. + if m.prevState["metacrypt/api"] != "running" { + t.Fatalf("prevState: got %q, want %q", m.prevState["metacrypt/api"], "running") + } +} + +func TestMonitorStartStop(t *testing.T) { + db := openTestDB(t) + logger := testLogger() + cfg := testMonitorConfig() + cfg.Interval.Duration = 50 * time.Millisecond + + rt := &fakeRuntime{} + m := New(db, rt, cfg, "test-node", logger) + + m.Start() + + // Give it a moment to tick at least once. + time.Sleep(150 * time.Millisecond) + + m.Stop() + + // If Stop returns, the goroutine exited cleanly. +} + +func TestMonitorNoChangeNoEvent(t *testing.T) { + db := openTestDB(t) + logger := testLogger() + cfg := testMonitorConfig() + + if err := registry.CreateService(db, "metacrypt", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(db, ®istry.Component{ + Name: "api", Service: "metacrypt", Image: "img:v1", + Restart: "unless-stopped", DesiredState: "running", ObservedState: "running", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + rt := &fakeRuntime{ + containers: []runtime.ContainerInfo{ + {Name: "metacrypt-api", State: "running"}, + }, + } + + m := New(db, rt, cfg, "test-node", logger) + // Seed prevState so that observed == prev (no change). + m.prevState["metacrypt/api"] = "running" + + m.tick() + + // No events should be recorded when state is unchanged. + events, err := registry.QueryEvents(db, "metacrypt", "api", time.Now().Add(-1*time.Hour), 0) + if err != nil { + t.Fatalf("query events: %v", err) + } + if len(events) != 0 { + t.Fatalf("events: got %d, want 0 (no state change)", len(events)) + } +}