package registry import ( "database/sql" "fmt" "time" ) // Route represents a route entry for a component in the registry. type Route struct { Name string Port int Mode string Hostname string HostPort int // agent-assigned host port (0 = not yet allocated) } // Component represents a component in the registry. type Component struct { Name string Service string Image string Network string UserSpec string Restart string DesiredState string ObservedState string Version string Ports []string Volumes []string Cmd []string Routes []Route CreatedAt time.Time UpdatedAt time.Time } // CreateComponent creates a new component in the registry. func CreateComponent(db *sql.DB, c *Component) error { tx, err := db.Begin() if err != nil { return fmt.Errorf("begin tx: %w", err) } defer tx.Rollback() //nolint:errcheck _, err = tx.Exec(` INSERT INTO components (name, service, image, network, user_spec, restart, desired_state, observed_state, version) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, c.Name, c.Service, c.Image, c.Network, c.UserSpec, c.Restart, c.DesiredState, c.ObservedState, c.Version, ) if err != nil { return fmt.Errorf("insert component %q/%q: %w", c.Service, c.Name, err) } if err := setPorts(tx, c.Service, c.Name, c.Ports); err != nil { return err } if err := setVolumes(tx, c.Service, c.Name, c.Volumes); err != nil { return err } if err := setCmd(tx, c.Service, c.Name, c.Cmd); err != nil { return err } if err := setRoutes(tx, c.Service, c.Name, c.Routes); err != nil { return err } return tx.Commit() } // GetComponent retrieves a component by service and name. func GetComponent(db *sql.DB, service, name string) (*Component, error) { c := &Component{} var createdAt, updatedAt string err := db.QueryRow(` SELECT name, service, image, network, user_spec, restart, desired_state, observed_state, version, created_at, updated_at FROM components WHERE service = ? AND name = ?`, service, name, ).Scan(&c.Name, &c.Service, &c.Image, &c.Network, &c.UserSpec, &c.Restart, &c.DesiredState, &c.ObservedState, &c.Version, &createdAt, &updatedAt) if err != nil { return nil, fmt.Errorf("get component %q/%q: %w", service, name, err) } c.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt) c.UpdatedAt, _ = time.Parse("2006-01-02 15:04:05", updatedAt) c.Ports, err = getPorts(db, service, name) if err != nil { return nil, err } c.Volumes, err = getVolumes(db, service, name) if err != nil { return nil, err } c.Cmd, err = getCmd(db, service, name) if err != nil { return nil, err } c.Routes, err = getRoutes(db, service, name) if err != nil { return nil, err } return c, nil } // ListComponents returns all components for a service. func ListComponents(db *sql.DB, service string) ([]Component, error) { rows, err := db.Query(` SELECT name, service, image, network, user_spec, restart, desired_state, observed_state, version, created_at, updated_at FROM components WHERE service = ? ORDER BY name`, service, ) if err != nil { return nil, fmt.Errorf("list components for %q: %w", service, err) } defer func() { _ = rows.Close() }() var components []Component for rows.Next() { var c Component var createdAt, updatedAt string if err := rows.Scan(&c.Name, &c.Service, &c.Image, &c.Network, &c.UserSpec, &c.Restart, &c.DesiredState, &c.ObservedState, &c.Version, &createdAt, &updatedAt); err != nil { return nil, fmt.Errorf("scan component: %w", err) } c.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt) c.UpdatedAt, _ = time.Parse("2006-01-02 15:04:05", updatedAt) c.Ports, _ = getPorts(db, c.Service, c.Name) c.Volumes, _ = getVolumes(db, c.Service, c.Name) c.Cmd, _ = getCmd(db, c.Service, c.Name) c.Routes, _ = getRoutes(db, c.Service, c.Name) components = append(components, c) } return components, rows.Err() } // UpdateComponentState updates desired and/or observed state. func UpdateComponentState(db *sql.DB, service, name, desiredState, observedState string) error { res, err := db.Exec(` UPDATE components SET desired_state = CASE WHEN ? = '' THEN desired_state ELSE ? END, observed_state = CASE WHEN ? = '' THEN observed_state ELSE ? END, updated_at = datetime('now') WHERE service = ? AND name = ?`, desiredState, desiredState, observedState, observedState, service, name, ) if err != nil { return fmt.Errorf("update state %q/%q: %w", service, name, err) } n, _ := res.RowsAffected() if n == 0 { return fmt.Errorf("update state %q/%q: %w", service, name, sql.ErrNoRows) } return nil } // UpdateComponentSpec updates the full spec for an existing component. func UpdateComponentSpec(db *sql.DB, c *Component) error { tx, err := db.Begin() if err != nil { return fmt.Errorf("begin tx: %w", err) } defer tx.Rollback() //nolint:errcheck _, err = tx.Exec(` UPDATE components SET image = ?, network = ?, user_spec = ?, restart = ?, version = ?, updated_at = datetime('now') WHERE service = ? AND name = ?`, c.Image, c.Network, c.UserSpec, c.Restart, c.Version, c.Service, c.Name, ) if err != nil { return fmt.Errorf("update component %q/%q: %w", c.Service, c.Name, err) } if err := setPorts(tx, c.Service, c.Name, c.Ports); err != nil { return err } if err := setVolumes(tx, c.Service, c.Name, c.Volumes); err != nil { return err } if err := setCmd(tx, c.Service, c.Name, c.Cmd); err != nil { return err } if err := setRoutes(tx, c.Service, c.Name, c.Routes); err != nil { return err } return tx.Commit() } // DeleteComponent removes a component. func DeleteComponent(db *sql.DB, service, name string) error { res, err := db.Exec("DELETE FROM components WHERE service = ? AND name = ?", service, name) if err != nil { return fmt.Errorf("delete component %q/%q: %w", service, name, err) } n, _ := res.RowsAffected() if n == 0 { return fmt.Errorf("delete component %q/%q: %w", service, name, sql.ErrNoRows) } return nil } // helper: set port mappings (delete + re-insert) func setPorts(tx *sql.Tx, service, component string, ports []string) error { if _, err := tx.Exec("DELETE FROM component_ports WHERE service = ? AND component = ?", service, component); err != nil { return fmt.Errorf("clear ports %q/%q: %w", service, component, err) } for _, p := range ports { if _, err := tx.Exec("INSERT INTO component_ports (service, component, mapping) VALUES (?, ?, ?)", service, component, p); err != nil { return fmt.Errorf("insert port %q/%q %q: %w", service, component, p, err) } } return nil } func getPorts(db *sql.DB, service, component string) ([]string, error) { rows, err := db.Query("SELECT mapping FROM component_ports WHERE service = ? AND component = ? ORDER BY mapping", service, component) if err != nil { return nil, fmt.Errorf("get ports %q/%q: %w", service, component, err) } defer func() { _ = rows.Close() }() var ports []string for rows.Next() { var p string if err := rows.Scan(&p); err != nil { return nil, err } ports = append(ports, p) } return ports, rows.Err() } // helper: set volume mappings func setVolumes(tx *sql.Tx, service, component string, volumes []string) error { if _, err := tx.Exec("DELETE FROM component_volumes WHERE service = ? AND component = ?", service, component); err != nil { return fmt.Errorf("clear volumes %q/%q: %w", service, component, err) } for _, v := range volumes { if _, err := tx.Exec("INSERT INTO component_volumes (service, component, mapping) VALUES (?, ?, ?)", service, component, v); err != nil { return fmt.Errorf("insert volume %q/%q %q: %w", service, component, v, err) } } return nil } func getVolumes(db *sql.DB, service, component string) ([]string, error) { rows, err := db.Query("SELECT mapping FROM component_volumes WHERE service = ? AND component = ? ORDER BY mapping", service, component) if err != nil { return nil, fmt.Errorf("get volumes %q/%q: %w", service, component, err) } defer func() { _ = rows.Close() }() var volumes []string for rows.Next() { var v string if err := rows.Scan(&v); err != nil { return nil, err } volumes = append(volumes, v) } return volumes, rows.Err() } // helper: set command args func setCmd(tx *sql.Tx, service, component string, cmd []string) error { if _, err := tx.Exec("DELETE FROM component_cmd WHERE service = ? AND component = ?", service, component); err != nil { return fmt.Errorf("clear cmd %q/%q: %w", service, component, err) } for i, arg := range cmd { if _, err := tx.Exec("INSERT INTO component_cmd (service, component, position, arg) VALUES (?, ?, ?, ?)", service, component, i, arg); err != nil { return fmt.Errorf("insert cmd %q/%q [%d]: %w", service, component, i, err) } } return nil } func getCmd(db *sql.DB, service, component string) ([]string, error) { rows, err := db.Query("SELECT arg FROM component_cmd WHERE service = ? AND component = ? ORDER BY position", service, component) if err != nil { return nil, fmt.Errorf("get cmd %q/%q: %w", service, component, err) } defer func() { _ = rows.Close() }() var cmd []string for rows.Next() { var arg string if err := rows.Scan(&arg); err != nil { return nil, err } cmd = append(cmd, arg) } return cmd, rows.Err() } // helper: set route definitions (delete + re-insert) func setRoutes(tx *sql.Tx, service, component string, routes []Route) error { if _, err := tx.Exec("DELETE FROM component_routes WHERE service = ? AND component = ?", service, component); err != nil { return fmt.Errorf("clear routes %q/%q: %w", service, component, err) } for _, r := range routes { mode := r.Mode if mode == "" { mode = "l4" } name := r.Name if name == "" { name = "default" } if _, err := tx.Exec( "INSERT INTO component_routes (service, component, name, port, mode, hostname, host_port) VALUES (?, ?, ?, ?, ?, ?, ?)", service, component, name, r.Port, mode, r.Hostname, r.HostPort, ); err != nil { return fmt.Errorf("insert route %q/%q %q: %w", service, component, name, err) } } return nil } func getRoutes(db *sql.DB, service, component string) ([]Route, error) { rows, err := db.Query( "SELECT name, port, mode, hostname, host_port FROM component_routes WHERE service = ? AND component = ? ORDER BY name", service, component, ) if err != nil { return nil, fmt.Errorf("get routes %q/%q: %w", service, component, err) } defer func() { _ = rows.Close() }() var routes []Route for rows.Next() { var r Route if err := rows.Scan(&r.Name, &r.Port, &r.Mode, &r.Hostname, &r.HostPort); err != nil { return nil, err } routes = append(routes, r) } return routes, rows.Err() } // UpdateRouteHostPort updates the agent-assigned host port for a specific route. func UpdateRouteHostPort(db *sql.DB, service, component, routeName string, hostPort int) error { res, err := db.Exec( "UPDATE component_routes SET host_port = ? WHERE service = ? AND component = ? AND name = ?", hostPort, service, component, routeName, ) if err != nil { return fmt.Errorf("update route host_port %q/%q/%q: %w", service, component, routeName, err) } n, _ := res.RowsAffected() if n == 0 { return fmt.Errorf("update route host_port %q/%q/%q: %w", service, component, routeName, sql.ErrNoRows) } return nil } // GetRouteHostPorts returns a map of route name to assigned host port for a component. func GetRouteHostPorts(db *sql.DB, service, component string) (map[string]int, error) { rows, err := db.Query( "SELECT name, host_port FROM component_routes WHERE service = ? AND component = ?", service, component, ) if err != nil { return nil, fmt.Errorf("get route host ports %q/%q: %w", service, component, err) } defer func() { _ = rows.Close() }() result := make(map[string]int) for rows.Next() { var name string var port int if err := rows.Scan(&name, &port); err != nil { return nil, err } result[name] = port } return result, rows.Err() }