Files
mcp/internal/registry/components.go
Kyle Isom 6122123064 P1.1: Registry package with full CRUD and tests
SQLite schema (services, components, ports, volumes, cmd, events),
migrations, and complete CRUD operations. 7 tests covering:
idempotent migration, service CRUD, duplicate name rejection,
component CRUD with ports/volumes/cmd, composite PK enforcement,
cascade delete, and event insert/query/count/prune.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 11:18:35 -07:00

277 lines
8.6 KiB
Go

package registry
import (
"database/sql"
"fmt"
"time"
)
// Component represents a component in the registry.
type Component struct {
Name string
Service string
Image string
Network string
UserSpec string
Restart string
DesiredState string
ObservedState string
Version string
Ports []string
Volumes []string
Cmd []string
CreatedAt time.Time
UpdatedAt time.Time
}
// CreateComponent creates a new component in the registry.
func CreateComponent(db *sql.DB, c *Component) error {
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback() //nolint:errcheck
_, err = tx.Exec(`
INSERT INTO components (name, service, image, network, user_spec, restart, desired_state, observed_state, version)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
c.Name, c.Service, c.Image, c.Network, c.UserSpec, c.Restart,
c.DesiredState, c.ObservedState, c.Version,
)
if err != nil {
return fmt.Errorf("insert component %q/%q: %w", c.Service, c.Name, err)
}
if err := setPorts(tx, c.Service, c.Name, c.Ports); err != nil {
return err
}
if err := setVolumes(tx, c.Service, c.Name, c.Volumes); err != nil {
return err
}
if err := setCmd(tx, c.Service, c.Name, c.Cmd); err != nil {
return err
}
return tx.Commit()
}
// GetComponent retrieves a component by service and name.
func GetComponent(db *sql.DB, service, name string) (*Component, error) {
c := &Component{}
var createdAt, updatedAt string
err := db.QueryRow(`
SELECT name, service, image, network, user_spec, restart,
desired_state, observed_state, version, created_at, updated_at
FROM components WHERE service = ? AND name = ?`,
service, name,
).Scan(&c.Name, &c.Service, &c.Image, &c.Network, &c.UserSpec, &c.Restart,
&c.DesiredState, &c.ObservedState, &c.Version, &createdAt, &updatedAt)
if err != nil {
return nil, fmt.Errorf("get component %q/%q: %w", service, name, err)
}
c.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt)
c.UpdatedAt, _ = time.Parse("2006-01-02 15:04:05", updatedAt)
c.Ports, err = getPorts(db, service, name)
if err != nil {
return nil, err
}
c.Volumes, err = getVolumes(db, service, name)
if err != nil {
return nil, err
}
c.Cmd, err = getCmd(db, service, name)
if err != nil {
return nil, err
}
return c, nil
}
// ListComponents returns all components for a service.
func ListComponents(db *sql.DB, service string) ([]Component, error) {
rows, err := db.Query(`
SELECT name, service, image, network, user_spec, restart,
desired_state, observed_state, version, created_at, updated_at
FROM components WHERE service = ? ORDER BY name`,
service,
)
if err != nil {
return nil, fmt.Errorf("list components for %q: %w", service, err)
}
defer func() { _ = rows.Close() }()
var components []Component
for rows.Next() {
var c Component
var createdAt, updatedAt string
if err := rows.Scan(&c.Name, &c.Service, &c.Image, &c.Network, &c.UserSpec, &c.Restart,
&c.DesiredState, &c.ObservedState, &c.Version, &createdAt, &updatedAt); err != nil {
return nil, fmt.Errorf("scan component: %w", err)
}
c.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt)
c.UpdatedAt, _ = time.Parse("2006-01-02 15:04:05", updatedAt)
c.Ports, _ = getPorts(db, c.Service, c.Name)
c.Volumes, _ = getVolumes(db, c.Service, c.Name)
c.Cmd, _ = getCmd(db, c.Service, c.Name)
components = append(components, c)
}
return components, rows.Err()
}
// UpdateComponentState updates desired and/or observed state.
func UpdateComponentState(db *sql.DB, service, name, desiredState, observedState string) error {
res, err := db.Exec(`
UPDATE components
SET desired_state = CASE WHEN ? = '' THEN desired_state ELSE ? END,
observed_state = CASE WHEN ? = '' THEN observed_state ELSE ? END,
updated_at = datetime('now')
WHERE service = ? AND name = ?`,
desiredState, desiredState, observedState, observedState, service, name,
)
if err != nil {
return fmt.Errorf("update state %q/%q: %w", service, name, err)
}
n, _ := res.RowsAffected()
if n == 0 {
return fmt.Errorf("update state %q/%q: %w", service, name, sql.ErrNoRows)
}
return nil
}
// UpdateComponentSpec updates the full spec for an existing component.
func UpdateComponentSpec(db *sql.DB, c *Component) error {
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback() //nolint:errcheck
_, err = tx.Exec(`
UPDATE components
SET image = ?, network = ?, user_spec = ?, restart = ?, version = ?, updated_at = datetime('now')
WHERE service = ? AND name = ?`,
c.Image, c.Network, c.UserSpec, c.Restart, c.Version, c.Service, c.Name,
)
if err != nil {
return fmt.Errorf("update component %q/%q: %w", c.Service, c.Name, err)
}
if err := setPorts(tx, c.Service, c.Name, c.Ports); err != nil {
return err
}
if err := setVolumes(tx, c.Service, c.Name, c.Volumes); err != nil {
return err
}
if err := setCmd(tx, c.Service, c.Name, c.Cmd); err != nil {
return err
}
return tx.Commit()
}
// DeleteComponent removes a component.
func DeleteComponent(db *sql.DB, service, name string) error {
res, err := db.Exec("DELETE FROM components WHERE service = ? AND name = ?", service, name)
if err != nil {
return fmt.Errorf("delete component %q/%q: %w", service, name, err)
}
n, _ := res.RowsAffected()
if n == 0 {
return fmt.Errorf("delete component %q/%q: %w", service, name, sql.ErrNoRows)
}
return nil
}
// helper: set port mappings (delete + re-insert)
func setPorts(tx *sql.Tx, service, component string, ports []string) error {
if _, err := tx.Exec("DELETE FROM component_ports WHERE service = ? AND component = ?", service, component); err != nil {
return fmt.Errorf("clear ports %q/%q: %w", service, component, err)
}
for _, p := range ports {
if _, err := tx.Exec("INSERT INTO component_ports (service, component, mapping) VALUES (?, ?, ?)", service, component, p); err != nil {
return fmt.Errorf("insert port %q/%q %q: %w", service, component, p, err)
}
}
return nil
}
func getPorts(db *sql.DB, service, component string) ([]string, error) {
rows, err := db.Query("SELECT mapping FROM component_ports WHERE service = ? AND component = ? ORDER BY mapping", service, component)
if err != nil {
return nil, fmt.Errorf("get ports %q/%q: %w", service, component, err)
}
defer func() { _ = rows.Close() }()
var ports []string
for rows.Next() {
var p string
if err := rows.Scan(&p); err != nil {
return nil, err
}
ports = append(ports, p)
}
return ports, rows.Err()
}
// helper: set volume mappings
func setVolumes(tx *sql.Tx, service, component string, volumes []string) error {
if _, err := tx.Exec("DELETE FROM component_volumes WHERE service = ? AND component = ?", service, component); err != nil {
return fmt.Errorf("clear volumes %q/%q: %w", service, component, err)
}
for _, v := range volumes {
if _, err := tx.Exec("INSERT INTO component_volumes (service, component, mapping) VALUES (?, ?, ?)", service, component, v); err != nil {
return fmt.Errorf("insert volume %q/%q %q: %w", service, component, v, err)
}
}
return nil
}
func getVolumes(db *sql.DB, service, component string) ([]string, error) {
rows, err := db.Query("SELECT mapping FROM component_volumes WHERE service = ? AND component = ? ORDER BY mapping", service, component)
if err != nil {
return nil, fmt.Errorf("get volumes %q/%q: %w", service, component, err)
}
defer func() { _ = rows.Close() }()
var volumes []string
for rows.Next() {
var v string
if err := rows.Scan(&v); err != nil {
return nil, err
}
volumes = append(volumes, v)
}
return volumes, rows.Err()
}
// helper: set command args
func setCmd(tx *sql.Tx, service, component string, cmd []string) error {
if _, err := tx.Exec("DELETE FROM component_cmd WHERE service = ? AND component = ?", service, component); err != nil {
return fmt.Errorf("clear cmd %q/%q: %w", service, component, err)
}
for i, arg := range cmd {
if _, err := tx.Exec("INSERT INTO component_cmd (service, component, position, arg) VALUES (?, ?, ?, ?)", service, component, i, arg); err != nil {
return fmt.Errorf("insert cmd %q/%q [%d]: %w", service, component, i, err)
}
}
return nil
}
func getCmd(db *sql.DB, service, component string) ([]string, error) {
rows, err := db.Query("SELECT arg FROM component_cmd WHERE service = ? AND component = ? ORDER BY position", service, component)
if err != nil {
return nil, fmt.Errorf("get cmd %q/%q: %w", service, component, err)
}
defer func() { _ = rows.Close() }()
var cmd []string
for rows.Next() {
var arg string
if err := rows.Scan(&arg); err != nil {
return nil, err
}
cmd = append(cmd, arg)
}
return cmd, rows.Err()
}