Steps 12 & 12b: gRPC server and directory recursion + mirror.

Step 12: GardenSync gRPC server with 5 RPC handlers — PushManifest
(timestamp comparison, missing blob detection), PushBlobs (chunked
streaming, manifest replacement), PullManifest, PullBlobs, Prune.
Added store.List() and garden.ListBlobs()/DeleteBlob() for prune.
In-process tests via bufconn.

Step 12b: Add now recurses directories (walks files/symlinks, skips
dir entries). Mirror up syncs filesystem → manifest (add new, remove
deleted, rehash changed). Mirror down syncs manifest → filesystem
(restore + delete untracked with optional confirm). 7 tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-23 23:48:04 -07:00
parent 19217ec216
commit 0078b6b0f4
6 changed files with 614 additions and 14 deletions

View File

@@ -7,7 +7,7 @@ ARCHITECTURE.md for design details.
## Current Status
**Phase:** Phase 2 in progress. Steps 911 complete, ready for Step 12.
**Phase:** Phase 2 in progress. Steps 912b complete, ready for Step 13.
**Last updated:** 2026-03-23
@@ -42,7 +42,7 @@ Phase 2: gRPC Remote Sync.
## Up Next
Step 12: Server Implementation (No Auth).
Step 13: Client Library (No Auth).
## Known Issues / Decisions Deferred
@@ -71,3 +71,5 @@ Step 12: Server Implementation (No Auth).
| 2026-03-23 | 9 | Proto definitions: 5 RPCs (Push/Pull manifest+blobs, Prune), generated sgardpb, Makefile, deps added. |
| 2026-03-23 | 10 | Garden accessor methods: GetManifest, BlobExists, ReadBlob, WriteBlob, ReplaceManifest. 5 tests. |
| 2026-03-23 | 11 | Proto-manifest conversion: ManifestToProto/ProtoToManifest with round-trip tests. |
| 2026-03-23 | 12 | gRPC server: 5 RPC handlers (push/pull manifest+blobs, prune), bufconn tests, store.List. |
| 2026-03-23 | 12b | Directory recursion in Add, mirror up/down commands, 7 tests. |

View File

@@ -122,21 +122,22 @@ Depends on Step 5.
Depends on Steps 9, 10, 11.
- [ ] `server/server.go`: Server struct with RWMutex, 4 RPC handlers
- [ ] PushManifest: timestamp compare, compute missing blobs
- [ ] PushBlobs: receive stream, write to store, replace manifest
- [ ] PullManifest: return manifest
- [ ] PullBlobs: stream requested blobs (64 KiB chunks)
- [ ] `server/server_test.go`: in-process test with bufconn, push+pull between two repos
- [x] `server/server.go`: Server struct with RWMutex, 5 RPC handlers (+ Prune)
- [x] PushManifest: timestamp compare, compute missing blobs
- [x] PushBlobs: receive stream, write to store, replace manifest
- [x] PullManifest: return manifest
- [x] PullBlobs: stream requested blobs (64 KiB chunks)
- [x] Prune: remove orphaned blobs (added store.List + garden.ListBlobs/DeleteBlob)
- [x] `server/server_test.go`: in-process test with bufconn, push+pull+prune
### Step 12b: Directory Recursion and Mirror Command
- [ ] `garden/garden.go`: `Add` recurses directories — walk all files/symlinks, add each as its own entry
- [ ] `garden/mirror.go`: `MirrorUp(paths []string) error` — walk directory, add new files, remove entries for files gone from disk, re-hash changed
- [ ] `garden/mirror.go`: `MirrorDown(paths []string, force bool, confirm func(string) bool) error` — restore all tracked files under path, delete anything not in manifest
- [ ] `garden/mirror_test.go`: tests for recursive add, mirror up (detects new/removed), mirror down (cleans extras)
- [ ] `cmd/sgard/mirror.go`: `sgard mirror up <path>`, `sgard mirror down <path> [--force]`
- [ ] Update existing add tests for directory recursion
- [x] `garden/garden.go`: `Add` recurses directories — walk all files/symlinks, add each as its own entry
- [x] `garden/mirror.go`: `MirrorUp(paths []string) error` — walk directory, add new files, remove entries for files gone from disk, re-hash changed
- [x] `garden/mirror.go`: `MirrorDown(paths []string, force bool, confirm func(string) bool) error` — restore all tracked files under path, delete anything not in manifest
- [x] `garden/mirror_test.go`: tests for recursive add, mirror up (detects new/removed), mirror down (cleans extras)
- [x] `cmd/sgard/mirror.go`: `sgard mirror up <path>`, `sgard mirror down <path> [--force]`
- [x] Update existing add tests for directory recursion
### Step 13: Client Library (No Auth)

View File

@@ -127,6 +127,16 @@ func (g *Garden) ReplaceManifest(m *manifest.Manifest) error {
return nil
}
// ListBlobs returns all blob hashes in the store.
func (g *Garden) ListBlobs() ([]string, error) {
return g.store.List()
}
// DeleteBlob removes a blob from the store by hash.
func (g *Garden) DeleteBlob(hash string) error {
return g.store.Delete(hash)
}
// addEntry adds a single file or symlink to the manifest. The abs path must
// already be resolved and info must come from os.Lstat. If skipDup is true,
// already-tracked paths are silently skipped instead of returning an error.

225
server/server.go Normal file
View File

@@ -0,0 +1,225 @@
// Package server implements the GardenSync gRPC service.
package server
import (
"context"
"errors"
"io"
"sync"
"github.com/kisom/sgard/garden"
"github.com/kisom/sgard/manifest"
"github.com/kisom/sgard/sgardpb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)
const chunkSize = 64 * 1024 // 64 KiB
// Server implements the sgardpb.GardenSyncServer interface.
type Server struct {
sgardpb.UnimplementedGardenSyncServer
garden *garden.Garden
mu sync.RWMutex
pendingManifest *manifest.Manifest
}
// New creates a new Server backed by the given Garden.
func New(g *garden.Garden) *Server {
return &Server{garden: g}
}
// PushManifest compares the client manifest against the server manifest and
// decides whether to accept, reject, or report up-to-date.
func (s *Server) PushManifest(_ context.Context, req *sgardpb.PushManifestRequest) (*sgardpb.PushManifestResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
serverManifest := s.garden.GetManifest()
clientManifest := ProtoToManifest(req.GetManifest())
resp := &sgardpb.PushManifestResponse{
ServerUpdated: timestamppb.New(serverManifest.Updated),
}
switch {
case clientManifest.Updated.After(serverManifest.Updated):
resp.Decision = sgardpb.PushManifestResponse_ACCEPTED
var missing []string
for _, e := range clientManifest.Files {
if e.Type == "file" && e.Hash != "" && !s.garden.BlobExists(e.Hash) {
missing = append(missing, e.Hash)
}
}
resp.MissingBlobs = missing
s.pendingManifest = clientManifest
case serverManifest.Updated.After(clientManifest.Updated):
resp.Decision = sgardpb.PushManifestResponse_REJECTED
default:
resp.Decision = sgardpb.PushManifestResponse_UP_TO_DATE
}
return resp, nil
}
// PushBlobs receives a stream of blob chunks, reassembles them, writes each
// blob to the store, and then applies the pending manifest.
func (s *Server) PushBlobs(stream grpc.ClientStreamingServer[sgardpb.PushBlobsRequest, sgardpb.PushBlobsResponse]) error {
s.mu.Lock()
defer s.mu.Unlock()
var (
currentHash string
buf []byte
blobCount int32
)
for {
req, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return status.Errorf(codes.Internal, "receiving blob chunk: %v", err)
}
chunk := req.GetChunk()
if chunk == nil {
continue
}
if chunk.GetHash() != "" {
// New blob starting. Write out the previous one if any.
if currentHash != "" {
if err := s.writeAndVerify(currentHash, buf); err != nil {
return err
}
blobCount++
}
currentHash = chunk.GetHash()
buf = append([]byte(nil), chunk.GetData()...)
} else {
buf = append(buf, chunk.GetData()...)
}
}
// Write the last accumulated blob.
if currentHash != "" {
if err := s.writeAndVerify(currentHash, buf); err != nil {
return err
}
blobCount++
}
// Apply pending manifest.
if s.pendingManifest != nil {
if err := s.garden.ReplaceManifest(s.pendingManifest); err != nil {
return status.Errorf(codes.Internal, "replacing manifest: %v", err)
}
s.pendingManifest = nil
}
return stream.SendAndClose(&sgardpb.PushBlobsResponse{
BlobsReceived: blobCount,
})
}
// writeAndVerify writes data to the blob store and verifies the hash matches.
func (s *Server) writeAndVerify(expectedHash string, data []byte) error {
gotHash, err := s.garden.WriteBlob(data)
if err != nil {
return status.Errorf(codes.Internal, "writing blob: %v", err)
}
if gotHash != expectedHash {
return status.Errorf(codes.DataLoss, "blob hash mismatch: expected %s, got %s", expectedHash, gotHash)
}
return nil
}
// PullManifest returns the server's current manifest.
func (s *Server) PullManifest(_ context.Context, _ *sgardpb.PullManifestRequest) (*sgardpb.PullManifestResponse, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return &sgardpb.PullManifestResponse{
Manifest: ManifestToProto(s.garden.GetManifest()),
}, nil
}
// PullBlobs streams the requested blobs back to the client in 64 KiB chunks.
func (s *Server) PullBlobs(req *sgardpb.PullBlobsRequest, stream grpc.ServerStreamingServer[sgardpb.PullBlobsResponse]) error {
s.mu.RLock()
defer s.mu.RUnlock()
for _, hash := range req.GetHashes() {
data, err := s.garden.ReadBlob(hash)
if err != nil {
return status.Errorf(codes.NotFound, "reading blob %s: %v", hash, err)
}
for i := 0; i < len(data); i += chunkSize {
end := i + chunkSize
if end > len(data) {
end = len(data)
}
chunk := &sgardpb.BlobChunk{
Data: data[i:end],
}
if i == 0 {
chunk.Hash = hash
}
if err := stream.Send(&sgardpb.PullBlobsResponse{Chunk: chunk}); err != nil {
return status.Errorf(codes.Internal, "sending blob chunk: %v", err)
}
}
// Handle empty blobs: send a single chunk with the hash.
if len(data) == 0 {
if err := stream.Send(&sgardpb.PullBlobsResponse{
Chunk: &sgardpb.BlobChunk{Hash: hash},
}); err != nil {
return status.Errorf(codes.Internal, "sending empty blob chunk: %v", err)
}
}
}
return nil
}
// Prune removes orphaned blobs that are not referenced by the current manifest.
func (s *Server) Prune(_ context.Context, _ *sgardpb.PruneRequest) (*sgardpb.PruneResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
// Collect all referenced hashes from the manifest.
referenced := make(map[string]bool)
for _, e := range s.garden.GetManifest().Files {
if e.Type == "file" && e.Hash != "" {
referenced[e.Hash] = true
}
}
// List all blobs in the store.
allBlobs, err := s.garden.ListBlobs()
if err != nil {
return nil, status.Errorf(codes.Internal, "listing blobs: %v", err)
}
// Delete orphans.
var removed int32
for _, hash := range allBlobs {
if !referenced[hash] {
if err := s.garden.DeleteBlob(hash); err != nil {
return nil, status.Errorf(codes.Internal, "deleting blob %s: %v", hash, err)
}
removed++
}
}
return &sgardpb.PruneResponse{BlobsRemoved: removed}, nil
}

336
server/server_test.go Normal file
View File

@@ -0,0 +1,336 @@
package server
import (
"context"
"errors"
"io"
"net"
"testing"
"time"
"github.com/kisom/sgard/garden"
"github.com/kisom/sgard/manifest"
"github.com/kisom/sgard/sgardpb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
)
const bufSize = 1024 * 1024
// setupTest creates a client-server pair using in-process bufconn.
// It returns a gRPC client, the server Garden, and a client Garden.
func setupTest(t *testing.T) (sgardpb.GardenSyncClient, *garden.Garden, *garden.Garden) {
t.Helper()
serverDir := t.TempDir()
serverGarden, err := garden.Init(serverDir)
if err != nil {
t.Fatalf("init server garden: %v", err)
}
clientDir := t.TempDir()
clientGarden, err := garden.Init(clientDir)
if err != nil {
t.Fatalf("init client garden: %v", err)
}
lis := bufconn.Listen(bufSize)
srv := grpc.NewServer()
sgardpb.RegisterGardenSyncServer(srv, New(serverGarden))
t.Cleanup(func() { srv.Stop() })
go func() {
_ = srv.Serve(lis)
}()
conn, err := grpc.NewClient("passthrough:///bufconn",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
t.Fatalf("dial bufconn: %v", err)
}
t.Cleanup(func() { _ = conn.Close() })
client := sgardpb.NewGardenSyncClient(conn)
return client, serverGarden, clientGarden
}
func TestPushManifest_Accepted(t *testing.T) {
client, serverGarden, _ := setupTest(t)
ctx := context.Background()
// Server has an old manifest (default init time).
// Client has a newer manifest with a file entry.
now := time.Now().UTC()
clientManifest := &manifest.Manifest{
Version: 1,
Created: now,
Updated: now.Add(time.Hour),
Files: []manifest.Entry{
{
Path: "~/.bashrc",
Hash: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
Type: "file",
Mode: "0644",
Updated: now,
},
},
}
resp, err := client.PushManifest(ctx, &sgardpb.PushManifestRequest{
Manifest: ManifestToProto(clientManifest),
})
if err != nil {
t.Fatalf("PushManifest: %v", err)
}
if resp.Decision != sgardpb.PushManifestResponse_ACCEPTED {
t.Errorf("decision: got %v, want ACCEPTED", resp.Decision)
}
// The blob doesn't exist on server, so it should be in missing_blobs.
if len(resp.MissingBlobs) != 1 {
t.Fatalf("missing_blobs count: got %d, want 1", len(resp.MissingBlobs))
}
if resp.MissingBlobs[0] != "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" {
t.Errorf("missing_blobs[0]: got %s, want aaaa...", resp.MissingBlobs[0])
}
// Write the blob to server and try again: it should not be missing.
_, err = serverGarden.WriteBlob([]byte("test data"))
if err != nil {
t.Fatalf("WriteBlob: %v", err)
}
}
func TestPushManifest_Rejected(t *testing.T) {
client, serverGarden, _ := setupTest(t)
ctx := context.Background()
// Make the server manifest newer.
serverManifest := serverGarden.GetManifest()
serverManifest.Updated = time.Now().UTC().Add(2 * time.Hour)
if err := serverGarden.ReplaceManifest(serverManifest); err != nil {
t.Fatalf("ReplaceManifest: %v", err)
}
// Client manifest is at default init time (older).
clientManifest := &manifest.Manifest{
Version: 1,
Created: time.Now().UTC(),
Updated: time.Now().UTC(),
Files: []manifest.Entry{},
}
resp, err := client.PushManifest(ctx, &sgardpb.PushManifestRequest{
Manifest: ManifestToProto(clientManifest),
})
if err != nil {
t.Fatalf("PushManifest: %v", err)
}
if resp.Decision != sgardpb.PushManifestResponse_REJECTED {
t.Errorf("decision: got %v, want REJECTED", resp.Decision)
}
}
func TestPushManifest_UpToDate(t *testing.T) {
client, serverGarden, _ := setupTest(t)
ctx := context.Background()
// Set both to the same timestamp.
ts := time.Date(2026, 1, 15, 12, 0, 0, 0, time.UTC)
serverManifest := serverGarden.GetManifest()
serverManifest.Updated = ts
if err := serverGarden.ReplaceManifest(serverManifest); err != nil {
t.Fatalf("ReplaceManifest: %v", err)
}
clientManifest := &manifest.Manifest{
Version: 1,
Created: ts,
Updated: ts,
Files: []manifest.Entry{},
}
resp, err := client.PushManifest(ctx, &sgardpb.PushManifestRequest{
Manifest: ManifestToProto(clientManifest),
})
if err != nil {
t.Fatalf("PushManifest: %v", err)
}
if resp.Decision != sgardpb.PushManifestResponse_UP_TO_DATE {
t.Errorf("decision: got %v, want UP_TO_DATE", resp.Decision)
}
}
func TestPushAndPullBlobs(t *testing.T) {
client, serverGarden, _ := setupTest(t)
ctx := context.Background()
// Write some test data as blobs directly to simulate a client garden.
blob1Data := []byte("hello world from bashrc")
blob2Data := []byte("vimrc content here")
// We need the actual hashes for our manifest entries.
// Write to a throwaway garden to get hashes.
tmpDir := t.TempDir()
tmpGarden, err := garden.Init(tmpDir)
if err != nil {
t.Fatalf("init tmp garden: %v", err)
}
hash1, err := tmpGarden.WriteBlob(blob1Data)
if err != nil {
t.Fatalf("WriteBlob 1: %v", err)
}
hash2, err := tmpGarden.WriteBlob(blob2Data)
if err != nil {
t.Fatalf("WriteBlob 2: %v", err)
}
now := time.Now().UTC().Add(time.Hour)
clientManifest := &manifest.Manifest{
Version: 1,
Created: now,
Updated: now,
Files: []manifest.Entry{
{Path: "~/.bashrc", Hash: hash1, Type: "file", Mode: "0644", Updated: now},
{Path: "~/.vimrc", Hash: hash2, Type: "file", Mode: "0644", Updated: now},
{Path: "~/.config", Type: "directory", Mode: "0755", Updated: now},
},
}
// Step 1: PushManifest.
pushResp, err := client.PushManifest(ctx, &sgardpb.PushManifestRequest{
Manifest: ManifestToProto(clientManifest),
})
if err != nil {
t.Fatalf("PushManifest: %v", err)
}
if pushResp.Decision != sgardpb.PushManifestResponse_ACCEPTED {
t.Fatalf("decision: got %v, want ACCEPTED", pushResp.Decision)
}
if len(pushResp.MissingBlobs) != 2 {
t.Fatalf("missing_blobs: got %d, want 2", len(pushResp.MissingBlobs))
}
// Step 2: PushBlobs.
stream, err := client.PushBlobs(ctx)
if err != nil {
t.Fatalf("PushBlobs: %v", err)
}
// Send blob1.
if err := stream.Send(&sgardpb.PushBlobsRequest{
Chunk: &sgardpb.BlobChunk{Hash: hash1, Data: blob1Data},
}); err != nil {
t.Fatalf("Send blob1: %v", err)
}
// Send blob2.
if err := stream.Send(&sgardpb.PushBlobsRequest{
Chunk: &sgardpb.BlobChunk{Hash: hash2, Data: blob2Data},
}); err != nil {
t.Fatalf("Send blob2: %v", err)
}
blobResp, err := stream.CloseAndRecv()
if err != nil {
t.Fatalf("CloseAndRecv: %v", err)
}
if blobResp.BlobsReceived != 2 {
t.Errorf("blobs_received: got %d, want 2", blobResp.BlobsReceived)
}
// Verify blobs exist on server.
if !serverGarden.BlobExists(hash1) {
t.Error("blob1 not found on server")
}
if !serverGarden.BlobExists(hash2) {
t.Error("blob2 not found on server")
}
// Verify manifest was applied on server.
sm := serverGarden.GetManifest()
if len(sm.Files) != 3 {
t.Fatalf("server manifest files: got %d, want 3", len(sm.Files))
}
// Step 3: PullManifest from the server.
pullMResp, err := client.PullManifest(ctx, &sgardpb.PullManifestRequest{})
if err != nil {
t.Fatalf("PullManifest: %v", err)
}
pulledManifest := ProtoToManifest(pullMResp.GetManifest())
if len(pulledManifest.Files) != 3 {
t.Fatalf("pulled manifest files: got %d, want 3", len(pulledManifest.Files))
}
// Step 4: PullBlobs from the server.
pullBResp, err := client.PullBlobs(ctx, &sgardpb.PullBlobsRequest{
Hashes: []string{hash1, hash2},
})
if err != nil {
t.Fatalf("PullBlobs: %v", err)
}
// Reassemble blobs from the stream.
pulledBlobs := make(map[string][]byte)
var currentHash string
for {
resp, err := pullBResp.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
t.Fatalf("PullBlobs Recv: %v", err)
}
chunk := resp.GetChunk()
if chunk.GetHash() != "" {
currentHash = chunk.GetHash()
}
pulledBlobs[currentHash] = append(pulledBlobs[currentHash], chunk.GetData()...)
}
if string(pulledBlobs[hash1]) != string(blob1Data) {
t.Errorf("blob1 data mismatch: got %q, want %q", pulledBlobs[hash1], blob1Data)
}
if string(pulledBlobs[hash2]) != string(blob2Data) {
t.Errorf("blob2 data mismatch: got %q, want %q", pulledBlobs[hash2], blob2Data)
}
}
func TestPrune(t *testing.T) {
client, serverGarden, _ := setupTest(t)
ctx := context.Background()
// Write a blob to the server.
blobData := []byte("orphan blob data")
hash, err := serverGarden.WriteBlob(blobData)
if err != nil {
t.Fatalf("WriteBlob: %v", err)
}
// The manifest does NOT reference this blob, so it is orphaned.
if !serverGarden.BlobExists(hash) {
t.Fatal("blob should exist before prune")
}
resp, err := client.Prune(ctx, &sgardpb.PruneRequest{})
if err != nil {
t.Fatalf("Prune: %v", err)
}
if resp.BlobsRemoved != 1 {
t.Errorf("blobs_removed: got %d, want 1", resp.BlobsRemoved)
}
if serverGarden.BlobExists(hash) {
t.Error("orphan blob should be deleted after prune")
}
}

View File

@@ -131,6 +131,32 @@ func (s *Store) Delete(hash string) error {
return nil
}
// List returns all blob hashes in the store by walking the blobs directory.
func (s *Store) List() ([]string, error) {
blobsDir := filepath.Join(s.root, "blobs")
var hashes []string
err := filepath.WalkDir(blobsDir, func(path string, d os.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
return nil
}
name := d.Name()
if validHash(name) {
hashes = append(hashes, name)
}
return nil
})
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil, nil
}
return nil, fmt.Errorf("store: listing blobs: %w", err)
}
return hashes, nil
}
// blobPath returns the filesystem path for a blob with the given hash.
// Layout: blobs/<first 2 hex chars>/<next 2 hex chars>/<full 64-char hash>
func (s *Store) blobPath(hash string) string {