Files
mcr/internal/grpcserver/registry.go
Kyle Isom d5580f01f2 Migrate module path from kyle/ to mc/ org
All import paths updated to git.wntrmute.dev/mc/. Bumps mcdsl to v1.2.0.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 02:05:59 -07:00

206 lines
5.6 KiB
Go

package grpcserver
import (
"context"
"errors"
"fmt"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/google/uuid"
mcdslauth "git.wntrmute.dev/mc/mcdsl/auth"
pb "git.wntrmute.dev/mc/mcr/gen/mcr/v1"
"git.wntrmute.dev/mc/mcr/internal/db"
"git.wntrmute.dev/mc/mcr/internal/gc"
)
// registryService implements pb.RegistryServiceServer.
type registryService struct {
pb.UnimplementedRegistryServiceServer
db *db.DB
collector *gc.Collector
gcStatus *GCStatus
auditFn AuditFunc
}
func (s *registryService) ListRepositories(_ context.Context, req *pb.ListRepositoriesRequest) (*pb.ListRepositoriesResponse, error) {
limit := int32(50)
offset := int32(0)
if req.GetPagination() != nil {
if req.Pagination.Limit > 0 {
limit = req.Pagination.Limit
}
if req.Pagination.Offset >= 0 {
offset = req.Pagination.Offset
}
}
repos, err := s.db.ListRepositoriesWithMetadata(int(limit), int(offset))
if err != nil {
return nil, status.Errorf(codes.Internal, "internal error")
}
var result []*pb.RepositoryMetadata
for _, r := range repos {
result = append(result, &pb.RepositoryMetadata{
Name: r.Name,
TagCount: int32(r.TagCount), //nolint:gosec // tag count fits int32
ManifestCount: int32(r.ManifestCount), //nolint:gosec // manifest count fits int32
TotalSize: r.TotalSize,
CreatedAt: r.CreatedAt,
})
}
return &pb.ListRepositoriesResponse{Repositories: result}, nil
}
func (s *registryService) GetRepository(_ context.Context, req *pb.GetRepositoryRequest) (*pb.GetRepositoryResponse, error) {
if req.GetName() == "" {
return nil, status.Errorf(codes.InvalidArgument, "repository name required")
}
detail, err := s.db.GetRepositoryDetail(req.Name)
if err != nil {
if errors.Is(err, db.ErrRepoNotFound) {
return nil, status.Errorf(codes.NotFound, "repository not found")
}
return nil, status.Errorf(codes.Internal, "internal error")
}
resp := &pb.GetRepositoryResponse{
Name: detail.Name,
TotalSize: detail.TotalSize,
CreatedAt: detail.CreatedAt,
}
for _, t := range detail.Tags {
resp.Tags = append(resp.Tags, &pb.TagInfo{
Name: t.Name,
Digest: t.Digest,
UpdatedAt: t.UpdatedAt,
})
}
for _, m := range detail.Manifests {
resp.Manifests = append(resp.Manifests, &pb.ManifestInfo{
Digest: m.Digest,
MediaType: m.MediaType,
Size: m.Size,
CreatedAt: m.CreatedAt,
})
}
return resp, nil
}
func (s *registryService) DeleteRepository(ctx context.Context, req *pb.DeleteRepositoryRequest) (*pb.DeleteRepositoryResponse, error) {
if req.GetName() == "" {
return nil, status.Errorf(codes.InvalidArgument, "repository name required")
}
if err := s.db.DeleteRepository(req.Name); err != nil {
if errors.Is(err, db.ErrRepoNotFound) {
return nil, status.Errorf(codes.NotFound, "repository not found")
}
return nil, status.Errorf(codes.Internal, "internal error")
}
if s.auditFn != nil {
info := mcdslauth.TokenInfoFromContext(ctx)
actorID := ""
if info != nil {
actorID = info.Username
}
s.auditFn("repo_deleted", actorID, req.Name, "", "", nil)
}
return &pb.DeleteRepositoryResponse{}, nil
}
func (s *registryService) GarbageCollect(_ context.Context, _ *pb.GarbageCollectRequest) (*pb.GarbageCollectResponse, error) {
s.gcStatus.mu.Lock()
if s.gcStatus.running {
s.gcStatus.mu.Unlock()
return nil, status.Errorf(codes.AlreadyExists, "garbage collection already running")
}
s.gcStatus.running = true
s.gcStatus.mu.Unlock()
gcID := uuid.New().String()
// Run GC asynchronously like the REST handler. GC is a long-running
// background operation that must not be tied to the request context,
// so we intentionally use context.Background() inside runGC.
go s.runGC(gcID) //nolint:gosec // G118: GC must outlive the triggering RPC
return &pb.GarbageCollectResponse{Id: gcID}, nil
}
// runGC executes garbage collection in the background. It uses
// context.Background() because GC must not be cancelled when the
// triggering RPC completes.
func (s *registryService) runGC(gcID string) {
startedAt := time.Now().UTC().Format(time.RFC3339)
if s.auditFn != nil {
s.auditFn("gc_started", "", "", "", "", map[string]string{
"gc_id": gcID,
})
}
var blobsRemoved int
var bytesFreed int64
var gcErr error
if s.collector != nil {
r, err := s.collector.Run(context.Background()) //nolint:gosec // GC is intentionally background, not request-scoped
if err != nil {
gcErr = err
}
if r != nil {
blobsRemoved = r.BlobsRemoved
bytesFreed = r.BytesFreed
}
}
completedAt := time.Now().UTC().Format(time.RFC3339)
s.gcStatus.mu.Lock()
s.gcStatus.running = false
s.gcStatus.lastRun = &gcLastRun{
StartedAt: startedAt,
CompletedAt: completedAt,
BlobsRemoved: blobsRemoved,
BytesFreed: bytesFreed,
}
s.gcStatus.mu.Unlock()
if s.auditFn != nil && gcErr == nil {
details := map[string]string{
"gc_id": gcID,
"blobs_removed": fmt.Sprintf("%d", blobsRemoved),
"bytes_freed": fmt.Sprintf("%d", bytesFreed),
}
s.auditFn("gc_completed", "", "", "", "", details)
}
}
func (s *registryService) GetGCStatus(_ context.Context, _ *pb.GetGCStatusRequest) (*pb.GetGCStatusResponse, error) {
s.gcStatus.mu.Lock()
resp := &pb.GetGCStatusResponse{
Running: s.gcStatus.running,
}
if s.gcStatus.lastRun != nil {
resp.LastRun = &pb.GCLastRun{
StartedAt: s.gcStatus.lastRun.StartedAt,
CompletedAt: s.gcStatus.lastRun.CompletedAt,
BlobsRemoved: int32(s.gcStatus.lastRun.BlobsRemoved), //nolint:gosec // blob count fits int32
BytesFreed: s.gcStatus.lastRun.BytesFreed,
}
}
s.gcStatus.mu.Unlock()
return resp, nil
}