Implement mcp purge command for registry cleanup

Add PurgeComponent RPC to the agent service that removes stale registry
entries for components that are both gone (observed state is removed,
unknown, or exited) and unwanted (not in any current service definition).
Refuses to purge components with running or stopped containers. When all
components of a service are purged, the service row is deleted too.
Supports --dry-run to preview without modifying the database.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-26 22:30:45 -07:00
parent 1afbf5e1f6
commit 1e58dcce27
8 changed files with 1001 additions and 36 deletions

View File

@@ -47,6 +47,7 @@ func main() {
root.AddCommand(pushCmd())
root.AddCommand(pullCmd())
root.AddCommand(nodeCmd())
root.AddCommand(purgeCmd())
if err := root.Execute(); err != nil {
log.Fatal(err)

119
cmd/mcp/purge.go Normal file
View File

@@ -0,0 +1,119 @@
package main
import (
"context"
"fmt"
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
"git.wntrmute.dev/kyle/mcp/internal/config"
"git.wntrmute.dev/kyle/mcp/internal/servicedef"
"github.com/spf13/cobra"
)
func purgeCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "purge [service[/component]]",
Short: "Remove stale registry entries for gone, undefined components",
Long: `Purge removes registry entries that are both unwanted (not in any
current service definition) and gone (no corresponding container in the
runtime). It never stops or removes running containers.
Use --dry-run to preview what would be purged.`,
Args: cobra.MaximumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
cfg, err := config.LoadCLIConfig(cfgPath)
if err != nil {
return fmt.Errorf("load config: %w", err)
}
dryRun, _ := cmd.Flags().GetBool("dry-run")
var service, component string
if len(args) == 1 {
service, component = parseServiceArg(args[0])
}
// Load all local service definitions to build the set of
// currently-defined service/component pairs.
definedComponents := buildDefinedComponents(cfg)
// Build node address lookup.
nodeAddr := make(map[string]string, len(cfg.Nodes))
for _, n := range cfg.Nodes {
nodeAddr[n.Name] = n.Address
}
// If a specific service was given and we can find its node,
// only talk to that node. Otherwise, talk to all nodes.
targetNodes := cfg.Nodes
if service != "" {
if nodeName, nodeAddr, err := findServiceNode(cfg, service); err == nil {
targetNodes = []config.NodeConfig{{Name: nodeName, Address: nodeAddr}}
}
}
anyResults := false
for _, node := range targetNodes {
client, conn, err := dialAgent(node.Address, cfg)
if err != nil {
return fmt.Errorf("dial %s: %w", node.Name, err)
}
defer func() { _ = conn.Close() }()
resp, err := client.PurgeComponent(context.Background(), &mcpv1.PurgeRequest{
Service: service,
Component: component,
DryRun: dryRun,
DefinedComponents: definedComponents,
})
if err != nil {
return fmt.Errorf("purge on %s: %w", node.Name, err)
}
for _, r := range resp.GetResults() {
anyResults = true
if r.GetPurged() {
if dryRun {
fmt.Printf("would purge %s/%s (%s)\n", r.GetService(), r.GetComponent(), r.GetReason())
} else {
fmt.Printf("purged %s/%s (%s)\n", r.GetService(), r.GetComponent(), r.GetReason())
}
} else {
fmt.Printf("skipped %s/%s (%s)\n", r.GetService(), r.GetComponent(), r.GetReason())
}
}
}
if !anyResults {
fmt.Println("nothing to purge")
}
return nil
},
}
cmd.Flags().Bool("dry-run", false, "preview what would be purged without modifying the registry")
return cmd
}
// buildDefinedComponents reads all local service definition files and returns
// a list of "service/component" strings for every defined component.
func buildDefinedComponents(cfg *config.CLIConfig) []string {
defs, err := servicedef.LoadAll(cfg.Services.Dir)
if err != nil {
// If we can't read service definitions, return an empty list.
// The agent will treat every component as undefined, which is the
// most conservative behavior (everything eligible gets purged).
return nil
}
var defined []string
for _, def := range defs {
for _, comp := range def.Components {
defined = append(defined, def.Name+"/"+comp.Name)
}
}
return defined
}

View File

@@ -1865,6 +1865,193 @@ func (x *NodeStatusResponse) GetUptimeSince() *timestamppb.Timestamp {
return nil
}
type PurgeRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
// Service name (empty = all services).
Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"`
// Component name (empty = all eligible in service).
Component string `protobuf:"bytes,2,opt,name=component,proto3" json:"component,omitempty"`
// Preview only, do not modify registry.
DryRun bool `protobuf:"varint,3,opt,name=dry_run,json=dryRun,proto3" json:"dry_run,omitempty"`
// Currently-defined service/component pairs (e.g., "mcns/mcns").
// The agent uses this to determine what is "not in any service definition".
DefinedComponents []string `protobuf:"bytes,4,rep,name=defined_components,json=definedComponents,proto3" json:"defined_components,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *PurgeRequest) Reset() {
*x = PurgeRequest{}
mi := &file_proto_mcp_v1_mcp_proto_msgTypes[33]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *PurgeRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PurgeRequest) ProtoMessage() {}
func (x *PurgeRequest) ProtoReflect() protoreflect.Message {
mi := &file_proto_mcp_v1_mcp_proto_msgTypes[33]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PurgeRequest.ProtoReflect.Descriptor instead.
func (*PurgeRequest) Descriptor() ([]byte, []int) {
return file_proto_mcp_v1_mcp_proto_rawDescGZIP(), []int{33}
}
func (x *PurgeRequest) GetService() string {
if x != nil {
return x.Service
}
return ""
}
func (x *PurgeRequest) GetComponent() string {
if x != nil {
return x.Component
}
return ""
}
func (x *PurgeRequest) GetDryRun() bool {
if x != nil {
return x.DryRun
}
return false
}
func (x *PurgeRequest) GetDefinedComponents() []string {
if x != nil {
return x.DefinedComponents
}
return nil
}
type PurgeResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Results []*PurgeResult `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *PurgeResponse) Reset() {
*x = PurgeResponse{}
mi := &file_proto_mcp_v1_mcp_proto_msgTypes[34]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *PurgeResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PurgeResponse) ProtoMessage() {}
func (x *PurgeResponse) ProtoReflect() protoreflect.Message {
mi := &file_proto_mcp_v1_mcp_proto_msgTypes[34]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PurgeResponse.ProtoReflect.Descriptor instead.
func (*PurgeResponse) Descriptor() ([]byte, []int) {
return file_proto_mcp_v1_mcp_proto_rawDescGZIP(), []int{34}
}
func (x *PurgeResponse) GetResults() []*PurgeResult {
if x != nil {
return x.Results
}
return nil
}
type PurgeResult struct {
state protoimpl.MessageState `protogen:"open.v1"`
Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"`
Component string `protobuf:"bytes,2,opt,name=component,proto3" json:"component,omitempty"`
// true if removed (or would be, in dry-run).
Purged bool `protobuf:"varint,3,opt,name=purged,proto3" json:"purged,omitempty"`
// Why eligible, or why refused.
Reason string `protobuf:"bytes,4,opt,name=reason,proto3" json:"reason,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *PurgeResult) Reset() {
*x = PurgeResult{}
mi := &file_proto_mcp_v1_mcp_proto_msgTypes[35]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *PurgeResult) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PurgeResult) ProtoMessage() {}
func (x *PurgeResult) ProtoReflect() protoreflect.Message {
mi := &file_proto_mcp_v1_mcp_proto_msgTypes[35]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PurgeResult.ProtoReflect.Descriptor instead.
func (*PurgeResult) Descriptor() ([]byte, []int) {
return file_proto_mcp_v1_mcp_proto_rawDescGZIP(), []int{35}
}
func (x *PurgeResult) GetService() string {
if x != nil {
return x.Service
}
return ""
}
func (x *PurgeResult) GetComponent() string {
if x != nil {
return x.Component
}
return ""
}
func (x *PurgeResult) GetPurged() bool {
if x != nil {
return x.Purged
}
return false
}
func (x *PurgeResult) GetReason() string {
if x != nil {
return x.Reason
}
return ""
}
var File_proto_mcp_v1_mcp_proto protoreflect.FileDescriptor
const file_proto_mcp_v1_mcp_proto_rawDesc = "" +
@@ -1988,7 +2175,19 @@ const file_proto_mcp_v1_mcp_proto_rawDesc = "" +
"\x11memory_free_bytes\x18\t \x01(\x04R\x0fmemoryFreeBytes\x12*\n" +
"\x11cpu_usage_percent\x18\n" +
" \x01(\x01R\x0fcpuUsagePercent\x12=\n" +
"\fuptime_since\x18\v \x01(\v2\x1a.google.protobuf.TimestampR\vuptimeSince2\x80\a\n" +
"\fuptime_since\x18\v \x01(\v2\x1a.google.protobuf.TimestampR\vuptimeSince\"\x8e\x01\n" +
"\fPurgeRequest\x12\x18\n" +
"\aservice\x18\x01 \x01(\tR\aservice\x12\x1c\n" +
"\tcomponent\x18\x02 \x01(\tR\tcomponent\x12\x17\n" +
"\adry_run\x18\x03 \x01(\bR\x06dryRun\x12-\n" +
"\x12defined_components\x18\x04 \x03(\tR\x11definedComponents\">\n" +
"\rPurgeResponse\x12-\n" +
"\aresults\x18\x01 \x03(\v2\x13.mcp.v1.PurgeResultR\aresults\"u\n" +
"\vPurgeResult\x12\x18\n" +
"\aservice\x18\x01 \x01(\tR\aservice\x12\x1c\n" +
"\tcomponent\x18\x02 \x01(\tR\tcomponent\x12\x16\n" +
"\x06purged\x18\x03 \x01(\bR\x06purged\x12\x16\n" +
"\x06reason\x18\x04 \x01(\tR\x06reason2\xbf\a\n" +
"\x0fMcpAgentService\x127\n" +
"\x06Deploy\x12\x15.mcp.v1.DeployRequest\x1a\x16.mcp.v1.DeployResponse\x12F\n" +
"\vStopService\x12\x1a.mcp.v1.StopServiceRequest\x1a\x1b.mcp.v1.StopServiceResponse\x12I\n" +
@@ -1999,6 +2198,7 @@ const file_proto_mcp_v1_mcp_proto_rawDesc = "" +
"\x10GetServiceStatus\x12\x1f.mcp.v1.GetServiceStatusRequest\x1a .mcp.v1.GetServiceStatusResponse\x12@\n" +
"\tLiveCheck\x12\x18.mcp.v1.LiveCheckRequest\x1a\x19.mcp.v1.LiveCheckResponse\x12R\n" +
"\x0fAdoptContainers\x12\x1e.mcp.v1.AdoptContainersRequest\x1a\x1f.mcp.v1.AdoptContainersResponse\x12=\n" +
"\x0ePurgeComponent\x12\x14.mcp.v1.PurgeRequest\x1a\x15.mcp.v1.PurgeResponse\x12=\n" +
"\bPushFile\x12\x17.mcp.v1.PushFileRequest\x1a\x18.mcp.v1.PushFileResponse\x12=\n" +
"\bPullFile\x12\x17.mcp.v1.PullFileRequest\x1a\x18.mcp.v1.PullFileResponse\x12C\n" +
"\n" +
@@ -2016,7 +2216,7 @@ func file_proto_mcp_v1_mcp_proto_rawDescGZIP() []byte {
return file_proto_mcp_v1_mcp_proto_rawDescData
}
var file_proto_mcp_v1_mcp_proto_msgTypes = make([]protoimpl.MessageInfo, 33)
var file_proto_mcp_v1_mcp_proto_msgTypes = make([]protoimpl.MessageInfo, 36)
var file_proto_mcp_v1_mcp_proto_goTypes = []any{
(*ComponentSpec)(nil), // 0: mcp.v1.ComponentSpec
(*ServiceSpec)(nil), // 1: mcp.v1.ServiceSpec
@@ -2051,7 +2251,10 @@ var file_proto_mcp_v1_mcp_proto_goTypes = []any{
(*PullFileResponse)(nil), // 30: mcp.v1.PullFileResponse
(*NodeStatusRequest)(nil), // 31: mcp.v1.NodeStatusRequest
(*NodeStatusResponse)(nil), // 32: mcp.v1.NodeStatusResponse
(*timestamppb.Timestamp)(nil), // 33: google.protobuf.Timestamp
(*PurgeRequest)(nil), // 33: mcp.v1.PurgeRequest
(*PurgeResponse)(nil), // 34: mcp.v1.PurgeResponse
(*PurgeResult)(nil), // 35: mcp.v1.PurgeResult
(*timestamppb.Timestamp)(nil), // 36: google.protobuf.Timestamp
}
var file_proto_mcp_v1_mcp_proto_depIdxs = []int32{
0, // 0: mcp.v1.ServiceSpec.components:type_name -> mcp.v1.ComponentSpec
@@ -2063,44 +2266,47 @@ var file_proto_mcp_v1_mcp_proto_depIdxs = []int32{
1, // 6: mcp.v1.SyncDesiredStateRequest.services:type_name -> mcp.v1.ServiceSpec
13, // 7: mcp.v1.SyncDesiredStateResponse.results:type_name -> mcp.v1.ServiceSyncResult
16, // 8: mcp.v1.ServiceInfo.components:type_name -> mcp.v1.ComponentInfo
33, // 9: mcp.v1.ComponentInfo.started:type_name -> google.protobuf.Timestamp
36, // 9: mcp.v1.ComponentInfo.started:type_name -> google.protobuf.Timestamp
15, // 10: mcp.v1.ListServicesResponse.services:type_name -> mcp.v1.ServiceInfo
33, // 11: mcp.v1.EventInfo.timestamp:type_name -> google.protobuf.Timestamp
36, // 11: mcp.v1.EventInfo.timestamp:type_name -> google.protobuf.Timestamp
15, // 12: mcp.v1.GetServiceStatusResponse.services:type_name -> mcp.v1.ServiceInfo
19, // 13: mcp.v1.GetServiceStatusResponse.drift:type_name -> mcp.v1.DriftInfo
20, // 14: mcp.v1.GetServiceStatusResponse.recent_events:type_name -> mcp.v1.EventInfo
15, // 15: mcp.v1.LiveCheckResponse.services:type_name -> mcp.v1.ServiceInfo
25, // 16: mcp.v1.AdoptContainersResponse.results:type_name -> mcp.v1.AdoptResult
33, // 17: mcp.v1.NodeStatusResponse.uptime_since:type_name -> google.protobuf.Timestamp
2, // 18: mcp.v1.McpAgentService.Deploy:input_type -> mcp.v1.DeployRequest
5, // 19: mcp.v1.McpAgentService.StopService:input_type -> mcp.v1.StopServiceRequest
7, // 20: mcp.v1.McpAgentService.StartService:input_type -> mcp.v1.StartServiceRequest
9, // 21: mcp.v1.McpAgentService.RestartService:input_type -> mcp.v1.RestartServiceRequest
11, // 22: mcp.v1.McpAgentService.SyncDesiredState:input_type -> mcp.v1.SyncDesiredStateRequest
14, // 23: mcp.v1.McpAgentService.ListServices:input_type -> mcp.v1.ListServicesRequest
18, // 24: mcp.v1.McpAgentService.GetServiceStatus:input_type -> mcp.v1.GetServiceStatusRequest
22, // 25: mcp.v1.McpAgentService.LiveCheck:input_type -> mcp.v1.LiveCheckRequest
24, // 26: mcp.v1.McpAgentService.AdoptContainers:input_type -> mcp.v1.AdoptContainersRequest
27, // 27: mcp.v1.McpAgentService.PushFile:input_type -> mcp.v1.PushFileRequest
29, // 28: mcp.v1.McpAgentService.PullFile:input_type -> mcp.v1.PullFileRequest
31, // 29: mcp.v1.McpAgentService.NodeStatus:input_type -> mcp.v1.NodeStatusRequest
3, // 30: mcp.v1.McpAgentService.Deploy:output_type -> mcp.v1.DeployResponse
6, // 31: mcp.v1.McpAgentService.StopService:output_type -> mcp.v1.StopServiceResponse
8, // 32: mcp.v1.McpAgentService.StartService:output_type -> mcp.v1.StartServiceResponse
10, // 33: mcp.v1.McpAgentService.RestartService:output_type -> mcp.v1.RestartServiceResponse
12, // 34: mcp.v1.McpAgentService.SyncDesiredState:output_type -> mcp.v1.SyncDesiredStateResponse
17, // 35: mcp.v1.McpAgentService.ListServices:output_type -> mcp.v1.ListServicesResponse
21, // 36: mcp.v1.McpAgentService.GetServiceStatus:output_type -> mcp.v1.GetServiceStatusResponse
23, // 37: mcp.v1.McpAgentService.LiveCheck:output_type -> mcp.v1.LiveCheckResponse
26, // 38: mcp.v1.McpAgentService.AdoptContainers:output_type -> mcp.v1.AdoptContainersResponse
28, // 39: mcp.v1.McpAgentService.PushFile:output_type -> mcp.v1.PushFileResponse
30, // 40: mcp.v1.McpAgentService.PullFile:output_type -> mcp.v1.PullFileResponse
32, // 41: mcp.v1.McpAgentService.NodeStatus:output_type -> mcp.v1.NodeStatusResponse
30, // [30:42] is the sub-list for method output_type
18, // [18:30] is the sub-list for method input_type
18, // [18:18] is the sub-list for extension type_name
18, // [18:18] is the sub-list for extension extendee
0, // [0:18] is the sub-list for field type_name
36, // 17: mcp.v1.NodeStatusResponse.uptime_since:type_name -> google.protobuf.Timestamp
35, // 18: mcp.v1.PurgeResponse.results:type_name -> mcp.v1.PurgeResult
2, // 19: mcp.v1.McpAgentService.Deploy:input_type -> mcp.v1.DeployRequest
5, // 20: mcp.v1.McpAgentService.StopService:input_type -> mcp.v1.StopServiceRequest
7, // 21: mcp.v1.McpAgentService.StartService:input_type -> mcp.v1.StartServiceRequest
9, // 22: mcp.v1.McpAgentService.RestartService:input_type -> mcp.v1.RestartServiceRequest
11, // 23: mcp.v1.McpAgentService.SyncDesiredState:input_type -> mcp.v1.SyncDesiredStateRequest
14, // 24: mcp.v1.McpAgentService.ListServices:input_type -> mcp.v1.ListServicesRequest
18, // 25: mcp.v1.McpAgentService.GetServiceStatus:input_type -> mcp.v1.GetServiceStatusRequest
22, // 26: mcp.v1.McpAgentService.LiveCheck:input_type -> mcp.v1.LiveCheckRequest
24, // 27: mcp.v1.McpAgentService.AdoptContainers:input_type -> mcp.v1.AdoptContainersRequest
33, // 28: mcp.v1.McpAgentService.PurgeComponent:input_type -> mcp.v1.PurgeRequest
27, // 29: mcp.v1.McpAgentService.PushFile:input_type -> mcp.v1.PushFileRequest
29, // 30: mcp.v1.McpAgentService.PullFile:input_type -> mcp.v1.PullFileRequest
31, // 31: mcp.v1.McpAgentService.NodeStatus:input_type -> mcp.v1.NodeStatusRequest
3, // 32: mcp.v1.McpAgentService.Deploy:output_type -> mcp.v1.DeployResponse
6, // 33: mcp.v1.McpAgentService.StopService:output_type -> mcp.v1.StopServiceResponse
8, // 34: mcp.v1.McpAgentService.StartService:output_type -> mcp.v1.StartServiceResponse
10, // 35: mcp.v1.McpAgentService.RestartService:output_type -> mcp.v1.RestartServiceResponse
12, // 36: mcp.v1.McpAgentService.SyncDesiredState:output_type -> mcp.v1.SyncDesiredStateResponse
17, // 37: mcp.v1.McpAgentService.ListServices:output_type -> mcp.v1.ListServicesResponse
21, // 38: mcp.v1.McpAgentService.GetServiceStatus:output_type -> mcp.v1.GetServiceStatusResponse
23, // 39: mcp.v1.McpAgentService.LiveCheck:output_type -> mcp.v1.LiveCheckResponse
26, // 40: mcp.v1.McpAgentService.AdoptContainers:output_type -> mcp.v1.AdoptContainersResponse
34, // 41: mcp.v1.McpAgentService.PurgeComponent:output_type -> mcp.v1.PurgeResponse
28, // 42: mcp.v1.McpAgentService.PushFile:output_type -> mcp.v1.PushFileResponse
30, // 43: mcp.v1.McpAgentService.PullFile:output_type -> mcp.v1.PullFileResponse
32, // 44: mcp.v1.McpAgentService.NodeStatus:output_type -> mcp.v1.NodeStatusResponse
32, // [32:45] is the sub-list for method output_type
19, // [19:32] is the sub-list for method input_type
19, // [19:19] is the sub-list for extension type_name
19, // [19:19] is the sub-list for extension extendee
0, // [0:19] is the sub-list for field type_name
}
func init() { file_proto_mcp_v1_mcp_proto_init() }
@@ -2114,7 +2320,7 @@ func file_proto_mcp_v1_mcp_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_mcp_v1_mcp_proto_rawDesc), len(file_proto_mcp_v1_mcp_proto_rawDesc)),
NumEnums: 0,
NumMessages: 33,
NumMessages: 36,
NumExtensions: 0,
NumServices: 1,
},

View File

@@ -28,6 +28,7 @@ const (
McpAgentService_GetServiceStatus_FullMethodName = "/mcp.v1.McpAgentService/GetServiceStatus"
McpAgentService_LiveCheck_FullMethodName = "/mcp.v1.McpAgentService/LiveCheck"
McpAgentService_AdoptContainers_FullMethodName = "/mcp.v1.McpAgentService/AdoptContainers"
McpAgentService_PurgeComponent_FullMethodName = "/mcp.v1.McpAgentService/PurgeComponent"
McpAgentService_PushFile_FullMethodName = "/mcp.v1.McpAgentService/PushFile"
McpAgentService_PullFile_FullMethodName = "/mcp.v1.McpAgentService/PullFile"
McpAgentService_NodeStatus_FullMethodName = "/mcp.v1.McpAgentService/NodeStatus"
@@ -50,6 +51,8 @@ type McpAgentServiceClient interface {
LiveCheck(ctx context.Context, in *LiveCheckRequest, opts ...grpc.CallOption) (*LiveCheckResponse, error)
// Adopt
AdoptContainers(ctx context.Context, in *AdoptContainersRequest, opts ...grpc.CallOption) (*AdoptContainersResponse, error)
// Purge
PurgeComponent(ctx context.Context, in *PurgeRequest, opts ...grpc.CallOption) (*PurgeResponse, error)
// File transfer
PushFile(ctx context.Context, in *PushFileRequest, opts ...grpc.CallOption) (*PushFileResponse, error)
PullFile(ctx context.Context, in *PullFileRequest, opts ...grpc.CallOption) (*PullFileResponse, error)
@@ -155,6 +158,16 @@ func (c *mcpAgentServiceClient) AdoptContainers(ctx context.Context, in *AdoptCo
return out, nil
}
func (c *mcpAgentServiceClient) PurgeComponent(ctx context.Context, in *PurgeRequest, opts ...grpc.CallOption) (*PurgeResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(PurgeResponse)
err := c.cc.Invoke(ctx, McpAgentService_PurgeComponent_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *mcpAgentServiceClient) PushFile(ctx context.Context, in *PushFileRequest, opts ...grpc.CallOption) (*PushFileResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(PushFileResponse)
@@ -202,6 +215,8 @@ type McpAgentServiceServer interface {
LiveCheck(context.Context, *LiveCheckRequest) (*LiveCheckResponse, error)
// Adopt
AdoptContainers(context.Context, *AdoptContainersRequest) (*AdoptContainersResponse, error)
// Purge
PurgeComponent(context.Context, *PurgeRequest) (*PurgeResponse, error)
// File transfer
PushFile(context.Context, *PushFileRequest) (*PushFileResponse, error)
PullFile(context.Context, *PullFileRequest) (*PullFileResponse, error)
@@ -244,6 +259,9 @@ func (UnimplementedMcpAgentServiceServer) LiveCheck(context.Context, *LiveCheckR
func (UnimplementedMcpAgentServiceServer) AdoptContainers(context.Context, *AdoptContainersRequest) (*AdoptContainersResponse, error) {
return nil, status.Error(codes.Unimplemented, "method AdoptContainers not implemented")
}
func (UnimplementedMcpAgentServiceServer) PurgeComponent(context.Context, *PurgeRequest) (*PurgeResponse, error) {
return nil, status.Error(codes.Unimplemented, "method PurgeComponent not implemented")
}
func (UnimplementedMcpAgentServiceServer) PushFile(context.Context, *PushFileRequest) (*PushFileResponse, error) {
return nil, status.Error(codes.Unimplemented, "method PushFile not implemented")
}
@@ -436,6 +454,24 @@ func _McpAgentService_AdoptContainers_Handler(srv interface{}, ctx context.Conte
return interceptor(ctx, in, info, handler)
}
func _McpAgentService_PurgeComponent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PurgeRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(McpAgentServiceServer).PurgeComponent(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: McpAgentService_PurgeComponent_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(McpAgentServiceServer).PurgeComponent(ctx, req.(*PurgeRequest))
}
return interceptor(ctx, in, info, handler)
}
func _McpAgentService_PushFile_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PushFileRequest)
if err := dec(in); err != nil {
@@ -533,6 +569,10 @@ var McpAgentService_ServiceDesc = grpc.ServiceDesc{
MethodName: "AdoptContainers",
Handler: _McpAgentService_AdoptContainers_Handler,
},
{
MethodName: "PurgeComponent",
Handler: _McpAgentService_PurgeComponent_Handler,
},
{
MethodName: "PushFile",
Handler: _McpAgentService_PushFile_Handler,

155
internal/agent/purge.go Normal file
View File

@@ -0,0 +1,155 @@
package agent
import (
"context"
"fmt"
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
"git.wntrmute.dev/kyle/mcp/internal/registry"
)
// PurgeComponent removes stale registry entries for components that are both
// gone (observed state is removed/unknown/exited) and unwanted (not in any
// current service definition). It never touches running containers.
func (a *Agent) PurgeComponent(ctx context.Context, req *mcpv1.PurgeRequest) (*mcpv1.PurgeResponse, error) {
a.Logger.Info("PurgeComponent",
"service", req.GetService(),
"component", req.GetComponent(),
"dry_run", req.GetDryRun(),
)
// Build a set of defined service/component pairs for quick lookup.
defined := make(map[string]bool, len(req.GetDefinedComponents()))
for _, dc := range req.GetDefinedComponents() {
defined[dc] = true
}
// Determine which services to examine.
var services []registry.Service
if req.GetService() != "" {
svc, err := registry.GetService(a.DB, req.GetService())
if err != nil {
return nil, fmt.Errorf("get service %q: %w", req.GetService(), err)
}
services = []registry.Service{*svc}
} else {
var err error
services, err = registry.ListServices(a.DB)
if err != nil {
return nil, fmt.Errorf("list services: %w", err)
}
}
var results []*mcpv1.PurgeResult
for _, svc := range services {
components, err := registry.ListComponents(a.DB, svc.Name)
if err != nil {
return nil, fmt.Errorf("list components for %q: %w", svc.Name, err)
}
// If a specific component was requested, filter to just that one.
if req.GetComponent() != "" {
var filtered []registry.Component
for _, c := range components {
if c.Name == req.GetComponent() {
filtered = append(filtered, c)
}
}
components = filtered
}
for _, comp := range components {
result := a.evaluatePurge(svc.Name, &comp, defined, req.GetDryRun())
results = append(results, result)
}
// If all components of this service were purged (not dry-run),
// check if the service should be cleaned up too.
if !req.GetDryRun() {
remaining, err := registry.ListComponents(a.DB, svc.Name)
if err != nil {
a.Logger.Warn("failed to check remaining components", "service", svc.Name, "err", err)
continue
}
if len(remaining) == 0 {
if err := registry.DeleteService(a.DB, svc.Name); err != nil {
a.Logger.Warn("failed to delete empty service", "service", svc.Name, "err", err)
} else {
a.Logger.Info("purged empty service", "service", svc.Name)
}
}
}
}
return &mcpv1.PurgeResponse{Results: results}, nil
}
// purgeableStates are observed states that indicate a component's container
// is gone and the registry entry can be safely removed.
var purgeableStates = map[string]bool{
"removed": true,
"unknown": true,
"exited": true,
}
// evaluatePurge checks whether a single component is eligible for purge and,
// if not in dry-run mode, deletes it.
func (a *Agent) evaluatePurge(service string, comp *registry.Component, defined map[string]bool, dryRun bool) *mcpv1.PurgeResult {
key := service + "/" + comp.Name
// Safety: refuse to purge components with a live container.
if !purgeableStates[comp.ObservedState] {
return &mcpv1.PurgeResult{
Service: service,
Component: comp.Name,
Purged: false,
Reason: fmt.Sprintf("observed=%s, container still exists", comp.ObservedState),
}
}
// Don't purge components that are still in service definitions.
if defined[key] {
return &mcpv1.PurgeResult{
Service: service,
Component: comp.Name,
Purged: false,
Reason: "still in service definitions",
}
}
reason := fmt.Sprintf("observed=%s, not in service definitions", comp.ObservedState)
if dryRun {
return &mcpv1.PurgeResult{
Service: service,
Component: comp.Name,
Purged: true,
Reason: reason,
}
}
// Delete events first (events table has no FK to components).
if err := registry.DeleteComponentEvents(a.DB, service, comp.Name); err != nil {
a.Logger.Warn("failed to delete events during purge", "service", service, "component", comp.Name, "err", err)
}
// Delete the component (CASCADE handles ports, volumes, cmd).
if err := registry.DeleteComponent(a.DB, service, comp.Name); err != nil {
return &mcpv1.PurgeResult{
Service: service,
Component: comp.Name,
Purged: false,
Reason: fmt.Sprintf("delete failed: %v", err),
}
}
a.Logger.Info("purged component", "service", service, "component", comp.Name, "reason", reason)
return &mcpv1.PurgeResult{
Service: service,
Component: comp.Name,
Purged: true,
Reason: reason,
}
}

View File

@@ -0,0 +1,405 @@
package agent
import (
"context"
"testing"
mcpv1 "git.wntrmute.dev/kyle/mcp/gen/mcp/v1"
"git.wntrmute.dev/kyle/mcp/internal/registry"
)
func TestPurgeComponentRemoved(t *testing.T) {
rt := &fakeRuntime{}
a := newTestAgent(t, rt)
ctx := context.Background()
// Set up a service with a stale component.
if err := registry.CreateService(a.DB, "mcns", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "coredns",
Service: "mcns",
Image: "coredns:latest",
DesiredState: "running",
ObservedState: "removed",
}); err != nil {
t.Fatalf("create component: %v", err)
}
// Insert an event for this component.
if err := registry.InsertEvent(a.DB, "mcns", "coredns", "running", "removed"); err != nil {
t.Fatalf("insert event: %v", err)
}
resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{
DefinedComponents: []string{"mcns/mcns"},
})
if err != nil {
t.Fatalf("PurgeComponent: %v", err)
}
if len(resp.Results) != 1 {
t.Fatalf("expected 1 result, got %d", len(resp.Results))
}
r := resp.Results[0]
if !r.Purged {
t.Fatalf("expected purged=true, got reason: %s", r.Reason)
}
if r.Service != "mcns" || r.Component != "coredns" {
t.Fatalf("unexpected result: %s/%s", r.Service, r.Component)
}
// Verify component was deleted.
_, err = registry.GetComponent(a.DB, "mcns", "coredns")
if err == nil {
t.Fatal("component should have been deleted")
}
// Service should also be deleted since it has no remaining components.
_, err = registry.GetService(a.DB, "mcns")
if err == nil {
t.Fatal("service should have been deleted (no remaining components)")
}
}
func TestPurgeRefusesRunning(t *testing.T) {
rt := &fakeRuntime{}
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "mcr", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "api",
Service: "mcr",
Image: "mcr:latest",
DesiredState: "running",
ObservedState: "running",
}); err != nil {
t.Fatalf("create component: %v", err)
}
resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{
Service: "mcr",
Component: "api",
})
if err != nil {
t.Fatalf("PurgeComponent: %v", err)
}
if len(resp.Results) != 1 {
t.Fatalf("expected 1 result, got %d", len(resp.Results))
}
if resp.Results[0].Purged {
t.Fatal("should not purge a running component")
}
// Verify component still exists.
_, err = registry.GetComponent(a.DB, "mcr", "api")
if err != nil {
t.Fatalf("component should still exist: %v", err)
}
}
func TestPurgeRefusesStopped(t *testing.T) {
rt := &fakeRuntime{}
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "mcr", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "api",
Service: "mcr",
Image: "mcr:latest",
DesiredState: "stopped",
ObservedState: "stopped",
}); err != nil {
t.Fatalf("create component: %v", err)
}
resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{
Service: "mcr",
Component: "api",
})
if err != nil {
t.Fatalf("PurgeComponent: %v", err)
}
if resp.Results[0].Purged {
t.Fatal("should not purge a stopped component")
}
}
func TestPurgeSkipsDefinedComponent(t *testing.T) {
rt := &fakeRuntime{}
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "mcns", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "mcns",
Service: "mcns",
Image: "mcns:latest",
DesiredState: "running",
ObservedState: "exited",
}); err != nil {
t.Fatalf("create component: %v", err)
}
resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{
DefinedComponents: []string{"mcns/mcns"},
})
if err != nil {
t.Fatalf("PurgeComponent: %v", err)
}
if len(resp.Results) != 1 {
t.Fatalf("expected 1 result, got %d", len(resp.Results))
}
if resp.Results[0].Purged {
t.Fatal("should not purge a component that is still in service definitions")
}
if resp.Results[0].Reason != "still in service definitions" {
t.Fatalf("unexpected reason: %s", resp.Results[0].Reason)
}
}
func TestPurgeDryRun(t *testing.T) {
rt := &fakeRuntime{}
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "mcns", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "coredns",
Service: "mcns",
Image: "coredns:latest",
DesiredState: "running",
ObservedState: "removed",
}); err != nil {
t.Fatalf("create component: %v", err)
}
resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{
DryRun: true,
DefinedComponents: []string{"mcns/mcns"},
})
if err != nil {
t.Fatalf("PurgeComponent: %v", err)
}
if len(resp.Results) != 1 {
t.Fatalf("expected 1 result, got %d", len(resp.Results))
}
if !resp.Results[0].Purged {
t.Fatal("dry run should report purged=true for eligible components")
}
// Verify component was NOT deleted (dry run).
_, err = registry.GetComponent(a.DB, "mcns", "coredns")
if err != nil {
t.Fatalf("component should still exist after dry run: %v", err)
}
}
func TestPurgeServiceFilter(t *testing.T) {
rt := &fakeRuntime{}
a := newTestAgent(t, rt)
ctx := context.Background()
// Create two services.
if err := registry.CreateService(a.DB, "mcns", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "coredns", Service: "mcns", Image: "coredns:latest",
DesiredState: "running", ObservedState: "removed",
}); err != nil {
t.Fatalf("create component: %v", err)
}
if err := registry.CreateService(a.DB, "mcr", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "old", Service: "mcr", Image: "old:latest",
DesiredState: "running", ObservedState: "removed",
}); err != nil {
t.Fatalf("create component: %v", err)
}
// Purge only mcns.
resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{
Service: "mcns",
})
if err != nil {
t.Fatalf("PurgeComponent: %v", err)
}
if len(resp.Results) != 1 {
t.Fatalf("expected 1 result, got %d", len(resp.Results))
}
if resp.Results[0].Service != "mcns" {
t.Fatalf("expected mcns, got %s", resp.Results[0].Service)
}
// mcr/old should still exist.
_, err = registry.GetComponent(a.DB, "mcr", "old")
if err != nil {
t.Fatalf("mcr/old should still exist: %v", err)
}
}
func TestPurgeServiceDeletedWhenEmpty(t *testing.T) {
rt := &fakeRuntime{}
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "mcns", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "coredns", Service: "mcns", Image: "coredns:latest",
DesiredState: "running", ObservedState: "removed",
}); err != nil {
t.Fatalf("create component: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "old-thing", Service: "mcns", Image: "old:latest",
DesiredState: "stopped", ObservedState: "unknown",
}); err != nil {
t.Fatalf("create component: %v", err)
}
resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{})
if err != nil {
t.Fatalf("PurgeComponent: %v", err)
}
// Both components should be purged.
if len(resp.Results) != 2 {
t.Fatalf("expected 2 results, got %d", len(resp.Results))
}
for _, r := range resp.Results {
if !r.Purged {
t.Fatalf("expected purged=true for %s/%s: %s", r.Service, r.Component, r.Reason)
}
}
// Service should be deleted.
_, err = registry.GetService(a.DB, "mcns")
if err == nil {
t.Fatal("service should have been deleted")
}
}
func TestPurgeServiceKeptWhenComponentsRemain(t *testing.T) {
rt := &fakeRuntime{}
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "mcns", true); err != nil {
t.Fatalf("create service: %v", err)
}
// Stale component (will be purged).
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "coredns", Service: "mcns", Image: "coredns:latest",
DesiredState: "running", ObservedState: "removed",
}); err != nil {
t.Fatalf("create component: %v", err)
}
// Live component (will not be purged).
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "mcns", Service: "mcns", Image: "mcns:latest",
DesiredState: "running", ObservedState: "running",
}); err != nil {
t.Fatalf("create component: %v", err)
}
resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{})
if err != nil {
t.Fatalf("PurgeComponent: %v", err)
}
if len(resp.Results) != 2 {
t.Fatalf("expected 2 results, got %d", len(resp.Results))
}
// coredns should be purged, mcns should not.
purged := 0
for _, r := range resp.Results {
if r.Purged {
purged++
if r.Component != "coredns" {
t.Fatalf("expected coredns to be purged, got %s", r.Component)
}
}
}
if purged != 1 {
t.Fatalf("expected 1 purged, got %d", purged)
}
// Service should still exist.
_, err = registry.GetService(a.DB, "mcns")
if err != nil {
t.Fatalf("service should still exist: %v", err)
}
}
func TestPurgeExitedState(t *testing.T) {
rt := &fakeRuntime{}
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "test", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "old", Service: "test", Image: "old:latest",
DesiredState: "stopped", ObservedState: "exited",
}); err != nil {
t.Fatalf("create component: %v", err)
}
resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{})
if err != nil {
t.Fatalf("PurgeComponent: %v", err)
}
if len(resp.Results) != 1 || !resp.Results[0].Purged {
t.Fatalf("exited component should be purgeable")
}
}
func TestPurgeUnknownState(t *testing.T) {
rt := &fakeRuntime{}
a := newTestAgent(t, rt)
ctx := context.Background()
if err := registry.CreateService(a.DB, "test", true); err != nil {
t.Fatalf("create service: %v", err)
}
if err := registry.CreateComponent(a.DB, &registry.Component{
Name: "ghost", Service: "test", Image: "ghost:latest",
DesiredState: "running", ObservedState: "unknown",
}); err != nil {
t.Fatalf("create component: %v", err)
}
resp, err := a.PurgeComponent(ctx, &mcpv1.PurgeRequest{})
if err != nil {
t.Fatalf("PurgeComponent: %v", err)
}
if len(resp.Results) != 1 || !resp.Results[0].Purged {
t.Fatalf("unknown component should be purgeable")
}
}

View File

@@ -83,6 +83,15 @@ func CountEvents(db *sql.DB, service, component string, since time.Time) (int, e
return count, nil
}
// DeleteComponentEvents deletes all events for a specific component.
func DeleteComponentEvents(db *sql.DB, service, component string) error {
_, err := db.Exec("DELETE FROM events WHERE service = ? AND component = ?", service, component)
if err != nil {
return fmt.Errorf("delete events %q/%q: %w", service, component, err)
}
return nil
}
// PruneEvents deletes events older than the given time.
func PruneEvents(db *sql.DB, before time.Time) (int64, error) {
res, err := db.Exec(

View File

@@ -23,6 +23,9 @@ service McpAgentService {
// Adopt
rpc AdoptContainers(AdoptContainersRequest) returns (AdoptContainersResponse);
// Purge
rpc PurgeComponent(PurgeRequest) returns (PurgeResponse);
// File transfer
rpc PushFile(PushFileRequest) returns (PushFileResponse);
rpc PullFile(PullFileRequest) returns (PullFileResponse);
@@ -234,3 +237,30 @@ message NodeStatusResponse {
double cpu_usage_percent = 10;
google.protobuf.Timestamp uptime_since = 11;
}
// --- Purge ---
message PurgeRequest {
// Service name (empty = all services).
string service = 1;
// Component name (empty = all eligible in service).
string component = 2;
// Preview only, do not modify registry.
bool dry_run = 3;
// Currently-defined service/component pairs (e.g., "mcns/mcns").
// The agent uses this to determine what is "not in any service definition".
repeated string defined_components = 4;
}
message PurgeResponse {
repeated PurgeResult results = 1;
}
message PurgeResult {
string service = 1;
string component = 2;
// true if removed (or would be, in dry-run).
bool purged = 3;
// Why eligible, or why refused.
string reason = 4;
}