Phases 5, 6, 8: OCI pull/push paths and admin REST API
Phase 5 (OCI pull): internal/oci/ package with manifest GET/HEAD by tag/digest, blob GET/HEAD with repo membership check, tag listing with OCI pagination, catalog listing. Multi-segment repo names via parseOCIPath() right-split routing. DB query layer in internal/db/repository.go. Phase 6 (OCI push): blob uploads (monolithic and chunked) with uploadManager tracking in-progress BlobWriters, manifest push implementing full ARCHITECTURE.md §5 flow in a single SQLite transaction (create repo, upsert manifest, populate manifest_blobs, atomic tag move). Digest verification on both blob commit and manifest push-by-digest. Phase 8 (admin REST): /v1 endpoints for auth (login/logout/health), repository management (list/detail/delete), policy CRUD with engine reload, audit log listing with filters, GC trigger/status stubs. RequireAdmin middleware, platform-standard error format. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
241
internal/oci/upload.go
Normal file
241
internal/oci/upload.go
Normal file
@@ -0,0 +1,241 @@
|
||||
package oci
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"git.wntrmute.dev/kyle/mcr/internal/db"
|
||||
"git.wntrmute.dev/kyle/mcr/internal/policy"
|
||||
"git.wntrmute.dev/kyle/mcr/internal/storage"
|
||||
)
|
||||
|
||||
// uploadManager tracks in-progress blob writers by UUID.
|
||||
type uploadManager struct {
|
||||
mu sync.Mutex
|
||||
writers map[string]*storage.BlobWriter
|
||||
}
|
||||
|
||||
func newUploadManager() *uploadManager {
|
||||
return &uploadManager{writers: make(map[string]*storage.BlobWriter)}
|
||||
}
|
||||
|
||||
func (m *uploadManager) set(uuid string, bw *storage.BlobWriter) {
|
||||
m.mu.Lock()
|
||||
m.writers[uuid] = bw
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *uploadManager) get(uuid string) (*storage.BlobWriter, bool) {
|
||||
m.mu.Lock()
|
||||
bw, ok := m.writers[uuid]
|
||||
m.mu.Unlock()
|
||||
return bw, ok
|
||||
}
|
||||
|
||||
func (m *uploadManager) remove(uuid string) {
|
||||
m.mu.Lock()
|
||||
delete(m.writers, uuid)
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// generateUUID creates a random UUID (v4) string.
|
||||
func generateUUID() (string, error) {
|
||||
var buf [16]byte
|
||||
if _, err := rand.Read(buf[:]); err != nil {
|
||||
return "", fmt.Errorf("oci: generate uuid: %w", err)
|
||||
}
|
||||
// Set version 4 and variant bits.
|
||||
buf[6] = (buf[6] & 0x0f) | 0x40
|
||||
buf[8] = (buf[8] & 0x3f) | 0x80
|
||||
return fmt.Sprintf("%08x-%04x-%04x-%04x-%012x",
|
||||
buf[0:4], buf[4:6], buf[6:8], buf[8:10], buf[10:16]), nil
|
||||
}
|
||||
|
||||
// handleUploadInitiate handles POST /v2/<name>/blobs/uploads/
|
||||
func (h *Handler) handleUploadInitiate(w http.ResponseWriter, r *http.Request, repo string) {
|
||||
if !h.checkPolicy(w, r, policy.ActionPush, repo) {
|
||||
return
|
||||
}
|
||||
|
||||
// Create repository if it doesn't exist (implicit creation).
|
||||
repoID, err := h.db.GetOrCreateRepository(repo)
|
||||
if err != nil {
|
||||
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
||||
return
|
||||
}
|
||||
|
||||
uuid, err := generateUUID()
|
||||
if err != nil {
|
||||
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
||||
return
|
||||
}
|
||||
|
||||
// Insert upload row in DB.
|
||||
if err := h.db.CreateUpload(uuid, repoID); err != nil {
|
||||
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
||||
return
|
||||
}
|
||||
|
||||
// Create temp file via storage.
|
||||
bw, err := h.blobs.StartUpload(uuid)
|
||||
if err != nil {
|
||||
// Clean up DB row on storage failure.
|
||||
_ = h.db.DeleteUpload(uuid)
|
||||
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
||||
return
|
||||
}
|
||||
|
||||
h.uploads.set(uuid, bw)
|
||||
|
||||
w.Header().Set("Location", fmt.Sprintf("/v2/%s/blobs/uploads/%s", repo, uuid))
|
||||
w.Header().Set("Docker-Upload-UUID", uuid)
|
||||
w.Header().Set("Range", "0-0")
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
|
||||
// handleUploadChunk handles PATCH /v2/<name>/blobs/uploads/<uuid>
|
||||
func (h *Handler) handleUploadChunk(w http.ResponseWriter, r *http.Request, repo, uuid string) {
|
||||
if !h.checkPolicy(w, r, policy.ActionPush, repo) {
|
||||
return
|
||||
}
|
||||
|
||||
bw, ok := h.uploads.get(uuid)
|
||||
if !ok {
|
||||
writeOCIError(w, "BLOB_UPLOAD_UNKNOWN", http.StatusNotFound, "upload not found")
|
||||
return
|
||||
}
|
||||
|
||||
// Append request body to upload file.
|
||||
n, err := io.Copy(bw, r.Body)
|
||||
if err != nil {
|
||||
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "write failed")
|
||||
return
|
||||
}
|
||||
|
||||
// Update offset in DB.
|
||||
newOffset := bw.BytesWritten()
|
||||
if err := h.db.UpdateUploadOffset(uuid, newOffset); err != nil {
|
||||
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
||||
return
|
||||
}
|
||||
|
||||
_ = n // bytes written this chunk
|
||||
|
||||
w.Header().Set("Location", fmt.Sprintf("/v2/%s/blobs/uploads/%s", repo, uuid))
|
||||
w.Header().Set("Docker-Upload-UUID", uuid)
|
||||
w.Header().Set("Range", fmt.Sprintf("0-%d", newOffset))
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
|
||||
// handleUploadComplete handles PUT /v2/<name>/blobs/uploads/<uuid>?digest=<digest>
|
||||
func (h *Handler) handleUploadComplete(w http.ResponseWriter, r *http.Request, repo, uuid string) {
|
||||
if !h.checkPolicy(w, r, policy.ActionPush, repo) {
|
||||
return
|
||||
}
|
||||
|
||||
digest := r.URL.Query().Get("digest")
|
||||
if digest == "" {
|
||||
writeOCIError(w, "DIGEST_INVALID", http.StatusBadRequest, "digest parameter required")
|
||||
return
|
||||
}
|
||||
|
||||
bw, ok := h.uploads.get(uuid)
|
||||
if !ok {
|
||||
writeOCIError(w, "BLOB_UPLOAD_UNKNOWN", http.StatusNotFound, "upload not found")
|
||||
return
|
||||
}
|
||||
|
||||
// If request body is non-empty, append it first (monolithic upload).
|
||||
if r.ContentLength != 0 {
|
||||
if _, err := io.Copy(bw, r.Body); err != nil {
|
||||
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "write failed")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Commit the blob: verify digest, move to final location.
|
||||
size := bw.BytesWritten()
|
||||
_, err := bw.Commit(digest)
|
||||
if err != nil {
|
||||
h.uploads.remove(uuid)
|
||||
if errors.Is(err, storage.ErrDigestMismatch) {
|
||||
_ = h.db.DeleteUpload(uuid)
|
||||
writeOCIError(w, "DIGEST_INVALID", http.StatusBadRequest, "digest mismatch")
|
||||
return
|
||||
}
|
||||
if errors.Is(err, storage.ErrInvalidDigest) {
|
||||
_ = h.db.DeleteUpload(uuid)
|
||||
writeOCIError(w, "DIGEST_INVALID", http.StatusBadRequest, "invalid digest format")
|
||||
return
|
||||
}
|
||||
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "commit failed")
|
||||
return
|
||||
}
|
||||
|
||||
h.uploads.remove(uuid)
|
||||
|
||||
// Insert blob row (no-op if already exists — content-addressed dedup).
|
||||
if err := h.db.InsertBlob(digest, size); err != nil {
|
||||
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
||||
return
|
||||
}
|
||||
|
||||
// Delete upload row.
|
||||
_ = h.db.DeleteUpload(uuid)
|
||||
|
||||
h.audit(r, "blob_uploaded", repo, digest)
|
||||
|
||||
w.Header().Set("Location", fmt.Sprintf("/v2/%s/blobs/%s", repo, digest))
|
||||
w.Header().Set("Docker-Content-Digest", digest)
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
}
|
||||
|
||||
// handleUploadStatus handles GET /v2/<name>/blobs/uploads/<uuid>
|
||||
func (h *Handler) handleUploadStatus(w http.ResponseWriter, r *http.Request, repo, uuid string) {
|
||||
if !h.checkPolicy(w, r, policy.ActionPush, repo) {
|
||||
return
|
||||
}
|
||||
|
||||
upload, err := h.db.GetUpload(uuid)
|
||||
if err != nil {
|
||||
if errors.Is(err, db.ErrUploadNotFound) {
|
||||
writeOCIError(w, "BLOB_UPLOAD_UNKNOWN", http.StatusNotFound, "upload not found")
|
||||
return
|
||||
}
|
||||
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Location", fmt.Sprintf("/v2/%s/blobs/uploads/%s", repo, uuid))
|
||||
w.Header().Set("Docker-Upload-UUID", uuid)
|
||||
w.Header().Set("Range", fmt.Sprintf("0-%d", upload.ByteOffset))
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// handleUploadCancel handles DELETE /v2/<name>/blobs/uploads/<uuid>
|
||||
func (h *Handler) handleUploadCancel(w http.ResponseWriter, r *http.Request, repo, uuid string) {
|
||||
if !h.checkPolicy(w, r, policy.ActionPush, repo) {
|
||||
return
|
||||
}
|
||||
|
||||
bw, ok := h.uploads.get(uuid)
|
||||
if ok {
|
||||
_ = bw.Cancel()
|
||||
h.uploads.remove(uuid)
|
||||
}
|
||||
|
||||
if err := h.db.DeleteUpload(uuid); err != nil {
|
||||
if errors.Is(err, db.ErrUploadNotFound) {
|
||||
writeOCIError(w, "BLOB_UPLOAD_UNKNOWN", http.StatusNotFound, "upload not found")
|
||||
return
|
||||
}
|
||||
writeOCIError(w, "UNKNOWN", http.StatusInternalServerError, "internal error")
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
Reference in New Issue
Block a user