diff --git a/cmd/mcp/main.go b/cmd/mcp/main.go index 67b3477..7e4d81b 100644 --- a/cmd/mcp/main.go +++ b/cmd/mcp/main.go @@ -47,6 +47,7 @@ func main() { root.AddCommand(pushCmd()) root.AddCommand(pullCmd()) root.AddCommand(nodeCmd()) + root.AddCommand(purgeCmd()) if err := root.Execute(); err != nil { log.Fatal(err) diff --git a/cmd/mcp/purge.go b/cmd/mcp/purge.go new file mode 100644 index 0000000..4af14d8 --- /dev/null +++ b/cmd/mcp/purge.go @@ -0,0 +1,119 @@ +package main + +import ( + "context" + "fmt" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/config" + "git.wntrmute.dev/kyle/mcp/internal/servicedef" + + "github.com/spf13/cobra" +) + +func purgeCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "purge [service[/component]]", + Short: "Remove stale registry entries for gone, undefined components", + Long: `Purge removes registry entries that are both unwanted (not in any +current service definition) and gone (no corresponding container in the +runtime). It never stops or removes running containers. + +Use --dry-run to preview what would be purged.`, + Args: cobra.MaximumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + cfg, err := config.LoadCLIConfig(cfgPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + + dryRun, _ := cmd.Flags().GetBool("dry-run") + + var service, component string + if len(args) == 1 { + service, component = parseServiceArg(args[0]) + } + + // Load all local service definitions to build the set of + // currently-defined service/component pairs. + definedComponents := buildDefinedComponents(cfg) + + // Build node address lookup. + nodeAddr := make(map[string]string, len(cfg.Nodes)) + for _, n := range cfg.Nodes { + nodeAddr[n.Name] = n.Address + } + + // If a specific service was given and we can find its node, + // only talk to that node. Otherwise, talk to all nodes. + targetNodes := cfg.Nodes + if service != "" { + if nodeName, nodeAddr, err := findServiceNode(cfg, service); err == nil { + targetNodes = []config.NodeConfig{{Name: nodeName, Address: nodeAddr}} + } + } + + anyResults := false + for _, node := range targetNodes { + client, conn, err := dialAgent(node.Address, cfg) + if err != nil { + return fmt.Errorf("dial %s: %w", node.Name, err) + } + defer func() { _ = conn.Close() }() + + resp, err := client.PurgeComponent(context.Background(), &mcpv1.PurgeRequest{ + Service: service, + Component: component, + DryRun: dryRun, + DefinedComponents: definedComponents, + }) + if err != nil { + return fmt.Errorf("purge on %s: %w", node.Name, err) + } + + for _, r := range resp.GetResults() { + anyResults = true + if r.GetPurged() { + if dryRun { + fmt.Printf("would purge %s/%s (%s)\n", r.GetService(), r.GetComponent(), r.GetReason()) + } else { + fmt.Printf("purged %s/%s (%s)\n", r.GetService(), r.GetComponent(), r.GetReason()) + } + } else { + fmt.Printf("skipped %s/%s (%s)\n", r.GetService(), r.GetComponent(), r.GetReason()) + } + } + } + + if !anyResults { + fmt.Println("nothing to purge") + } + + return nil + }, + } + + cmd.Flags().Bool("dry-run", false, "preview what would be purged without modifying the registry") + + return cmd +} + +// buildDefinedComponents reads all local service definition files and returns +// a list of "service/component" strings for every defined component. +func buildDefinedComponents(cfg *config.CLIConfig) []string { + defs, err := servicedef.LoadAll(cfg.Services.Dir) + if err != nil { + // If we can't read service definitions, return an empty list. + // The agent will treat every component as undefined, which is the + // most conservative behavior (everything eligible gets purged). + return nil + } + + var defined []string + for _, def := range defs { + for _, comp := range def.Components { + defined = append(defined, def.Name+"/"+comp.Name) + } + } + return defined +} diff --git a/gen/mcp/v1/mcp.pb.go b/gen/mcp/v1/mcp.pb.go index 92b1e63..4b16694 100644 --- a/gen/mcp/v1/mcp.pb.go +++ b/gen/mcp/v1/mcp.pb.go @@ -1865,6 +1865,193 @@ func (x *NodeStatusResponse) GetUptimeSince() *timestamppb.Timestamp { return nil } +type PurgeRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Service name (empty = all services). + Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` + // Component name (empty = all eligible in service). + Component string `protobuf:"bytes,2,opt,name=component,proto3" json:"component,omitempty"` + // Preview only, do not modify registry. + DryRun bool `protobuf:"varint,3,opt,name=dry_run,json=dryRun,proto3" json:"dry_run,omitempty"` + // Currently-defined service/component pairs (e.g., "mcns/mcns"). + // The agent uses this to determine what is "not in any service definition". + DefinedComponents []string `protobuf:"bytes,4,rep,name=defined_components,json=definedComponents,proto3" json:"defined_components,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PurgeRequest) Reset() { + *x = PurgeRequest{} + mi := &file_proto_mcp_v1_mcp_proto_msgTypes[33] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PurgeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PurgeRequest) ProtoMessage() {} + +func (x *PurgeRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_mcp_v1_mcp_proto_msgTypes[33] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PurgeRequest.ProtoReflect.Descriptor instead. +func (*PurgeRequest) Descriptor() ([]byte, []int) { + return file_proto_mcp_v1_mcp_proto_rawDescGZIP(), []int{33} +} + +func (x *PurgeRequest) GetService() string { + if x != nil { + return x.Service + } + return "" +} + +func (x *PurgeRequest) GetComponent() string { + if x != nil { + return x.Component + } + return "" +} + +func (x *PurgeRequest) GetDryRun() bool { + if x != nil { + return x.DryRun + } + return false +} + +func (x *PurgeRequest) GetDefinedComponents() []string { + if x != nil { + return x.DefinedComponents + } + return nil +} + +type PurgeResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Results []*PurgeResult `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PurgeResponse) Reset() { + *x = PurgeResponse{} + mi := &file_proto_mcp_v1_mcp_proto_msgTypes[34] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PurgeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PurgeResponse) ProtoMessage() {} + +func (x *PurgeResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_mcp_v1_mcp_proto_msgTypes[34] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PurgeResponse.ProtoReflect.Descriptor instead. +func (*PurgeResponse) Descriptor() ([]byte, []int) { + return file_proto_mcp_v1_mcp_proto_rawDescGZIP(), []int{34} +} + +func (x *PurgeResponse) GetResults() []*PurgeResult { + if x != nil { + return x.Results + } + return nil +} + +type PurgeResult struct { + state protoimpl.MessageState `protogen:"open.v1"` + Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` + Component string `protobuf:"bytes,2,opt,name=component,proto3" json:"component,omitempty"` + // true if removed (or would be, in dry-run). + Purged bool `protobuf:"varint,3,opt,name=purged,proto3" json:"purged,omitempty"` + // Why eligible, or why refused. + Reason string `protobuf:"bytes,4,opt,name=reason,proto3" json:"reason,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PurgeResult) Reset() { + *x = PurgeResult{} + mi := &file_proto_mcp_v1_mcp_proto_msgTypes[35] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PurgeResult) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PurgeResult) ProtoMessage() {} + +func (x *PurgeResult) ProtoReflect() protoreflect.Message { + mi := &file_proto_mcp_v1_mcp_proto_msgTypes[35] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PurgeResult.ProtoReflect.Descriptor instead. +func (*PurgeResult) Descriptor() ([]byte, []int) { + return file_proto_mcp_v1_mcp_proto_rawDescGZIP(), []int{35} +} + +func (x *PurgeResult) GetService() string { + if x != nil { + return x.Service + } + return "" +} + +func (x *PurgeResult) GetComponent() string { + if x != nil { + return x.Component + } + return "" +} + +func (x *PurgeResult) GetPurged() bool { + if x != nil { + return x.Purged + } + return false +} + +func (x *PurgeResult) GetReason() string { + if x != nil { + return x.Reason + } + return "" +} + var File_proto_mcp_v1_mcp_proto protoreflect.FileDescriptor const file_proto_mcp_v1_mcp_proto_rawDesc = "" + @@ -1988,7 +2175,19 @@ const file_proto_mcp_v1_mcp_proto_rawDesc = "" + "\x11memory_free_bytes\x18\t \x01(\x04R\x0fmemoryFreeBytes\x12*\n" + "\x11cpu_usage_percent\x18\n" + " \x01(\x01R\x0fcpuUsagePercent\x12=\n" + - "\fuptime_since\x18\v \x01(\v2\x1a.google.protobuf.TimestampR\vuptimeSince2\x80\a\n" + + "\fuptime_since\x18\v \x01(\v2\x1a.google.protobuf.TimestampR\vuptimeSince\"\x8e\x01\n" + + "\fPurgeRequest\x12\x18\n" + + "\aservice\x18\x01 \x01(\tR\aservice\x12\x1c\n" + + "\tcomponent\x18\x02 \x01(\tR\tcomponent\x12\x17\n" + + "\adry_run\x18\x03 \x01(\bR\x06dryRun\x12-\n" + + "\x12defined_components\x18\x04 \x03(\tR\x11definedComponents\">\n" + + "\rPurgeResponse\x12-\n" + + "\aresults\x18\x01 \x03(\v2\x13.mcp.v1.PurgeResultR\aresults\"u\n" + + "\vPurgeResult\x12\x18\n" + + "\aservice\x18\x01 \x01(\tR\aservice\x12\x1c\n" + + "\tcomponent\x18\x02 \x01(\tR\tcomponent\x12\x16\n" + + "\x06purged\x18\x03 \x01(\bR\x06purged\x12\x16\n" + + "\x06reason\x18\x04 \x01(\tR\x06reason2\xbf\a\n" + "\x0fMcpAgentService\x127\n" + "\x06Deploy\x12\x15.mcp.v1.DeployRequest\x1a\x16.mcp.v1.DeployResponse\x12F\n" + "\vStopService\x12\x1a.mcp.v1.StopServiceRequest\x1a\x1b.mcp.v1.StopServiceResponse\x12I\n" + @@ -1999,6 +2198,7 @@ const file_proto_mcp_v1_mcp_proto_rawDesc = "" + "\x10GetServiceStatus\x12\x1f.mcp.v1.GetServiceStatusRequest\x1a .mcp.v1.GetServiceStatusResponse\x12@\n" + "\tLiveCheck\x12\x18.mcp.v1.LiveCheckRequest\x1a\x19.mcp.v1.LiveCheckResponse\x12R\n" + "\x0fAdoptContainers\x12\x1e.mcp.v1.AdoptContainersRequest\x1a\x1f.mcp.v1.AdoptContainersResponse\x12=\n" + + "\x0ePurgeComponent\x12\x14.mcp.v1.PurgeRequest\x1a\x15.mcp.v1.PurgeResponse\x12=\n" + "\bPushFile\x12\x17.mcp.v1.PushFileRequest\x1a\x18.mcp.v1.PushFileResponse\x12=\n" + "\bPullFile\x12\x17.mcp.v1.PullFileRequest\x1a\x18.mcp.v1.PullFileResponse\x12C\n" + "\n" + @@ -2016,7 +2216,7 @@ func file_proto_mcp_v1_mcp_proto_rawDescGZIP() []byte { return file_proto_mcp_v1_mcp_proto_rawDescData } -var file_proto_mcp_v1_mcp_proto_msgTypes = make([]protoimpl.MessageInfo, 33) +var file_proto_mcp_v1_mcp_proto_msgTypes = make([]protoimpl.MessageInfo, 36) var file_proto_mcp_v1_mcp_proto_goTypes = []any{ (*ComponentSpec)(nil), // 0: mcp.v1.ComponentSpec (*ServiceSpec)(nil), // 1: mcp.v1.ServiceSpec @@ -2051,7 +2251,10 @@ var file_proto_mcp_v1_mcp_proto_goTypes = []any{ (*PullFileResponse)(nil), // 30: mcp.v1.PullFileResponse (*NodeStatusRequest)(nil), // 31: mcp.v1.NodeStatusRequest (*NodeStatusResponse)(nil), // 32: mcp.v1.NodeStatusResponse - (*timestamppb.Timestamp)(nil), // 33: google.protobuf.Timestamp + (*PurgeRequest)(nil), // 33: mcp.v1.PurgeRequest + (*PurgeResponse)(nil), // 34: mcp.v1.PurgeResponse + (*PurgeResult)(nil), // 35: mcp.v1.PurgeResult + (*timestamppb.Timestamp)(nil), // 36: google.protobuf.Timestamp } var file_proto_mcp_v1_mcp_proto_depIdxs = []int32{ 0, // 0: mcp.v1.ServiceSpec.components:type_name -> mcp.v1.ComponentSpec @@ -2063,44 +2266,47 @@ var file_proto_mcp_v1_mcp_proto_depIdxs = []int32{ 1, // 6: mcp.v1.SyncDesiredStateRequest.services:type_name -> mcp.v1.ServiceSpec 13, // 7: mcp.v1.SyncDesiredStateResponse.results:type_name -> mcp.v1.ServiceSyncResult 16, // 8: mcp.v1.ServiceInfo.components:type_name -> mcp.v1.ComponentInfo - 33, // 9: mcp.v1.ComponentInfo.started:type_name -> google.protobuf.Timestamp + 36, // 9: mcp.v1.ComponentInfo.started:type_name -> google.protobuf.Timestamp 15, // 10: mcp.v1.ListServicesResponse.services:type_name -> mcp.v1.ServiceInfo - 33, // 11: mcp.v1.EventInfo.timestamp:type_name -> google.protobuf.Timestamp + 36, // 11: mcp.v1.EventInfo.timestamp:type_name -> google.protobuf.Timestamp 15, // 12: mcp.v1.GetServiceStatusResponse.services:type_name -> mcp.v1.ServiceInfo 19, // 13: mcp.v1.GetServiceStatusResponse.drift:type_name -> mcp.v1.DriftInfo 20, // 14: mcp.v1.GetServiceStatusResponse.recent_events:type_name -> mcp.v1.EventInfo 15, // 15: mcp.v1.LiveCheckResponse.services:type_name -> mcp.v1.ServiceInfo 25, // 16: mcp.v1.AdoptContainersResponse.results:type_name -> mcp.v1.AdoptResult - 33, // 17: mcp.v1.NodeStatusResponse.uptime_since:type_name -> google.protobuf.Timestamp - 2, // 18: mcp.v1.McpAgentService.Deploy:input_type -> mcp.v1.DeployRequest - 5, // 19: mcp.v1.McpAgentService.StopService:input_type -> mcp.v1.StopServiceRequest - 7, // 20: mcp.v1.McpAgentService.StartService:input_type -> mcp.v1.StartServiceRequest - 9, // 21: mcp.v1.McpAgentService.RestartService:input_type -> mcp.v1.RestartServiceRequest - 11, // 22: mcp.v1.McpAgentService.SyncDesiredState:input_type -> mcp.v1.SyncDesiredStateRequest - 14, // 23: mcp.v1.McpAgentService.ListServices:input_type -> mcp.v1.ListServicesRequest - 18, // 24: mcp.v1.McpAgentService.GetServiceStatus:input_type -> mcp.v1.GetServiceStatusRequest - 22, // 25: mcp.v1.McpAgentService.LiveCheck:input_type -> mcp.v1.LiveCheckRequest - 24, // 26: mcp.v1.McpAgentService.AdoptContainers:input_type -> mcp.v1.AdoptContainersRequest - 27, // 27: mcp.v1.McpAgentService.PushFile:input_type -> mcp.v1.PushFileRequest - 29, // 28: mcp.v1.McpAgentService.PullFile:input_type -> mcp.v1.PullFileRequest - 31, // 29: mcp.v1.McpAgentService.NodeStatus:input_type -> mcp.v1.NodeStatusRequest - 3, // 30: mcp.v1.McpAgentService.Deploy:output_type -> mcp.v1.DeployResponse - 6, // 31: mcp.v1.McpAgentService.StopService:output_type -> mcp.v1.StopServiceResponse - 8, // 32: mcp.v1.McpAgentService.StartService:output_type -> mcp.v1.StartServiceResponse - 10, // 33: mcp.v1.McpAgentService.RestartService:output_type -> mcp.v1.RestartServiceResponse - 12, // 34: mcp.v1.McpAgentService.SyncDesiredState:output_type -> mcp.v1.SyncDesiredStateResponse - 17, // 35: mcp.v1.McpAgentService.ListServices:output_type -> mcp.v1.ListServicesResponse - 21, // 36: mcp.v1.McpAgentService.GetServiceStatus:output_type -> mcp.v1.GetServiceStatusResponse - 23, // 37: mcp.v1.McpAgentService.LiveCheck:output_type -> mcp.v1.LiveCheckResponse - 26, // 38: mcp.v1.McpAgentService.AdoptContainers:output_type -> mcp.v1.AdoptContainersResponse - 28, // 39: mcp.v1.McpAgentService.PushFile:output_type -> mcp.v1.PushFileResponse - 30, // 40: mcp.v1.McpAgentService.PullFile:output_type -> mcp.v1.PullFileResponse - 32, // 41: mcp.v1.McpAgentService.NodeStatus:output_type -> mcp.v1.NodeStatusResponse - 30, // [30:42] is the sub-list for method output_type - 18, // [18:30] is the sub-list for method input_type - 18, // [18:18] is the sub-list for extension type_name - 18, // [18:18] is the sub-list for extension extendee - 0, // [0:18] is the sub-list for field type_name + 36, // 17: mcp.v1.NodeStatusResponse.uptime_since:type_name -> google.protobuf.Timestamp + 35, // 18: mcp.v1.PurgeResponse.results:type_name -> mcp.v1.PurgeResult + 2, // 19: mcp.v1.McpAgentService.Deploy:input_type -> mcp.v1.DeployRequest + 5, // 20: mcp.v1.McpAgentService.StopService:input_type -> mcp.v1.StopServiceRequest + 7, // 21: mcp.v1.McpAgentService.StartService:input_type -> mcp.v1.StartServiceRequest + 9, // 22: mcp.v1.McpAgentService.RestartService:input_type -> mcp.v1.RestartServiceRequest + 11, // 23: mcp.v1.McpAgentService.SyncDesiredState:input_type -> mcp.v1.SyncDesiredStateRequest + 14, // 24: mcp.v1.McpAgentService.ListServices:input_type -> mcp.v1.ListServicesRequest + 18, // 25: mcp.v1.McpAgentService.GetServiceStatus:input_type -> mcp.v1.GetServiceStatusRequest + 22, // 26: mcp.v1.McpAgentService.LiveCheck:input_type -> mcp.v1.LiveCheckRequest + 24, // 27: mcp.v1.McpAgentService.AdoptContainers:input_type -> mcp.v1.AdoptContainersRequest + 33, // 28: mcp.v1.McpAgentService.PurgeComponent:input_type -> mcp.v1.PurgeRequest + 27, // 29: mcp.v1.McpAgentService.PushFile:input_type -> mcp.v1.PushFileRequest + 29, // 30: mcp.v1.McpAgentService.PullFile:input_type -> mcp.v1.PullFileRequest + 31, // 31: mcp.v1.McpAgentService.NodeStatus:input_type -> mcp.v1.NodeStatusRequest + 3, // 32: mcp.v1.McpAgentService.Deploy:output_type -> mcp.v1.DeployResponse + 6, // 33: mcp.v1.McpAgentService.StopService:output_type -> mcp.v1.StopServiceResponse + 8, // 34: mcp.v1.McpAgentService.StartService:output_type -> mcp.v1.StartServiceResponse + 10, // 35: mcp.v1.McpAgentService.RestartService:output_type -> mcp.v1.RestartServiceResponse + 12, // 36: mcp.v1.McpAgentService.SyncDesiredState:output_type -> mcp.v1.SyncDesiredStateResponse + 17, // 37: mcp.v1.McpAgentService.ListServices:output_type -> mcp.v1.ListServicesResponse + 21, // 38: mcp.v1.McpAgentService.GetServiceStatus:output_type -> mcp.v1.GetServiceStatusResponse + 23, // 39: mcp.v1.McpAgentService.LiveCheck:output_type -> mcp.v1.LiveCheckResponse + 26, // 40: mcp.v1.McpAgentService.AdoptContainers:output_type -> mcp.v1.AdoptContainersResponse + 34, // 41: mcp.v1.McpAgentService.PurgeComponent:output_type -> mcp.v1.PurgeResponse + 28, // 42: mcp.v1.McpAgentService.PushFile:output_type -> mcp.v1.PushFileResponse + 30, // 43: mcp.v1.McpAgentService.PullFile:output_type -> mcp.v1.PullFileResponse + 32, // 44: mcp.v1.McpAgentService.NodeStatus:output_type -> mcp.v1.NodeStatusResponse + 32, // [32:45] is the sub-list for method output_type + 19, // [19:32] is the sub-list for method input_type + 19, // [19:19] is the sub-list for extension type_name + 19, // [19:19] is the sub-list for extension extendee + 0, // [0:19] is the sub-list for field type_name } func init() { file_proto_mcp_v1_mcp_proto_init() } @@ -2114,7 +2320,7 @@ func file_proto_mcp_v1_mcp_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_mcp_v1_mcp_proto_rawDesc), len(file_proto_mcp_v1_mcp_proto_rawDesc)), NumEnums: 0, - NumMessages: 33, + NumMessages: 36, NumExtensions: 0, NumServices: 1, }, diff --git a/gen/mcp/v1/mcp_grpc.pb.go b/gen/mcp/v1/mcp_grpc.pb.go index 1ba6d41..83fa016 100644 --- a/gen/mcp/v1/mcp_grpc.pb.go +++ b/gen/mcp/v1/mcp_grpc.pb.go @@ -28,6 +28,7 @@ const ( McpAgentService_GetServiceStatus_FullMethodName = "/mcp.v1.McpAgentService/GetServiceStatus" McpAgentService_LiveCheck_FullMethodName = "/mcp.v1.McpAgentService/LiveCheck" McpAgentService_AdoptContainers_FullMethodName = "/mcp.v1.McpAgentService/AdoptContainers" + McpAgentService_PurgeComponent_FullMethodName = "/mcp.v1.McpAgentService/PurgeComponent" McpAgentService_PushFile_FullMethodName = "/mcp.v1.McpAgentService/PushFile" McpAgentService_PullFile_FullMethodName = "/mcp.v1.McpAgentService/PullFile" McpAgentService_NodeStatus_FullMethodName = "/mcp.v1.McpAgentService/NodeStatus" @@ -50,6 +51,8 @@ type McpAgentServiceClient interface { LiveCheck(ctx context.Context, in *LiveCheckRequest, opts ...grpc.CallOption) (*LiveCheckResponse, error) // Adopt AdoptContainers(ctx context.Context, in *AdoptContainersRequest, opts ...grpc.CallOption) (*AdoptContainersResponse, error) + // Purge + PurgeComponent(ctx context.Context, in *PurgeRequest, opts ...grpc.CallOption) (*PurgeResponse, error) // File transfer PushFile(ctx context.Context, in *PushFileRequest, opts ...grpc.CallOption) (*PushFileResponse, error) PullFile(ctx context.Context, in *PullFileRequest, opts ...grpc.CallOption) (*PullFileResponse, error) @@ -155,6 +158,16 @@ func (c *mcpAgentServiceClient) AdoptContainers(ctx context.Context, in *AdoptCo return out, nil } +func (c *mcpAgentServiceClient) PurgeComponent(ctx context.Context, in *PurgeRequest, opts ...grpc.CallOption) (*PurgeResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(PurgeResponse) + err := c.cc.Invoke(ctx, McpAgentService_PurgeComponent_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *mcpAgentServiceClient) PushFile(ctx context.Context, in *PushFileRequest, opts ...grpc.CallOption) (*PushFileResponse, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(PushFileResponse) @@ -202,6 +215,8 @@ type McpAgentServiceServer interface { LiveCheck(context.Context, *LiveCheckRequest) (*LiveCheckResponse, error) // Adopt AdoptContainers(context.Context, *AdoptContainersRequest) (*AdoptContainersResponse, error) + // Purge + PurgeComponent(context.Context, *PurgeRequest) (*PurgeResponse, error) // File transfer PushFile(context.Context, *PushFileRequest) (*PushFileResponse, error) PullFile(context.Context, *PullFileRequest) (*PullFileResponse, error) @@ -244,6 +259,9 @@ func (UnimplementedMcpAgentServiceServer) LiveCheck(context.Context, *LiveCheckR func (UnimplementedMcpAgentServiceServer) AdoptContainers(context.Context, *AdoptContainersRequest) (*AdoptContainersResponse, error) { return nil, status.Error(codes.Unimplemented, "method AdoptContainers not implemented") } +func (UnimplementedMcpAgentServiceServer) PurgeComponent(context.Context, *PurgeRequest) (*PurgeResponse, error) { + return nil, status.Error(codes.Unimplemented, "method PurgeComponent not implemented") +} func (UnimplementedMcpAgentServiceServer) PushFile(context.Context, *PushFileRequest) (*PushFileResponse, error) { return nil, status.Error(codes.Unimplemented, "method PushFile not implemented") } @@ -436,6 +454,24 @@ func _McpAgentService_AdoptContainers_Handler(srv interface{}, ctx context.Conte return interceptor(ctx, in, info, handler) } +func _McpAgentService_PurgeComponent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PurgeRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(McpAgentServiceServer).PurgeComponent(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: McpAgentService_PurgeComponent_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(McpAgentServiceServer).PurgeComponent(ctx, req.(*PurgeRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _McpAgentService_PushFile_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(PushFileRequest) if err := dec(in); err != nil { @@ -533,6 +569,10 @@ var McpAgentService_ServiceDesc = grpc.ServiceDesc{ MethodName: "AdoptContainers", Handler: _McpAgentService_AdoptContainers_Handler, }, + { + MethodName: "PurgeComponent", + Handler: _McpAgentService_PurgeComponent_Handler, + }, { MethodName: "PushFile", Handler: _McpAgentService_PushFile_Handler, diff --git a/internal/agent/purge.go b/internal/agent/purge.go new file mode 100644 index 0000000..a18ac5d --- /dev/null +++ b/internal/agent/purge.go @@ -0,0 +1,155 @@ +package agent + +import ( + "context" + "fmt" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/registry" +) + +// PurgeComponent removes stale registry entries for components that are both +// gone (observed state is removed/unknown/exited) and unwanted (not in any +// current service definition). It never touches running containers. +func (a *Agent) PurgeComponent(ctx context.Context, req *mcpv1.PurgeRequest) (*mcpv1.PurgeResponse, error) { + a.Logger.Info("PurgeComponent", + "service", req.GetService(), + "component", req.GetComponent(), + "dry_run", req.GetDryRun(), + ) + + // Build a set of defined service/component pairs for quick lookup. + defined := make(map[string]bool, len(req.GetDefinedComponents())) + for _, dc := range req.GetDefinedComponents() { + defined[dc] = true + } + + // Determine which services to examine. + var services []registry.Service + if req.GetService() != "" { + svc, err := registry.GetService(a.DB, req.GetService()) + if err != nil { + return nil, fmt.Errorf("get service %q: %w", req.GetService(), err) + } + services = []registry.Service{*svc} + } else { + var err error + services, err = registry.ListServices(a.DB) + if err != nil { + return nil, fmt.Errorf("list services: %w", err) + } + } + + var results []*mcpv1.PurgeResult + + for _, svc := range services { + components, err := registry.ListComponents(a.DB, svc.Name) + if err != nil { + return nil, fmt.Errorf("list components for %q: %w", svc.Name, err) + } + + // If a specific component was requested, filter to just that one. + if req.GetComponent() != "" { + var filtered []registry.Component + for _, c := range components { + if c.Name == req.GetComponent() { + filtered = append(filtered, c) + } + } + components = filtered + } + + for _, comp := range components { + result := a.evaluatePurge(svc.Name, &comp, defined, req.GetDryRun()) + results = append(results, result) + } + + // If all components of this service were purged (not dry-run), + // check if the service should be cleaned up too. + if !req.GetDryRun() { + remaining, err := registry.ListComponents(a.DB, svc.Name) + if err != nil { + a.Logger.Warn("failed to check remaining components", "service", svc.Name, "err", err) + continue + } + if len(remaining) == 0 { + if err := registry.DeleteService(a.DB, svc.Name); err != nil { + a.Logger.Warn("failed to delete empty service", "service", svc.Name, "err", err) + } else { + a.Logger.Info("purged empty service", "service", svc.Name) + } + } + } + } + + return &mcpv1.PurgeResponse{Results: results}, nil +} + +// purgeableStates are observed states that indicate a component's container +// is gone and the registry entry can be safely removed. +var purgeableStates = map[string]bool{ + "removed": true, + "unknown": true, + "exited": true, +} + +// evaluatePurge checks whether a single component is eligible for purge and, +// if not in dry-run mode, deletes it. +func (a *Agent) evaluatePurge(service string, comp *registry.Component, defined map[string]bool, dryRun bool) *mcpv1.PurgeResult { + key := service + "/" + comp.Name + + // Safety: refuse to purge components with a live container. + if !purgeableStates[comp.ObservedState] { + return &mcpv1.PurgeResult{ + Service: service, + Component: comp.Name, + Purged: false, + Reason: fmt.Sprintf("observed=%s, container still exists", comp.ObservedState), + } + } + + // Don't purge components that are still in service definitions. + if defined[key] { + return &mcpv1.PurgeResult{ + Service: service, + Component: comp.Name, + Purged: false, + Reason: "still in service definitions", + } + } + + reason := fmt.Sprintf("observed=%s, not in service definitions", comp.ObservedState) + + if dryRun { + return &mcpv1.PurgeResult{ + Service: service, + Component: comp.Name, + Purged: true, + Reason: reason, + } + } + + // Delete events first (events table has no FK to components). + if err := registry.DeleteComponentEvents(a.DB, service, comp.Name); err != nil { + a.Logger.Warn("failed to delete events during purge", "service", service, "component", comp.Name, "err", err) + } + + // Delete the component (CASCADE handles ports, volumes, cmd). + if err := registry.DeleteComponent(a.DB, service, comp.Name); err != nil { + return &mcpv1.PurgeResult{ + Service: service, + Component: comp.Name, + Purged: false, + Reason: fmt.Sprintf("delete failed: %v", err), + } + } + + a.Logger.Info("purged component", "service", service, "component", comp.Name, "reason", reason) + + return &mcpv1.PurgeResult{ + Service: service, + Component: comp.Name, + Purged: true, + Reason: reason, + } +} diff --git a/internal/agent/purge_test.go b/internal/agent/purge_test.go new file mode 100644 index 0000000..079b5e2 --- /dev/null +++ b/internal/agent/purge_test.go @@ -0,0 +1,405 @@ +package agent + +import ( + "context" + "testing" + + mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1" + "git.wntrmute.dev/kyle/mcp/internal/registry" +) + +func TestPurgeComponentRemoved(t *testing.T) { + rt := &fakeRuntime{} + a := newTestAgent(t, rt) + ctx := context.Background() + + // Set up a service with a stale component. + if err := registry.CreateService(a.DB, "mcns", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "coredns", + Service: "mcns", + Image: "coredns:latest", + DesiredState: "running", + ObservedState: "removed", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + // Insert an event for this component. + if err := registry.InsertEvent(a.DB, "mcns", "coredns", "running", "removed"); err != nil { + t.Fatalf("insert event: %v", err) + } + + resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{ + DefinedComponents: []string{"mcns/mcns"}, + }) + if err != nil { + t.Fatalf("PurgeComponent: %v", err) + } + + if len(resp.Results) != 1 { + t.Fatalf("expected 1 result, got %d", len(resp.Results)) + } + + r := resp.Results[0] + if !r.Purged { + t.Fatalf("expected purged=true, got reason: %s", r.Reason) + } + if r.Service != "mcns" || r.Component != "coredns" { + t.Fatalf("unexpected result: %s/%s", r.Service, r.Component) + } + + // Verify component was deleted. + _, err = registry.GetComponent(a.DB, "mcns", "coredns") + if err == nil { + t.Fatal("component should have been deleted") + } + + // Service should also be deleted since it has no remaining components. + _, err = registry.GetService(a.DB, "mcns") + if err == nil { + t.Fatal("service should have been deleted (no remaining components)") + } +} + +func TestPurgeRefusesRunning(t *testing.T) { + rt := &fakeRuntime{} + a := newTestAgent(t, rt) + ctx := context.Background() + + if err := registry.CreateService(a.DB, "mcr", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "api", + Service: "mcr", + Image: "mcr:latest", + DesiredState: "running", + ObservedState: "running", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{ + Service: "mcr", + Component: "api", + }) + if err != nil { + t.Fatalf("PurgeComponent: %v", err) + } + + if len(resp.Results) != 1 { + t.Fatalf("expected 1 result, got %d", len(resp.Results)) + } + if resp.Results[0].Purged { + t.Fatal("should not purge a running component") + } + + // Verify component still exists. + _, err = registry.GetComponent(a.DB, "mcr", "api") + if err != nil { + t.Fatalf("component should still exist: %v", err) + } +} + +func TestPurgeRefusesStopped(t *testing.T) { + rt := &fakeRuntime{} + a := newTestAgent(t, rt) + ctx := context.Background() + + if err := registry.CreateService(a.DB, "mcr", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "api", + Service: "mcr", + Image: "mcr:latest", + DesiredState: "stopped", + ObservedState: "stopped", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{ + Service: "mcr", + Component: "api", + }) + if err != nil { + t.Fatalf("PurgeComponent: %v", err) + } + + if resp.Results[0].Purged { + t.Fatal("should not purge a stopped component") + } +} + +func TestPurgeSkipsDefinedComponent(t *testing.T) { + rt := &fakeRuntime{} + a := newTestAgent(t, rt) + ctx := context.Background() + + if err := registry.CreateService(a.DB, "mcns", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "mcns", + Service: "mcns", + Image: "mcns:latest", + DesiredState: "running", + ObservedState: "exited", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{ + DefinedComponents: []string{"mcns/mcns"}, + }) + if err != nil { + t.Fatalf("PurgeComponent: %v", err) + } + + if len(resp.Results) != 1 { + t.Fatalf("expected 1 result, got %d", len(resp.Results)) + } + if resp.Results[0].Purged { + t.Fatal("should not purge a component that is still in service definitions") + } + if resp.Results[0].Reason != "still in service definitions" { + t.Fatalf("unexpected reason: %s", resp.Results[0].Reason) + } +} + +func TestPurgeDryRun(t *testing.T) { + rt := &fakeRuntime{} + a := newTestAgent(t, rt) + ctx := context.Background() + + if err := registry.CreateService(a.DB, "mcns", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "coredns", + Service: "mcns", + Image: "coredns:latest", + DesiredState: "running", + ObservedState: "removed", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{ + DryRun: true, + DefinedComponents: []string{"mcns/mcns"}, + }) + if err != nil { + t.Fatalf("PurgeComponent: %v", err) + } + + if len(resp.Results) != 1 { + t.Fatalf("expected 1 result, got %d", len(resp.Results)) + } + if !resp.Results[0].Purged { + t.Fatal("dry run should report purged=true for eligible components") + } + + // Verify component was NOT deleted (dry run). + _, err = registry.GetComponent(a.DB, "mcns", "coredns") + if err != nil { + t.Fatalf("component should still exist after dry run: %v", err) + } +} + +func TestPurgeServiceFilter(t *testing.T) { + rt := &fakeRuntime{} + a := newTestAgent(t, rt) + ctx := context.Background() + + // Create two services. + if err := registry.CreateService(a.DB, "mcns", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "coredns", Service: "mcns", Image: "coredns:latest", + DesiredState: "running", ObservedState: "removed", + }); err != nil { + t.Fatalf("create component: %v", err) + } + if err := registry.CreateService(a.DB, "mcr", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "old", Service: "mcr", Image: "old:latest", + DesiredState: "running", ObservedState: "removed", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + // Purge only mcns. + resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{ + Service: "mcns", + }) + if err != nil { + t.Fatalf("PurgeComponent: %v", err) + } + + if len(resp.Results) != 1 { + t.Fatalf("expected 1 result, got %d", len(resp.Results)) + } + if resp.Results[0].Service != "mcns" { + t.Fatalf("expected mcns, got %s", resp.Results[0].Service) + } + + // mcr/old should still exist. + _, err = registry.GetComponent(a.DB, "mcr", "old") + if err != nil { + t.Fatalf("mcr/old should still exist: %v", err) + } +} + +func TestPurgeServiceDeletedWhenEmpty(t *testing.T) { + rt := &fakeRuntime{} + a := newTestAgent(t, rt) + ctx := context.Background() + + if err := registry.CreateService(a.DB, "mcns", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "coredns", Service: "mcns", Image: "coredns:latest", + DesiredState: "running", ObservedState: "removed", + }); err != nil { + t.Fatalf("create component: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "old-thing", Service: "mcns", Image: "old:latest", + DesiredState: "stopped", ObservedState: "unknown", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{}) + if err != nil { + t.Fatalf("PurgeComponent: %v", err) + } + + // Both components should be purged. + if len(resp.Results) != 2 { + t.Fatalf("expected 2 results, got %d", len(resp.Results)) + } + for _, r := range resp.Results { + if !r.Purged { + t.Fatalf("expected purged=true for %s/%s: %s", r.Service, r.Component, r.Reason) + } + } + + // Service should be deleted. + _, err = registry.GetService(a.DB, "mcns") + if err == nil { + t.Fatal("service should have been deleted") + } +} + +func TestPurgeServiceKeptWhenComponentsRemain(t *testing.T) { + rt := &fakeRuntime{} + a := newTestAgent(t, rt) + ctx := context.Background() + + if err := registry.CreateService(a.DB, "mcns", true); err != nil { + t.Fatalf("create service: %v", err) + } + // Stale component (will be purged). + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "coredns", Service: "mcns", Image: "coredns:latest", + DesiredState: "running", ObservedState: "removed", + }); err != nil { + t.Fatalf("create component: %v", err) + } + // Live component (will not be purged). + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "mcns", Service: "mcns", Image: "mcns:latest", + DesiredState: "running", ObservedState: "running", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{}) + if err != nil { + t.Fatalf("PurgeComponent: %v", err) + } + + if len(resp.Results) != 2 { + t.Fatalf("expected 2 results, got %d", len(resp.Results)) + } + + // coredns should be purged, mcns should not. + purged := 0 + for _, r := range resp.Results { + if r.Purged { + purged++ + if r.Component != "coredns" { + t.Fatalf("expected coredns to be purged, got %s", r.Component) + } + } + } + if purged != 1 { + t.Fatalf("expected 1 purged, got %d", purged) + } + + // Service should still exist. + _, err = registry.GetService(a.DB, "mcns") + if err != nil { + t.Fatalf("service should still exist: %v", err) + } +} + +func TestPurgeExitedState(t *testing.T) { + rt := &fakeRuntime{} + a := newTestAgent(t, rt) + ctx := context.Background() + + if err := registry.CreateService(a.DB, "test", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "old", Service: "test", Image: "old:latest", + DesiredState: "stopped", ObservedState: "exited", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{}) + if err != nil { + t.Fatalf("PurgeComponent: %v", err) + } + + if len(resp.Results) != 1 || !resp.Results[0].Purged { + t.Fatalf("exited component should be purgeable") + } +} + +func TestPurgeUnknownState(t *testing.T) { + rt := &fakeRuntime{} + a := newTestAgent(t, rt) + ctx := context.Background() + + if err := registry.CreateService(a.DB, "test", true); err != nil { + t.Fatalf("create service: %v", err) + } + if err := registry.CreateComponent(a.DB, ®istry.Component{ + Name: "ghost", Service: "test", Image: "ghost:latest", + DesiredState: "running", ObservedState: "unknown", + }); err != nil { + t.Fatalf("create component: %v", err) + } + + resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{}) + if err != nil { + t.Fatalf("PurgeComponent: %v", err) + } + + if len(resp.Results) != 1 || !resp.Results[0].Purged { + t.Fatalf("unknown component should be purgeable") + } +} diff --git a/internal/registry/events.go b/internal/registry/events.go index 18cfb92..f46831f 100644 --- a/internal/registry/events.go +++ b/internal/registry/events.go @@ -83,6 +83,15 @@ func CountEvents(db *sql.DB, service, component string, since time.Time) (int, e return count, nil } +// DeleteComponentEvents deletes all events for a specific component. +func DeleteComponentEvents(db *sql.DB, service, component string) error { + _, err := db.Exec("DELETE FROM events WHERE service = ? AND component = ?", service, component) + if err != nil { + return fmt.Errorf("delete events %q/%q: %w", service, component, err) + } + return nil +} + // PruneEvents deletes events older than the given time. func PruneEvents(db *sql.DB, before time.Time) (int64, error) { res, err := db.Exec( diff --git a/proto/mcp/v1/mcp.proto b/proto/mcp/v1/mcp.proto index 8603a48..93c6de7 100644 --- a/proto/mcp/v1/mcp.proto +++ b/proto/mcp/v1/mcp.proto @@ -23,6 +23,9 @@ service McpAgentService { // Adopt rpc AdoptContainers(AdoptContainersRequest) returns (AdoptContainersResponse); + // Purge + rpc PurgeComponent(PurgeRequest) returns (PurgeResponse); + // File transfer rpc PushFile(PushFileRequest) returns (PushFileResponse); rpc PullFile(PullFileRequest) returns (PullFileResponse); @@ -234,3 +237,30 @@ message NodeStatusResponse { double cpu_usage_percent = 10; google.protobuf.Timestamp uptime_since = 11; } + +// --- Purge --- + +message PurgeRequest { + // Service name (empty = all services). + string service = 1; + // Component name (empty = all eligible in service). + string component = 2; + // Preview only, do not modify registry. + bool dry_run = 3; + // Currently-defined service/component pairs (e.g., "mcns/mcns"). + // The agent uses this to determine what is "not in any service definition". + repeated string defined_components = 4; +} + +message PurgeResponse { + repeated PurgeResult results = 1; +} + +message PurgeResult { + string service = 1; + string component = 2; + // true if removed (or would be, in dry-run). + bool purged = 3; + // Why eligible, or why refused. + string reason = 4; +}