Every 500 response in the OCI package silently discarded the actual error, making production debugging impossible. Add slog.Error before each 500 response with the error and relevant context (repo, digest, tag, uuid). Add slog.Info for state-mutating successes (manifest push, blob upload complete, deletions). Logger is injected into the OCI Handler via constructor, falling back to slog.Default() if nil. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
257 lines
7.9 KiB
Go
257 lines
7.9 KiB
Go
package oci
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"sync"
|
|
|
|
"git.wntrmute.dev/kyle/mcr/internal/db"
|
|
"git.wntrmute.dev/kyle/mcr/internal/policy"
|
|
"git.wntrmute.dev/kyle/mcr/internal/storage"
|
|
)
|
|
|
|
// uploadManager tracks in-progress blob writers by UUID.
|
|
type uploadManager struct {
|
|
mu sync.Mutex
|
|
writers map[string]*storage.BlobWriter
|
|
}
|
|
|
|
func newUploadManager() *uploadManager {
|
|
return &uploadManager{writers: make(map[string]*storage.BlobWriter)}
|
|
}
|
|
|
|
func (m *uploadManager) set(uuid string, bw *storage.BlobWriter) {
|
|
m.mu.Lock()
|
|
m.writers[uuid] = bw
|
|
m.mu.Unlock()
|
|
}
|
|
|
|
func (m *uploadManager) get(uuid string) (*storage.BlobWriter, bool) {
|
|
m.mu.Lock()
|
|
bw, ok := m.writers[uuid]
|
|
m.mu.Unlock()
|
|
return bw, ok
|
|
}
|
|
|
|
func (m *uploadManager) remove(uuid string) {
|
|
m.mu.Lock()
|
|
delete(m.writers, uuid)
|
|
m.mu.Unlock()
|
|
}
|
|
|
|
// generateUUID creates a random UUID (v4) string.
|
|
func generateUUID() (string, error) {
|
|
var buf [16]byte
|
|
if _, err := rand.Read(buf[:]); err != nil {
|
|
return "", fmt.Errorf("oci: generate uuid: %w", err)
|
|
}
|
|
// Set version 4 and variant bits.
|
|
buf[6] = (buf[6] & 0x0f) | 0x40
|
|
buf[8] = (buf[8] & 0x3f) | 0x80
|
|
return fmt.Sprintf("%08x-%04x-%04x-%04x-%012x",
|
|
buf[0:4], buf[4:6], buf[6:8], buf[8:10], buf[10:16]), nil
|
|
}
|
|
|
|
// handleUploadInitiate handles POST /v2/<name>/blobs/uploads/
|
|
func (h *Handler) handleUploadInitiate(w http.ResponseWriter, r *http.Request, repo string) {
|
|
if !h.checkPolicy(w, r, policy.ActionPush, repo) {
|
|
return
|
|
}
|
|
|
|
// Create repository if it doesn't exist (implicit creation).
|
|
repoID, err := h.db.GetOrCreateRepository(repo)
|
|
if err != nil {
|
|
h.log.Error("upload initiate: get or create repository", "error", err, "repo", repo)
|
|
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
|
return
|
|
}
|
|
|
|
uuid, err := generateUUID()
|
|
if err != nil {
|
|
h.log.Error("upload initiate: generate uuid", "error", err, "repo", repo)
|
|
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
|
return
|
|
}
|
|
|
|
// Insert upload row in DB.
|
|
if err := h.db.CreateUpload(uuid, repoID); err != nil {
|
|
h.log.Error("upload initiate: create upload in database", "error", err, "repo", repo, "uuid", uuid)
|
|
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
|
return
|
|
}
|
|
|
|
// Create temp file via storage.
|
|
bw, err := h.blobs.StartUpload(uuid)
|
|
if err != nil {
|
|
// Clean up DB row on storage failure.
|
|
_ = h.db.DeleteUpload(uuid)
|
|
h.log.Error("upload initiate: start upload in storage", "error", err, "repo", repo, "uuid", uuid)
|
|
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
|
return
|
|
}
|
|
|
|
h.uploads.set(uuid, bw)
|
|
|
|
h.log.Info("upload initiated", "repo", repo, "uuid", uuid)
|
|
|
|
w.Header().Set("Location", fmt.Sprintf("/v2/%s/blobs/uploads/%s", repo, uuid))
|
|
w.Header().Set("Docker-Upload-UUID", uuid)
|
|
w.Header().Set("Range", "0-0")
|
|
w.WriteHeader(http.StatusAccepted)
|
|
}
|
|
|
|
// handleUploadChunk handles PATCH /v2/<name>/blobs/uploads/<uuid>
|
|
func (h *Handler) handleUploadChunk(w http.ResponseWriter, r *http.Request, repo, uuid string) {
|
|
if !h.checkPolicy(w, r, policy.ActionPush, repo) {
|
|
return
|
|
}
|
|
|
|
bw, ok := h.uploads.get(uuid)
|
|
if !ok {
|
|
writeOCIError(w, "BLOB_UPLOAD_UNKNOWN", http.StatusNotFound, "upload not found")
|
|
return
|
|
}
|
|
|
|
// Append request body to upload file.
|
|
n, err := io.Copy(bw, r.Body)
|
|
if err != nil {
|
|
h.log.Error("upload chunk: write to storage", "error", err, "repo", repo, "uuid", uuid)
|
|
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "write failed")
|
|
return
|
|
}
|
|
|
|
// Update offset in DB.
|
|
newOffset := bw.BytesWritten()
|
|
if err := h.db.UpdateUploadOffset(uuid, newOffset); err != nil {
|
|
h.log.Error("upload chunk: update offset in database", "error", err, "repo", repo, "uuid", uuid, "offset", newOffset)
|
|
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
|
return
|
|
}
|
|
|
|
_ = n // bytes written this chunk
|
|
|
|
w.Header().Set("Location", fmt.Sprintf("/v2/%s/blobs/uploads/%s", repo, uuid))
|
|
w.Header().Set("Docker-Upload-UUID", uuid)
|
|
w.Header().Set("Range", fmt.Sprintf("0-%d", newOffset))
|
|
w.WriteHeader(http.StatusAccepted)
|
|
}
|
|
|
|
// handleUploadComplete handles PUT /v2/<name>/blobs/uploads/<uuid>?digest=<digest>
|
|
func (h *Handler) handleUploadComplete(w http.ResponseWriter, r *http.Request, repo, uuid string) {
|
|
if !h.checkPolicy(w, r, policy.ActionPush, repo) {
|
|
return
|
|
}
|
|
|
|
digest := r.URL.Query().Get("digest")
|
|
if digest == "" {
|
|
writeOCIError(w, "DIGEST_INVALID", http.StatusBadRequest, "digest parameter required")
|
|
return
|
|
}
|
|
|
|
bw, ok := h.uploads.get(uuid)
|
|
if !ok {
|
|
writeOCIError(w, "BLOB_UPLOAD_UNKNOWN", http.StatusNotFound, "upload not found")
|
|
return
|
|
}
|
|
|
|
// If request body is non-empty, append it first (monolithic upload).
|
|
if r.ContentLength != 0 {
|
|
if _, err := io.Copy(bw, r.Body); err != nil {
|
|
h.log.Error("upload complete: write final chunk to storage", "error", err, "repo", repo, "uuid", uuid)
|
|
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "write failed")
|
|
return
|
|
}
|
|
}
|
|
|
|
// Commit the blob: verify digest, move to final location.
|
|
size := bw.BytesWritten()
|
|
_, err := bw.Commit(digest)
|
|
if err != nil {
|
|
h.uploads.remove(uuid)
|
|
if errors.Is(err, storage.ErrDigestMismatch) {
|
|
_ = h.db.DeleteUpload(uuid)
|
|
writeOCIError(w, "DIGEST_INVALID", http.StatusBadRequest, "digest mismatch")
|
|
return
|
|
}
|
|
if errors.Is(err, storage.ErrInvalidDigest) {
|
|
_ = h.db.DeleteUpload(uuid)
|
|
writeOCIError(w, "DIGEST_INVALID", http.StatusBadRequest, "invalid digest format")
|
|
return
|
|
}
|
|
h.log.Error("upload complete: commit blob", "error", err, "repo", repo, "uuid", uuid, "digest", digest)
|
|
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "commit failed")
|
|
return
|
|
}
|
|
|
|
h.uploads.remove(uuid)
|
|
|
|
// Insert blob row (no-op if already exists — content-addressed dedup).
|
|
if err := h.db.InsertBlob(digest, size); err != nil {
|
|
h.log.Error("upload complete: insert blob in database", "error", err, "repo", repo, "digest", digest, "size", size)
|
|
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
|
return
|
|
}
|
|
|
|
// Delete upload row.
|
|
_ = h.db.DeleteUpload(uuid)
|
|
|
|
h.audit(r, "blob_uploaded", repo, digest)
|
|
|
|
h.log.Info("blob upload complete", "repo", repo, "digest", digest, "size", size)
|
|
|
|
w.Header().Set("Location", fmt.Sprintf("/v2/%s/blobs/%s", repo, digest))
|
|
w.Header().Set("Docker-Content-Digest", digest)
|
|
w.WriteHeader(http.StatusCreated)
|
|
}
|
|
|
|
// handleUploadStatus handles GET /v2/<name>/blobs/uploads/<uuid>
|
|
func (h *Handler) handleUploadStatus(w http.ResponseWriter, r *http.Request, repo, uuid string) {
|
|
if !h.checkPolicy(w, r, policy.ActionPush, repo) {
|
|
return
|
|
}
|
|
|
|
upload, err := h.db.GetUpload(uuid)
|
|
if err != nil {
|
|
if errors.Is(err, db.ErrUploadNotFound) {
|
|
writeOCIError(w, "BLOB_UPLOAD_UNKNOWN", http.StatusNotFound, "upload not found")
|
|
return
|
|
}
|
|
h.log.Error("upload status: lookup upload", "error", err, "repo", repo, "uuid", uuid)
|
|
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Location", fmt.Sprintf("/v2/%s/blobs/uploads/%s", repo, uuid))
|
|
w.Header().Set("Docker-Upload-UUID", uuid)
|
|
w.Header().Set("Range", fmt.Sprintf("0-%d", upload.ByteOffset))
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|
|
|
|
// handleUploadCancel handles DELETE /v2/<name>/blobs/uploads/<uuid>
|
|
func (h *Handler) handleUploadCancel(w http.ResponseWriter, r *http.Request, repo, uuid string) {
|
|
if !h.checkPolicy(w, r, policy.ActionPush, repo) {
|
|
return
|
|
}
|
|
|
|
bw, ok := h.uploads.get(uuid)
|
|
if ok {
|
|
_ = bw.Cancel()
|
|
h.uploads.remove(uuid)
|
|
}
|
|
|
|
if err := h.db.DeleteUpload(uuid); err != nil {
|
|
if errors.Is(err, db.ErrUploadNotFound) {
|
|
writeOCIError(w, "BLOB_UPLOAD_UNKNOWN", http.StatusNotFound, "upload not found")
|
|
return
|
|
}
|
|
h.log.Error("upload cancel: delete upload from database", "error", err, "repo", repo, "uuid", uuid)
|
|
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
|
return
|
|
}
|
|
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|