package grpcserver import ( "context" "errors" "fmt" "time" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/google/uuid" mcdslauth "git.wntrmute.dev/kyle/mcdsl/auth" pb "git.wntrmute.dev/kyle/mcr/gen/mcr/v1" "git.wntrmute.dev/kyle/mcr/internal/db" "git.wntrmute.dev/kyle/mcr/internal/gc" ) // registryService implements pb.RegistryServiceServer. type registryService struct { pb.UnimplementedRegistryServiceServer db *db.DB collector *gc.Collector gcStatus *GCStatus auditFn AuditFunc } func (s *registryService) ListRepositories(_ context.Context, req *pb.ListRepositoriesRequest) (*pb.ListRepositoriesResponse, error) { limit := int32(50) offset := int32(0) if req.GetPagination() != nil { if req.Pagination.Limit > 0 { limit = req.Pagination.Limit } if req.Pagination.Offset >= 0 { offset = req.Pagination.Offset } } repos, err := s.db.ListRepositoriesWithMetadata(int(limit), int(offset)) if err != nil { return nil, status.Errorf(codes.Internal, "internal error") } var result []*pb.RepositoryMetadata for _, r := range repos { result = append(result, &pb.RepositoryMetadata{ Name: r.Name, TagCount: int32(r.TagCount), //nolint:gosec // tag count fits int32 ManifestCount: int32(r.ManifestCount), //nolint:gosec // manifest count fits int32 TotalSize: r.TotalSize, CreatedAt: r.CreatedAt, }) } return &pb.ListRepositoriesResponse{Repositories: result}, nil } func (s *registryService) GetRepository(_ context.Context, req *pb.GetRepositoryRequest) (*pb.GetRepositoryResponse, error) { if req.GetName() == "" { return nil, status.Errorf(codes.InvalidArgument, "repository name required") } detail, err := s.db.GetRepositoryDetail(req.Name) if err != nil { if errors.Is(err, db.ErrRepoNotFound) { return nil, status.Errorf(codes.NotFound, "repository not found") } return nil, status.Errorf(codes.Internal, "internal error") } resp := &pb.GetRepositoryResponse{ Name: detail.Name, TotalSize: detail.TotalSize, CreatedAt: detail.CreatedAt, } for _, t := range detail.Tags { resp.Tags = append(resp.Tags, &pb.TagInfo{ Name: t.Name, Digest: t.Digest, }) } for _, m := range detail.Manifests { resp.Manifests = append(resp.Manifests, &pb.ManifestInfo{ Digest: m.Digest, MediaType: m.MediaType, Size: m.Size, CreatedAt: m.CreatedAt, }) } return resp, nil } func (s *registryService) DeleteRepository(ctx context.Context, req *pb.DeleteRepositoryRequest) (*pb.DeleteRepositoryResponse, error) { if req.GetName() == "" { return nil, status.Errorf(codes.InvalidArgument, "repository name required") } if err := s.db.DeleteRepository(req.Name); err != nil { if errors.Is(err, db.ErrRepoNotFound) { return nil, status.Errorf(codes.NotFound, "repository not found") } return nil, status.Errorf(codes.Internal, "internal error") } if s.auditFn != nil { info := mcdslauth.TokenInfoFromContext(ctx) actorID := "" if info != nil { actorID = info.Username } s.auditFn("repo_deleted", actorID, req.Name, "", "", nil) } return &pb.DeleteRepositoryResponse{}, nil } func (s *registryService) GarbageCollect(_ context.Context, _ *pb.GarbageCollectRequest) (*pb.GarbageCollectResponse, error) { s.gcStatus.mu.Lock() if s.gcStatus.running { s.gcStatus.mu.Unlock() return nil, status.Errorf(codes.AlreadyExists, "garbage collection already running") } s.gcStatus.running = true s.gcStatus.mu.Unlock() gcID := uuid.New().String() // Run GC asynchronously like the REST handler. GC is a long-running // background operation that must not be tied to the request context, // so we intentionally use context.Background() inside runGC. go s.runGC(gcID) //nolint:gosec // G118: GC must outlive the triggering RPC return &pb.GarbageCollectResponse{Id: gcID}, nil } // runGC executes garbage collection in the background. It uses // context.Background() because GC must not be cancelled when the // triggering RPC completes. func (s *registryService) runGC(gcID string) { startedAt := time.Now().UTC().Format(time.RFC3339) if s.auditFn != nil { s.auditFn("gc_started", "", "", "", "", map[string]string{ "gc_id": gcID, }) } var blobsRemoved int var bytesFreed int64 var gcErr error if s.collector != nil { r, err := s.collector.Run(context.Background()) //nolint:gosec // GC is intentionally background, not request-scoped if err != nil { gcErr = err } if r != nil { blobsRemoved = r.BlobsRemoved bytesFreed = r.BytesFreed } } completedAt := time.Now().UTC().Format(time.RFC3339) s.gcStatus.mu.Lock() s.gcStatus.running = false s.gcStatus.lastRun = &gcLastRun{ StartedAt: startedAt, CompletedAt: completedAt, BlobsRemoved: blobsRemoved, BytesFreed: bytesFreed, } s.gcStatus.mu.Unlock() if s.auditFn != nil && gcErr == nil { details := map[string]string{ "gc_id": gcID, "blobs_removed": fmt.Sprintf("%d", blobsRemoved), "bytes_freed": fmt.Sprintf("%d", bytesFreed), } s.auditFn("gc_completed", "", "", "", "", details) } } func (s *registryService) GetGCStatus(_ context.Context, _ *pb.GetGCStatusRequest) (*pb.GetGCStatusResponse, error) { s.gcStatus.mu.Lock() resp := &pb.GetGCStatusResponse{ Running: s.gcStatus.running, } if s.gcStatus.lastRun != nil { resp.LastRun = &pb.GCLastRun{ StartedAt: s.gcStatus.lastRun.StartedAt, CompletedAt: s.gcStatus.lastRun.CompletedAt, BlobsRemoved: int32(s.gcStatus.lastRun.BlobsRemoved), //nolint:gosec // blob count fits int32 BytesFreed: s.gcStatus.lastRun.BytesFreed, } } s.gcStatus.mu.Unlock() return resp, nil }