diff --git a/PROGRESS.md b/PROGRESS.md index 2464e13..545b3f9 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -7,7 +7,7 @@ ARCHITECTURE.md for design details. ## Current Status -**Phase:** Phase 2 in progress. Steps 9–12b complete, ready for Step 13. +**Phase:** Phase 2 in progress. Steps 9–13 complete, ready for Step 14. **Last updated:** 2026-03-23 @@ -42,7 +42,7 @@ Phase 2: gRPC Remote Sync. ## Up Next -Step 13: Client Library (No Auth). +Step 14: SSH Key Auth. ## Known Issues / Decisions Deferred @@ -73,3 +73,4 @@ Step 13: Client Library (No Auth). | 2026-03-23 | 11 | Proto-manifest conversion: ManifestToProto/ProtoToManifest with round-trip tests. | | 2026-03-23 | 12 | gRPC server: 5 RPC handlers (push/pull manifest+blobs, prune), bufconn tests, store.List. | | 2026-03-23 | 12b | Directory recursion in Add, mirror up/down commands, 7 tests. | +| 2026-03-23 | 13 | Client library: Push, Pull, Prune with chunked blob streaming. 6 integration tests. | diff --git a/PROJECT_PLAN.md b/PROJECT_PLAN.md index 463ec2b..f2fab8c 100644 --- a/PROJECT_PLAN.md +++ b/PROJECT_PLAN.md @@ -143,8 +143,8 @@ Depends on Steps 9, 10, 11. Depends on Step 12. -- [ ] `client/client.go`: Client struct, `Push()`, `Pull()` methods -- [ ] `client/client_test.go`: integration test against in-process server +- [x] `client/client.go`: Client struct, `Push()`, `Pull()`, `Prune()` methods +- [x] `client/client_test.go`: integration tests (push+pull cycle, server newer, up-to-date, prune) ### Step 14: SSH Key Auth diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..bae412b --- /dev/null +++ b/client/client.go @@ -0,0 +1,213 @@ +// Package client provides a gRPC client for the sgard GardenSync service. +package client + +import ( + "context" + "errors" + "fmt" + "io" + + "github.com/kisom/sgard/garden" + "github.com/kisom/sgard/server" + "github.com/kisom/sgard/sgardpb" + "google.golang.org/grpc" +) + +const chunkSize = 64 * 1024 // 64 KiB + +// Client wraps a gRPC connection to a GardenSync server. +type Client struct { + rpc sgardpb.GardenSyncClient +} + +// New creates a Client from an existing gRPC connection. +func New(conn grpc.ClientConnInterface) *Client { + return &Client{rpc: sgardpb.NewGardenSyncClient(conn)} +} + +// Push sends the local manifest and any missing blobs to the server. +// Returns the number of blobs sent, or an error. If the server is newer, +// returns ErrServerNewer. +func (c *Client) Push(ctx context.Context, g *garden.Garden) (int, error) { + localManifest := g.GetManifest() + + // Step 1: send manifest, get decision. + resp, err := c.rpc.PushManifest(ctx, &sgardpb.PushManifestRequest{ + Manifest: server.ManifestToProto(localManifest), + }) + if err != nil { + return 0, fmt.Errorf("push manifest: %w", err) + } + + switch resp.Decision { + case sgardpb.PushManifestResponse_REJECTED: + return 0, ErrServerNewer + case sgardpb.PushManifestResponse_UP_TO_DATE: + return 0, nil + case sgardpb.PushManifestResponse_ACCEPTED: + // continue + default: + return 0, fmt.Errorf("unexpected decision: %v", resp.Decision) + } + + // Step 2: stream missing blobs. + if len(resp.MissingBlobs) == 0 { + // Manifest accepted but no blobs needed — still need to call PushBlobs + // to trigger manifest replacement on the server. + stream, err := c.rpc.PushBlobs(ctx) + if err != nil { + return 0, fmt.Errorf("push blobs: %w", err) + } + _, err = stream.CloseAndRecv() + if err != nil { + return 0, fmt.Errorf("close push blobs: %w", err) + } + return 0, nil + } + + stream, err := c.rpc.PushBlobs(ctx) + if err != nil { + return 0, fmt.Errorf("push blobs: %w", err) + } + + for _, hash := range resp.MissingBlobs { + data, err := g.ReadBlob(hash) + if err != nil { + return 0, fmt.Errorf("reading local blob %s: %w", hash, err) + } + + for i := 0; i < len(data); i += chunkSize { + end := i + chunkSize + if end > len(data) { + end = len(data) + } + chunk := &sgardpb.BlobChunk{Data: data[i:end]} + if i == 0 { + chunk.Hash = hash + } + if err := stream.Send(&sgardpb.PushBlobsRequest{Chunk: chunk}); err != nil { + return 0, fmt.Errorf("sending blob chunk: %w", err) + } + } + + // Handle empty blobs. + if len(data) == 0 { + if err := stream.Send(&sgardpb.PushBlobsRequest{ + Chunk: &sgardpb.BlobChunk{Hash: hash}, + }); err != nil { + return 0, fmt.Errorf("sending empty blob: %w", err) + } + } + } + + blobResp, err := stream.CloseAndRecv() + if err != nil { + return 0, fmt.Errorf("close push blobs: %w", err) + } + + return int(blobResp.BlobsReceived), nil +} + +// Pull downloads the server's manifest and any missing blobs to the local garden. +// Returns the number of blobs received, or an error. If the local manifest is +// newer or equal, returns 0 with no error. +func (c *Client) Pull(ctx context.Context, g *garden.Garden) (int, error) { + // Step 1: get server manifest. + pullResp, err := c.rpc.PullManifest(ctx, &sgardpb.PullManifestRequest{}) + if err != nil { + return 0, fmt.Errorf("pull manifest: %w", err) + } + + serverManifest := server.ProtoToManifest(pullResp.GetManifest()) + localManifest := g.GetManifest() + + // If local is newer or equal, nothing to do. + if !serverManifest.Updated.After(localManifest.Updated) { + return 0, nil + } + + // Step 2: compute missing blobs. + var missingHashes []string + for _, e := range serverManifest.Files { + if e.Type == "file" && e.Hash != "" && !g.BlobExists(e.Hash) { + missingHashes = append(missingHashes, e.Hash) + } + } + + // Step 3: pull missing blobs. + blobCount := 0 + if len(missingHashes) > 0 { + stream, err := c.rpc.PullBlobs(ctx, &sgardpb.PullBlobsRequest{ + Hashes: missingHashes, + }) + if err != nil { + return 0, fmt.Errorf("pull blobs: %w", err) + } + + var currentHash string + var buf []byte + + for { + resp, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return 0, fmt.Errorf("receiving blob chunk: %w", err) + } + + chunk := resp.GetChunk() + if chunk.GetHash() != "" { + // New blob starting. Write out the previous one. + if currentHash != "" { + if err := writeAndVerify(g, currentHash, buf); err != nil { + return 0, err + } + blobCount++ + } + currentHash = chunk.GetHash() + buf = append([]byte(nil), chunk.GetData()...) + } else { + buf = append(buf, chunk.GetData()...) + } + } + + // Write the last blob. + if currentHash != "" { + if err := writeAndVerify(g, currentHash, buf); err != nil { + return 0, err + } + blobCount++ + } + } + + // Step 4: replace local manifest. + if err := g.ReplaceManifest(serverManifest); err != nil { + return 0, fmt.Errorf("replacing local manifest: %w", err) + } + + return blobCount, nil +} + +// Prune requests the server to remove orphaned blobs. Returns the count removed. +func (c *Client) Prune(ctx context.Context) (int, error) { + resp, err := c.rpc.Prune(ctx, &sgardpb.PruneRequest{}) + if err != nil { + return 0, fmt.Errorf("prune: %w", err) + } + return int(resp.BlobsRemoved), nil +} + +func writeAndVerify(g *garden.Garden, expectedHash string, data []byte) error { + gotHash, err := g.WriteBlob(data) + if err != nil { + return fmt.Errorf("writing blob: %w", err) + } + if gotHash != expectedHash { + return fmt.Errorf("blob hash mismatch: expected %s, got %s", expectedHash, gotHash) + } + return nil +} + +// ErrServerNewer indicates the server's manifest is newer than the local one. +var ErrServerNewer = errors.New("server manifest is newer; pull first") diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 0000000..ec7a1c6 --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,208 @@ +package client + +import ( + "context" + "errors" + "net" + "os" + "path/filepath" + "testing" + "time" + + "github.com/kisom/sgard/garden" + "github.com/kisom/sgard/server" + "github.com/kisom/sgard/sgardpb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" +) + +const bufSize = 1024 * 1024 + +// setupTest creates a gRPC client, server garden, and client garden +// connected via in-process bufconn. +func setupTest(t *testing.T) (*Client, *garden.Garden, *garden.Garden) { + t.Helper() + + serverDir := t.TempDir() + serverGarden, err := garden.Init(serverDir) + if err != nil { + t.Fatalf("init server garden: %v", err) + } + + clientDir := t.TempDir() + clientGarden, err := garden.Init(clientDir) + if err != nil { + t.Fatalf("init client garden: %v", err) + } + + lis := bufconn.Listen(bufSize) + srv := grpc.NewServer() + sgardpb.RegisterGardenSyncServer(srv, server.New(serverGarden)) + t.Cleanup(func() { srv.Stop() }) + + go func() { + _ = srv.Serve(lis) + }() + + conn, err := grpc.NewClient("passthrough:///bufconn", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + t.Fatalf("dial bufconn: %v", err) + } + t.Cleanup(func() { _ = conn.Close() }) + + c := New(conn) + return c, serverGarden, clientGarden +} + +func TestPushAndPull(t *testing.T) { + c, serverGarden, clientGarden := setupTest(t) + ctx := context.Background() + + // Create files in a temp directory and add them to the client garden. + root := t.TempDir() + bashrc := filepath.Join(root, "bashrc") + gitconfig := filepath.Join(root, "gitconfig") + if err := os.WriteFile(bashrc, []byte("export PS1='$ '\n"), 0o644); err != nil { + t.Fatalf("writing bashrc: %v", err) + } + if err := os.WriteFile(gitconfig, []byte("[user]\n\tname = test\n"), 0o644); err != nil { + t.Fatalf("writing gitconfig: %v", err) + } + + if err := clientGarden.Add([]string{bashrc, gitconfig}); err != nil { + t.Fatalf("Add: %v", err) + } + if err := clientGarden.Checkpoint("initial"); err != nil { + t.Fatalf("Checkpoint: %v", err) + } + + // Push from client to server. + pushed, err := c.Push(ctx, clientGarden) + if err != nil { + t.Fatalf("Push: %v", err) + } + if pushed != 2 { + t.Errorf("pushed %d blobs, want 2", pushed) + } + + // Verify server has the blobs. + clientManifest := clientGarden.GetManifest() + for _, e := range clientManifest.Files { + if e.Type == "file" && !serverGarden.BlobExists(e.Hash) { + t.Errorf("server missing blob for %s", e.Path) + } + } + + // Verify server manifest matches. + serverManifest := serverGarden.GetManifest() + if len(serverManifest.Files) != len(clientManifest.Files) { + t.Errorf("server has %d entries, want %d", len(serverManifest.Files), len(clientManifest.Files)) + } + + // Pull into a fresh garden. Backdate its manifest so the server is "newer". + freshDir := t.TempDir() + freshGarden, err := garden.Init(freshDir) + if err != nil { + t.Fatalf("init fresh garden: %v", err) + } + oldManifest := freshGarden.GetManifest() + oldManifest.Updated = oldManifest.Updated.Add(-2 * time.Hour) + if err := freshGarden.ReplaceManifest(oldManifest); err != nil { + t.Fatalf("backdate fresh manifest: %v", err) + } + + pulled, err := c.Pull(ctx, freshGarden) + if err != nil { + t.Fatalf("Pull: %v", err) + } + if pulled != 2 { + t.Errorf("pulled %d blobs, want 2", pulled) + } + + // Verify fresh garden has the correct manifest and blobs. + freshManifest := freshGarden.GetManifest() + if len(freshManifest.Files) != len(clientManifest.Files) { + t.Fatalf("fresh garden has %d entries, want %d", len(freshManifest.Files), len(clientManifest.Files)) + } + for _, e := range freshManifest.Files { + if e.Type == "file" && !freshGarden.BlobExists(e.Hash) { + t.Errorf("fresh garden missing blob for %s", e.Path) + } + } +} + +func TestPushServerNewer(t *testing.T) { + c, serverGarden, clientGarden := setupTest(t) + ctx := context.Background() + + // Make server newer by checkpointing it. + root := t.TempDir() + f := filepath.Join(root, "file") + if err := os.WriteFile(f, []byte("server file"), 0o644); err != nil { + t.Fatalf("writing file: %v", err) + } + if err := serverGarden.Add([]string{f}); err != nil { + t.Fatalf("server Add: %v", err) + } + if err := serverGarden.Checkpoint("server ahead"); err != nil { + t.Fatalf("server Checkpoint: %v", err) + } + + _, err := c.Push(ctx, clientGarden) + if !errors.Is(err, ErrServerNewer) { + t.Errorf("expected ErrServerNewer, got %v", err) + } +} + +func TestPushUpToDate(t *testing.T) { + c, _, clientGarden := setupTest(t) + ctx := context.Background() + + // Both gardens are freshly initialized with same timestamp (approximately). + // Push should return 0 blobs. + pushed, err := c.Push(ctx, clientGarden) + if err != nil { + t.Fatalf("Push: %v", err) + } + if pushed != 0 { + t.Errorf("pushed %d blobs, want 0 for up-to-date", pushed) + } +} + +func TestPullUpToDate(t *testing.T) { + c, _, clientGarden := setupTest(t) + ctx := context.Background() + + pulled, err := c.Pull(ctx, clientGarden) + if err != nil { + t.Fatalf("Pull: %v", err) + } + if pulled != 0 { + t.Errorf("pulled %d blobs, want 0 for up-to-date", pulled) + } +} + +func TestPrune(t *testing.T) { + c, serverGarden, _ := setupTest(t) + ctx := context.Background() + + // Write an orphan blob to the server. + _, err := serverGarden.WriteBlob([]byte("orphan")) + if err != nil { + t.Fatalf("WriteBlob: %v", err) + } + + removed, err := c.Prune(ctx) + if err != nil { + t.Fatalf("Prune: %v", err) + } + if removed != 1 { + t.Errorf("removed %d blobs, want 1", removed) + } +}