diff --git a/internal/masterdb/db.go b/internal/masterdb/db.go new file mode 100644 index 0000000..af00ce4 --- /dev/null +++ b/internal/masterdb/db.go @@ -0,0 +1,106 @@ +// Package masterdb provides the SQLite database for the mcp-master daemon. +// It stores the cluster-wide node registry, service placements, and edge routes. +// This is separate from the agent's registry (internal/registry/) because the +// master and agent have fundamentally different schemas. +package masterdb + +import ( + "database/sql" + "fmt" + + _ "modernc.org/sqlite" +) + +// Open opens the master database at the given path and runs migrations. +func Open(path string) (*sql.DB, error) { + db, err := sql.Open("sqlite", path) + if err != nil { + return nil, fmt.Errorf("open database: %w", err) + } + + for _, pragma := range []string{ + "PRAGMA journal_mode = WAL", + "PRAGMA foreign_keys = ON", + "PRAGMA busy_timeout = 5000", + } { + if _, err := db.Exec(pragma); err != nil { + _ = db.Close() + return nil, fmt.Errorf("exec %q: %w", pragma, err) + } + } + + if err := migrate(db); err != nil { + _ = db.Close() + return nil, fmt.Errorf("migrate: %w", err) + } + + return db, nil +} + +func migrate(db *sql.DB) error { + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS schema_migrations ( + version INTEGER PRIMARY KEY, + applied_at TEXT NOT NULL DEFAULT (datetime('now')) + ); + `) + if err != nil { + return fmt.Errorf("create migrations table: %w", err) + } + + for i, m := range migrations { + version := i + 1 + var count int + if err := db.QueryRow("SELECT COUNT(*) FROM schema_migrations WHERE version = ?", version).Scan(&count); err != nil { + return fmt.Errorf("check migration %d: %w", version, err) + } + if count > 0 { + continue + } + + if _, err := db.Exec(m); err != nil { + return fmt.Errorf("run migration %d: %w", version, err) + } + if _, err := db.Exec("INSERT INTO schema_migrations (version) VALUES (?)", version); err != nil { + return fmt.Errorf("record migration %d: %w", version, err) + } + } + + return nil +} + +var migrations = []string{ + // Migration 1: cluster state + ` + CREATE TABLE IF NOT EXISTS nodes ( + name TEXT PRIMARY KEY, + address TEXT NOT NULL, + role TEXT NOT NULL DEFAULT 'worker', + arch TEXT NOT NULL DEFAULT 'amd64', + status TEXT NOT NULL DEFAULT 'unknown', + containers INTEGER NOT NULL DEFAULT 0, + last_heartbeat TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + ); + + CREATE TABLE IF NOT EXISTS placements ( + service_name TEXT PRIMARY KEY, + node TEXT NOT NULL REFERENCES nodes(name), + tier TEXT NOT NULL DEFAULT 'worker', + deployed_at TEXT NOT NULL DEFAULT (datetime('now')) + ); + + CREATE TABLE IF NOT EXISTS edge_routes ( + hostname TEXT PRIMARY KEY, + service_name TEXT NOT NULL, + edge_node TEXT NOT NULL REFERENCES nodes(name), + backend_hostname TEXT NOT NULL, + backend_port INTEGER NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')) + ); + + CREATE INDEX IF NOT EXISTS idx_edge_routes_service + ON edge_routes(service_name); + `, +} diff --git a/internal/masterdb/db_test.go b/internal/masterdb/db_test.go new file mode 100644 index 0000000..7a7f14c --- /dev/null +++ b/internal/masterdb/db_test.go @@ -0,0 +1,185 @@ +package masterdb + +import ( + "database/sql" + "path/filepath" + "testing" +) + +func openTestDB(t *testing.T) *sql.DB { + t.Helper() + path := filepath.Join(t.TempDir(), "test.db") + db, err := Open(path) + if err != nil { + t.Fatalf("Open: %v", err) + } + t.Cleanup(func() { _ = db.Close() }) + return db +} + +func TestOpenAndMigrate(t *testing.T) { + openTestDB(t) +} + +func TestNodeCRUD(t *testing.T) { + db := openTestDB(t) + + if err := UpsertNode(db, "rift", "100.95.252.120:9444", "master", "amd64"); err != nil { + t.Fatalf("UpsertNode: %v", err) + } + if err := UpsertNode(db, "svc", "100.106.232.4:9555", "edge", "amd64"); err != nil { + t.Fatalf("UpsertNode: %v", err) + } + if err := UpsertNode(db, "orion", "100.1.2.3:9444", "worker", "amd64"); err != nil { + t.Fatalf("UpsertNode: %v", err) + } + + // Get. + n, err := GetNode(db, "rift") + if err != nil { + t.Fatalf("GetNode: %v", err) + } + if n == nil || n.Address != "100.95.252.120:9444" { + t.Errorf("GetNode(rift) = %+v", n) + } + + // Get nonexistent. + n, err = GetNode(db, "nonexistent") + if err != nil { + t.Fatalf("GetNode: %v", err) + } + if n != nil { + t.Errorf("expected nil for nonexistent node") + } + + // List all. + nodes, err := ListNodes(db) + if err != nil { + t.Fatalf("ListNodes: %v", err) + } + if len(nodes) != 3 { + t.Errorf("ListNodes: got %d, want 3", len(nodes)) + } + + // List workers (includes master role). + workers, err := ListWorkerNodes(db) + if err != nil { + t.Fatalf("ListWorkerNodes: %v", err) + } + if len(workers) != 2 { + t.Errorf("ListWorkerNodes: got %d, want 2 (rift+orion)", len(workers)) + } + + // List edge. + edges, err := ListEdgeNodes(db) + if err != nil { + t.Fatalf("ListEdgeNodes: %v", err) + } + if len(edges) != 1 || edges[0].Name != "svc" { + t.Errorf("ListEdgeNodes: got %v", edges) + } + + // Update status. + if err := UpdateNodeStatus(db, "rift", "healthy"); err != nil { + t.Fatalf("UpdateNodeStatus: %v", err) + } + n, _ = GetNode(db, "rift") + if n.Status != "healthy" { + t.Errorf("status = %q, want healthy", n.Status) + } +} + +func TestPlacementCRUD(t *testing.T) { + db := openTestDB(t) + _ = UpsertNode(db, "rift", "100.95.252.120:9444", "master", "amd64") + _ = UpsertNode(db, "orion", "100.1.2.3:9444", "worker", "amd64") + + if err := CreatePlacement(db, "mcq", "rift", "worker"); err != nil { + t.Fatalf("CreatePlacement: %v", err) + } + if err := CreatePlacement(db, "mcdoc", "orion", "worker"); err != nil { + t.Fatalf("CreatePlacement: %v", err) + } + + p, err := GetPlacement(db, "mcq") + if err != nil { + t.Fatalf("GetPlacement: %v", err) + } + if p == nil || p.Node != "rift" { + t.Errorf("GetPlacement(mcq) = %+v", p) + } + + p, _ = GetPlacement(db, "nonexistent") + if p != nil { + t.Errorf("expected nil for nonexistent placement") + } + + counts, err := CountPlacementsPerNode(db) + if err != nil { + t.Fatalf("CountPlacementsPerNode: %v", err) + } + if counts["rift"] != 1 || counts["orion"] != 1 { + t.Errorf("counts = %v", counts) + } + + placements, err := ListPlacements(db) + if err != nil { + t.Fatalf("ListPlacements: %v", err) + } + if len(placements) != 2 { + t.Errorf("ListPlacements: got %d", len(placements)) + } + + if err := DeletePlacement(db, "mcq"); err != nil { + t.Fatalf("DeletePlacement: %v", err) + } + p, _ = GetPlacement(db, "mcq") + if p != nil { + t.Errorf("expected nil after delete") + } +} + +func TestEdgeRouteCRUD(t *testing.T) { + db := openTestDB(t) + _ = UpsertNode(db, "svc", "100.106.232.4:9555", "edge", "amd64") + + if err := CreateEdgeRoute(db, "mcq.metacircular.net", "mcq", "svc", "mcq.svc.mcp.metacircular.net", 8443); err != nil { + t.Fatalf("CreateEdgeRoute: %v", err) + } + if err := CreateEdgeRoute(db, "docs.metacircular.net", "mcdoc", "svc", "mcdoc.svc.mcp.metacircular.net", 443); err != nil { + t.Fatalf("CreateEdgeRoute: %v", err) + } + + routes, err := ListEdgeRoutes(db) + if err != nil { + t.Fatalf("ListEdgeRoutes: %v", err) + } + if len(routes) != 2 { + t.Errorf("ListEdgeRoutes: got %d", len(routes)) + } + + routes, err = ListEdgeRoutesForService(db, "mcq") + if err != nil { + t.Fatalf("ListEdgeRoutesForService: %v", err) + } + if len(routes) != 1 || routes[0].Hostname != "mcq.metacircular.net" { + t.Errorf("ListEdgeRoutesForService(mcq) = %v", routes) + } + + if err := DeleteEdgeRoute(db, "mcq.metacircular.net"); err != nil { + t.Fatalf("DeleteEdgeRoute: %v", err) + } + routes, _ = ListEdgeRoutes(db) + if len(routes) != 1 { + t.Errorf("expected 1 route after delete, got %d", len(routes)) + } + + _ = CreateEdgeRoute(db, "docs2.metacircular.net", "mcdoc", "svc", "mcdoc.svc.mcp.metacircular.net", 443) + if err := DeleteEdgeRoutesForService(db, "mcdoc"); err != nil { + t.Fatalf("DeleteEdgeRoutesForService: %v", err) + } + routes, _ = ListEdgeRoutes(db) + if len(routes) != 0 { + t.Errorf("expected 0 routes after service delete, got %d", len(routes)) + } +} diff --git a/internal/masterdb/edge_routes.go b/internal/masterdb/edge_routes.go new file mode 100644 index 0000000..b086118 --- /dev/null +++ b/internal/masterdb/edge_routes.go @@ -0,0 +1,95 @@ +package masterdb + +import ( + "database/sql" + "fmt" + "time" +) + +// EdgeRoute records a public route managed by the master. +type EdgeRoute struct { + Hostname string + ServiceName string + EdgeNode string + BackendHostname string + BackendPort int + CreatedAt time.Time +} + +// CreateEdgeRoute inserts or replaces an edge route record. +func CreateEdgeRoute(db *sql.DB, hostname, serviceName, edgeNode, backendHostname string, backendPort int) error { + _, err := db.Exec(` + INSERT INTO edge_routes (hostname, service_name, edge_node, backend_hostname, backend_port, created_at) + VALUES (?, ?, ?, ?, ?, datetime('now')) + ON CONFLICT(hostname) DO UPDATE SET + service_name = excluded.service_name, + edge_node = excluded.edge_node, + backend_hostname = excluded.backend_hostname, + backend_port = excluded.backend_port + `, hostname, serviceName, edgeNode, backendHostname, backendPort) + if err != nil { + return fmt.Errorf("create edge route %s: %w", hostname, err) + } + return nil +} + +// ListEdgeRoutes returns all edge routes. +func ListEdgeRoutes(db *sql.DB) ([]*EdgeRoute, error) { + return queryEdgeRoutes(db, `SELECT hostname, service_name, edge_node, backend_hostname, backend_port, created_at FROM edge_routes ORDER BY hostname`) +} + +// ListEdgeRoutesForService returns edge routes for a specific service. +func ListEdgeRoutesForService(db *sql.DB, serviceName string) ([]*EdgeRoute, error) { + rows, err := db.Query(` + SELECT hostname, service_name, edge_node, backend_hostname, backend_port, created_at + FROM edge_routes WHERE service_name = ? ORDER BY hostname + `, serviceName) + if err != nil { + return nil, fmt.Errorf("list edge routes for %s: %w", serviceName, err) + } + defer func() { _ = rows.Close() }() + + return scanEdgeRoutes(rows) +} + +// DeleteEdgeRoute removes a single edge route by hostname. +func DeleteEdgeRoute(db *sql.DB, hostname string) error { + _, err := db.Exec(`DELETE FROM edge_routes WHERE hostname = ?`, hostname) + if err != nil { + return fmt.Errorf("delete edge route %s: %w", hostname, err) + } + return nil +} + +// DeleteEdgeRoutesForService removes all edge routes for a service. +func DeleteEdgeRoutesForService(db *sql.DB, serviceName string) error { + _, err := db.Exec(`DELETE FROM edge_routes WHERE service_name = ?`, serviceName) + if err != nil { + return fmt.Errorf("delete edge routes for %s: %w", serviceName, err) + } + return nil +} + +func queryEdgeRoutes(db *sql.DB, query string) ([]*EdgeRoute, error) { + rows, err := db.Query(query) + if err != nil { + return nil, fmt.Errorf("query edge routes: %w", err) + } + defer func() { _ = rows.Close() }() + + return scanEdgeRoutes(rows) +} + +func scanEdgeRoutes(rows *sql.Rows) ([]*EdgeRoute, error) { + var routes []*EdgeRoute + for rows.Next() { + var r EdgeRoute + var createdAt string + if err := rows.Scan(&r.Hostname, &r.ServiceName, &r.EdgeNode, &r.BackendHostname, &r.BackendPort, &createdAt); err != nil { + return nil, fmt.Errorf("scan edge route: %w", err) + } + r.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt) + routes = append(routes, &r) + } + return routes, rows.Err() +} diff --git a/internal/masterdb/nodes.go b/internal/masterdb/nodes.go new file mode 100644 index 0000000..3c4e4b6 --- /dev/null +++ b/internal/masterdb/nodes.go @@ -0,0 +1,103 @@ +package masterdb + +import ( + "database/sql" + "fmt" + "time" +) + +// Node represents a registered node in the cluster. +type Node struct { + Name string + Address string + Role string + Arch string + Status string + Containers int + LastHeartbeat *time.Time +} + +// UpsertNode inserts or updates a node in the registry. +func UpsertNode(db *sql.DB, name, address, role, arch string) error { + _, err := db.Exec(` + INSERT INTO nodes (name, address, role, arch, updated_at) + VALUES (?, ?, ?, ?, datetime('now')) + ON CONFLICT(name) DO UPDATE SET + address = excluded.address, + role = excluded.role, + arch = excluded.arch, + updated_at = datetime('now') + `, name, address, role, arch) + if err != nil { + return fmt.Errorf("upsert node %s: %w", name, err) + } + return nil +} + +// GetNode returns a single node by name. +func GetNode(db *sql.DB, name string) (*Node, error) { + var n Node + var lastHB sql.NullString + err := db.QueryRow(` + SELECT name, address, role, arch, status, containers, last_heartbeat + FROM nodes WHERE name = ? + `, name).Scan(&n.Name, &n.Address, &n.Role, &n.Arch, &n.Status, &n.Containers, &lastHB) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("get node %s: %w", name, err) + } + if lastHB.Valid { + t, _ := time.Parse("2006-01-02 15:04:05", lastHB.String) + n.LastHeartbeat = &t + } + return &n, nil +} + +// ListNodes returns all nodes. +func ListNodes(db *sql.DB) ([]*Node, error) { + return queryNodes(db, `SELECT name, address, role, arch, status, containers, last_heartbeat FROM nodes ORDER BY name`) +} + +// ListWorkerNodes returns nodes with role "worker" or "master" (master is also a worker). +func ListWorkerNodes(db *sql.DB) ([]*Node, error) { + return queryNodes(db, `SELECT name, address, role, arch, status, containers, last_heartbeat FROM nodes WHERE role IN ('worker', 'master') ORDER BY name`) +} + +// ListEdgeNodes returns nodes with role "edge". +func ListEdgeNodes(db *sql.DB) ([]*Node, error) { + return queryNodes(db, `SELECT name, address, role, arch, status, containers, last_heartbeat FROM nodes WHERE role = 'edge' ORDER BY name`) +} + +func queryNodes(db *sql.DB, query string) ([]*Node, error) { + rows, err := db.Query(query) + if err != nil { + return nil, fmt.Errorf("query nodes: %w", err) + } + defer func() { _ = rows.Close() }() + + var nodes []*Node + for rows.Next() { + var n Node + var lastHB sql.NullString + if err := rows.Scan(&n.Name, &n.Address, &n.Role, &n.Arch, &n.Status, &n.Containers, &lastHB); err != nil { + return nil, fmt.Errorf("scan node: %w", err) + } + if lastHB.Valid { + t, _ := time.Parse("2006-01-02 15:04:05", lastHB.String) + n.LastHeartbeat = &t + } + nodes = append(nodes, &n) + } + return nodes, rows.Err() +} + +// UpdateNodeStatus updates a node's status field. +func UpdateNodeStatus(db *sql.DB, name, status string) error { + _, err := db.Exec(`UPDATE nodes SET status = ?, updated_at = datetime('now') WHERE name = ?`, status, name) + if err != nil { + return fmt.Errorf("update node status %s: %w", name, err) + } + return nil +} diff --git a/internal/masterdb/placements.go b/internal/masterdb/placements.go new file mode 100644 index 0000000..36e444a --- /dev/null +++ b/internal/masterdb/placements.go @@ -0,0 +1,99 @@ +package masterdb + +import ( + "database/sql" + "fmt" + "time" +) + +// Placement records which node hosts which service. +type Placement struct { + ServiceName string + Node string + Tier string + DeployedAt time.Time +} + +// CreatePlacement inserts or replaces a placement record. +func CreatePlacement(db *sql.DB, serviceName, node, tier string) error { + _, err := db.Exec(` + INSERT INTO placements (service_name, node, tier, deployed_at) + VALUES (?, ?, ?, datetime('now')) + ON CONFLICT(service_name) DO UPDATE SET + node = excluded.node, + tier = excluded.tier, + deployed_at = datetime('now') + `, serviceName, node, tier) + if err != nil { + return fmt.Errorf("create placement %s: %w", serviceName, err) + } + return nil +} + +// GetPlacement returns the placement for a service. +func GetPlacement(db *sql.DB, serviceName string) (*Placement, error) { + var p Placement + var deployedAt string + err := db.QueryRow(` + SELECT service_name, node, tier, deployed_at + FROM placements WHERE service_name = ? + `, serviceName).Scan(&p.ServiceName, &p.Node, &p.Tier, &deployedAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("get placement %s: %w", serviceName, err) + } + p.DeployedAt, _ = time.Parse("2006-01-02 15:04:05", deployedAt) + return &p, nil +} + +// ListPlacements returns all placements. +func ListPlacements(db *sql.DB) ([]*Placement, error) { + rows, err := db.Query(`SELECT service_name, node, tier, deployed_at FROM placements ORDER BY service_name`) + if err != nil { + return nil, fmt.Errorf("list placements: %w", err) + } + defer func() { _ = rows.Close() }() + + var placements []*Placement + for rows.Next() { + var p Placement + var deployedAt string + if err := rows.Scan(&p.ServiceName, &p.Node, &p.Tier, &deployedAt); err != nil { + return nil, fmt.Errorf("scan placement: %w", err) + } + p.DeployedAt, _ = time.Parse("2006-01-02 15:04:05", deployedAt) + placements = append(placements, &p) + } + return placements, rows.Err() +} + +// DeletePlacement removes a placement record. +func DeletePlacement(db *sql.DB, serviceName string) error { + _, err := db.Exec(`DELETE FROM placements WHERE service_name = ?`, serviceName) + if err != nil { + return fmt.Errorf("delete placement %s: %w", serviceName, err) + } + return nil +} + +// CountPlacementsPerNode returns a map of node name → number of placed services. +func CountPlacementsPerNode(db *sql.DB) (map[string]int, error) { + rows, err := db.Query(`SELECT node, COUNT(*) FROM placements GROUP BY node`) + if err != nil { + return nil, fmt.Errorf("count placements: %w", err) + } + defer func() { _ = rows.Close() }() + + counts := make(map[string]int) + for rows.Next() { + var node string + var count int + if err := rows.Scan(&node, &count); err != nil { + return nil, fmt.Errorf("scan count: %w", err) + } + counts[node] = count + } + return counts, rows.Err() +}