Phase 10: gRPC admin API with interceptor chain
Proto definitions for 4 services (RegistryService, PolicyService, AuditService, AdminService) with hand-written Go stubs using JSON codec until protobuf tooling is available. Interceptor chain: logging (method, peer IP, duration, never logs auth metadata) → auth (bearer token via MCIAS, Health bypasses) → admin (role check for GC, policy, delete, audit RPCs). All RPCs share business logic with REST handlers via internal/db and internal/gc packages. TLS 1.3 minimum on gRPC listener. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
203
internal/grpcserver/registry.go
Normal file
203
internal/grpcserver/registry.go
Normal file
@@ -0,0 +1,203 @@
|
||||
package grpcserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
pb "git.wntrmute.dev/kyle/mcr/gen/mcr/v1"
|
||||
"git.wntrmute.dev/kyle/mcr/internal/auth"
|
||||
"git.wntrmute.dev/kyle/mcr/internal/db"
|
||||
"git.wntrmute.dev/kyle/mcr/internal/gc"
|
||||
)
|
||||
|
||||
// registryService implements pb.RegistryServiceServer.
|
||||
type registryService struct {
|
||||
pb.UnimplementedRegistryServiceServer
|
||||
db *db.DB
|
||||
collector *gc.Collector
|
||||
gcStatus *GCStatus
|
||||
auditFn AuditFunc
|
||||
}
|
||||
|
||||
func (s *registryService) ListRepositories(_ context.Context, req *pb.ListRepositoriesRequest) (*pb.ListRepositoriesResponse, error) {
|
||||
limit := int32(50)
|
||||
offset := int32(0)
|
||||
if req.GetPagination() != nil {
|
||||
if req.Pagination.Limit > 0 {
|
||||
limit = req.Pagination.Limit
|
||||
}
|
||||
if req.Pagination.Offset >= 0 {
|
||||
offset = req.Pagination.Offset
|
||||
}
|
||||
}
|
||||
|
||||
repos, err := s.db.ListRepositoriesWithMetadata(int(limit), int(offset))
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "internal error")
|
||||
}
|
||||
|
||||
var result []*pb.RepositoryMetadata
|
||||
for _, r := range repos {
|
||||
result = append(result, &pb.RepositoryMetadata{
|
||||
Name: r.Name,
|
||||
TagCount: int32(r.TagCount), //nolint:gosec // tag count fits int32
|
||||
ManifestCount: int32(r.ManifestCount), //nolint:gosec // manifest count fits int32
|
||||
TotalSize: r.TotalSize,
|
||||
CreatedAt: r.CreatedAt,
|
||||
})
|
||||
}
|
||||
|
||||
return &pb.ListRepositoriesResponse{Repositories: result}, nil
|
||||
}
|
||||
|
||||
func (s *registryService) GetRepository(_ context.Context, req *pb.GetRepositoryRequest) (*pb.GetRepositoryResponse, error) {
|
||||
if req.GetName() == "" {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "repository name required")
|
||||
}
|
||||
|
||||
detail, err := s.db.GetRepositoryDetail(req.Name)
|
||||
if err != nil {
|
||||
if errors.Is(err, db.ErrRepoNotFound) {
|
||||
return nil, status.Errorf(codes.NotFound, "repository not found")
|
||||
}
|
||||
return nil, status.Errorf(codes.Internal, "internal error")
|
||||
}
|
||||
|
||||
resp := &pb.GetRepositoryResponse{
|
||||
Name: detail.Name,
|
||||
TotalSize: detail.TotalSize,
|
||||
CreatedAt: detail.CreatedAt,
|
||||
}
|
||||
for _, t := range detail.Tags {
|
||||
resp.Tags = append(resp.Tags, &pb.TagInfo{
|
||||
Name: t.Name,
|
||||
Digest: t.Digest,
|
||||
})
|
||||
}
|
||||
for _, m := range detail.Manifests {
|
||||
resp.Manifests = append(resp.Manifests, &pb.ManifestInfo{
|
||||
Digest: m.Digest,
|
||||
MediaType: m.MediaType,
|
||||
Size: m.Size,
|
||||
CreatedAt: m.CreatedAt,
|
||||
})
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *registryService) DeleteRepository(ctx context.Context, req *pb.DeleteRepositoryRequest) (*pb.DeleteRepositoryResponse, error) {
|
||||
if req.GetName() == "" {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "repository name required")
|
||||
}
|
||||
|
||||
if err := s.db.DeleteRepository(req.Name); err != nil {
|
||||
if errors.Is(err, db.ErrRepoNotFound) {
|
||||
return nil, status.Errorf(codes.NotFound, "repository not found")
|
||||
}
|
||||
return nil, status.Errorf(codes.Internal, "internal error")
|
||||
}
|
||||
|
||||
if s.auditFn != nil {
|
||||
claims := auth.ClaimsFromContext(ctx)
|
||||
actorID := ""
|
||||
if claims != nil {
|
||||
actorID = claims.Subject
|
||||
}
|
||||
s.auditFn("repo_deleted", actorID, req.Name, "", "", nil)
|
||||
}
|
||||
|
||||
return &pb.DeleteRepositoryResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *registryService) GarbageCollect(_ context.Context, _ *pb.GarbageCollectRequest) (*pb.GarbageCollectResponse, error) {
|
||||
s.gcStatus.mu.Lock()
|
||||
if s.gcStatus.running {
|
||||
s.gcStatus.mu.Unlock()
|
||||
return nil, status.Errorf(codes.AlreadyExists, "garbage collection already running")
|
||||
}
|
||||
s.gcStatus.running = true
|
||||
s.gcStatus.mu.Unlock()
|
||||
|
||||
gcID := uuid.New().String()
|
||||
|
||||
// Run GC asynchronously like the REST handler. GC is a long-running
|
||||
// background operation that must not be tied to the request context,
|
||||
// so we intentionally use context.Background() inside runGC.
|
||||
go s.runGC(gcID) //nolint:gosec // G118: GC must outlive the triggering RPC
|
||||
|
||||
return &pb.GarbageCollectResponse{Id: gcID}, nil
|
||||
}
|
||||
|
||||
// runGC executes garbage collection in the background. It uses
|
||||
// context.Background() because GC must not be cancelled when the
|
||||
// triggering RPC completes.
|
||||
func (s *registryService) runGC(gcID string) {
|
||||
startedAt := time.Now().UTC().Format(time.RFC3339)
|
||||
|
||||
if s.auditFn != nil {
|
||||
s.auditFn("gc_started", "", "", "", "", map[string]string{
|
||||
"gc_id": gcID,
|
||||
})
|
||||
}
|
||||
|
||||
var blobsRemoved int
|
||||
var bytesFreed int64
|
||||
var gcErr error
|
||||
if s.collector != nil {
|
||||
r, err := s.collector.Run(context.Background()) //nolint:gosec // GC is intentionally background, not request-scoped
|
||||
if err != nil {
|
||||
gcErr = err
|
||||
}
|
||||
if r != nil {
|
||||
blobsRemoved = r.BlobsRemoved
|
||||
bytesFreed = r.BytesFreed
|
||||
}
|
||||
}
|
||||
|
||||
completedAt := time.Now().UTC().Format(time.RFC3339)
|
||||
|
||||
s.gcStatus.mu.Lock()
|
||||
s.gcStatus.running = false
|
||||
s.gcStatus.lastRun = &gcLastRun{
|
||||
StartedAt: startedAt,
|
||||
CompletedAt: completedAt,
|
||||
BlobsRemoved: blobsRemoved,
|
||||
BytesFreed: bytesFreed,
|
||||
}
|
||||
s.gcStatus.mu.Unlock()
|
||||
|
||||
if s.auditFn != nil && gcErr == nil {
|
||||
details := map[string]string{
|
||||
"gc_id": gcID,
|
||||
"blobs_removed": fmt.Sprintf("%d", blobsRemoved),
|
||||
"bytes_freed": fmt.Sprintf("%d", bytesFreed),
|
||||
}
|
||||
s.auditFn("gc_completed", "", "", "", "", details)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *registryService) GetGCStatus(_ context.Context, _ *pb.GetGCStatusRequest) (*pb.GetGCStatusResponse, error) {
|
||||
s.gcStatus.mu.Lock()
|
||||
resp := &pb.GetGCStatusResponse{
|
||||
Running: s.gcStatus.running,
|
||||
}
|
||||
if s.gcStatus.lastRun != nil {
|
||||
resp.LastRun = &pb.GCLastRun{
|
||||
StartedAt: s.gcStatus.lastRun.StartedAt,
|
||||
CompletedAt: s.gcStatus.lastRun.CompletedAt,
|
||||
BlobsRemoved: int32(s.gcStatus.lastRun.BlobsRemoved), //nolint:gosec // blob count fits int32
|
||||
BytesFreed: s.gcStatus.lastRun.BytesFreed,
|
||||
}
|
||||
}
|
||||
s.gcStatus.mu.Unlock()
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
Reference in New Issue
Block a user