Files
mcp/internal/agent/sync.go
Kyle Isom 76247978c2 Fix protoToComponent to include routes in synced components
Routes from the proto ComponentSpec were dropped during sync, causing
the deploy flow to see empty regRoutes and skip cert provisioning,
route registration, and DNS registration.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 14:39:26 -07:00

203 lines
6.4 KiB
Go

package agent
import (
"context"
"fmt"
"strings"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/registry"
"git.wntrmute.dev/mc/mcp/internal/runtime"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// SyncDesiredState reconciles the agent's registry with the declared service
// specs. It creates or updates services and components, then discovers
// untracked containers and adds them with desired_state "ignore".
func (a *Agent) SyncDesiredState(ctx context.Context, req *mcpv1.SyncDesiredStateRequest) (*mcpv1.SyncDesiredStateResponse, error) {
a.Logger.Info("SyncDesiredState", "services", len(req.GetServices()))
known := make(map[string]bool)
var results []*mcpv1.ServiceSyncResult
for _, spec := range req.GetServices() {
if spec.GetName() == "" {
return nil, status.Error(codes.InvalidArgument, "service name is required")
}
r, names, err := a.syncService(ctx, spec)
if err != nil {
return nil, err
}
for _, n := range names {
known[n] = true
}
results = append(results, r)
}
// Reconciliation: find containers not in the registry and add them.
if err := a.reconcileUntracked(ctx, known); err != nil {
a.Logger.Info("reconcile untracked containers failed", "error", err)
}
return &mcpv1.SyncDesiredStateResponse{Results: results}, nil
}
// syncService creates or updates a single service and its components.
// It returns the sync result and a list of container names that belong to
// this service.
func (a *Agent) syncService(_ context.Context, spec *mcpv1.ServiceSpec) (*mcpv1.ServiceSyncResult, []string, error) {
result := &mcpv1.ServiceSyncResult{Name: spec.GetName()}
var changes []string
var containerNames []string
desiredState := "running"
if !spec.GetActive() {
desiredState = "stopped"
}
// Create or update the service record.
existing, err := registry.GetService(a.DB, spec.GetName())
if err != nil {
// Service does not exist; create it.
if err := registry.CreateService(a.DB, spec.GetName(), spec.GetActive()); err != nil {
return nil, nil, status.Errorf(codes.Internal, "create service %q: %v", spec.GetName(), err)
}
changes = append(changes, "created service")
} else if existing.Active != spec.GetActive() {
if err := registry.UpdateServiceActive(a.DB, spec.GetName(), spec.GetActive()); err != nil {
return nil, nil, status.Errorf(codes.Internal, "update service %q: %v", spec.GetName(), err)
}
changes = append(changes, fmt.Sprintf("active: %v -> %v", existing.Active, spec.GetActive()))
}
// Create or update each component.
for _, cs := range spec.GetComponents() {
containerName := spec.GetName() + "-" + cs.GetName()
containerNames = append(containerNames, containerName)
comp := protoToComponent(spec.GetName(), cs, desiredState)
if componentExists(a.DB, spec.GetName(), cs.GetName()) {
if err := registry.UpdateComponentSpec(a.DB, comp); err != nil {
return nil, nil, status.Errorf(codes.Internal, "update component %q/%q: %v", spec.GetName(), cs.GetName(), err)
}
if err := registry.UpdateComponentState(a.DB, spec.GetName(), cs.GetName(), desiredState, ""); err != nil {
return nil, nil, status.Errorf(codes.Internal, "update component state %q/%q: %v", spec.GetName(), cs.GetName(), err)
}
changes = append(changes, fmt.Sprintf("updated %s", cs.GetName()))
} else {
if err := registry.CreateComponent(a.DB, comp); err != nil {
return nil, nil, status.Errorf(codes.Internal, "create component %q/%q: %v", spec.GetName(), cs.GetName(), err)
}
changes = append(changes, fmt.Sprintf("created %s", cs.GetName()))
}
}
result.Changed = len(changes) > 0
result.Summary = strings.Join(changes, "; ")
if !result.Changed {
result.Summary = "no changes"
}
a.Logger.Info("sync service", "service", spec.GetName(), "changed", result.Changed, "summary", result.Summary)
return result, containerNames, nil
}
// reconcileUntracked lists all containers from the runtime and adds any that
// are not already tracked in the registry with desired_state "ignore".
func (a *Agent) reconcileUntracked(ctx context.Context, known map[string]bool) error {
containers, err := a.Runtime.List(ctx)
if err != nil {
return fmt.Errorf("list containers: %w", err)
}
for _, c := range containers {
if known[c.Name] {
continue
}
service, component, ok := parseContainerName(c.Name)
if !ok {
continue
}
if componentExists(a.DB, service, component) {
continue
}
if _, err := registry.GetService(a.DB, service); err != nil {
if err := registry.CreateService(a.DB, service, true); err != nil {
a.Logger.Info("reconcile: create service failed", "service", service, "error", err)
continue
}
}
comp := &registry.Component{
Name: component,
Service: service,
Image: c.Image,
Network: c.Network,
UserSpec: c.User,
Restart: c.Restart,
DesiredState: "ignore",
ObservedState: c.State,
Version: runtime.ExtractVersion(c.Image),
}
if err := registry.CreateComponent(a.DB, comp); err != nil {
a.Logger.Info("reconcile: create component failed", "container", c.Name, "error", err)
continue
}
a.Logger.Info("reconcile: adopted untracked container", "container", c.Name, "desired_state", "ignore")
}
return nil
}
// protoToComponent converts a proto ComponentSpec to a registry Component.
func protoToComponent(service string, cs *mcpv1.ComponentSpec, desiredState string) *registry.Component {
var routes []registry.Route
for _, r := range cs.GetRoutes() {
mode := r.GetMode()
if mode == "" {
mode = "l4"
}
name := r.GetName()
if name == "" {
name = "default"
}
routes = append(routes, registry.Route{
Name: name,
Port: int(r.GetPort()),
Mode: mode,
Hostname: r.GetHostname(),
})
}
return &registry.Component{
Name: cs.GetName(),
Service: service,
Image: cs.GetImage(),
Network: cs.GetNetwork(),
UserSpec: cs.GetUser(),
Restart: cs.GetRestart(),
Ports: cs.GetPorts(),
Volumes: cs.GetVolumes(),
Cmd: cs.GetCmd(),
Routes: routes,
DesiredState: desiredState,
Version: runtime.ExtractVersion(cs.GetImage()),
}
}
// parseContainerName splits "service-component" into its parts. Returns false
// if the name does not contain a hyphen.
func parseContainerName(name string) (service, component string, ok bool) {
i := strings.IndexByte(name, '-')
if i < 0 {
return "", "", false
}
return name[:i], name[i+1:], true
}