Add master database with nodes, placements, and edge_routes

New internal/masterdb/ package for mcp-master cluster state. Separate
from the agent's registry because the schemas are fundamentally
different (cluster-wide placement vs node-local containers).

Tables: nodes, placements, edge_routes. Full CRUD with tests.
Follows the same Open/migrate pattern as internal/registry/.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-02 15:26:04 -07:00
parent 78890ed76a
commit 3c0b55f9f8
5 changed files with 588 additions and 0 deletions

106
internal/masterdb/db.go Normal file
View File

@@ -0,0 +1,106 @@
// Package masterdb provides the SQLite database for the mcp-master daemon.
// It stores the cluster-wide node registry, service placements, and edge routes.
// This is separate from the agent's registry (internal/registry/) because the
// master and agent have fundamentally different schemas.
package masterdb
import (
"database/sql"
"fmt"
_ "modernc.org/sqlite"
)
// Open opens the master database at the given path and runs migrations.
func Open(path string) (*sql.DB, error) {
db, err := sql.Open("sqlite", path)
if err != nil {
return nil, fmt.Errorf("open database: %w", err)
}
for _, pragma := range []string{
"PRAGMA journal_mode = WAL",
"PRAGMA foreign_keys = ON",
"PRAGMA busy_timeout = 5000",
} {
if _, err := db.Exec(pragma); err != nil {
_ = db.Close()
return nil, fmt.Errorf("exec %q: %w", pragma, err)
}
}
if err := migrate(db); err != nil {
_ = db.Close()
return nil, fmt.Errorf("migrate: %w", err)
}
return db, nil
}
func migrate(db *sql.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TEXT NOT NULL DEFAULT (datetime('now'))
);
`)
if err != nil {
return fmt.Errorf("create migrations table: %w", err)
}
for i, m := range migrations {
version := i + 1
var count int
if err := db.QueryRow("SELECT COUNT(*) FROM schema_migrations WHERE version = ?", version).Scan(&count); err != nil {
return fmt.Errorf("check migration %d: %w", version, err)
}
if count > 0 {
continue
}
if _, err := db.Exec(m); err != nil {
return fmt.Errorf("run migration %d: %w", version, err)
}
if _, err := db.Exec("INSERT INTO schema_migrations (version) VALUES (?)", version); err != nil {
return fmt.Errorf("record migration %d: %w", version, err)
}
}
return nil
}
var migrations = []string{
// Migration 1: cluster state
`
CREATE TABLE IF NOT EXISTS nodes (
name TEXT PRIMARY KEY,
address TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'worker',
arch TEXT NOT NULL DEFAULT 'amd64',
status TEXT NOT NULL DEFAULT 'unknown',
containers INTEGER NOT NULL DEFAULT 0,
last_heartbeat TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS placements (
service_name TEXT PRIMARY KEY,
node TEXT NOT NULL REFERENCES nodes(name),
tier TEXT NOT NULL DEFAULT 'worker',
deployed_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS edge_routes (
hostname TEXT PRIMARY KEY,
service_name TEXT NOT NULL,
edge_node TEXT NOT NULL REFERENCES nodes(name),
backend_hostname TEXT NOT NULL,
backend_port INTEGER NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_edge_routes_service
ON edge_routes(service_name);
`,
}

View File

@@ -0,0 +1,185 @@
package masterdb
import (
"database/sql"
"path/filepath"
"testing"
)
func openTestDB(t *testing.T) *sql.DB {
t.Helper()
path := filepath.Join(t.TempDir(), "test.db")
db, err := Open(path)
if err != nil {
t.Fatalf("Open: %v", err)
}
t.Cleanup(func() { _ = db.Close() })
return db
}
func TestOpenAndMigrate(t *testing.T) {
openTestDB(t)
}
func TestNodeCRUD(t *testing.T) {
db := openTestDB(t)
if err := UpsertNode(db, "rift", "100.95.252.120:9444", "master", "amd64"); err != nil {
t.Fatalf("UpsertNode: %v", err)
}
if err := UpsertNode(db, "svc", "100.106.232.4:9555", "edge", "amd64"); err != nil {
t.Fatalf("UpsertNode: %v", err)
}
if err := UpsertNode(db, "orion", "100.1.2.3:9444", "worker", "amd64"); err != nil {
t.Fatalf("UpsertNode: %v", err)
}
// Get.
n, err := GetNode(db, "rift")
if err != nil {
t.Fatalf("GetNode: %v", err)
}
if n == nil || n.Address != "100.95.252.120:9444" {
t.Errorf("GetNode(rift) = %+v", n)
}
// Get nonexistent.
n, err = GetNode(db, "nonexistent")
if err != nil {
t.Fatalf("GetNode: %v", err)
}
if n != nil {
t.Errorf("expected nil for nonexistent node")
}
// List all.
nodes, err := ListNodes(db)
if err != nil {
t.Fatalf("ListNodes: %v", err)
}
if len(nodes) != 3 {
t.Errorf("ListNodes: got %d, want 3", len(nodes))
}
// List workers (includes master role).
workers, err := ListWorkerNodes(db)
if err != nil {
t.Fatalf("ListWorkerNodes: %v", err)
}
if len(workers) != 2 {
t.Errorf("ListWorkerNodes: got %d, want 2 (rift+orion)", len(workers))
}
// List edge.
edges, err := ListEdgeNodes(db)
if err != nil {
t.Fatalf("ListEdgeNodes: %v", err)
}
if len(edges) != 1 || edges[0].Name != "svc" {
t.Errorf("ListEdgeNodes: got %v", edges)
}
// Update status.
if err := UpdateNodeStatus(db, "rift", "healthy"); err != nil {
t.Fatalf("UpdateNodeStatus: %v", err)
}
n, _ = GetNode(db, "rift")
if n.Status != "healthy" {
t.Errorf("status = %q, want healthy", n.Status)
}
}
func TestPlacementCRUD(t *testing.T) {
db := openTestDB(t)
_ = UpsertNode(db, "rift", "100.95.252.120:9444", "master", "amd64")
_ = UpsertNode(db, "orion", "100.1.2.3:9444", "worker", "amd64")
if err := CreatePlacement(db, "mcq", "rift", "worker"); err != nil {
t.Fatalf("CreatePlacement: %v", err)
}
if err := CreatePlacement(db, "mcdoc", "orion", "worker"); err != nil {
t.Fatalf("CreatePlacement: %v", err)
}
p, err := GetPlacement(db, "mcq")
if err != nil {
t.Fatalf("GetPlacement: %v", err)
}
if p == nil || p.Node != "rift" {
t.Errorf("GetPlacement(mcq) = %+v", p)
}
p, _ = GetPlacement(db, "nonexistent")
if p != nil {
t.Errorf("expected nil for nonexistent placement")
}
counts, err := CountPlacementsPerNode(db)
if err != nil {
t.Fatalf("CountPlacementsPerNode: %v", err)
}
if counts["rift"] != 1 || counts["orion"] != 1 {
t.Errorf("counts = %v", counts)
}
placements, err := ListPlacements(db)
if err != nil {
t.Fatalf("ListPlacements: %v", err)
}
if len(placements) != 2 {
t.Errorf("ListPlacements: got %d", len(placements))
}
if err := DeletePlacement(db, "mcq"); err != nil {
t.Fatalf("DeletePlacement: %v", err)
}
p, _ = GetPlacement(db, "mcq")
if p != nil {
t.Errorf("expected nil after delete")
}
}
func TestEdgeRouteCRUD(t *testing.T) {
db := openTestDB(t)
_ = UpsertNode(db, "svc", "100.106.232.4:9555", "edge", "amd64")
if err := CreateEdgeRoute(db, "mcq.metacircular.net", "mcq", "svc", "mcq.svc.mcp.metacircular.net", 8443); err != nil {
t.Fatalf("CreateEdgeRoute: %v", err)
}
if err := CreateEdgeRoute(db, "docs.metacircular.net", "mcdoc", "svc", "mcdoc.svc.mcp.metacircular.net", 443); err != nil {
t.Fatalf("CreateEdgeRoute: %v", err)
}
routes, err := ListEdgeRoutes(db)
if err != nil {
t.Fatalf("ListEdgeRoutes: %v", err)
}
if len(routes) != 2 {
t.Errorf("ListEdgeRoutes: got %d", len(routes))
}
routes, err = ListEdgeRoutesForService(db, "mcq")
if err != nil {
t.Fatalf("ListEdgeRoutesForService: %v", err)
}
if len(routes) != 1 || routes[0].Hostname != "mcq.metacircular.net" {
t.Errorf("ListEdgeRoutesForService(mcq) = %v", routes)
}
if err := DeleteEdgeRoute(db, "mcq.metacircular.net"); err != nil {
t.Fatalf("DeleteEdgeRoute: %v", err)
}
routes, _ = ListEdgeRoutes(db)
if len(routes) != 1 {
t.Errorf("expected 1 route after delete, got %d", len(routes))
}
_ = CreateEdgeRoute(db, "docs2.metacircular.net", "mcdoc", "svc", "mcdoc.svc.mcp.metacircular.net", 443)
if err := DeleteEdgeRoutesForService(db, "mcdoc"); err != nil {
t.Fatalf("DeleteEdgeRoutesForService: %v", err)
}
routes, _ = ListEdgeRoutes(db)
if len(routes) != 0 {
t.Errorf("expected 0 routes after service delete, got %d", len(routes))
}
}

View File

@@ -0,0 +1,95 @@
package masterdb
import (
"database/sql"
"fmt"
"time"
)
// EdgeRoute records a public route managed by the master.
type EdgeRoute struct {
Hostname string
ServiceName string
EdgeNode string
BackendHostname string
BackendPort int
CreatedAt time.Time
}
// CreateEdgeRoute inserts or replaces an edge route record.
func CreateEdgeRoute(db *sql.DB, hostname, serviceName, edgeNode, backendHostname string, backendPort int) error {
_, err := db.Exec(`
INSERT INTO edge_routes (hostname, service_name, edge_node, backend_hostname, backend_port, created_at)
VALUES (?, ?, ?, ?, ?, datetime('now'))
ON CONFLICT(hostname) DO UPDATE SET
service_name = excluded.service_name,
edge_node = excluded.edge_node,
backend_hostname = excluded.backend_hostname,
backend_port = excluded.backend_port
`, hostname, serviceName, edgeNode, backendHostname, backendPort)
if err != nil {
return fmt.Errorf("create edge route %s: %w", hostname, err)
}
return nil
}
// ListEdgeRoutes returns all edge routes.
func ListEdgeRoutes(db *sql.DB) ([]*EdgeRoute, error) {
return queryEdgeRoutes(db, `SELECT hostname, service_name, edge_node, backend_hostname, backend_port, created_at FROM edge_routes ORDER BY hostname`)
}
// ListEdgeRoutesForService returns edge routes for a specific service.
func ListEdgeRoutesForService(db *sql.DB, serviceName string) ([]*EdgeRoute, error) {
rows, err := db.Query(`
SELECT hostname, service_name, edge_node, backend_hostname, backend_port, created_at
FROM edge_routes WHERE service_name = ? ORDER BY hostname
`, serviceName)
if err != nil {
return nil, fmt.Errorf("list edge routes for %s: %w", serviceName, err)
}
defer func() { _ = rows.Close() }()
return scanEdgeRoutes(rows)
}
// DeleteEdgeRoute removes a single edge route by hostname.
func DeleteEdgeRoute(db *sql.DB, hostname string) error {
_, err := db.Exec(`DELETE FROM edge_routes WHERE hostname = ?`, hostname)
if err != nil {
return fmt.Errorf("delete edge route %s: %w", hostname, err)
}
return nil
}
// DeleteEdgeRoutesForService removes all edge routes for a service.
func DeleteEdgeRoutesForService(db *sql.DB, serviceName string) error {
_, err := db.Exec(`DELETE FROM edge_routes WHERE service_name = ?`, serviceName)
if err != nil {
return fmt.Errorf("delete edge routes for %s: %w", serviceName, err)
}
return nil
}
func queryEdgeRoutes(db *sql.DB, query string) ([]*EdgeRoute, error) {
rows, err := db.Query(query)
if err != nil {
return nil, fmt.Errorf("query edge routes: %w", err)
}
defer func() { _ = rows.Close() }()
return scanEdgeRoutes(rows)
}
func scanEdgeRoutes(rows *sql.Rows) ([]*EdgeRoute, error) {
var routes []*EdgeRoute
for rows.Next() {
var r EdgeRoute
var createdAt string
if err := rows.Scan(&r.Hostname, &r.ServiceName, &r.EdgeNode, &r.BackendHostname, &r.BackendPort, &createdAt); err != nil {
return nil, fmt.Errorf("scan edge route: %w", err)
}
r.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt)
routes = append(routes, &r)
}
return routes, rows.Err()
}

103
internal/masterdb/nodes.go Normal file
View File

@@ -0,0 +1,103 @@
package masterdb
import (
"database/sql"
"fmt"
"time"
)
// Node represents a registered node in the cluster.
type Node struct {
Name string
Address string
Role string
Arch string
Status string
Containers int
LastHeartbeat *time.Time
}
// UpsertNode inserts or updates a node in the registry.
func UpsertNode(db *sql.DB, name, address, role, arch string) error {
_, err := db.Exec(`
INSERT INTO nodes (name, address, role, arch, updated_at)
VALUES (?, ?, ?, ?, datetime('now'))
ON CONFLICT(name) DO UPDATE SET
address = excluded.address,
role = excluded.role,
arch = excluded.arch,
updated_at = datetime('now')
`, name, address, role, arch)
if err != nil {
return fmt.Errorf("upsert node %s: %w", name, err)
}
return nil
}
// GetNode returns a single node by name.
func GetNode(db *sql.DB, name string) (*Node, error) {
var n Node
var lastHB sql.NullString
err := db.QueryRow(`
SELECT name, address, role, arch, status, containers, last_heartbeat
FROM nodes WHERE name = ?
`, name).Scan(&n.Name, &n.Address, &n.Role, &n.Arch, &n.Status, &n.Containers, &lastHB)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("get node %s: %w", name, err)
}
if lastHB.Valid {
t, _ := time.Parse("2006-01-02 15:04:05", lastHB.String)
n.LastHeartbeat = &t
}
return &n, nil
}
// ListNodes returns all nodes.
func ListNodes(db *sql.DB) ([]*Node, error) {
return queryNodes(db, `SELECT name, address, role, arch, status, containers, last_heartbeat FROM nodes ORDER BY name`)
}
// ListWorkerNodes returns nodes with role "worker" or "master" (master is also a worker).
func ListWorkerNodes(db *sql.DB) ([]*Node, error) {
return queryNodes(db, `SELECT name, address, role, arch, status, containers, last_heartbeat FROM nodes WHERE role IN ('worker', 'master') ORDER BY name`)
}
// ListEdgeNodes returns nodes with role "edge".
func ListEdgeNodes(db *sql.DB) ([]*Node, error) {
return queryNodes(db, `SELECT name, address, role, arch, status, containers, last_heartbeat FROM nodes WHERE role = 'edge' ORDER BY name`)
}
func queryNodes(db *sql.DB, query string) ([]*Node, error) {
rows, err := db.Query(query)
if err != nil {
return nil, fmt.Errorf("query nodes: %w", err)
}
defer func() { _ = rows.Close() }()
var nodes []*Node
for rows.Next() {
var n Node
var lastHB sql.NullString
if err := rows.Scan(&n.Name, &n.Address, &n.Role, &n.Arch, &n.Status, &n.Containers, &lastHB); err != nil {
return nil, fmt.Errorf("scan node: %w", err)
}
if lastHB.Valid {
t, _ := time.Parse("2006-01-02 15:04:05", lastHB.String)
n.LastHeartbeat = &t
}
nodes = append(nodes, &n)
}
return nodes, rows.Err()
}
// UpdateNodeStatus updates a node's status field.
func UpdateNodeStatus(db *sql.DB, name, status string) error {
_, err := db.Exec(`UPDATE nodes SET status = ?, updated_at = datetime('now') WHERE name = ?`, status, name)
if err != nil {
return fmt.Errorf("update node status %s: %w", name, err)
}
return nil
}

View File

@@ -0,0 +1,99 @@
package masterdb
import (
"database/sql"
"fmt"
"time"
)
// Placement records which node hosts which service.
type Placement struct {
ServiceName string
Node string
Tier string
DeployedAt time.Time
}
// CreatePlacement inserts or replaces a placement record.
func CreatePlacement(db *sql.DB, serviceName, node, tier string) error {
_, err := db.Exec(`
INSERT INTO placements (service_name, node, tier, deployed_at)
VALUES (?, ?, ?, datetime('now'))
ON CONFLICT(service_name) DO UPDATE SET
node = excluded.node,
tier = excluded.tier,
deployed_at = datetime('now')
`, serviceName, node, tier)
if err != nil {
return fmt.Errorf("create placement %s: %w", serviceName, err)
}
return nil
}
// GetPlacement returns the placement for a service.
func GetPlacement(db *sql.DB, serviceName string) (*Placement, error) {
var p Placement
var deployedAt string
err := db.QueryRow(`
SELECT service_name, node, tier, deployed_at
FROM placements WHERE service_name = ?
`, serviceName).Scan(&p.ServiceName, &p.Node, &p.Tier, &deployedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("get placement %s: %w", serviceName, err)
}
p.DeployedAt, _ = time.Parse("2006-01-02 15:04:05", deployedAt)
return &p, nil
}
// ListPlacements returns all placements.
func ListPlacements(db *sql.DB) ([]*Placement, error) {
rows, err := db.Query(`SELECT service_name, node, tier, deployed_at FROM placements ORDER BY service_name`)
if err != nil {
return nil, fmt.Errorf("list placements: %w", err)
}
defer func() { _ = rows.Close() }()
var placements []*Placement
for rows.Next() {
var p Placement
var deployedAt string
if err := rows.Scan(&p.ServiceName, &p.Node, &p.Tier, &deployedAt); err != nil {
return nil, fmt.Errorf("scan placement: %w", err)
}
p.DeployedAt, _ = time.Parse("2006-01-02 15:04:05", deployedAt)
placements = append(placements, &p)
}
return placements, rows.Err()
}
// DeletePlacement removes a placement record.
func DeletePlacement(db *sql.DB, serviceName string) error {
_, err := db.Exec(`DELETE FROM placements WHERE service_name = ?`, serviceName)
if err != nil {
return fmt.Errorf("delete placement %s: %w", serviceName, err)
}
return nil
}
// CountPlacementsPerNode returns a map of node name → number of placed services.
func CountPlacementsPerNode(db *sql.DB) (map[string]int, error) {
rows, err := db.Query(`SELECT node, COUNT(*) FROM placements GROUP BY node`)
if err != nil {
return nil, fmt.Errorf("count placements: %w", err)
}
defer func() { _ = rows.Close() }()
counts := make(map[string]int)
for rows.Next() {
var node string
var count int
if err := rows.Scan(&node, &count); err != nil {
return nil, fmt.Errorf("scan count: %w", err)
}
counts[node] = count
}
return counts, rows.Err()
}