diff --git a/PROGRESS.md b/PROGRESS.md index 328bc86..2464e13 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -7,7 +7,7 @@ ARCHITECTURE.md for design details. ## Current Status -**Phase:** Phase 2 in progress. Steps 9–11 complete, ready for Step 12. +**Phase:** Phase 2 in progress. Steps 9–12b complete, ready for Step 13. **Last updated:** 2026-03-23 @@ -42,7 +42,7 @@ Phase 2: gRPC Remote Sync. ## Up Next -Step 12: Server Implementation (No Auth). +Step 13: Client Library (No Auth). ## Known Issues / Decisions Deferred @@ -71,3 +71,5 @@ Step 12: Server Implementation (No Auth). | 2026-03-23 | 9 | Proto definitions: 5 RPCs (Push/Pull manifest+blobs, Prune), generated sgardpb, Makefile, deps added. | | 2026-03-23 | 10 | Garden accessor methods: GetManifest, BlobExists, ReadBlob, WriteBlob, ReplaceManifest. 5 tests. | | 2026-03-23 | 11 | Proto-manifest conversion: ManifestToProto/ProtoToManifest with round-trip tests. | +| 2026-03-23 | 12 | gRPC server: 5 RPC handlers (push/pull manifest+blobs, prune), bufconn tests, store.List. | +| 2026-03-23 | 12b | Directory recursion in Add, mirror up/down commands, 7 tests. | diff --git a/PROJECT_PLAN.md b/PROJECT_PLAN.md index 808aacd..463ec2b 100644 --- a/PROJECT_PLAN.md +++ b/PROJECT_PLAN.md @@ -122,21 +122,22 @@ Depends on Step 5. Depends on Steps 9, 10, 11. -- [ ] `server/server.go`: Server struct with RWMutex, 4 RPC handlers -- [ ] PushManifest: timestamp compare, compute missing blobs -- [ ] PushBlobs: receive stream, write to store, replace manifest -- [ ] PullManifest: return manifest -- [ ] PullBlobs: stream requested blobs (64 KiB chunks) -- [ ] `server/server_test.go`: in-process test with bufconn, push+pull between two repos +- [x] `server/server.go`: Server struct with RWMutex, 5 RPC handlers (+ Prune) +- [x] PushManifest: timestamp compare, compute missing blobs +- [x] PushBlobs: receive stream, write to store, replace manifest +- [x] PullManifest: return manifest +- [x] PullBlobs: stream requested blobs (64 KiB chunks) +- [x] Prune: remove orphaned blobs (added store.List + garden.ListBlobs/DeleteBlob) +- [x] `server/server_test.go`: in-process test with bufconn, push+pull+prune ### Step 12b: Directory Recursion and Mirror Command -- [ ] `garden/garden.go`: `Add` recurses directories — walk all files/symlinks, add each as its own entry -- [ ] `garden/mirror.go`: `MirrorUp(paths []string) error` — walk directory, add new files, remove entries for files gone from disk, re-hash changed -- [ ] `garden/mirror.go`: `MirrorDown(paths []string, force bool, confirm func(string) bool) error` — restore all tracked files under path, delete anything not in manifest -- [ ] `garden/mirror_test.go`: tests for recursive add, mirror up (detects new/removed), mirror down (cleans extras) -- [ ] `cmd/sgard/mirror.go`: `sgard mirror up `, `sgard mirror down [--force]` -- [ ] Update existing add tests for directory recursion +- [x] `garden/garden.go`: `Add` recurses directories — walk all files/symlinks, add each as its own entry +- [x] `garden/mirror.go`: `MirrorUp(paths []string) error` — walk directory, add new files, remove entries for files gone from disk, re-hash changed +- [x] `garden/mirror.go`: `MirrorDown(paths []string, force bool, confirm func(string) bool) error` — restore all tracked files under path, delete anything not in manifest +- [x] `garden/mirror_test.go`: tests for recursive add, mirror up (detects new/removed), mirror down (cleans extras) +- [x] `cmd/sgard/mirror.go`: `sgard mirror up `, `sgard mirror down [--force]` +- [x] Update existing add tests for directory recursion ### Step 13: Client Library (No Auth) diff --git a/garden/garden.go b/garden/garden.go index 6810f28..56f9ab1 100644 --- a/garden/garden.go +++ b/garden/garden.go @@ -127,6 +127,16 @@ func (g *Garden) ReplaceManifest(m *manifest.Manifest) error { return nil } +// ListBlobs returns all blob hashes in the store. +func (g *Garden) ListBlobs() ([]string, error) { + return g.store.List() +} + +// DeleteBlob removes a blob from the store by hash. +func (g *Garden) DeleteBlob(hash string) error { + return g.store.Delete(hash) +} + // addEntry adds a single file or symlink to the manifest. The abs path must // already be resolved and info must come from os.Lstat. If skipDup is true, // already-tracked paths are silently skipped instead of returning an error. diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..aedbefe --- /dev/null +++ b/server/server.go @@ -0,0 +1,225 @@ +// Package server implements the GardenSync gRPC service. +package server + +import ( + "context" + "errors" + "io" + "sync" + + "github.com/kisom/sgard/garden" + "github.com/kisom/sgard/manifest" + "github.com/kisom/sgard/sgardpb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const chunkSize = 64 * 1024 // 64 KiB + +// Server implements the sgardpb.GardenSyncServer interface. +type Server struct { + sgardpb.UnimplementedGardenSyncServer + garden *garden.Garden + mu sync.RWMutex + pendingManifest *manifest.Manifest +} + +// New creates a new Server backed by the given Garden. +func New(g *garden.Garden) *Server { + return &Server{garden: g} +} + +// PushManifest compares the client manifest against the server manifest and +// decides whether to accept, reject, or report up-to-date. +func (s *Server) PushManifest(_ context.Context, req *sgardpb.PushManifestRequest) (*sgardpb.PushManifestResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + + serverManifest := s.garden.GetManifest() + clientManifest := ProtoToManifest(req.GetManifest()) + + resp := &sgardpb.PushManifestResponse{ + ServerUpdated: timestamppb.New(serverManifest.Updated), + } + + switch { + case clientManifest.Updated.After(serverManifest.Updated): + resp.Decision = sgardpb.PushManifestResponse_ACCEPTED + + var missing []string + for _, e := range clientManifest.Files { + if e.Type == "file" && e.Hash != "" && !s.garden.BlobExists(e.Hash) { + missing = append(missing, e.Hash) + } + } + resp.MissingBlobs = missing + s.pendingManifest = clientManifest + + case serverManifest.Updated.After(clientManifest.Updated): + resp.Decision = sgardpb.PushManifestResponse_REJECTED + + default: + resp.Decision = sgardpb.PushManifestResponse_UP_TO_DATE + } + + return resp, nil +} + +// PushBlobs receives a stream of blob chunks, reassembles them, writes each +// blob to the store, and then applies the pending manifest. +func (s *Server) PushBlobs(stream grpc.ClientStreamingServer[sgardpb.PushBlobsRequest, sgardpb.PushBlobsResponse]) error { + s.mu.Lock() + defer s.mu.Unlock() + + var ( + currentHash string + buf []byte + blobCount int32 + ) + + for { + req, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return status.Errorf(codes.Internal, "receiving blob chunk: %v", err) + } + + chunk := req.GetChunk() + if chunk == nil { + continue + } + + if chunk.GetHash() != "" { + // New blob starting. Write out the previous one if any. + if currentHash != "" { + if err := s.writeAndVerify(currentHash, buf); err != nil { + return err + } + blobCount++ + } + currentHash = chunk.GetHash() + buf = append([]byte(nil), chunk.GetData()...) + } else { + buf = append(buf, chunk.GetData()...) + } + } + + // Write the last accumulated blob. + if currentHash != "" { + if err := s.writeAndVerify(currentHash, buf); err != nil { + return err + } + blobCount++ + } + + // Apply pending manifest. + if s.pendingManifest != nil { + if err := s.garden.ReplaceManifest(s.pendingManifest); err != nil { + return status.Errorf(codes.Internal, "replacing manifest: %v", err) + } + s.pendingManifest = nil + } + + return stream.SendAndClose(&sgardpb.PushBlobsResponse{ + BlobsReceived: blobCount, + }) +} + +// writeAndVerify writes data to the blob store and verifies the hash matches. +func (s *Server) writeAndVerify(expectedHash string, data []byte) error { + gotHash, err := s.garden.WriteBlob(data) + if err != nil { + return status.Errorf(codes.Internal, "writing blob: %v", err) + } + if gotHash != expectedHash { + return status.Errorf(codes.DataLoss, "blob hash mismatch: expected %s, got %s", expectedHash, gotHash) + } + return nil +} + +// PullManifest returns the server's current manifest. +func (s *Server) PullManifest(_ context.Context, _ *sgardpb.PullManifestRequest) (*sgardpb.PullManifestResponse, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + return &sgardpb.PullManifestResponse{ + Manifest: ManifestToProto(s.garden.GetManifest()), + }, nil +} + +// PullBlobs streams the requested blobs back to the client in 64 KiB chunks. +func (s *Server) PullBlobs(req *sgardpb.PullBlobsRequest, stream grpc.ServerStreamingServer[sgardpb.PullBlobsResponse]) error { + s.mu.RLock() + defer s.mu.RUnlock() + + for _, hash := range req.GetHashes() { + data, err := s.garden.ReadBlob(hash) + if err != nil { + return status.Errorf(codes.NotFound, "reading blob %s: %v", hash, err) + } + + for i := 0; i < len(data); i += chunkSize { + end := i + chunkSize + if end > len(data) { + end = len(data) + } + chunk := &sgardpb.BlobChunk{ + Data: data[i:end], + } + if i == 0 { + chunk.Hash = hash + } + if err := stream.Send(&sgardpb.PullBlobsResponse{Chunk: chunk}); err != nil { + return status.Errorf(codes.Internal, "sending blob chunk: %v", err) + } + } + + // Handle empty blobs: send a single chunk with the hash. + if len(data) == 0 { + if err := stream.Send(&sgardpb.PullBlobsResponse{ + Chunk: &sgardpb.BlobChunk{Hash: hash}, + }); err != nil { + return status.Errorf(codes.Internal, "sending empty blob chunk: %v", err) + } + } + } + + return nil +} + +// Prune removes orphaned blobs that are not referenced by the current manifest. +func (s *Server) Prune(_ context.Context, _ *sgardpb.PruneRequest) (*sgardpb.PruneResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + + // Collect all referenced hashes from the manifest. + referenced := make(map[string]bool) + for _, e := range s.garden.GetManifest().Files { + if e.Type == "file" && e.Hash != "" { + referenced[e.Hash] = true + } + } + + // List all blobs in the store. + allBlobs, err := s.garden.ListBlobs() + if err != nil { + return nil, status.Errorf(codes.Internal, "listing blobs: %v", err) + } + + // Delete orphans. + var removed int32 + for _, hash := range allBlobs { + if !referenced[hash] { + if err := s.garden.DeleteBlob(hash); err != nil { + return nil, status.Errorf(codes.Internal, "deleting blob %s: %v", hash, err) + } + removed++ + } + } + + return &sgardpb.PruneResponse{BlobsRemoved: removed}, nil +} diff --git a/server/server_test.go b/server/server_test.go new file mode 100644 index 0000000..ef4e106 --- /dev/null +++ b/server/server_test.go @@ -0,0 +1,336 @@ +package server + +import ( + "context" + "errors" + "io" + "net" + "testing" + "time" + + "github.com/kisom/sgard/garden" + "github.com/kisom/sgard/manifest" + "github.com/kisom/sgard/sgardpb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" +) + +const bufSize = 1024 * 1024 + +// setupTest creates a client-server pair using in-process bufconn. +// It returns a gRPC client, the server Garden, and a client Garden. +func setupTest(t *testing.T) (sgardpb.GardenSyncClient, *garden.Garden, *garden.Garden) { + t.Helper() + + serverDir := t.TempDir() + serverGarden, err := garden.Init(serverDir) + if err != nil { + t.Fatalf("init server garden: %v", err) + } + + clientDir := t.TempDir() + clientGarden, err := garden.Init(clientDir) + if err != nil { + t.Fatalf("init client garden: %v", err) + } + + lis := bufconn.Listen(bufSize) + srv := grpc.NewServer() + sgardpb.RegisterGardenSyncServer(srv, New(serverGarden)) + t.Cleanup(func() { srv.Stop() }) + + go func() { + _ = srv.Serve(lis) + }() + + conn, err := grpc.NewClient("passthrough:///bufconn", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + t.Fatalf("dial bufconn: %v", err) + } + t.Cleanup(func() { _ = conn.Close() }) + + client := sgardpb.NewGardenSyncClient(conn) + return client, serverGarden, clientGarden +} + +func TestPushManifest_Accepted(t *testing.T) { + client, serverGarden, _ := setupTest(t) + ctx := context.Background() + + // Server has an old manifest (default init time). + // Client has a newer manifest with a file entry. + now := time.Now().UTC() + clientManifest := &manifest.Manifest{ + Version: 1, + Created: now, + Updated: now.Add(time.Hour), + Files: []manifest.Entry{ + { + Path: "~/.bashrc", + Hash: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + Type: "file", + Mode: "0644", + Updated: now, + }, + }, + } + + resp, err := client.PushManifest(ctx, &sgardpb.PushManifestRequest{ + Manifest: ManifestToProto(clientManifest), + }) + if err != nil { + t.Fatalf("PushManifest: %v", err) + } + + if resp.Decision != sgardpb.PushManifestResponse_ACCEPTED { + t.Errorf("decision: got %v, want ACCEPTED", resp.Decision) + } + + // The blob doesn't exist on server, so it should be in missing_blobs. + if len(resp.MissingBlobs) != 1 { + t.Fatalf("missing_blobs count: got %d, want 1", len(resp.MissingBlobs)) + } + if resp.MissingBlobs[0] != "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" { + t.Errorf("missing_blobs[0]: got %s, want aaaa...", resp.MissingBlobs[0]) + } + + // Write the blob to server and try again: it should not be missing. + _, err = serverGarden.WriteBlob([]byte("test data")) + if err != nil { + t.Fatalf("WriteBlob: %v", err) + } +} + +func TestPushManifest_Rejected(t *testing.T) { + client, serverGarden, _ := setupTest(t) + ctx := context.Background() + + // Make the server manifest newer. + serverManifest := serverGarden.GetManifest() + serverManifest.Updated = time.Now().UTC().Add(2 * time.Hour) + if err := serverGarden.ReplaceManifest(serverManifest); err != nil { + t.Fatalf("ReplaceManifest: %v", err) + } + + // Client manifest is at default init time (older). + clientManifest := &manifest.Manifest{ + Version: 1, + Created: time.Now().UTC(), + Updated: time.Now().UTC(), + Files: []manifest.Entry{}, + } + + resp, err := client.PushManifest(ctx, &sgardpb.PushManifestRequest{ + Manifest: ManifestToProto(clientManifest), + }) + if err != nil { + t.Fatalf("PushManifest: %v", err) + } + + if resp.Decision != sgardpb.PushManifestResponse_REJECTED { + t.Errorf("decision: got %v, want REJECTED", resp.Decision) + } +} + +func TestPushManifest_UpToDate(t *testing.T) { + client, serverGarden, _ := setupTest(t) + ctx := context.Background() + + // Set both to the same timestamp. + ts := time.Date(2026, 1, 15, 12, 0, 0, 0, time.UTC) + serverManifest := serverGarden.GetManifest() + serverManifest.Updated = ts + if err := serverGarden.ReplaceManifest(serverManifest); err != nil { + t.Fatalf("ReplaceManifest: %v", err) + } + + clientManifest := &manifest.Manifest{ + Version: 1, + Created: ts, + Updated: ts, + Files: []manifest.Entry{}, + } + + resp, err := client.PushManifest(ctx, &sgardpb.PushManifestRequest{ + Manifest: ManifestToProto(clientManifest), + }) + if err != nil { + t.Fatalf("PushManifest: %v", err) + } + + if resp.Decision != sgardpb.PushManifestResponse_UP_TO_DATE { + t.Errorf("decision: got %v, want UP_TO_DATE", resp.Decision) + } +} + +func TestPushAndPullBlobs(t *testing.T) { + client, serverGarden, _ := setupTest(t) + ctx := context.Background() + + // Write some test data as blobs directly to simulate a client garden. + blob1Data := []byte("hello world from bashrc") + blob2Data := []byte("vimrc content here") + + // We need the actual hashes for our manifest entries. + // Write to a throwaway garden to get hashes. + tmpDir := t.TempDir() + tmpGarden, err := garden.Init(tmpDir) + if err != nil { + t.Fatalf("init tmp garden: %v", err) + } + hash1, err := tmpGarden.WriteBlob(blob1Data) + if err != nil { + t.Fatalf("WriteBlob 1: %v", err) + } + hash2, err := tmpGarden.WriteBlob(blob2Data) + if err != nil { + t.Fatalf("WriteBlob 2: %v", err) + } + + now := time.Now().UTC().Add(time.Hour) + clientManifest := &manifest.Manifest{ + Version: 1, + Created: now, + Updated: now, + Files: []manifest.Entry{ + {Path: "~/.bashrc", Hash: hash1, Type: "file", Mode: "0644", Updated: now}, + {Path: "~/.vimrc", Hash: hash2, Type: "file", Mode: "0644", Updated: now}, + {Path: "~/.config", Type: "directory", Mode: "0755", Updated: now}, + }, + } + + // Step 1: PushManifest. + pushResp, err := client.PushManifest(ctx, &sgardpb.PushManifestRequest{ + Manifest: ManifestToProto(clientManifest), + }) + if err != nil { + t.Fatalf("PushManifest: %v", err) + } + if pushResp.Decision != sgardpb.PushManifestResponse_ACCEPTED { + t.Fatalf("decision: got %v, want ACCEPTED", pushResp.Decision) + } + if len(pushResp.MissingBlobs) != 2 { + t.Fatalf("missing_blobs: got %d, want 2", len(pushResp.MissingBlobs)) + } + + // Step 2: PushBlobs. + stream, err := client.PushBlobs(ctx) + if err != nil { + t.Fatalf("PushBlobs: %v", err) + } + + // Send blob1. + if err := stream.Send(&sgardpb.PushBlobsRequest{ + Chunk: &sgardpb.BlobChunk{Hash: hash1, Data: blob1Data}, + }); err != nil { + t.Fatalf("Send blob1: %v", err) + } + + // Send blob2. + if err := stream.Send(&sgardpb.PushBlobsRequest{ + Chunk: &sgardpb.BlobChunk{Hash: hash2, Data: blob2Data}, + }); err != nil { + t.Fatalf("Send blob2: %v", err) + } + + blobResp, err := stream.CloseAndRecv() + if err != nil { + t.Fatalf("CloseAndRecv: %v", err) + } + if blobResp.BlobsReceived != 2 { + t.Errorf("blobs_received: got %d, want 2", blobResp.BlobsReceived) + } + + // Verify blobs exist on server. + if !serverGarden.BlobExists(hash1) { + t.Error("blob1 not found on server") + } + if !serverGarden.BlobExists(hash2) { + t.Error("blob2 not found on server") + } + + // Verify manifest was applied on server. + sm := serverGarden.GetManifest() + if len(sm.Files) != 3 { + t.Fatalf("server manifest files: got %d, want 3", len(sm.Files)) + } + + // Step 3: PullManifest from the server. + pullMResp, err := client.PullManifest(ctx, &sgardpb.PullManifestRequest{}) + if err != nil { + t.Fatalf("PullManifest: %v", err) + } + pulledManifest := ProtoToManifest(pullMResp.GetManifest()) + if len(pulledManifest.Files) != 3 { + t.Fatalf("pulled manifest files: got %d, want 3", len(pulledManifest.Files)) + } + + // Step 4: PullBlobs from the server. + pullBResp, err := client.PullBlobs(ctx, &sgardpb.PullBlobsRequest{ + Hashes: []string{hash1, hash2}, + }) + if err != nil { + t.Fatalf("PullBlobs: %v", err) + } + + // Reassemble blobs from the stream. + pulledBlobs := make(map[string][]byte) + var currentHash string + for { + resp, err := pullBResp.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + t.Fatalf("PullBlobs Recv: %v", err) + } + chunk := resp.GetChunk() + if chunk.GetHash() != "" { + currentHash = chunk.GetHash() + } + pulledBlobs[currentHash] = append(pulledBlobs[currentHash], chunk.GetData()...) + } + + if string(pulledBlobs[hash1]) != string(blob1Data) { + t.Errorf("blob1 data mismatch: got %q, want %q", pulledBlobs[hash1], blob1Data) + } + if string(pulledBlobs[hash2]) != string(blob2Data) { + t.Errorf("blob2 data mismatch: got %q, want %q", pulledBlobs[hash2], blob2Data) + } +} + +func TestPrune(t *testing.T) { + client, serverGarden, _ := setupTest(t) + ctx := context.Background() + + // Write a blob to the server. + blobData := []byte("orphan blob data") + hash, err := serverGarden.WriteBlob(blobData) + if err != nil { + t.Fatalf("WriteBlob: %v", err) + } + + // The manifest does NOT reference this blob, so it is orphaned. + if !serverGarden.BlobExists(hash) { + t.Fatal("blob should exist before prune") + } + + resp, err := client.Prune(ctx, &sgardpb.PruneRequest{}) + if err != nil { + t.Fatalf("Prune: %v", err) + } + + if resp.BlobsRemoved != 1 { + t.Errorf("blobs_removed: got %d, want 1", resp.BlobsRemoved) + } + + if serverGarden.BlobExists(hash) { + t.Error("orphan blob should be deleted after prune") + } +} diff --git a/store/store.go b/store/store.go index 90eef86..6a456e1 100644 --- a/store/store.go +++ b/store/store.go @@ -131,6 +131,32 @@ func (s *Store) Delete(hash string) error { return nil } +// List returns all blob hashes in the store by walking the blobs directory. +func (s *Store) List() ([]string, error) { + blobsDir := filepath.Join(s.root, "blobs") + var hashes []string + err := filepath.WalkDir(blobsDir, func(path string, d os.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + return nil + } + name := d.Name() + if validHash(name) { + hashes = append(hashes, name) + } + return nil + }) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil, nil + } + return nil, fmt.Errorf("store: listing blobs: %w", err) + } + return hashes, nil +} + // blobPath returns the filesystem path for a blob with the given hash. // Layout: blobs/// func (s *Store) blobPath(hash string) string {