Build the complete artifact pillar with five packages: - artifacts: Artifact, Snapshot, Citation, Publisher types with Get/Store DB methods, tag/category management, metadata ops, YAML import - blob: content-addressable store (SHA256, hierarchical dir layout) - proto: protobuf definitions (common.proto, artifacts.proto) with buf linting and code generation - server: gRPC ArtifactService implementation (create/get artifacts, store/retrieve blobs, manage tags/categories, search by tag) All FK insertion ordering is correct (parent rows before children). Full test coverage across artifacts, blob, and server packages. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
435 lines
12 KiB
Go
435 lines
12 KiB
Go
// Package server implements the gRPC service for the exo system.
|
|
package server
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"time"
|
|
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
|
|
"git.wntrmute.dev/kyle/exo/artifacts"
|
|
"git.wntrmute.dev/kyle/exo/blob"
|
|
"git.wntrmute.dev/kyle/exo/core"
|
|
"git.wntrmute.dev/kyle/exo/db"
|
|
pb "git.wntrmute.dev/kyle/exo/proto/exo/v1"
|
|
)
|
|
|
|
// ArtifactServer implements the ArtifactService gRPC service.
|
|
type ArtifactServer struct {
|
|
pb.UnimplementedArtifactServiceServer
|
|
database *sql.DB
|
|
blobs *blob.Store
|
|
}
|
|
|
|
// NewArtifactServer creates a new ArtifactServer.
|
|
func NewArtifactServer(database *sql.DB, blobs *blob.Store) *ArtifactServer {
|
|
return &ArtifactServer{database: database, blobs: blobs}
|
|
}
|
|
|
|
func (s *ArtifactServer) CreateArtifact(ctx context.Context, req *pb.CreateArtifactRequest) (*pb.CreateArtifactResponse, error) {
|
|
if req.Artifact == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "artifact is required")
|
|
}
|
|
|
|
art, snaps, err := protoToArtifact(req.Artifact)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "invalid artifact: %v", err)
|
|
}
|
|
|
|
tx, err := db.StartTX(ctx, s.database)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to start transaction: %v", err)
|
|
}
|
|
|
|
// Create tags and categories idempotently.
|
|
for tag := range art.Tags {
|
|
if err := artifacts.CreateTag(ctx, tx, tag); err != nil {
|
|
_ = tx.Rollback()
|
|
return nil, status.Errorf(codes.Internal, "failed to create tag: %v", err)
|
|
}
|
|
}
|
|
for cat := range art.Categories {
|
|
if err := artifacts.CreateCategory(ctx, tx, cat); err != nil {
|
|
_ = tx.Rollback()
|
|
return nil, status.Errorf(codes.Internal, "failed to create category: %v", err)
|
|
}
|
|
}
|
|
|
|
if err := art.Store(ctx, tx); err != nil {
|
|
_ = tx.Rollback()
|
|
return nil, status.Errorf(codes.Internal, "failed to store artifact: %v", err)
|
|
}
|
|
|
|
for _, snap := range snaps {
|
|
if err := snap.Store(ctx, tx, s.blobs); err != nil {
|
|
_ = tx.Rollback()
|
|
return nil, status.Errorf(codes.Internal, "failed to store snapshot: %v", err)
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to commit: %v", err)
|
|
}
|
|
|
|
return &pb.CreateArtifactResponse{Id: art.ID}, nil
|
|
}
|
|
|
|
func (s *ArtifactServer) GetArtifact(ctx context.Context, req *pb.GetArtifactRequest) (*pb.GetArtifactResponse, error) {
|
|
if req.Id == "" {
|
|
return nil, status.Error(codes.InvalidArgument, "id is required")
|
|
}
|
|
|
|
tx, err := db.StartTX(ctx, s.database)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to start transaction: %v", err)
|
|
}
|
|
|
|
art := &artifacts.Artifact{ID: req.Id}
|
|
if err := art.Get(ctx, tx); err != nil {
|
|
_ = tx.Rollback()
|
|
return nil, status.Errorf(codes.NotFound, "artifact not found: %v", err)
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to commit: %v", err)
|
|
}
|
|
|
|
return &pb.GetArtifactResponse{Artifact: artifactToProto(art)}, nil
|
|
}
|
|
|
|
func (s *ArtifactServer) DeleteArtifact(_ context.Context, _ *pb.DeleteArtifactRequest) (*pb.DeleteArtifactResponse, error) {
|
|
return nil, status.Error(codes.Unimplemented, "delete not yet implemented")
|
|
}
|
|
|
|
func (s *ArtifactServer) StoreBlob(ctx context.Context, req *pb.StoreBlobRequest) (*pb.StoreBlobResponse, error) {
|
|
if len(req.Data) == 0 {
|
|
return nil, status.Error(codes.InvalidArgument, "data is required")
|
|
}
|
|
|
|
id, err := s.blobs.Write(req.Data)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to write blob: %v", err)
|
|
}
|
|
|
|
if req.SnapshotId != "" {
|
|
tx, err := db.StartTX(ctx, s.database)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to start transaction: %v", err)
|
|
}
|
|
b := &artifacts.BlobRef{
|
|
SnapshotID: req.SnapshotId,
|
|
ID: id,
|
|
Format: artifacts.MIME(req.Format),
|
|
}
|
|
if err := b.Store(ctx, tx, nil); err != nil {
|
|
_ = tx.Rollback()
|
|
return nil, status.Errorf(codes.Internal, "failed to store blob ref: %v", err)
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to commit: %v", err)
|
|
}
|
|
}
|
|
|
|
return &pb.StoreBlobResponse{Id: id}, nil
|
|
}
|
|
|
|
func (s *ArtifactServer) GetBlob(_ context.Context, req *pb.GetBlobRequest) (*pb.GetBlobResponse, error) {
|
|
if req.Id == "" {
|
|
return nil, status.Error(codes.InvalidArgument, "id is required")
|
|
}
|
|
|
|
data, err := s.blobs.Read(req.Id)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.NotFound, "blob not found: %v", err)
|
|
}
|
|
|
|
return &pb.GetBlobResponse{Data: data}, nil
|
|
}
|
|
|
|
func (s *ArtifactServer) ListTags(ctx context.Context, _ *pb.ListTagsRequest) (*pb.ListTagsResponse, error) {
|
|
tx, err := db.StartTX(ctx, s.database)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to start transaction: %v", err)
|
|
}
|
|
|
|
tags, err := artifacts.GetAllTags(ctx, tx)
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
return nil, status.Errorf(codes.Internal, "failed to get tags: %v", err)
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to commit: %v", err)
|
|
}
|
|
|
|
return &pb.ListTagsResponse{Tags: tags}, nil
|
|
}
|
|
|
|
func (s *ArtifactServer) CreateTag(ctx context.Context, req *pb.CreateTagRequest) (*pb.CreateTagResponse, error) {
|
|
if req.Tag == "" {
|
|
return nil, status.Error(codes.InvalidArgument, "tag is required")
|
|
}
|
|
|
|
tx, err := db.StartTX(ctx, s.database)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to start transaction: %v", err)
|
|
}
|
|
|
|
if err := artifacts.CreateTag(ctx, tx, req.Tag); err != nil {
|
|
_ = tx.Rollback()
|
|
return nil, status.Errorf(codes.Internal, "failed to create tag: %v", err)
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to commit: %v", err)
|
|
}
|
|
|
|
return &pb.CreateTagResponse{}, nil
|
|
}
|
|
|
|
func (s *ArtifactServer) ListCategories(ctx context.Context, _ *pb.ListCategoriesRequest) (*pb.ListCategoriesResponse, error) {
|
|
tx, err := db.StartTX(ctx, s.database)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to start transaction: %v", err)
|
|
}
|
|
|
|
cats, err := artifacts.GetAllCategories(ctx, tx)
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
return nil, status.Errorf(codes.Internal, "failed to get categories: %v", err)
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to commit: %v", err)
|
|
}
|
|
|
|
return &pb.ListCategoriesResponse{Categories: cats}, nil
|
|
}
|
|
|
|
func (s *ArtifactServer) CreateCategory(ctx context.Context, req *pb.CreateCategoryRequest) (*pb.CreateCategoryResponse, error) {
|
|
if req.Category == "" {
|
|
return nil, status.Error(codes.InvalidArgument, "category is required")
|
|
}
|
|
|
|
tx, err := db.StartTX(ctx, s.database)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to start transaction: %v", err)
|
|
}
|
|
|
|
if err := artifacts.CreateCategory(ctx, tx, req.Category); err != nil {
|
|
_ = tx.Rollback()
|
|
return nil, status.Errorf(codes.Internal, "failed to create category: %v", err)
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to commit: %v", err)
|
|
}
|
|
|
|
return &pb.CreateCategoryResponse{}, nil
|
|
}
|
|
|
|
func (s *ArtifactServer) SearchByTag(ctx context.Context, req *pb.SearchByTagRequest) (*pb.SearchByTagResponse, error) {
|
|
if req.Tag == "" {
|
|
return nil, status.Error(codes.InvalidArgument, "tag is required")
|
|
}
|
|
|
|
tx, err := db.StartTX(ctx, s.database)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to start transaction: %v", err)
|
|
}
|
|
|
|
ids, err := artifacts.GetArtifactIDsForTag(ctx, tx, req.Tag)
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
return nil, status.Errorf(codes.Internal, "failed to search by tag: %v", err)
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to commit: %v", err)
|
|
}
|
|
|
|
return &pb.SearchByTagResponse{ArtifactIds: ids}, nil
|
|
}
|
|
|
|
// --- Conversion helpers ---
|
|
|
|
func protoToArtifact(p *pb.Artifact) (*artifacts.Artifact, []*artifacts.Snapshot, error) {
|
|
if p.Id == "" {
|
|
p.Id = core.NewUUID()
|
|
}
|
|
|
|
cite := protoCitationToDomain(p.Citation)
|
|
|
|
art := &artifacts.Artifact{
|
|
ID: p.Id,
|
|
Type: artifacts.ArtifactType(p.Type),
|
|
Citation: cite,
|
|
History: map[time.Time]string{},
|
|
Tags: core.MapFromList(p.Tags),
|
|
Categories: core.MapFromList(p.Categories),
|
|
Metadata: protoMetadataToDomain(p.Metadata),
|
|
}
|
|
|
|
if p.Latest != "" {
|
|
var err error
|
|
art.Latest, err = db.FromDBTime(p.Latest, nil)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("invalid latest time: %w", err)
|
|
}
|
|
} else {
|
|
art.Latest = time.Now().UTC()
|
|
}
|
|
|
|
var snaps []*artifacts.Snapshot
|
|
for _, sp := range p.Snapshots {
|
|
snap := protoSnapshotToDomain(sp, p.Id, cite)
|
|
art.History[snap.Datetime] = snap.ID
|
|
snaps = append(snaps, snap)
|
|
}
|
|
|
|
return art, snaps, nil
|
|
}
|
|
|
|
func protoCitationToDomain(p *pb.Citation) *artifacts.Citation {
|
|
if p == nil {
|
|
return &artifacts.Citation{
|
|
ID: core.NewUUID(),
|
|
Metadata: core.Metadata{},
|
|
}
|
|
}
|
|
|
|
cite := &artifacts.Citation{
|
|
ID: p.Id,
|
|
DOI: p.Doi,
|
|
Title: p.Title,
|
|
Year: int(p.Year),
|
|
Authors: p.Authors,
|
|
Source: p.Source,
|
|
Abstract: p.Abstract,
|
|
Metadata: protoMetadataToDomain(p.Metadata),
|
|
}
|
|
if cite.ID == "" {
|
|
cite.ID = core.NewUUID()
|
|
}
|
|
|
|
if p.Published != "" {
|
|
t, err := db.FromDBTime(p.Published, nil)
|
|
if err == nil {
|
|
cite.Published = t
|
|
}
|
|
}
|
|
|
|
if p.Publisher != nil {
|
|
cite.Publisher = &artifacts.Publisher{
|
|
ID: p.Publisher.Id,
|
|
Name: p.Publisher.Name,
|
|
Address: p.Publisher.Address,
|
|
}
|
|
}
|
|
|
|
return cite
|
|
}
|
|
|
|
func protoSnapshotToDomain(p *pb.Snapshot, artifactID string, parentCite *artifacts.Citation) *artifacts.Snapshot {
|
|
snap := &artifacts.Snapshot{
|
|
ArtifactID: artifactID,
|
|
ID: p.Id,
|
|
StoreDate: time.Unix(p.StoredAt, 0),
|
|
Source: p.Source,
|
|
Blobs: map[artifacts.MIME]*artifacts.BlobRef{},
|
|
Metadata: protoMetadataToDomain(p.Metadata),
|
|
}
|
|
if snap.ID == "" {
|
|
snap.ID = core.NewUUID()
|
|
}
|
|
|
|
if p.Datetime != "" {
|
|
t, err := db.FromDBTime(p.Datetime, nil)
|
|
if err == nil {
|
|
snap.Datetime = t
|
|
}
|
|
}
|
|
|
|
if p.Citation != nil {
|
|
snap.Citation = protoCitationToDomain(p.Citation)
|
|
} else {
|
|
snap.Citation = parentCite
|
|
}
|
|
|
|
for _, b := range p.Blobs {
|
|
ref := &artifacts.BlobRef{
|
|
SnapshotID: snap.ID,
|
|
ID: b.Id,
|
|
Format: artifacts.MIME(b.Format),
|
|
}
|
|
snap.Blobs[ref.Format] = ref
|
|
}
|
|
|
|
return snap
|
|
}
|
|
|
|
func artifactToProto(art *artifacts.Artifact) *pb.Artifact {
|
|
p := &pb.Artifact{
|
|
Id: art.ID,
|
|
Type: string(art.Type),
|
|
Latest: db.ToDBTime(art.Latest),
|
|
Tags: core.ListFromMap(art.Tags),
|
|
Categories: core.ListFromMap(art.Categories),
|
|
Metadata: domainMetadataToProto(art.Metadata),
|
|
}
|
|
|
|
if art.Citation != nil {
|
|
p.Citation = domainCitationToProto(art.Citation)
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func domainCitationToProto(c *artifacts.Citation) *pb.Citation {
|
|
p := &pb.Citation{
|
|
Id: c.ID,
|
|
Doi: c.DOI,
|
|
Title: c.Title,
|
|
Year: int32(c.Year), //nolint:gosec // year values are always small
|
|
Published: db.ToDBTime(c.Published),
|
|
Authors: c.Authors,
|
|
Source: c.Source,
|
|
Abstract: c.Abstract,
|
|
Metadata: domainMetadataToProto(c.Metadata),
|
|
}
|
|
|
|
if c.Publisher != nil {
|
|
p.Publisher = &pb.Publisher{
|
|
Id: c.Publisher.ID,
|
|
Name: c.Publisher.Name,
|
|
Address: c.Publisher.Address,
|
|
}
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func protoMetadataToDomain(entries []*pb.MetadataEntry) core.Metadata {
|
|
m := core.Metadata{}
|
|
for _, e := range entries {
|
|
if e.Value != nil {
|
|
m[e.Key] = core.Value{Contents: e.Value.Contents, Type: e.Value.Type}
|
|
}
|
|
}
|
|
return m
|
|
}
|
|
|
|
func domainMetadataToProto(m core.Metadata) []*pb.MetadataEntry {
|
|
entries := make([]*pb.MetadataEntry, 0, len(m))
|
|
for k, v := range m {
|
|
entries = append(entries, &pb.MetadataEntry{
|
|
Key: k,
|
|
Value: &pb.Value{Contents: v.Contents, Type: v.Type},
|
|
})
|
|
}
|
|
return entries
|
|
}
|