Add mcp logs command for streaming container logs
New server-streaming Logs RPC streams container output to the CLI. Supports --tail/-n, --follow/-f, --timestamps/-t, --since. Detects journald log driver and falls back to journalctl (podman logs can't read journald outside the originating user session). New containers default to k8s-file via mcp user's containers.conf. Also adds stream auth interceptor for the agent gRPC server (required for streaming RPCs). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -100,6 +100,9 @@ func Run(cfg *config.AgentConfig) error {
|
||||
grpc.ChainUnaryInterceptor(
|
||||
auth.AuthInterceptor(validator),
|
||||
),
|
||||
grpc.ChainStreamInterceptor(
|
||||
auth.StreamAuthInterceptor(validator),
|
||||
),
|
||||
)
|
||||
mcpv1.RegisterMcpAgentServiceServer(server, a)
|
||||
|
||||
|
||||
75
internal/agent/logs.go
Normal file
75
internal/agent/logs.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
|
||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||
"git.wntrmute.dev/mc/mcp/internal/registry"
|
||||
"git.wntrmute.dev/mc/mcp/internal/runtime"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// Logs streams container logs for a service component.
|
||||
func (a *Agent) Logs(req *mcpv1.LogsRequest, stream mcpv1.McpAgentService_LogsServer) error {
|
||||
if req.GetService() == "" {
|
||||
return status.Error(codes.InvalidArgument, "service name is required")
|
||||
}
|
||||
|
||||
// Resolve component name.
|
||||
component := req.GetComponent()
|
||||
if component == "" {
|
||||
components, err := registry.ListComponents(a.DB, req.GetService())
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "list components: %v", err)
|
||||
}
|
||||
if len(components) == 0 {
|
||||
return status.Error(codes.NotFound, "no components found for service")
|
||||
}
|
||||
component = components[0].Name
|
||||
}
|
||||
|
||||
containerName := ContainerNameFor(req.GetService(), component)
|
||||
|
||||
podman, ok := a.Runtime.(*runtime.Podman)
|
||||
if !ok {
|
||||
return status.Error(codes.Internal, "logs requires podman runtime")
|
||||
}
|
||||
|
||||
cmd := podman.Logs(stream.Context(), containerName, int(req.GetTail()), req.GetFollow(), req.GetTimestamps(), req.GetSince())
|
||||
|
||||
a.Logger.Info("running podman logs", "container", containerName, "args", cmd.Args)
|
||||
|
||||
// Podman writes container stdout to its stdout and container stderr
|
||||
// to its stderr. Merge both into a single pipe.
|
||||
pr, pw := io.Pipe()
|
||||
cmd.Stdout = pw
|
||||
cmd.Stderr = pw
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
pw.Close()
|
||||
return status.Errorf(codes.Internal, "start podman logs: %v", err)
|
||||
}
|
||||
|
||||
// Close the write end when the command exits so the scanner finishes.
|
||||
go func() {
|
||||
err := cmd.Wait()
|
||||
if err != nil {
|
||||
a.Logger.Warn("podman logs exited", "container", containerName, "error", err)
|
||||
}
|
||||
pw.Close()
|
||||
}()
|
||||
|
||||
scanner := bufio.NewScanner(pr)
|
||||
for scanner.Scan() {
|
||||
if err := stream.Send(&mcpv1.LogsResponse{
|
||||
Data: append(scanner.Bytes(), '\n'),
|
||||
}); err != nil {
|
||||
_ = cmd.Process.Kill()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -255,6 +255,52 @@ func AuthInterceptor(validator TokenValidator) grpc.UnaryServerInterceptor {
|
||||
}
|
||||
}
|
||||
|
||||
// StreamAuthInterceptor returns a gRPC stream server interceptor with
|
||||
// the same authentication rules as AuthInterceptor.
|
||||
func StreamAuthInterceptor(validator TokenValidator) grpc.StreamServerInterceptor {
|
||||
return func(
|
||||
srv any,
|
||||
ss grpc.ServerStream,
|
||||
info *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler,
|
||||
) error {
|
||||
md, ok := metadata.FromIncomingContext(ss.Context())
|
||||
if !ok {
|
||||
return status.Error(codes.Unauthenticated, "missing metadata")
|
||||
}
|
||||
|
||||
authValues := md.Get("authorization")
|
||||
if len(authValues) == 0 {
|
||||
return status.Error(codes.Unauthenticated, "missing authorization header")
|
||||
}
|
||||
|
||||
authHeader := authValues[0]
|
||||
if !strings.HasPrefix(authHeader, "Bearer ") {
|
||||
return status.Error(codes.Unauthenticated, "malformed authorization header")
|
||||
}
|
||||
token := strings.TrimPrefix(authHeader, "Bearer ")
|
||||
|
||||
tokenInfo, err := validator.ValidateToken(ss.Context(), token)
|
||||
if err != nil {
|
||||
slog.Error("token validation failed", "method", info.FullMethod, "error", err)
|
||||
return status.Error(codes.Unauthenticated, "token validation failed")
|
||||
}
|
||||
|
||||
if !tokenInfo.Valid {
|
||||
return status.Error(codes.Unauthenticated, "invalid token")
|
||||
}
|
||||
|
||||
if tokenInfo.HasRole("guest") {
|
||||
slog.Warn("guest access denied", "method", info.FullMethod, "user", tokenInfo.Username)
|
||||
return status.Error(codes.PermissionDenied, "guest access not permitted")
|
||||
}
|
||||
|
||||
slog.Info("rpc", "method", info.FullMethod, "user", tokenInfo.Username, "account_type", tokenInfo.AccountType)
|
||||
|
||||
return handler(srv, ss)
|
||||
}
|
||||
}
|
||||
|
||||
// Login authenticates with MCIAS and returns a bearer token.
|
||||
func Login(serverURL, caCertPath, username, password string) (string, error) {
|
||||
client, err := newHTTPClient(caCertPath)
|
||||
|
||||
@@ -178,6 +178,49 @@ func (p *Podman) Inspect(ctx context.Context, name string) (ContainerInfo, error
|
||||
return info, nil
|
||||
}
|
||||
|
||||
// Logs returns an exec.Cmd that streams container logs. For containers
|
||||
// using the journald log driver, it uses journalctl (podman logs can't
|
||||
// read journald outside the originating user session). For k8s-file or
|
||||
// other drivers, it uses podman logs directly.
|
||||
func (p *Podman) Logs(ctx context.Context, containerName string, tail int, follow, timestamps bool, since string) *exec.Cmd {
|
||||
// Check if this container uses the journald log driver.
|
||||
inspectCmd := exec.CommandContext(ctx, p.command(), "inspect", "--format", "{{.HostConfig.LogConfig.Type}}", containerName) //nolint:gosec
|
||||
if out, err := inspectCmd.Output(); err == nil && strings.TrimSpace(string(out)) == "journald" {
|
||||
return p.journalLogs(ctx, containerName, tail, follow, since)
|
||||
}
|
||||
|
||||
args := []string{"logs"}
|
||||
if tail > 0 {
|
||||
args = append(args, "--tail", fmt.Sprintf("%d", tail))
|
||||
}
|
||||
if follow {
|
||||
args = append(args, "--follow")
|
||||
}
|
||||
if timestamps {
|
||||
args = append(args, "--timestamps")
|
||||
}
|
||||
if since != "" {
|
||||
args = append(args, "--since", since)
|
||||
}
|
||||
args = append(args, containerName)
|
||||
return exec.CommandContext(ctx, p.command(), args...) //nolint:gosec // args built programmatically
|
||||
}
|
||||
|
||||
// journalLogs returns a journalctl command filtered by container name.
|
||||
func (p *Podman) journalLogs(ctx context.Context, containerName string, tail int, follow bool, since string) *exec.Cmd {
|
||||
args := []string{"--no-pager", "--output", "cat", "CONTAINER_NAME=" + containerName}
|
||||
if tail > 0 {
|
||||
args = append(args, "--lines", fmt.Sprintf("%d", tail))
|
||||
}
|
||||
if follow {
|
||||
args = append(args, "--follow")
|
||||
}
|
||||
if since != "" {
|
||||
args = append(args, "--since", since)
|
||||
}
|
||||
return exec.CommandContext(ctx, "journalctl", args...) //nolint:gosec // args built programmatically
|
||||
}
|
||||
|
||||
// Login authenticates to a container registry using the given token as
|
||||
// the password. This enables non-interactive push with service account
|
||||
// tokens (MCR accepts MCIAS JWTs as passwords).
|
||||
|
||||
Reference in New Issue
Block a user