Files
mcp/internal/masterdb/nodes.go
Kyle Isom 3c0b55f9f8 Add master database with nodes, placements, and edge_routes
New internal/masterdb/ package for mcp-master cluster state. Separate
from the agent's registry because the schemas are fundamentally
different (cluster-wide placement vs node-local containers).

Tables: nodes, placements, edge_routes. Full CRUD with tests.
Follows the same Open/migrate pattern as internal/registry/.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 15:26:04 -07:00

104 lines
3.0 KiB
Go

package masterdb
import (
"database/sql"
"fmt"
"time"
)
// Node represents a registered node in the cluster.
type Node struct {
Name string
Address string
Role string
Arch string
Status string
Containers int
LastHeartbeat *time.Time
}
// UpsertNode inserts or updates a node in the registry.
func UpsertNode(db *sql.DB, name, address, role, arch string) error {
_, err := db.Exec(`
INSERT INTO nodes (name, address, role, arch, updated_at)
VALUES (?, ?, ?, ?, datetime('now'))
ON CONFLICT(name) DO UPDATE SET
address = excluded.address,
role = excluded.role,
arch = excluded.arch,
updated_at = datetime('now')
`, name, address, role, arch)
if err != nil {
return fmt.Errorf("upsert node %s: %w", name, err)
}
return nil
}
// GetNode returns a single node by name.
func GetNode(db *sql.DB, name string) (*Node, error) {
var n Node
var lastHB sql.NullString
err := db.QueryRow(`
SELECT name, address, role, arch, status, containers, last_heartbeat
FROM nodes WHERE name = ?
`, name).Scan(&n.Name, &n.Address, &n.Role, &n.Arch, &n.Status, &n.Containers, &lastHB)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("get node %s: %w", name, err)
}
if lastHB.Valid {
t, _ := time.Parse("2006-01-02 15:04:05", lastHB.String)
n.LastHeartbeat = &t
}
return &n, nil
}
// ListNodes returns all nodes.
func ListNodes(db *sql.DB) ([]*Node, error) {
return queryNodes(db, `SELECT name, address, role, arch, status, containers, last_heartbeat FROM nodes ORDER BY name`)
}
// ListWorkerNodes returns nodes with role "worker" or "master" (master is also a worker).
func ListWorkerNodes(db *sql.DB) ([]*Node, error) {
return queryNodes(db, `SELECT name, address, role, arch, status, containers, last_heartbeat FROM nodes WHERE role IN ('worker', 'master') ORDER BY name`)
}
// ListEdgeNodes returns nodes with role "edge".
func ListEdgeNodes(db *sql.DB) ([]*Node, error) {
return queryNodes(db, `SELECT name, address, role, arch, status, containers, last_heartbeat FROM nodes WHERE role = 'edge' ORDER BY name`)
}
func queryNodes(db *sql.DB, query string) ([]*Node, error) {
rows, err := db.Query(query)
if err != nil {
return nil, fmt.Errorf("query nodes: %w", err)
}
defer func() { _ = rows.Close() }()
var nodes []*Node
for rows.Next() {
var n Node
var lastHB sql.NullString
if err := rows.Scan(&n.Name, &n.Address, &n.Role, &n.Arch, &n.Status, &n.Containers, &lastHB); err != nil {
return nil, fmt.Errorf("scan node: %w", err)
}
if lastHB.Valid {
t, _ := time.Parse("2006-01-02 15:04:05", lastHB.String)
n.LastHeartbeat = &t
}
nodes = append(nodes, &n)
}
return nodes, rows.Err()
}
// UpdateNodeStatus updates a node's status field.
func UpdateNodeStatus(db *sql.DB, name, status string) error {
_, err := db.Exec(`UPDATE nodes SET status = ?, updated_at = datetime('now') WHERE name = ?`, status, name)
if err != nil {
return fmt.Errorf("update node status %s: %w", name, err)
}
return nil
}