GC engine (internal/gc/): Collector.Run() implements the two-phase algorithm — Phase 1 finds unreferenced blobs and deletes DB rows in a single transaction, Phase 2 deletes blob files from storage. Registry-wide mutex blocks concurrent GC runs. Collector.Reconcile() scans filesystem for orphaned files with no DB row (crash recovery). Wired into admin_gc.go: POST /v1/gc now launches the real collector in a goroutine with gc_started/gc_completed audit events. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
161 lines
4.1 KiB
Go
161 lines
4.1 KiB
Go
package gc
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// DB provides the database operations needed by GC.
|
|
type DB interface {
|
|
// FindUnreferencedBlobs returns digests and sizes of blobs with no
|
|
// manifest_blobs entries, deletes those blob rows in a transaction,
|
|
// and returns the results.
|
|
FindAndDeleteUnreferencedBlobs() ([]UnreferencedBlob, error)
|
|
// BlobExistsByDigest checks whether a blob row exists for the given digest.
|
|
BlobExistsByDigest(digest string) (bool, error)
|
|
}
|
|
|
|
// Storage provides filesystem operations for blob cleanup.
|
|
type Storage interface {
|
|
Delete(digest string) error
|
|
ListBlobDigests() ([]string, error)
|
|
}
|
|
|
|
// UnreferencedBlob is a blob that has no manifest references.
|
|
type UnreferencedBlob struct {
|
|
Digest string
|
|
Size int64
|
|
}
|
|
|
|
// Result records the outcome of a GC run.
|
|
type Result struct {
|
|
BlobsRemoved int
|
|
BytesFreed int64
|
|
Duration time.Duration
|
|
}
|
|
|
|
// Collector performs garbage collection of unreferenced blobs.
|
|
type Collector struct {
|
|
db DB
|
|
storage Storage
|
|
mu sync.Mutex // registry-wide GC lock
|
|
}
|
|
|
|
// New creates a new garbage collector.
|
|
func New(db DB, storage Storage) *Collector {
|
|
return &Collector{db: db, storage: storage}
|
|
}
|
|
|
|
// Run executes the two-phase GC algorithm per ARCHITECTURE.md §9.
|
|
// Phase 1 (DB): find unreferenced blobs, delete rows in a transaction.
|
|
// Phase 2 (filesystem): delete blob files, clean up empty dirs.
|
|
// Returns ErrGCRunning if another GC run is already in progress.
|
|
func (c *Collector) Run(ctx context.Context) (*Result, error) {
|
|
if !c.mu.TryLock() {
|
|
return nil, ErrGCRunning
|
|
}
|
|
defer c.mu.Unlock()
|
|
|
|
start := time.Now()
|
|
|
|
// Check for cancellation.
|
|
if err := ctx.Err(); err != nil {
|
|
return nil, fmt.Errorf("gc: %w", err)
|
|
}
|
|
|
|
// Phase 1: Mark and sweep in DB.
|
|
unreferenced, err := c.db.FindAndDeleteUnreferencedBlobs()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("gc: phase 1: %w", err)
|
|
}
|
|
|
|
// Phase 2: Delete files from storage.
|
|
var bytesFreed int64
|
|
for _, blob := range unreferenced {
|
|
if err := ctx.Err(); err != nil {
|
|
// Return partial result on cancellation.
|
|
return &Result{
|
|
BlobsRemoved: len(unreferenced),
|
|
BytesFreed: bytesFreed,
|
|
Duration: time.Since(start),
|
|
}, fmt.Errorf("gc: phase 2 interrupted: %w", err)
|
|
}
|
|
// Best-effort file deletion. If the file is already gone (e.g.,
|
|
// crash recovery), that's fine.
|
|
if err := c.storage.Delete(blob.Digest); err != nil {
|
|
// Log but continue — orphaned files are harmless and will
|
|
// be caught by reconcile.
|
|
continue
|
|
}
|
|
bytesFreed += blob.Size
|
|
}
|
|
|
|
return &Result{
|
|
BlobsRemoved: len(unreferenced),
|
|
BytesFreed: bytesFreed,
|
|
Duration: time.Since(start),
|
|
}, nil
|
|
}
|
|
|
|
// Reconcile scans the filesystem for blob files with no matching DB row
|
|
// and deletes them. This handles crash recovery — files left behind when
|
|
// the process crashed after Phase 1 (DB cleanup) but before Phase 2
|
|
// (file cleanup) completed.
|
|
func (c *Collector) Reconcile(ctx context.Context) (*Result, error) {
|
|
if !c.mu.TryLock() {
|
|
return nil, ErrGCRunning
|
|
}
|
|
defer c.mu.Unlock()
|
|
|
|
start := time.Now()
|
|
|
|
digests, err := c.storage.ListBlobDigests()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("gc: list blob files: %w", err)
|
|
}
|
|
|
|
var removed int
|
|
var bytesFreed int64
|
|
for _, digest := range digests {
|
|
if err := ctx.Err(); err != nil {
|
|
return &Result{
|
|
BlobsRemoved: removed,
|
|
BytesFreed: bytesFreed,
|
|
Duration: time.Since(start),
|
|
}, fmt.Errorf("gc: reconcile interrupted: %w", err)
|
|
}
|
|
|
|
exists, err := c.db.BlobExistsByDigest(digest)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if !exists {
|
|
if err := c.storage.Delete(digest); err != nil {
|
|
continue
|
|
}
|
|
removed++
|
|
}
|
|
}
|
|
|
|
return &Result{
|
|
BlobsRemoved: removed,
|
|
BytesFreed: bytesFreed,
|
|
Duration: time.Since(start),
|
|
}, nil
|
|
}
|
|
|
|
// Lock acquires the GC lock, blocking new blob uploads.
|
|
// Returns a function to release the lock.
|
|
func (c *Collector) Lock() func() {
|
|
c.mu.Lock()
|
|
return c.mu.Unlock
|
|
}
|
|
|
|
// TryLock attempts to acquire the GC lock without blocking.
|
|
// Returns true if the lock was acquired.
|
|
func (c *Collector) TryLock() bool {
|
|
return c.mu.TryLock()
|
|
}
|