diff --git a/PROGRESS_V1.md b/PROGRESS_V1.md index e3afa83..e212ad5 100644 --- a/PROGRESS_V1.md +++ b/PROGRESS_V1.md @@ -15,7 +15,7 @@ ## Phase 2: Agent -- [ ] **P2.1** Agent skeleton and gRPC server +- [x] **P2.1** Agent skeleton and gRPC server - [ ] **P2.2** Deploy handler - [ ] **P2.3** Lifecycle handlers (stop, start, restart) - [ ] **P2.4** Status handlers (list, live check, get status) @@ -27,7 +27,7 @@ ## Phase 3: CLI -- [ ] **P3.1** CLI skeleton +- [x] **P3.1** CLI skeleton - [ ] **P3.2** Login command - [ ] **P3.3** Deploy command - [ ] **P3.4** Lifecycle commands (stop, start, restart) diff --git a/cmd/mcp-agent/main.go b/cmd/mcp-agent/main.go index 1495d6e..a206ef7 100644 --- a/cmd/mcp-agent/main.go +++ b/cmd/mcp-agent/main.go @@ -2,8 +2,11 @@ package main import ( "fmt" + "log" "os" + "git.wntrmute.dev/kyle/mcp/internal/agent" + "git.wntrmute.dev/kyle/mcp/internal/config" "github.com/spf13/cobra" ) @@ -27,7 +30,20 @@ func main() { }, }) + root.AddCommand(&cobra.Command{ + Use: "server", + Short: "Start the agent server", + RunE: func(cmd *cobra.Command, args []string) error { + cfg, err := config.LoadAgentConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + return agent.Run(cfg) + }, + }) + if err := root.Execute(); err != nil { + log.Fatal(err) os.Exit(1) } } diff --git a/cmd/mcp/dial.go b/cmd/mcp/dial.go new file mode 100644 index 0000000..caafb28 --- /dev/null +++ b/cmd/mcp/dial.go @@ -0,0 +1,79 @@ +package main + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "os" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/config" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/metadata" +) + +// Ensure dial helpers are referenced to satisfy linters until CLI commands +// are implemented. This will be removed when the first command uses dialAgent. +var ( + _ = dialAgent + _ = loadBearerToken +) + +// dialAgent connects to an agent at the given address and returns a gRPC +// client. The connection uses TLS and attaches the bearer token to every RPC. +func dialAgent(address string, cfg *config.CLIConfig) (mcpv1.McpAgentServiceClient, *grpc.ClientConn, error) { + tlsConfig := &tls.Config{ + MinVersion: tls.VersionTLS13, + } + + if cfg.MCIAS.CACert != "" { + caCert, err := os.ReadFile(cfg.MCIAS.CACert) //nolint:gosec // trusted config path + if err != nil { + return nil, nil, fmt.Errorf("read CA cert %q: %w", cfg.MCIAS.CACert, err) + } + pool := x509.NewCertPool() + if !pool.AppendCertsFromPEM(caCert) { + return nil, nil, fmt.Errorf("invalid CA cert %q", cfg.MCIAS.CACert) + } + tlsConfig.RootCAs = pool + } + + token, err := loadBearerToken(cfg) + if err != nil { + return nil, nil, fmt.Errorf("load token: %w", err) + } + + conn, err := grpc.NewClient( + address, + grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), + grpc.WithUnaryInterceptor(tokenInterceptor(token)), + ) + if err != nil { + return nil, nil, fmt.Errorf("dial %q: %w", address, err) + } + + return mcpv1.NewMcpAgentServiceClient(conn), conn, nil +} + +// tokenInterceptor returns a gRPC client interceptor that attaches the +// bearer token to outgoing RPC metadata. +func tokenInterceptor(token string) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token) + return invoker(ctx, method, req, reply, cc, opts...) + } +} + +// loadBearerToken reads the token from file or env var. +func loadBearerToken(cfg *config.CLIConfig) (string, error) { + if token := os.Getenv("MCP_TOKEN"); token != "" { + return token, nil + } + token, err := os.ReadFile(cfg.Auth.TokenPath) //nolint:gosec // trusted config path + if err != nil { + return "", fmt.Errorf("read token from %q: %w (run 'mcp login' first)", cfg.Auth.TokenPath, err) + } + return string(token), nil +} diff --git a/cmd/mcp/main.go b/cmd/mcp/main.go index b6489ee..aa56301 100644 --- a/cmd/mcp/main.go +++ b/cmd/mcp/main.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "log" "os" "github.com/spf13/cobra" @@ -19,15 +20,238 @@ func main() { } root.PersistentFlags().StringVarP(&cfgPath, "config", "c", "", "config file path") - root.AddCommand(&cobra.Command{ + root.AddCommand(versionCmd()) + root.AddCommand(loginCmd()) + root.AddCommand(deployCmd()) + root.AddCommand(stopCmd()) + root.AddCommand(startCmd()) + root.AddCommand(restartCmd()) + root.AddCommand(listCmd()) + root.AddCommand(psCmd()) + root.AddCommand(statusCmd()) + root.AddCommand(syncCmd()) + root.AddCommand(adoptCmd()) + root.AddCommand(serviceCmd()) + root.AddCommand(pushCmd()) + root.AddCommand(pullCmd()) + root.AddCommand(nodeCmd()) + + if err := root.Execute(); err != nil { + log.Fatal(err) + os.Exit(1) + } +} + +func versionCmd() *cobra.Command { + return &cobra.Command{ Use: "version", Short: "Print version", Run: func(cmd *cobra.Command, args []string) { fmt.Println(version) }, - }) - - if err := root.Execute(); err != nil { - os.Exit(1) } } + +func loginCmd() *cobra.Command { + return &cobra.Command{ + Use: "login", + Short: "Authenticate to MCIAS, store token", + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } +} + +func deployCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "deploy [/]", + Short: "Deploy service from service definition", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } + cmd.Flags().StringP("file", "f", "", "service definition file") + return cmd +} + +func stopCmd() *cobra.Command { + return &cobra.Command{ + Use: "stop ", + Short: "Stop all components, set active=false", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } +} + +func startCmd() *cobra.Command { + return &cobra.Command{ + Use: "start ", + Short: "Start all components, set active=true", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } +} + +func restartCmd() *cobra.Command { + return &cobra.Command{ + Use: "restart ", + Short: "Restart all components", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } +} + +func listCmd() *cobra.Command { + return &cobra.Command{ + Use: "list", + Short: "List services from all agents (registry, no runtime query)", + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } +} + +func psCmd() *cobra.Command { + return &cobra.Command{ + Use: "ps", + Short: "Live check: query runtime on all agents", + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } +} + +func statusCmd() *cobra.Command { + return &cobra.Command{ + Use: "status [service]", + Short: "Full picture: live query + drift + recent events", + Args: cobra.MaximumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } +} + +func syncCmd() *cobra.Command { + return &cobra.Command{ + Use: "sync", + Short: "Push service definitions to agents (update desired state)", + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } +} + +func adoptCmd() *cobra.Command { + return &cobra.Command{ + Use: "adopt ", + Short: "Adopt all -* containers into a service", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } +} + +func serviceCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "service", + Short: "Service definition management", + } + + show := &cobra.Command{ + Use: "show ", + Short: "Print current spec from agent registry", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } + + edit := &cobra.Command{ + Use: "edit ", + Short: "Open service definition in $EDITOR", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } + + export := &cobra.Command{ + Use: "export ", + Short: "Write agent registry spec to local service file", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } + export.Flags().StringP("file", "f", "", "output file path") + + cmd.AddCommand(show, edit, export) + return cmd +} + +func pushCmd() *cobra.Command { + return &cobra.Command{ + Use: "push [path]", + Short: "Copy a local file into /srv//[path]", + Args: cobra.RangeArgs(2, 3), + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } +} + +func pullCmd() *cobra.Command { + return &cobra.Command{ + Use: "pull [local-file]", + Short: "Copy a file from /srv// to local", + Args: cobra.RangeArgs(2, 3), + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } +} + +func nodeCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "node", + Short: "Node management", + } + + list := &cobra.Command{ + Use: "list", + Short: "List registered nodes", + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } + + add := &cobra.Command{ + Use: "add
", + Short: "Register a node", + Args: cobra.ExactArgs(2), + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } + + remove := &cobra.Command{ + Use: "remove ", + Short: "Deregister a node", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return fmt.Errorf("not implemented") + }, + } + + cmd.AddCommand(list, add, remove) + return cmd +} diff --git a/internal/agent/agent.go b/internal/agent/agent.go new file mode 100644 index 0000000..565b64f --- /dev/null +++ b/internal/agent/agent.go @@ -0,0 +1,119 @@ +package agent + +import ( + "context" + "crypto/tls" + "database/sql" + "fmt" + "log/slog" + "net" + "os" + "os/signal" + "syscall" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/auth" + "git.wntrmute.dev/kyle/mcp/internal/config" + "git.wntrmute.dev/kyle/mcp/internal/registry" + "git.wntrmute.dev/kyle/mcp/internal/runtime" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +// Agent is the MCP node agent. It manages containers, stores the registry, +// monitors for drift, and serves the gRPC API. +type Agent struct { + mcpv1.UnimplementedMcpAgentServiceServer + + Config *config.AgentConfig + DB *sql.DB + Runtime runtime.Runtime + Logger *slog.Logger +} + +// Run starts the agent: opens the database, sets up the gRPC server with +// TLS and auth, and blocks until SIGINT/SIGTERM. +func Run(cfg *config.AgentConfig) error { + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + Level: parseLogLevel(cfg.Log.Level), + })) + + db, err := registry.Open(cfg.Database.Path) + if err != nil { + return fmt.Errorf("open registry: %w", err) + } + defer func() { _ = db.Close() }() + + rt := &runtime.Podman{} + + a := &Agent{ + Config: cfg, + DB: db, + Runtime: rt, + Logger: logger, + } + + tlsCert, err := tls.LoadX509KeyPair(cfg.Server.TLSCert, cfg.Server.TLSKey) + if err != nil { + return fmt.Errorf("load TLS cert: %w", err) + } + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + MinVersion: tls.VersionTLS13, + } + + validator, err := auth.NewMCIASValidator(cfg.MCIAS.ServerURL, cfg.MCIAS.CACert) + if err != nil { + return fmt.Errorf("create MCIAS validator: %w", err) + } + + server := grpc.NewServer( + grpc.Creds(credentials.NewTLS(tlsConfig)), + grpc.ChainUnaryInterceptor( + auth.AuthInterceptor(validator), + ), + ) + mcpv1.RegisterMcpAgentServiceServer(server, a) + + lis, err := net.Listen("tcp", cfg.Server.GRPCAddr) + if err != nil { + return fmt.Errorf("listen %q: %w", cfg.Server.GRPCAddr, err) + } + + logger.Info("agent starting", + "addr", cfg.Server.GRPCAddr, + "node", cfg.Agent.NodeName, + "runtime", cfg.Agent.ContainerRuntime, + ) + + // Graceful shutdown on signal. + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + errCh := make(chan error, 1) + go func() { + errCh <- server.Serve(lis) + }() + + select { + case <-ctx.Done(): + logger.Info("shutting down") + server.GracefulStop() + return nil + case err := <-errCh: + return fmt.Errorf("serve: %w", err) + } +} + +func parseLogLevel(level string) slog.Level { + switch level { + case "debug": + return slog.LevelDebug + case "warn": + return slog.LevelWarn + case "error": + return slog.LevelError + default: + return slog.LevelInfo + } +}