Phase 9: two-phase garbage collection engine
GC engine (internal/gc/): Collector.Run() implements the two-phase algorithm — Phase 1 finds unreferenced blobs and deletes DB rows in a single transaction, Phase 2 deletes blob files from storage. Registry-wide mutex blocks concurrent GC runs. Collector.Reconcile() scans filesystem for orphaned files with no DB row (crash recovery). Wired into admin_gc.go: POST /v1/gc now launches the real collector in a goroutine with gc_started/gc_completed audit events. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
77
internal/db/gc.go
Normal file
77
internal/db/gc.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.wntrmute.dev/kyle/mcr/internal/gc"
|
||||
)
|
||||
|
||||
// FindAndDeleteUnreferencedBlobs finds all blob rows with no manifest_blobs
|
||||
// entries, deletes them in a single transaction, and returns the digests
|
||||
// and sizes of the deleted blobs.
|
||||
func (d *DB) FindAndDeleteUnreferencedBlobs() ([]gc.UnreferencedBlob, error) {
|
||||
tx, err := d.Begin()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("db: begin gc transaction: %w", err)
|
||||
}
|
||||
|
||||
// Find unreferenced blobs.
|
||||
rows, err := tx.Query(
|
||||
`SELECT b.id, b.digest, b.size FROM blobs b
|
||||
LEFT JOIN manifest_blobs mb ON mb.blob_id = b.id
|
||||
WHERE mb.manifest_id IS NULL`,
|
||||
)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return nil, fmt.Errorf("db: find unreferenced blobs: %w", err)
|
||||
}
|
||||
|
||||
var unreferenced []gc.UnreferencedBlob
|
||||
var ids []int64
|
||||
for rows.Next() {
|
||||
var id int64
|
||||
var blob gc.UnreferencedBlob
|
||||
if err := rows.Scan(&id, &blob.Digest, &blob.Size); err != nil {
|
||||
_ = rows.Close()
|
||||
_ = tx.Rollback()
|
||||
return nil, fmt.Errorf("db: scan unreferenced blob: %w", err)
|
||||
}
|
||||
unreferenced = append(unreferenced, blob)
|
||||
ids = append(ids, id)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
_ = rows.Close()
|
||||
_ = tx.Rollback()
|
||||
return nil, fmt.Errorf("db: iterate unreferenced blobs: %w", err)
|
||||
}
|
||||
_ = rows.Close()
|
||||
|
||||
// Delete the unreferenced blob rows.
|
||||
for _, id := range ids {
|
||||
if _, err := tx.Exec(`DELETE FROM blobs WHERE id = ?`, id); err != nil {
|
||||
_ = tx.Rollback()
|
||||
return nil, fmt.Errorf("db: delete blob %d: %w", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return nil, fmt.Errorf("db: commit gc transaction: %w", err)
|
||||
}
|
||||
|
||||
return unreferenced, nil
|
||||
}
|
||||
|
||||
// BlobExistsByDigest checks whether a blob row exists for the given digest.
|
||||
func (d *DB) BlobExistsByDigest(digest string) (bool, error) {
|
||||
var count int
|
||||
err := d.QueryRow(`SELECT COUNT(*) FROM blobs WHERE digest = ?`, digest).Scan(&count)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return false, nil
|
||||
}
|
||||
return false, fmt.Errorf("db: blob exists by digest: %w", err)
|
||||
}
|
||||
return count > 0, nil
|
||||
}
|
||||
111
internal/db/gc_test.go
Normal file
111
internal/db/gc_test.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package db
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestFindAndDeleteUnreferencedBlobs(t *testing.T) {
|
||||
d := openTestDB(t)
|
||||
if err := d.Migrate(); err != nil {
|
||||
t.Fatalf("Migrate: %v", err)
|
||||
}
|
||||
|
||||
// Setup: repo, manifest, two blobs. One referenced, one not.
|
||||
_, err := d.Exec(`INSERT INTO repositories (name) VALUES ('testrepo')`)
|
||||
if err != nil {
|
||||
t.Fatalf("insert repo: %v", err)
|
||||
}
|
||||
_, err = d.Exec(`INSERT INTO manifests (repository_id, digest, media_type, content, size)
|
||||
VALUES (1, 'sha256:m1', 'application/vnd.oci.image.manifest.v1+json', '{}', 2)`)
|
||||
if err != nil {
|
||||
t.Fatalf("insert manifest: %v", err)
|
||||
}
|
||||
// Referenced blob.
|
||||
_, err = d.Exec(`INSERT INTO blobs (digest, size) VALUES ('sha256:referenced', 100)`)
|
||||
if err != nil {
|
||||
t.Fatalf("insert referenced blob: %v", err)
|
||||
}
|
||||
_, err = d.Exec(`INSERT INTO manifest_blobs (manifest_id, blob_id) VALUES (1, 1)`)
|
||||
if err != nil {
|
||||
t.Fatalf("insert manifest_blob: %v", err)
|
||||
}
|
||||
// Unreferenced blob.
|
||||
_, err = d.Exec(`INSERT INTO blobs (digest, size) VALUES ('sha256:unreferenced', 500)`)
|
||||
if err != nil {
|
||||
t.Fatalf("insert unreferenced blob: %v", err)
|
||||
}
|
||||
|
||||
unreferenced, err := d.FindAndDeleteUnreferencedBlobs()
|
||||
if err != nil {
|
||||
t.Fatalf("FindAndDeleteUnreferencedBlobs: %v", err)
|
||||
}
|
||||
|
||||
if len(unreferenced) != 1 {
|
||||
t.Fatalf("unreferenced count: got %d, want 1", len(unreferenced))
|
||||
}
|
||||
if unreferenced[0].Digest != "sha256:unreferenced" {
|
||||
t.Fatalf("unreferenced digest: got %q", unreferenced[0].Digest)
|
||||
}
|
||||
if unreferenced[0].Size != 500 {
|
||||
t.Fatalf("unreferenced size: got %d, want 500", unreferenced[0].Size)
|
||||
}
|
||||
|
||||
// Verify unreferenced blob row was deleted.
|
||||
exists, err := d.BlobExists("sha256:unreferenced")
|
||||
if err != nil {
|
||||
t.Fatalf("BlobExists: %v", err)
|
||||
}
|
||||
if exists {
|
||||
t.Fatal("unreferenced blob should have been deleted from DB")
|
||||
}
|
||||
|
||||
// Verify referenced blob still exists.
|
||||
exists, err = d.BlobExists("sha256:referenced")
|
||||
if err != nil {
|
||||
t.Fatalf("BlobExists: %v", err)
|
||||
}
|
||||
if !exists {
|
||||
t.Fatal("referenced blob should still exist")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindAndDeleteUnreferencedBlobsNone(t *testing.T) {
|
||||
d := openTestDB(t)
|
||||
if err := d.Migrate(); err != nil {
|
||||
t.Fatalf("Migrate: %v", err)
|
||||
}
|
||||
|
||||
unreferenced, err := d.FindAndDeleteUnreferencedBlobs()
|
||||
if err != nil {
|
||||
t.Fatalf("FindAndDeleteUnreferencedBlobs: %v", err)
|
||||
}
|
||||
if unreferenced != nil {
|
||||
t.Fatalf("expected nil, got %v", unreferenced)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlobExistsByDigest(t *testing.T) {
|
||||
d := openTestDB(t)
|
||||
if err := d.Migrate(); err != nil {
|
||||
t.Fatalf("Migrate: %v", err)
|
||||
}
|
||||
|
||||
_, err := d.Exec(`INSERT INTO blobs (digest, size) VALUES ('sha256:exists', 100)`)
|
||||
if err != nil {
|
||||
t.Fatalf("insert blob: %v", err)
|
||||
}
|
||||
|
||||
exists, err := d.BlobExistsByDigest("sha256:exists")
|
||||
if err != nil {
|
||||
t.Fatalf("BlobExistsByDigest: %v", err)
|
||||
}
|
||||
if !exists {
|
||||
t.Fatal("expected blob to exist")
|
||||
}
|
||||
|
||||
exists, err = d.BlobExistsByDigest("sha256:nope")
|
||||
if err != nil {
|
||||
t.Fatalf("BlobExistsByDigest (nope): %v", err)
|
||||
}
|
||||
if exists {
|
||||
t.Fatal("expected blob to not exist")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user