Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 9d543998dc | |||
| f9f6f339f4 | |||
| 5da307cab5 | |||
| 22a836812f | |||
| 9918859705 |
22
Dockerfile.master
Normal file
22
Dockerfile.master
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
FROM golang:1.25-alpine AS builder
|
||||||
|
|
||||||
|
ARG VERSION=dev
|
||||||
|
|
||||||
|
WORKDIR /build
|
||||||
|
COPY go.mod go.sum ./
|
||||||
|
RUN go mod download
|
||||||
|
COPY . .
|
||||||
|
RUN CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.version=${VERSION}" \
|
||||||
|
-o /mcp-master ./cmd/mcp-master
|
||||||
|
|
||||||
|
FROM alpine:3.21
|
||||||
|
|
||||||
|
RUN apk add --no-cache ca-certificates tzdata
|
||||||
|
|
||||||
|
COPY --from=builder /mcp-master /usr/local/bin/mcp-master
|
||||||
|
|
||||||
|
WORKDIR /srv/mcp-master
|
||||||
|
EXPOSE 9555
|
||||||
|
|
||||||
|
ENTRYPOINT ["mcp-master"]
|
||||||
|
CMD ["server", "--config", "/srv/mcp-master/mcp-master.toml"]
|
||||||
5
Makefile
5
Makefile
@@ -32,6 +32,11 @@ proto-lint:
|
|||||||
buf lint
|
buf lint
|
||||||
buf breaking --against '.git#branch=master,subdir=proto'
|
buf breaking --against '.git#branch=master,subdir=proto'
|
||||||
|
|
||||||
|
docker-master:
|
||||||
|
podman build -f Dockerfile.master \
|
||||||
|
--build-arg VERSION=$(shell git describe --tags --always --dirty) \
|
||||||
|
-t mcr.svc.mcp.metacircular.net:8443/mcp-master:$(shell git describe --tags --always --dirty) .
|
||||||
|
|
||||||
clean:
|
clean:
|
||||||
rm -f mcp mcp-agent mcp-master
|
rm -f mcp mcp-agent mcp-master
|
||||||
|
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ func main() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
root.AddCommand(snapshotCmd())
|
root.AddCommand(snapshotCmd())
|
||||||
|
root.AddCommand(recoverCmd())
|
||||||
|
|
||||||
if err := root.Execute(); err != nil {
|
if err := root.Execute(); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
|||||||
68
cmd/mcp-agent/recover.go
Normal file
68
cmd/mcp-agent/recover.go
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/agent"
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/config"
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/registry"
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/runtime"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
func recoverCmd() *cobra.Command {
|
||||||
|
return &cobra.Command{
|
||||||
|
Use: "recover",
|
||||||
|
Short: "Recreate containers from the agent registry",
|
||||||
|
Long: `Recover recreates containers from the agent's SQLite registry for all
|
||||||
|
services whose desired state is "running" but which don't have a running
|
||||||
|
container in podman.
|
||||||
|
|
||||||
|
This is the recovery path after a podman database loss (e.g., after a
|
||||||
|
UID change, podman reset, or reboot that cleared container state).
|
||||||
|
|
||||||
|
Images must be cached locally — recover does not pull from MCR.`,
|
||||||
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
|
cfg, err := config.LoadAgentConfig(cfgPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("load config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
|
||||||
|
Level: slog.LevelInfo,
|
||||||
|
}))
|
||||||
|
|
||||||
|
db, err := registry.Open(cfg.Database.Path)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("open registry: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = db.Close() }()
|
||||||
|
|
||||||
|
proxy, err := agent.NewProxyRouter(cfg.MCProxy.Socket, cfg.MCProxy.CertDir, logger)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warn("mc-proxy not available, routes will not be registered", "err", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
certs, err := agent.NewCertProvisioner(cfg.Metacrypt, cfg.MCProxy.CertDir, logger)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warn("cert provisioner not available", "err", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a := &agent.Agent{
|
||||||
|
Config: cfg,
|
||||||
|
DB: db,
|
||||||
|
Runtime: &runtime.Podman{},
|
||||||
|
Logger: logger,
|
||||||
|
PortAlloc: agent.NewPortAllocator(),
|
||||||
|
Proxy: proxy,
|
||||||
|
Certs: certs,
|
||||||
|
Version: version,
|
||||||
|
}
|
||||||
|
|
||||||
|
return a.Recover(context.Background())
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||||
"git.wntrmute.dev/mc/mcp/internal/config"
|
"git.wntrmute.dev/mc/mcp/internal/config"
|
||||||
@@ -52,6 +53,38 @@ func dialAgent(address string, cfg *config.CLIConfig) (mcpv1.McpAgentServiceClie
|
|||||||
return mcpv1.NewMcpAgentServiceClient(conn), conn, nil
|
return mcpv1.NewMcpAgentServiceClient(conn), conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// dialAgentMulti tries each address in order and returns the first successful
|
||||||
|
// connection. Provides resilience when Tailscale DNS is down or a node is
|
||||||
|
// reachable via LAN but not Tailnet.
|
||||||
|
func dialAgentMulti(addresses []string, cfg *config.CLIConfig) (mcpv1.McpAgentServiceClient, *grpc.ClientConn, error) {
|
||||||
|
if len(addresses) == 0 {
|
||||||
|
return nil, nil, fmt.Errorf("no addresses to dial")
|
||||||
|
}
|
||||||
|
if len(addresses) == 1 {
|
||||||
|
return dialAgent(addresses[0], cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
var lastErr error
|
||||||
|
for _, addr := range addresses {
|
||||||
|
client, conn, err := dialAgent(addr, cfg)
|
||||||
|
if err != nil {
|
||||||
|
lastErr = fmt.Errorf("%s: %w", addr, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Quick health check to verify the connection actually works.
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||||
|
_, err = client.NodeStatus(ctx, &mcpv1.NodeStatusRequest{})
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
|
_ = conn.Close()
|
||||||
|
lastErr = fmt.Errorf("%s: %w", addr, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return client, conn, nil
|
||||||
|
}
|
||||||
|
return nil, nil, fmt.Errorf("all addresses failed, last error: %w", lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
// dialMaster connects to the master at the given address and returns a gRPC
|
// dialMaster connects to the master at the given address and returns a gRPC
|
||||||
// client for the McpMasterService.
|
// client for the McpMasterService.
|
||||||
func dialMaster(address string, cfg *config.CLIConfig) (mcpv1.McpMasterServiceClient, *grpc.ClientConn, error) {
|
func dialMaster(address string, cfg *config.CLIConfig) (mcpv1.McpMasterServiceClient, *grpc.ClientConn, error) {
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// findNodeAddress looks up a node by name in the CLI config and returns
|
// findNodeAddress looks up a node by name in the CLI config and returns
|
||||||
// its address.
|
// its primary address.
|
||||||
func findNodeAddress(cfg *config.CLIConfig, nodeName string) (string, error) {
|
func findNodeAddress(cfg *config.CLIConfig, nodeName string) (string, error) {
|
||||||
for _, n := range cfg.Nodes {
|
for _, n := range cfg.Nodes {
|
||||||
if n.Name == nodeName {
|
if n.Name == nodeName {
|
||||||
@@ -19,6 +19,16 @@ func findNodeAddress(cfg *config.CLIConfig, nodeName string) (string, error) {
|
|||||||
return "", fmt.Errorf("node %q not found in config", nodeName)
|
return "", fmt.Errorf("node %q not found in config", nodeName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// findNode looks up a node by name in the CLI config.
|
||||||
|
func findNode(cfg *config.CLIConfig, nodeName string) (*config.NodeConfig, error) {
|
||||||
|
for i := range cfg.Nodes {
|
||||||
|
if cfg.Nodes[i].Name == nodeName {
|
||||||
|
return &cfg.Nodes[i], nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("node %q not found in config", nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
// printComponentResults prints the result of each component operation.
|
// printComponentResults prints the result of each component operation.
|
||||||
func printComponentResults(results []*mcpv1.ComponentResult) {
|
func printComponentResults(results []*mcpv1.ComponentResult) {
|
||||||
for _, r := range results {
|
for _, r := range results {
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ func forEachNode(fn func(node config.NodeConfig, client mcpv1.McpAgentServiceCli
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, node := range cfg.Nodes {
|
for _, node := range cfg.Nodes {
|
||||||
client, conn, err := dialAgent(node.Address, cfg)
|
client, conn, err := dialAgentMulti(node.AllAddresses(), cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_, _ = fmt.Fprintf(os.Stderr, "warning: %s: %v\n", node.Name, err)
|
_, _ = fmt.Fprintf(os.Stderr, "warning: %s: %v\n", node.Name, err)
|
||||||
continue
|
continue
|
||||||
@@ -85,7 +85,7 @@ func psCmd() *cobra.Command {
|
|||||||
Short: "Live check: query runtime on all agents",
|
Short: "Live check: query runtime on all agents",
|
||||||
RunE: func(cmd *cobra.Command, args []string) error {
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
w := newTable()
|
w := newTable()
|
||||||
_, _ = fmt.Fprintln(w, "SERVICE\tCOMPONENT\tNODE\tSTATE\tVERSION\tUPTIME")
|
_, _ = fmt.Fprintln(w, "SERVICE\tCOMPONENT\tNODE\tSTATE\tVERSION\tUPTIME\t")
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
if err := forEachNode(func(node config.NodeConfig, client mcpv1.McpAgentServiceClient) error {
|
if err := forEachNode(func(node config.NodeConfig, client mcpv1.McpAgentServiceClient) error {
|
||||||
@@ -96,19 +96,25 @@ func psCmd() *cobra.Command {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, svc := range resp.GetServices() {
|
for _, svc := range resp.GetServices() {
|
||||||
|
comment := svc.GetComment()
|
||||||
for _, comp := range svc.GetComponents() {
|
for _, comp := range svc.GetComponents() {
|
||||||
uptime := "-"
|
uptime := "-"
|
||||||
if comp.GetStarted() != nil {
|
if comp.GetStarted() != nil {
|
||||||
d := now.Sub(comp.GetStarted().AsTime())
|
d := now.Sub(comp.GetStarted().AsTime())
|
||||||
uptime = formatDuration(d)
|
uptime = formatDuration(d)
|
||||||
}
|
}
|
||||||
_, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n",
|
col7 := ""
|
||||||
|
if comment != "" {
|
||||||
|
col7 = "# " + comment
|
||||||
|
}
|
||||||
|
_, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
|
||||||
svc.GetName(),
|
svc.GetName(),
|
||||||
comp.GetName(),
|
comp.GetName(),
|
||||||
node.Name,
|
node.Name,
|
||||||
comp.GetObservedState(),
|
comp.GetObservedState(),
|
||||||
comp.GetVersion(),
|
comp.GetVersion(),
|
||||||
uptime,
|
uptime,
|
||||||
|
col7,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
139
internal/agent/recover.go
Normal file
139
internal/agent/recover.go
Normal file
@@ -0,0 +1,139 @@
|
|||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/registry"
|
||||||
|
"git.wntrmute.dev/mc/mcp/internal/runtime"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Recover recreates containers from the agent's registry for all services
|
||||||
|
// whose desired state is "running" but which don't have a running container
|
||||||
|
// in podman. This is the recovery path after a podman database loss (e.g.,
|
||||||
|
// after a UID change or podman reset).
|
||||||
|
//
|
||||||
|
// Recover does NOT pull images — it assumes the images are cached locally.
|
||||||
|
// If an image is missing, that component is skipped with a warning.
|
||||||
|
func (a *Agent) Recover(ctx context.Context) error {
|
||||||
|
services, err := registry.ListServices(a.DB)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("list services: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the list of currently running containers from podman.
|
||||||
|
running, err := a.Runtime.List(ctx)
|
||||||
|
if err != nil {
|
||||||
|
a.Logger.Warn("cannot list containers, assuming none running", "err", err)
|
||||||
|
running = nil
|
||||||
|
}
|
||||||
|
runningSet := make(map[string]bool)
|
||||||
|
for _, c := range running {
|
||||||
|
runningSet[c.Name] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
var recovered, skipped, already int
|
||||||
|
|
||||||
|
for _, svc := range services {
|
||||||
|
if !svc.Active {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
comps, err := registry.ListComponents(a.DB, svc.Name)
|
||||||
|
if err != nil {
|
||||||
|
a.Logger.Warn("list components", "service", svc.Name, "err", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, comp := range comps {
|
||||||
|
if comp.DesiredState != "running" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
containerName := svc.Name + "-" + comp.Name
|
||||||
|
if comp.Name == svc.Name {
|
||||||
|
containerName = svc.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip if container is already running.
|
||||||
|
if runningSet[containerName] {
|
||||||
|
already++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Logger.Info("recovering container",
|
||||||
|
"service", svc.Name,
|
||||||
|
"component", comp.Name,
|
||||||
|
"image", comp.Image,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Remove any stale container with the same name.
|
||||||
|
_ = a.Runtime.Remove(ctx, containerName)
|
||||||
|
|
||||||
|
// Build the container spec from the registry.
|
||||||
|
spec := runtime.ContainerSpec{
|
||||||
|
Name: containerName,
|
||||||
|
Image: comp.Image,
|
||||||
|
Network: comp.Network,
|
||||||
|
User: comp.UserSpec,
|
||||||
|
Restart: comp.Restart,
|
||||||
|
Volumes: comp.Volumes,
|
||||||
|
Cmd: comp.Cmd,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allocate ports from routes if the component has routes.
|
||||||
|
if len(comp.Routes) > 0 && a.PortAlloc != nil {
|
||||||
|
ports, env, allocErr := a.allocateRoutePorts(svc.Name, comp.Name, comp.Routes)
|
||||||
|
if allocErr != nil {
|
||||||
|
a.Logger.Warn("allocate route ports", "container", containerName, "err", allocErr)
|
||||||
|
spec.Ports = comp.Ports
|
||||||
|
} else {
|
||||||
|
spec.Ports = append(comp.Ports, ports...)
|
||||||
|
spec.Env = append(spec.Env, env...)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
spec.Ports = comp.Ports
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := a.Runtime.Run(ctx, spec); err != nil {
|
||||||
|
a.Logger.Error("recover container failed",
|
||||||
|
"container", containerName,
|
||||||
|
"err", err,
|
||||||
|
)
|
||||||
|
skipped++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-register mc-proxy routes.
|
||||||
|
if a.Proxy != nil && len(comp.Routes) > 0 {
|
||||||
|
hostPorts, hpErr := registry.GetRouteHostPorts(a.DB, svc.Name, comp.Name)
|
||||||
|
if hpErr == nil {
|
||||||
|
if proxyErr := a.Proxy.RegisterRoutes(ctx, svc.Name, comp.Routes, hostPorts); proxyErr != nil {
|
||||||
|
a.Logger.Warn("re-register routes", "service", svc.Name, "err", proxyErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Provision TLS certs if needed.
|
||||||
|
if a.Certs != nil && hasL7Routes(comp.Routes) {
|
||||||
|
hostnames := l7Hostnames(svc.Name, comp.Routes)
|
||||||
|
if certErr := a.Certs.EnsureCert(ctx, svc.Name, hostnames); certErr != nil {
|
||||||
|
a.Logger.Warn("cert provisioning", "service", svc.Name, "err", certErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
recovered++
|
||||||
|
a.Logger.Info("container recovered", "container", containerName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Logger.Info("recovery complete",
|
||||||
|
"recovered", recovered,
|
||||||
|
"skipped", skipped,
|
||||||
|
"already_running", already,
|
||||||
|
)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// hasL7Routes and l7Hostnames are defined in deploy.go.
|
||||||
@@ -50,9 +50,28 @@ type AuthConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NodeConfig defines a managed node that the CLI connects to.
|
// NodeConfig defines a managed node that the CLI connects to.
|
||||||
|
// Address is the primary address. Addresses is an optional list of
|
||||||
|
// fallback addresses tried in order if the primary fails. This
|
||||||
|
// provides resilience when Tailscale DNS is down or a node is
|
||||||
|
// reachable via LAN but not Tailnet.
|
||||||
type NodeConfig struct {
|
type NodeConfig struct {
|
||||||
Name string `toml:"name"`
|
Name string `toml:"name"`
|
||||||
Address string `toml:"address"`
|
Address string `toml:"address"`
|
||||||
|
Addresses []string `toml:"addresses,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// AllAddresses returns the node's primary address followed by any
|
||||||
|
// fallback addresses, deduplicated.
|
||||||
|
func (n NodeConfig) AllAddresses() []string {
|
||||||
|
seen := make(map[string]bool)
|
||||||
|
var addrs []string
|
||||||
|
for _, a := range append([]string{n.Address}, n.Addresses...) {
|
||||||
|
if a != "" && !seen[a] {
|
||||||
|
seen[a] = true
|
||||||
|
addrs = append(addrs, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return addrs
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadCLIConfig reads and validates a CLI configuration file.
|
// LoadCLIConfig reads and validates a CLI configuration file.
|
||||||
|
|||||||
@@ -64,9 +64,24 @@ type TimeoutsConfig struct {
|
|||||||
type MasterNodeConfig struct {
|
type MasterNodeConfig struct {
|
||||||
Name string `toml:"name"`
|
Name string `toml:"name"`
|
||||||
Address string `toml:"address"`
|
Address string `toml:"address"`
|
||||||
|
Addresses []string `toml:"addresses,omitempty"`
|
||||||
Role string `toml:"role"` // "worker", "edge", or "master"
|
Role string `toml:"role"` // "worker", "edge", or "master"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AllAddresses returns the node's primary address followed by any
|
||||||
|
// fallback addresses, deduplicated.
|
||||||
|
func (n MasterNodeConfig) AllAddresses() []string {
|
||||||
|
seen := make(map[string]bool)
|
||||||
|
var addrs []string
|
||||||
|
for _, a := range append([]string{n.Address}, n.Addresses...) {
|
||||||
|
if a != "" && !seen[a] {
|
||||||
|
seen[a] = true
|
||||||
|
addrs = append(addrs, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return addrs
|
||||||
|
}
|
||||||
|
|
||||||
// LoadMasterConfig reads and validates a master configuration file.
|
// LoadMasterConfig reads and validates a master configuration file.
|
||||||
func LoadMasterConfig(path string) (*MasterConfig, error) {
|
func LoadMasterConfig(path string) (*MasterConfig, error) {
|
||||||
data, err := os.ReadFile(path) //nolint:gosec // config path from trusted CLI flag
|
data, err := os.ReadFile(path) //nolint:gosec // config path from trusted CLI flag
|
||||||
|
|||||||
@@ -140,22 +140,31 @@ func NewAgentPool(caCertPath, token string) *AgentPool {
|
|||||||
|
|
||||||
// AddNode dials an agent and adds it to the pool.
|
// AddNode dials an agent and adds it to the pool.
|
||||||
func (p *AgentPool) AddNode(name, address string) error {
|
func (p *AgentPool) AddNode(name, address string) error {
|
||||||
client, err := DialAgent(address, p.caCert, p.token)
|
return p.AddNodeMulti(name, []string{address})
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddNodeMulti tries each address in order and adds the first successful
|
||||||
|
// connection to the pool.
|
||||||
|
func (p *AgentPool) AddNodeMulti(name string, addresses []string) error {
|
||||||
|
var lastErr error
|
||||||
|
for _, addr := range addresses {
|
||||||
|
client, err := DialAgent(addr, p.caCert, p.token)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("add node %s: %w", name, err)
|
lastErr = fmt.Errorf("%s: %w", addr, err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
client.Node = name
|
client.Node = name
|
||||||
|
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
|
||||||
|
|
||||||
// Close existing connection if re-adding.
|
|
||||||
if old, ok := p.clients[name]; ok {
|
if old, ok := p.clients[name]; ok {
|
||||||
_ = old.Close()
|
_ = old.Close()
|
||||||
}
|
}
|
||||||
p.clients[name] = client
|
p.clients[name] = client
|
||||||
|
p.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
return fmt.Errorf("add node %s: all addresses failed: %w", name, lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
// Get returns the agent client for a node.
|
// Get returns the agent client for a node.
|
||||||
func (p *AgentPool) Get(name string) (*AgentClient, error) {
|
func (p *AgentPool) Get(name string) (*AgentClient, error) {
|
||||||
|
|||||||
@@ -51,12 +51,23 @@ func (m *Master) Deploy(ctx context.Context, req *mcpv1.MasterDeployRequest) (*m
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse the node's Tailnet IP from its address (host:port).
|
// Resolve the node's address to an IP for DNS registration.
|
||||||
|
// Node addresses may be Tailscale DNS names (e.g., rift.scylla-hammerhead.ts.net:9444)
|
||||||
|
// but MCNS needs an IP address for A records.
|
||||||
nodeHost, _, err := net.SplitHostPort(node.Address)
|
nodeHost, _, err := net.SplitHostPort(node.Address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
resp.Error = fmt.Sprintf("invalid node address %q: %v", node.Address, err)
|
resp.Error = fmt.Sprintf("invalid node address %q: %v", node.Address, err)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
// If nodeHost is not an IP, resolve it.
|
||||||
|
if net.ParseIP(nodeHost) == nil {
|
||||||
|
ips, lookupErr := net.LookupHost(nodeHost)
|
||||||
|
if lookupErr != nil || len(ips) == 0 {
|
||||||
|
m.Logger.Warn("cannot resolve node address", "host", nodeHost, "err", lookupErr)
|
||||||
|
} else {
|
||||||
|
nodeHost = ips[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Step 2: Forward deploy to the agent.
|
// Step 2: Forward deploy to the agent.
|
||||||
client, err := m.Pool.Get(nodeName)
|
client, err := m.Pool.Get(nodeName)
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ func Run(cfg *config.MasterConfig, version string) error {
|
|||||||
// Create agent connection pool.
|
// Create agent connection pool.
|
||||||
pool := NewAgentPool(cfg.Master.CACert, token)
|
pool := NewAgentPool(cfg.Master.CACert, token)
|
||||||
for _, n := range cfg.Nodes {
|
for _, n := range cfg.Nodes {
|
||||||
if addErr := pool.AddNode(n.Name, n.Address); addErr != nil {
|
if addErr := pool.AddNodeMulti(n.Name, n.AllAddresses()); addErr != nil {
|
||||||
logger.Warn("failed to connect to agent", "node", n.Name, "err", addErr)
|
logger.Warn("failed to connect to agent", "node", n.Name, "err", addErr)
|
||||||
// Non-fatal: the node may come up later.
|
// Non-fatal: the node may come up later.
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,8 +16,10 @@ import (
|
|||||||
// ServiceDef is the top-level TOML structure for a service definition file.
|
// ServiceDef is the top-level TOML structure for a service definition file.
|
||||||
type ServiceDef struct {
|
type ServiceDef struct {
|
||||||
Name string `toml:"name"`
|
Name string `toml:"name"`
|
||||||
Node string `toml:"node"`
|
Node string `toml:"node,omitempty"`
|
||||||
|
Tier string `toml:"tier,omitempty"`
|
||||||
Active *bool `toml:"active,omitempty"`
|
Active *bool `toml:"active,omitempty"`
|
||||||
|
Comment string `toml:"comment,omitempty"`
|
||||||
Path string `toml:"path,omitempty"`
|
Path string `toml:"path,omitempty"`
|
||||||
Build *BuildDef `toml:"build,omitempty"`
|
Build *BuildDef `toml:"build,omitempty"`
|
||||||
Components []ComponentDef `toml:"components"`
|
Components []ComponentDef `toml:"components"`
|
||||||
@@ -36,6 +38,7 @@ type RouteDef struct {
|
|||||||
Port int `toml:"port"`
|
Port int `toml:"port"`
|
||||||
Mode string `toml:"mode,omitempty"`
|
Mode string `toml:"mode,omitempty"`
|
||||||
Hostname string `toml:"hostname,omitempty"`
|
Hostname string `toml:"hostname,omitempty"`
|
||||||
|
Public bool `toml:"public,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ComponentDef describes a single container component within a service.
|
// ComponentDef describes a single container component within a service.
|
||||||
@@ -129,8 +132,9 @@ func validate(def *ServiceDef) error {
|
|||||||
if def.Name == "" {
|
if def.Name == "" {
|
||||||
return fmt.Errorf("service name is required")
|
return fmt.Errorf("service name is required")
|
||||||
}
|
}
|
||||||
if def.Node == "" {
|
// v2: either node or tier must be set. Tier defaults to "worker" if both empty.
|
||||||
return fmt.Errorf("service node is required")
|
if def.Node == "" && def.Tier == "" {
|
||||||
|
def.Tier = "worker"
|
||||||
}
|
}
|
||||||
if len(def.Components) == 0 {
|
if len(def.Components) == 0 {
|
||||||
return fmt.Errorf("service %q must have at least one component", def.Name)
|
return fmt.Errorf("service %q must have at least one component", def.Name)
|
||||||
@@ -193,6 +197,9 @@ func ToProto(def *ServiceDef) *mcpv1.ServiceSpec {
|
|||||||
spec := &mcpv1.ServiceSpec{
|
spec := &mcpv1.ServiceSpec{
|
||||||
Name: def.Name,
|
Name: def.Name,
|
||||||
Active: def.Active != nil && *def.Active,
|
Active: def.Active != nil && *def.Active,
|
||||||
|
Comment: def.Comment,
|
||||||
|
Tier: def.Tier,
|
||||||
|
Node: def.Node,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, c := range def.Components {
|
for _, c := range def.Components {
|
||||||
@@ -213,6 +220,7 @@ func ToProto(def *ServiceDef) *mcpv1.ServiceSpec {
|
|||||||
Port: int32(r.Port), //nolint:gosec // port range validated
|
Port: int32(r.Port), //nolint:gosec // port range validated
|
||||||
Mode: r.Mode,
|
Mode: r.Mode,
|
||||||
Hostname: r.Hostname,
|
Hostname: r.Hostname,
|
||||||
|
Public: r.Public,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
spec.Components = append(spec.Components, cs)
|
spec.Components = append(spec.Components, cs)
|
||||||
@@ -229,7 +237,9 @@ func FromProto(spec *mcpv1.ServiceSpec, node string) *ServiceDef {
|
|||||||
def := &ServiceDef{
|
def := &ServiceDef{
|
||||||
Name: spec.GetName(),
|
Name: spec.GetName(),
|
||||||
Node: node,
|
Node: node,
|
||||||
|
Tier: spec.GetTier(),
|
||||||
Active: &active,
|
Active: &active,
|
||||||
|
Comment: spec.GetComment(),
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, c := range spec.GetComponents() {
|
for _, c := range spec.GetComponents() {
|
||||||
@@ -250,6 +260,7 @@ func FromProto(spec *mcpv1.ServiceSpec, node string) *ServiceDef {
|
|||||||
Port: int(r.GetPort()),
|
Port: int(r.GetPort()),
|
||||||
Mode: r.GetMode(),
|
Mode: r.GetMode(),
|
||||||
Hostname: r.GetHostname(),
|
Hostname: r.GetHostname(),
|
||||||
|
Public: r.GetPublic(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
def.Components = append(def.Components, cd)
|
def.Components = append(def.Components, cd)
|
||||||
|
|||||||
@@ -119,14 +119,8 @@ func TestValidation(t *testing.T) {
|
|||||||
},
|
},
|
||||||
wantErr: "service name is required",
|
wantErr: "service name is required",
|
||||||
},
|
},
|
||||||
{
|
// v2: missing node no longer errors — defaults to tier=worker.
|
||||||
name: "missing node",
|
// Tested separately in TestValidationNodeTierDefault.
|
||||||
def: &ServiceDef{
|
|
||||||
Name: "svc",
|
|
||||||
Components: []ComponentDef{{Name: "api", Image: "img:v1"}},
|
|
||||||
},
|
|
||||||
wantErr: "service node is required",
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
name: "empty components",
|
name: "empty components",
|
||||||
def: &ServiceDef{
|
def: &ServiceDef{
|
||||||
|
|||||||
Reference in New Issue
Block a user