diff --git a/cmd/mcp/dial.go b/cmd/mcp/dial.go index 22bbbf4..cb4db46 100644 --- a/cmd/mcp/dial.go +++ b/cmd/mcp/dial.go @@ -43,6 +43,7 @@ func dialAgent(address string, cfg *config.CLIConfig) (mcpv1.McpAgentServiceClie address, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), grpc.WithUnaryInterceptor(tokenInterceptor(token)), + grpc.WithStreamInterceptor(streamTokenInterceptor(token)), ) if err != nil { return nil, nil, fmt.Errorf("dial %q: %w", address, err) @@ -60,6 +61,15 @@ func tokenInterceptor(token string) grpc.UnaryClientInterceptor { } } +// streamTokenInterceptor returns a gRPC client stream interceptor that +// attaches the bearer token to outgoing stream metadata. +func streamTokenInterceptor(token string) grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token) + return streamer(ctx, desc, cc, method, opts...) + } +} + // loadBearerToken reads the token from file or env var. func loadBearerToken(cfg *config.CLIConfig) (string, error) { if token := os.Getenv("MCP_TOKEN"); token != "" { diff --git a/cmd/mcp/logs.go b/cmd/mcp/logs.go new file mode 100644 index 0000000..d5d60ad --- /dev/null +++ b/cmd/mcp/logs.go @@ -0,0 +1,81 @@ +package main + +import ( + "fmt" + "io" + "os" + + "github.com/spf13/cobra" + + mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1" + "git.wntrmute.dev/mc/mcp/internal/config" +) + +func logsCmd() *cobra.Command { + var ( + tail int + follow bool + timestamps bool + since string + ) + + cmd := &cobra.Command{ + Use: "logs [/]", + Short: "Show container logs", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + serviceName, component := parseServiceArg(args[0]) + + def, err := loadServiceDef(cmd, cfg, serviceName) + if err != nil { + return err + } + + address, err := findNodeAddress(cfg, def.Node) + if err != nil { + return err + } + + client, conn, err := dialAgent(address, cfg) + if err != nil { + return fmt.Errorf("dial agent: %w", err) + } + defer func() { _ = conn.Close() }() + + stream, err := client.Logs(cmd.Context(), &mcpv1.LogsRequest{ + Service: serviceName, + Component: component, + Tail: int32(tail), + Follow: follow, + Timestamps: timestamps, + Since: since, + }) + if err != nil { + return fmt.Errorf("logs: %w", err) + } + + for { + resp, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return fmt.Errorf("recv: %w", err) + } + _, _ = os.Stdout.Write(resp.Data) + } + }, + } + + cmd.Flags().IntVarP(&tail, "tail", "n", 0, "number of lines from end (0 = all)") + cmd.Flags().BoolVarP(&follow, "follow", "f", false, "follow log output") + cmd.Flags().BoolVarP(×tamps, "timestamps", "t", false, "show timestamps") + cmd.Flags().StringVar(&since, "since", "", "show logs since (e.g., 2h, 2026-03-28T00:00:00Z)") + + return cmd +} diff --git a/cmd/mcp/main.go b/cmd/mcp/main.go index 48650ee..0aeed9b 100644 --- a/cmd/mcp/main.go +++ b/cmd/mcp/main.go @@ -50,6 +50,7 @@ func main() { root.AddCommand(pullCmd()) root.AddCommand(nodeCmd()) root.AddCommand(purgeCmd()) + root.AddCommand(logsCmd()) if err := root.Execute(); err != nil { log.Fatal(err) diff --git a/gen/mcp/v1/mcp.pb.go b/gen/mcp/v1/mcp.pb.go index c00e3d4..45f7a7e 100644 --- a/gen/mcp/v1/mcp.pb.go +++ b/gen/mcp/v1/mcp.pb.go @@ -2224,6 +2224,134 @@ func (x *PurgeResult) GetReason() string { return "" } +type LogsRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` + Component string `protobuf:"bytes,2,opt,name=component,proto3" json:"component,omitempty"` // optional; defaults to first/only component + Tail int32 `protobuf:"varint,3,opt,name=tail,proto3" json:"tail,omitempty"` // number of lines from the end (0 = all) + Follow bool `protobuf:"varint,4,opt,name=follow,proto3" json:"follow,omitempty"` // stream new output + Timestamps bool `protobuf:"varint,5,opt,name=timestamps,proto3" json:"timestamps,omitempty"` // prepend timestamps + Since string `protobuf:"bytes,6,opt,name=since,proto3" json:"since,omitempty"` // show logs since (e.g., "2h", "2026-03-28T00:00:00Z") + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *LogsRequest) Reset() { + *x = LogsRequest{} + mi := &file_proto_mcp_v1_mcp_proto_msgTypes[39] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *LogsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogsRequest) ProtoMessage() {} + +func (x *LogsRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_mcp_v1_mcp_proto_msgTypes[39] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogsRequest.ProtoReflect.Descriptor instead. +func (*LogsRequest) Descriptor() ([]byte, []int) { + return file_proto_mcp_v1_mcp_proto_rawDescGZIP(), []int{39} +} + +func (x *LogsRequest) GetService() string { + if x != nil { + return x.Service + } + return "" +} + +func (x *LogsRequest) GetComponent() string { + if x != nil { + return x.Component + } + return "" +} + +func (x *LogsRequest) GetTail() int32 { + if x != nil { + return x.Tail + } + return 0 +} + +func (x *LogsRequest) GetFollow() bool { + if x != nil { + return x.Follow + } + return false +} + +func (x *LogsRequest) GetTimestamps() bool { + if x != nil { + return x.Timestamps + } + return false +} + +func (x *LogsRequest) GetSince() string { + if x != nil { + return x.Since + } + return "" +} + +type LogsResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *LogsResponse) Reset() { + *x = LogsResponse{} + mi := &file_proto_mcp_v1_mcp_proto_msgTypes[40] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *LogsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogsResponse) ProtoMessage() {} + +func (x *LogsResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_mcp_v1_mcp_proto_msgTypes[40] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogsResponse.ProtoReflect.Descriptor instead. +func (*LogsResponse) Descriptor() ([]byte, []int) { + return file_proto_mcp_v1_mcp_proto_rawDescGZIP(), []int{40} +} + +func (x *LogsResponse) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + var File_proto_mcp_v1_mcp_proto protoreflect.FileDescriptor const file_proto_mcp_v1_mcp_proto_rawDesc = "" + @@ -2371,7 +2499,18 @@ const file_proto_mcp_v1_mcp_proto_rawDesc = "" + "\aservice\x18\x01 \x01(\tR\aservice\x12\x1c\n" + "\tcomponent\x18\x02 \x01(\tR\tcomponent\x12\x16\n" + "\x06purged\x18\x03 \x01(\bR\x06purged\x12\x16\n" + - "\x06reason\x18\x04 \x01(\tR\x06reason2\x93\b\n" + + "\x06reason\x18\x04 \x01(\tR\x06reason\"\xa7\x01\n" + + "\vLogsRequest\x12\x18\n" + + "\aservice\x18\x01 \x01(\tR\aservice\x12\x1c\n" + + "\tcomponent\x18\x02 \x01(\tR\tcomponent\x12\x12\n" + + "\x04tail\x18\x03 \x01(\x05R\x04tail\x12\x16\n" + + "\x06follow\x18\x04 \x01(\bR\x06follow\x12\x1e\n" + + "\n" + + "timestamps\x18\x05 \x01(\bR\n" + + "timestamps\x12\x14\n" + + "\x05since\x18\x06 \x01(\tR\x05since\"\"\n" + + "\fLogsResponse\x12\x12\n" + + "\x04data\x18\x01 \x01(\fR\x04data2\xc8\b\n" + "\x0fMcpAgentService\x127\n" + "\x06Deploy\x12\x15.mcp.v1.DeployRequest\x1a\x16.mcp.v1.DeployResponse\x12R\n" + "\x0fUndeployService\x12\x1e.mcp.v1.UndeployServiceRequest\x1a\x1f.mcp.v1.UndeployServiceResponse\x12F\n" + @@ -2387,7 +2526,8 @@ const file_proto_mcp_v1_mcp_proto_rawDesc = "" + "\bPushFile\x12\x17.mcp.v1.PushFileRequest\x1a\x18.mcp.v1.PushFileResponse\x12=\n" + "\bPullFile\x12\x17.mcp.v1.PullFileRequest\x1a\x18.mcp.v1.PullFileResponse\x12C\n" + "\n" + - "NodeStatus\x12\x19.mcp.v1.NodeStatusRequest\x1a\x1a.mcp.v1.NodeStatusResponseB*Z(git.wntrmute.dev/mc/mcp/gen/mcp/v1;mcpv1b\x06proto3" + "NodeStatus\x12\x19.mcp.v1.NodeStatusRequest\x1a\x1a.mcp.v1.NodeStatusResponse\x123\n" + + "\x04Logs\x12\x13.mcp.v1.LogsRequest\x1a\x14.mcp.v1.LogsResponse0\x01B*Z(git.wntrmute.dev/mc/mcp/gen/mcp/v1;mcpv1b\x06proto3" var ( file_proto_mcp_v1_mcp_proto_rawDescOnce sync.Once @@ -2401,7 +2541,7 @@ func file_proto_mcp_v1_mcp_proto_rawDescGZIP() []byte { return file_proto_mcp_v1_mcp_proto_rawDescData } -var file_proto_mcp_v1_mcp_proto_msgTypes = make([]protoimpl.MessageInfo, 39) +var file_proto_mcp_v1_mcp_proto_msgTypes = make([]protoimpl.MessageInfo, 41) var file_proto_mcp_v1_mcp_proto_goTypes = []any{ (*RouteSpec)(nil), // 0: mcp.v1.RouteSpec (*ComponentSpec)(nil), // 1: mcp.v1.ComponentSpec @@ -2442,7 +2582,9 @@ var file_proto_mcp_v1_mcp_proto_goTypes = []any{ (*PurgeRequest)(nil), // 36: mcp.v1.PurgeRequest (*PurgeResponse)(nil), // 37: mcp.v1.PurgeResponse (*PurgeResult)(nil), // 38: mcp.v1.PurgeResult - (*timestamppb.Timestamp)(nil), // 39: google.protobuf.Timestamp + (*LogsRequest)(nil), // 39: mcp.v1.LogsRequest + (*LogsResponse)(nil), // 40: mcp.v1.LogsResponse + (*timestamppb.Timestamp)(nil), // 41: google.protobuf.Timestamp } var file_proto_mcp_v1_mcp_proto_depIdxs = []int32{ 0, // 0: mcp.v1.ComponentSpec.routes:type_name -> mcp.v1.RouteSpec @@ -2456,15 +2598,15 @@ var file_proto_mcp_v1_mcp_proto_depIdxs = []int32{ 2, // 8: mcp.v1.SyncDesiredStateRequest.services:type_name -> mcp.v1.ServiceSpec 16, // 9: mcp.v1.SyncDesiredStateResponse.results:type_name -> mcp.v1.ServiceSyncResult 19, // 10: mcp.v1.ServiceInfo.components:type_name -> mcp.v1.ComponentInfo - 39, // 11: mcp.v1.ComponentInfo.started:type_name -> google.protobuf.Timestamp + 41, // 11: mcp.v1.ComponentInfo.started:type_name -> google.protobuf.Timestamp 18, // 12: mcp.v1.ListServicesResponse.services:type_name -> mcp.v1.ServiceInfo - 39, // 13: mcp.v1.EventInfo.timestamp:type_name -> google.protobuf.Timestamp + 41, // 13: mcp.v1.EventInfo.timestamp:type_name -> google.protobuf.Timestamp 18, // 14: mcp.v1.GetServiceStatusResponse.services:type_name -> mcp.v1.ServiceInfo 22, // 15: mcp.v1.GetServiceStatusResponse.drift:type_name -> mcp.v1.DriftInfo 23, // 16: mcp.v1.GetServiceStatusResponse.recent_events:type_name -> mcp.v1.EventInfo 18, // 17: mcp.v1.LiveCheckResponse.services:type_name -> mcp.v1.ServiceInfo 28, // 18: mcp.v1.AdoptContainersResponse.results:type_name -> mcp.v1.AdoptResult - 39, // 19: mcp.v1.NodeStatusResponse.uptime_since:type_name -> google.protobuf.Timestamp + 41, // 19: mcp.v1.NodeStatusResponse.uptime_since:type_name -> google.protobuf.Timestamp 38, // 20: mcp.v1.PurgeResponse.results:type_name -> mcp.v1.PurgeResult 3, // 21: mcp.v1.McpAgentService.Deploy:input_type -> mcp.v1.DeployRequest 12, // 22: mcp.v1.McpAgentService.UndeployService:input_type -> mcp.v1.UndeployServiceRequest @@ -2480,22 +2622,24 @@ var file_proto_mcp_v1_mcp_proto_depIdxs = []int32{ 30, // 32: mcp.v1.McpAgentService.PushFile:input_type -> mcp.v1.PushFileRequest 32, // 33: mcp.v1.McpAgentService.PullFile:input_type -> mcp.v1.PullFileRequest 34, // 34: mcp.v1.McpAgentService.NodeStatus:input_type -> mcp.v1.NodeStatusRequest - 4, // 35: mcp.v1.McpAgentService.Deploy:output_type -> mcp.v1.DeployResponse - 13, // 36: mcp.v1.McpAgentService.UndeployService:output_type -> mcp.v1.UndeployServiceResponse - 7, // 37: mcp.v1.McpAgentService.StopService:output_type -> mcp.v1.StopServiceResponse - 9, // 38: mcp.v1.McpAgentService.StartService:output_type -> mcp.v1.StartServiceResponse - 11, // 39: mcp.v1.McpAgentService.RestartService:output_type -> mcp.v1.RestartServiceResponse - 15, // 40: mcp.v1.McpAgentService.SyncDesiredState:output_type -> mcp.v1.SyncDesiredStateResponse - 20, // 41: mcp.v1.McpAgentService.ListServices:output_type -> mcp.v1.ListServicesResponse - 24, // 42: mcp.v1.McpAgentService.GetServiceStatus:output_type -> mcp.v1.GetServiceStatusResponse - 26, // 43: mcp.v1.McpAgentService.LiveCheck:output_type -> mcp.v1.LiveCheckResponse - 29, // 44: mcp.v1.McpAgentService.AdoptContainers:output_type -> mcp.v1.AdoptContainersResponse - 37, // 45: mcp.v1.McpAgentService.PurgeComponent:output_type -> mcp.v1.PurgeResponse - 31, // 46: mcp.v1.McpAgentService.PushFile:output_type -> mcp.v1.PushFileResponse - 33, // 47: mcp.v1.McpAgentService.PullFile:output_type -> mcp.v1.PullFileResponse - 35, // 48: mcp.v1.McpAgentService.NodeStatus:output_type -> mcp.v1.NodeStatusResponse - 35, // [35:49] is the sub-list for method output_type - 21, // [21:35] is the sub-list for method input_type + 39, // 35: mcp.v1.McpAgentService.Logs:input_type -> mcp.v1.LogsRequest + 4, // 36: mcp.v1.McpAgentService.Deploy:output_type -> mcp.v1.DeployResponse + 13, // 37: mcp.v1.McpAgentService.UndeployService:output_type -> mcp.v1.UndeployServiceResponse + 7, // 38: mcp.v1.McpAgentService.StopService:output_type -> mcp.v1.StopServiceResponse + 9, // 39: mcp.v1.McpAgentService.StartService:output_type -> mcp.v1.StartServiceResponse + 11, // 40: mcp.v1.McpAgentService.RestartService:output_type -> mcp.v1.RestartServiceResponse + 15, // 41: mcp.v1.McpAgentService.SyncDesiredState:output_type -> mcp.v1.SyncDesiredStateResponse + 20, // 42: mcp.v1.McpAgentService.ListServices:output_type -> mcp.v1.ListServicesResponse + 24, // 43: mcp.v1.McpAgentService.GetServiceStatus:output_type -> mcp.v1.GetServiceStatusResponse + 26, // 44: mcp.v1.McpAgentService.LiveCheck:output_type -> mcp.v1.LiveCheckResponse + 29, // 45: mcp.v1.McpAgentService.AdoptContainers:output_type -> mcp.v1.AdoptContainersResponse + 37, // 46: mcp.v1.McpAgentService.PurgeComponent:output_type -> mcp.v1.PurgeResponse + 31, // 47: mcp.v1.McpAgentService.PushFile:output_type -> mcp.v1.PushFileResponse + 33, // 48: mcp.v1.McpAgentService.PullFile:output_type -> mcp.v1.PullFileResponse + 35, // 49: mcp.v1.McpAgentService.NodeStatus:output_type -> mcp.v1.NodeStatusResponse + 40, // 50: mcp.v1.McpAgentService.Logs:output_type -> mcp.v1.LogsResponse + 36, // [36:51] is the sub-list for method output_type + 21, // [21:36] is the sub-list for method input_type 21, // [21:21] is the sub-list for extension type_name 21, // [21:21] is the sub-list for extension extendee 0, // [0:21] is the sub-list for field type_name @@ -2512,7 +2656,7 @@ func file_proto_mcp_v1_mcp_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_mcp_v1_mcp_proto_rawDesc), len(file_proto_mcp_v1_mcp_proto_rawDesc)), NumEnums: 0, - NumMessages: 39, + NumMessages: 41, NumExtensions: 0, NumServices: 1, }, diff --git a/gen/mcp/v1/mcp_grpc.pb.go b/gen/mcp/v1/mcp_grpc.pb.go index 2e83e1a..2f3c94d 100644 --- a/gen/mcp/v1/mcp_grpc.pb.go +++ b/gen/mcp/v1/mcp_grpc.pb.go @@ -33,6 +33,7 @@ const ( McpAgentService_PushFile_FullMethodName = "/mcp.v1.McpAgentService/PushFile" McpAgentService_PullFile_FullMethodName = "/mcp.v1.McpAgentService/PullFile" McpAgentService_NodeStatus_FullMethodName = "/mcp.v1.McpAgentService/NodeStatus" + McpAgentService_Logs_FullMethodName = "/mcp.v1.McpAgentService/Logs" ) // McpAgentServiceClient is the client API for McpAgentService service. @@ -60,6 +61,8 @@ type McpAgentServiceClient interface { PullFile(ctx context.Context, in *PullFileRequest, opts ...grpc.CallOption) (*PullFileResponse, error) // Node NodeStatus(ctx context.Context, in *NodeStatusRequest, opts ...grpc.CallOption) (*NodeStatusResponse, error) + // Logs + Logs(ctx context.Context, in *LogsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[LogsResponse], error) } type mcpAgentServiceClient struct { @@ -210,6 +213,25 @@ func (c *mcpAgentServiceClient) NodeStatus(ctx context.Context, in *NodeStatusRe return out, nil } +func (c *mcpAgentServiceClient) Logs(ctx context.Context, in *LogsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[LogsResponse], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &McpAgentService_ServiceDesc.Streams[0], McpAgentService_Logs_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[LogsRequest, LogsResponse]{ClientStream: stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type McpAgentService_LogsClient = grpc.ServerStreamingClient[LogsResponse] + // McpAgentServiceServer is the server API for McpAgentService service. // All implementations must embed UnimplementedMcpAgentServiceServer // for forward compatibility. @@ -235,6 +257,8 @@ type McpAgentServiceServer interface { PullFile(context.Context, *PullFileRequest) (*PullFileResponse, error) // Node NodeStatus(context.Context, *NodeStatusRequest) (*NodeStatusResponse, error) + // Logs + Logs(*LogsRequest, grpc.ServerStreamingServer[LogsResponse]) error mustEmbedUnimplementedMcpAgentServiceServer() } @@ -287,6 +311,9 @@ func (UnimplementedMcpAgentServiceServer) PullFile(context.Context, *PullFileReq func (UnimplementedMcpAgentServiceServer) NodeStatus(context.Context, *NodeStatusRequest) (*NodeStatusResponse, error) { return nil, status.Error(codes.Unimplemented, "method NodeStatus not implemented") } +func (UnimplementedMcpAgentServiceServer) Logs(*LogsRequest, grpc.ServerStreamingServer[LogsResponse]) error { + return status.Error(codes.Unimplemented, "method Logs not implemented") +} func (UnimplementedMcpAgentServiceServer) mustEmbedUnimplementedMcpAgentServiceServer() {} func (UnimplementedMcpAgentServiceServer) testEmbeddedByValue() {} @@ -560,6 +587,17 @@ func _McpAgentService_NodeStatus_Handler(srv interface{}, ctx context.Context, d return interceptor(ctx, in, info, handler) } +func _McpAgentService_Logs_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(LogsRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(McpAgentServiceServer).Logs(m, &grpc.GenericServerStream[LogsRequest, LogsResponse]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type McpAgentService_LogsServer = grpc.ServerStreamingServer[LogsResponse] + // McpAgentService_ServiceDesc is the grpc.ServiceDesc for McpAgentService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -624,6 +662,12 @@ var McpAgentService_ServiceDesc = grpc.ServiceDesc{ Handler: _McpAgentService_NodeStatus_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Logs", + Handler: _McpAgentService_Logs_Handler, + ServerStreams: true, + }, + }, Metadata: "proto/mcp/v1/mcp.proto", } diff --git a/internal/agent/agent.go b/internal/agent/agent.go index b504e3f..72c2307 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -100,6 +100,9 @@ func Run(cfg *config.AgentConfig) error { grpc.ChainUnaryInterceptor( auth.AuthInterceptor(validator), ), + grpc.ChainStreamInterceptor( + auth.StreamAuthInterceptor(validator), + ), ) mcpv1.RegisterMcpAgentServiceServer(server, a) diff --git a/internal/agent/logs.go b/internal/agent/logs.go new file mode 100644 index 0000000..f84377d --- /dev/null +++ b/internal/agent/logs.go @@ -0,0 +1,75 @@ +package agent + +import ( + "bufio" + "io" + + mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1" + "git.wntrmute.dev/mc/mcp/internal/registry" + "git.wntrmute.dev/mc/mcp/internal/runtime" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// Logs streams container logs for a service component. +func (a *Agent) Logs(req *mcpv1.LogsRequest, stream mcpv1.McpAgentService_LogsServer) error { + if req.GetService() == "" { + return status.Error(codes.InvalidArgument, "service name is required") + } + + // Resolve component name. + component := req.GetComponent() + if component == "" { + components, err := registry.ListComponents(a.DB, req.GetService()) + if err != nil { + return status.Errorf(codes.Internal, "list components: %v", err) + } + if len(components) == 0 { + return status.Error(codes.NotFound, "no components found for service") + } + component = components[0].Name + } + + containerName := ContainerNameFor(req.GetService(), component) + + podman, ok := a.Runtime.(*runtime.Podman) + if !ok { + return status.Error(codes.Internal, "logs requires podman runtime") + } + + cmd := podman.Logs(stream.Context(), containerName, int(req.GetTail()), req.GetFollow(), req.GetTimestamps(), req.GetSince()) + + a.Logger.Info("running podman logs", "container", containerName, "args", cmd.Args) + + // Podman writes container stdout to its stdout and container stderr + // to its stderr. Merge both into a single pipe. + pr, pw := io.Pipe() + cmd.Stdout = pw + cmd.Stderr = pw + + if err := cmd.Start(); err != nil { + pw.Close() + return status.Errorf(codes.Internal, "start podman logs: %v", err) + } + + // Close the write end when the command exits so the scanner finishes. + go func() { + err := cmd.Wait() + if err != nil { + a.Logger.Warn("podman logs exited", "container", containerName, "error", err) + } + pw.Close() + }() + + scanner := bufio.NewScanner(pr) + for scanner.Scan() { + if err := stream.Send(&mcpv1.LogsResponse{ + Data: append(scanner.Bytes(), '\n'), + }); err != nil { + _ = cmd.Process.Kill() + return err + } + } + + return nil +} diff --git a/internal/auth/auth.go b/internal/auth/auth.go index e13a36a..4335139 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -255,6 +255,52 @@ func AuthInterceptor(validator TokenValidator) grpc.UnaryServerInterceptor { } } +// StreamAuthInterceptor returns a gRPC stream server interceptor with +// the same authentication rules as AuthInterceptor. +func StreamAuthInterceptor(validator TokenValidator) grpc.StreamServerInterceptor { + return func( + srv any, + ss grpc.ServerStream, + info *grpc.StreamServerInfo, + handler grpc.StreamHandler, + ) error { + md, ok := metadata.FromIncomingContext(ss.Context()) + if !ok { + return status.Error(codes.Unauthenticated, "missing metadata") + } + + authValues := md.Get("authorization") + if len(authValues) == 0 { + return status.Error(codes.Unauthenticated, "missing authorization header") + } + + authHeader := authValues[0] + if !strings.HasPrefix(authHeader, "Bearer ") { + return status.Error(codes.Unauthenticated, "malformed authorization header") + } + token := strings.TrimPrefix(authHeader, "Bearer ") + + tokenInfo, err := validator.ValidateToken(ss.Context(), token) + if err != nil { + slog.Error("token validation failed", "method", info.FullMethod, "error", err) + return status.Error(codes.Unauthenticated, "token validation failed") + } + + if !tokenInfo.Valid { + return status.Error(codes.Unauthenticated, "invalid token") + } + + if tokenInfo.HasRole("guest") { + slog.Warn("guest access denied", "method", info.FullMethod, "user", tokenInfo.Username) + return status.Error(codes.PermissionDenied, "guest access not permitted") + } + + slog.Info("rpc", "method", info.FullMethod, "user", tokenInfo.Username, "account_type", tokenInfo.AccountType) + + return handler(srv, ss) + } +} + // Login authenticates with MCIAS and returns a bearer token. func Login(serverURL, caCertPath, username, password string) (string, error) { client, err := newHTTPClient(caCertPath) diff --git a/internal/runtime/podman.go b/internal/runtime/podman.go index 9f41cec..7d0e822 100644 --- a/internal/runtime/podman.go +++ b/internal/runtime/podman.go @@ -178,6 +178,49 @@ func (p *Podman) Inspect(ctx context.Context, name string) (ContainerInfo, error return info, nil } +// Logs returns an exec.Cmd that streams container logs. For containers +// using the journald log driver, it uses journalctl (podman logs can't +// read journald outside the originating user session). For k8s-file or +// other drivers, it uses podman logs directly. +func (p *Podman) Logs(ctx context.Context, containerName string, tail int, follow, timestamps bool, since string) *exec.Cmd { + // Check if this container uses the journald log driver. + inspectCmd := exec.CommandContext(ctx, p.command(), "inspect", "--format", "{{.HostConfig.LogConfig.Type}}", containerName) //nolint:gosec + if out, err := inspectCmd.Output(); err == nil && strings.TrimSpace(string(out)) == "journald" { + return p.journalLogs(ctx, containerName, tail, follow, since) + } + + args := []string{"logs"} + if tail > 0 { + args = append(args, "--tail", fmt.Sprintf("%d", tail)) + } + if follow { + args = append(args, "--follow") + } + if timestamps { + args = append(args, "--timestamps") + } + if since != "" { + args = append(args, "--since", since) + } + args = append(args, containerName) + return exec.CommandContext(ctx, p.command(), args...) //nolint:gosec // args built programmatically +} + +// journalLogs returns a journalctl command filtered by container name. +func (p *Podman) journalLogs(ctx context.Context, containerName string, tail int, follow bool, since string) *exec.Cmd { + args := []string{"--no-pager", "--output", "cat", "CONTAINER_NAME=" + containerName} + if tail > 0 { + args = append(args, "--lines", fmt.Sprintf("%d", tail)) + } + if follow { + args = append(args, "--follow") + } + if since != "" { + args = append(args, "--since", since) + } + return exec.CommandContext(ctx, "journalctl", args...) //nolint:gosec // args built programmatically +} + // Login authenticates to a container registry using the given token as // the password. This enables non-interactive push with service account // tokens (MCR accepts MCIAS JWTs as passwords). diff --git a/proto/mcp/v1/mcp.proto b/proto/mcp/v1/mcp.proto index 18426bd..92e730b 100644 --- a/proto/mcp/v1/mcp.proto +++ b/proto/mcp/v1/mcp.proto @@ -33,6 +33,9 @@ service McpAgentService { // Node rpc NodeStatus(NodeStatusRequest) returns (NodeStatusResponse); + + // Logs + rpc Logs(LogsRequest) returns (stream LogsResponse); } // --- Service lifecycle --- @@ -282,3 +285,18 @@ message PurgeResult { // Why eligible, or why refused. string reason = 4; } + +// --- Logs --- + +message LogsRequest { + string service = 1; + string component = 2; // optional; defaults to first/only component + int32 tail = 3; // number of lines from the end (0 = all) + bool follow = 4; // stream new output + bool timestamps = 5; // prepend timestamps + string since = 6; // show logs since (e.g., "2h", "2026-03-28T00:00:00Z") +} + +message LogsResponse { + bytes data = 1; +}