20 Commits

Author SHA1 Message Date
714320c018 Add edge routing and health check RPCs (Phase 2)
New agent RPCs for v2 multi-node orchestration:

- SetupEdgeRoute: provisions TLS cert from Metacrypt, resolves backend
  hostname to Tailnet IP, validates it's in 100.64.0.0/10, registers
  L7 route in mc-proxy. Rejects backend_tls=false.
- RemoveEdgeRoute: removes mc-proxy route, cleans up TLS cert, removes
  registry entry.
- ListEdgeRoutes: returns all edge routes with cert serial/expiry.
- HealthCheck: returns agent health and container count.

New database table (migration 4): edge_routes stores hostname, backend
info, and cert paths for persistence across agent restarts.

ProxyRouter gains CertPath/KeyPath helpers for consistent cert path
construction.

Security:
- Backend hostname must resolve to a Tailnet IP (100.64.0.0/10)
- backend_tls=false is rejected (no cleartext to backends)
- Cert provisioning failure fails the setup (no route to missing cert)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 13:13:10 -07:00
fa8ba6fac1 Move ARCHITECTURE_V2.md to metacircular docs
The v2 architecture doc is platform-wide (covers master, agents,
edge routing, snapshots, migration across all nodes). Moved to
docs/architecture-v2.md in the metacircular workspace repo.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 11:09:09 -07:00
f66758b92b Hardcode version in flake.nix
The git fetcher doesn't provide gitDescribe, so the Nix build was
falling through to shortRev and producing commit-hash versions instead
of tag-based ones.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 17:46:12 -07:00
09d0d197c3 Add component-level targeting to start, stop, and restart
Allow start/stop/restart to target a single component via
<service>/<component> syntax, matching deploy/logs/purge. When a
component is specified, start/stop skip toggling the service-level
active flag. Agent-side filtering returns NotFound for unknown
components.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 17:26:05 -07:00
52914d50b0 Pass mode, backend-tls, and tls cert/key through route add
The --mode flag was defined but never wired through to the RPC.
Add tls_cert and tls_key fields to AddProxyRouteRequest so L7
routes can be created via mcp route add.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 20:44:44 -07:00
bb4bee51ba Add mono-repo consideration to ARCHITECTURE_V2.md open questions
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 20:40:32 -07:00
4ac8a6d60b Add ARCHITECTURE_V2.md for multi-node master/agent topology
Documents the planned v2 architecture: mcp-master on straylight
coordinates deployments across worker (rift) and edge (svc) nodes.
Includes edge routing flow, agent RPCs, migration plan, and
operational issues from v1 that motivate the redesign.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 20:37:24 -07:00
d8f45ca520 Merge explicit ports with route-allocated ports during deploy
Previously, explicit port mappings from the service definition were
ignored when routes were present. Now both are included, allowing
services to have stable external port bindings alongside dynamic
route-allocated ports.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 19:28:40 -07:00
95f86157b4 Add mcp route command for managing mc-proxy routes
New top-level command with list, add, remove subcommands. Supports
-n/--node to target a specific node. Adds AddProxyRoute and
RemoveProxyRoute RPCs to the agent. Moves route listing from
mcp node routes to mcp route list.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 19:10:11 -07:00
93e26d3789 Add mcp dns and mcp node routes commands
mcp dns queries MCNS via an agent to list all zones and DNS records.
mcp node routes queries mc-proxy on each node for listener/route status,
matching the mcproxyctl status output format.

New agent RPCs: ListDNSRecords, ListProxyRoutes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 18:51:53 -07:00
3d2edb7c26 Fall back to podman logs when journalctl is inaccessible
Probe journalctl with -n 0 before committing to it. When the journal
is not readable (e.g. rootless podman without user journal storage),
fall back to podman logs instead of streaming the permission error.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 17:54:14 -07:00
bf02935716 Add agent version to mcp node list
Thread the linker-injected version string into the Agent struct and
return it in the NodeStatus RPC. The CLI now dials each node and
displays the agent version alongside name and address.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 17:49:49 -07:00
c4f0d7be8e Fix mcp logs permission error for rootless podman journald driver
Rootless podman writes container logs to the user journal, but
journalctl without --user only reads the system journal. Add --user
when the agent is running as a non-root user.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 16:46:01 -07:00
4d900eafd1 Derive flake version from git rev instead of hardcoding
Eliminates the manual version bump in flake.nix on each release.
Uses self.shortRev (or dirtyShortRev) since self.gitDescribe is not
yet available in this Nix version. Makefile builds still get the full
git describe output via ldflags.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 22:48:44 -07:00
38f9070c24 Add top-level mcp edit command
Shortcut for mcp service edit — opens the service definition in $EDITOR.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 22:44:19 -07:00
67d0ab1d9d Bump flake.nix version to 0.7.5
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 19:40:29 -07:00
7383b370f0 Fix mcp ps showing registry version instead of runtime, error on unknown component
mcp ps now uses the actual container image and version from the runtime
instead of the registry, which could be stale after a failed deploy.

Deploy now returns an error when the component filter matches nothing
instead of silently succeeding with zero results.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 19:13:02 -07:00
4c847e6de9 Fix extraneous blank lines in mcp logs output
Skip empty lines from the scanner that result from double newlines
(application slog trailing newline + container runtime newline).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 18:22:38 -07:00
14b978861f Add mcp logs command for streaming container logs
New server-streaming Logs RPC streams container output to the CLI.
Supports --tail/-n, --follow/-f, --timestamps/-t, --since.

Detects journald log driver and falls back to journalctl (podman logs
can't read journald outside the originating user session). New containers
default to k8s-file via mcp user's containers.conf.

Also adds stream auth interceptor for the agent gRPC server (required
for streaming RPCs).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 17:54:48 -07:00
18365cc0a8 Document system account auth model in ARCHITECTURE.md
Replaces the "admin required for all operations" model with the new
three-tier identity model: human operators for CLI, mcp-agent system
account for infrastructure automation, admin reserved for MCIAS-level
administration. Documents agent-to-service token paths and per-service
authorization policies.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 16:11:08 -07:00
30 changed files with 3401 additions and 113 deletions

View File

@@ -121,9 +121,26 @@ option for future security hardening.
## Authentication and Authorization ## Authentication and Authorization
MCP follows the platform authentication model: all auth is delegated to MCP follows the platform authentication model: all auth is delegated to
MCIAS. MCIAS. The auth model separates three concerns: operator intent (CLI to
agent), infrastructure automation (agent to platform services), and
access control (who can do what).
### Agent Authentication ### Identity Model
| Identity | Type | Purpose |
|----------|------|---------|
| Human operator (e.g., `kyle`) | human | CLI operations: deploy, stop, start, build |
| `mcp-agent` | system | Agent-to-service automation: certs, DNS, routes, image pull |
| Per-service accounts (e.g., `mcq`) | system | Scoped self-management (own DNS records only) |
| `admin` role | role | MCIAS account management, policy changes, zone creation |
| `guest` role | role | Explicitly rejected by the agent |
The `admin` role is reserved for MCIAS-level administrative operations
(account creation, policy management, zone mutations). Routine MCP
operations (deploy, stop, start, build) do not require admin — any
authenticated non-guest user or system account is accepted.
### Agent Authentication (CLI → Agent)
The agent is a gRPC server with a unary interceptor that enforces The agent is a gRPC server with a unary interceptor that enforces
authentication on every RPC: authentication on every RPC:
@@ -132,10 +149,34 @@ authentication on every RPC:
(`authorization: Bearer <token>`). (`authorization: Bearer <token>`).
2. Agent extracts the token and validates it against MCIAS (cached 30s by 2. Agent extracts the token and validates it against MCIAS (cached 30s by
SHA-256 of the token, per platform convention). SHA-256 of the token, per platform convention).
3. Agent checks that the caller has the `admin` role. All MCP operations 3. Agent rejects guests (`guest` role → `PERMISSION_DENIED`). All other
require admin -- there is no unprivileged MCP access. authenticated users and system accounts are accepted.
4. If validation fails, the RPC returns `UNAUTHENTICATED` (invalid/expired 4. If validation fails, the RPC returns `UNAUTHENTICATED` (invalid/expired
token) or `PERMISSION_DENIED` (valid token, not admin). token) or `PERMISSION_DENIED` (guest).
### Agent Service Authentication (Agent → Platform Services)
The agent authenticates to platform services using a long-lived system
account token (`mcp-agent`). Each service has its own token file:
| Service | Token Path | Operations |
|---------|------------|------------|
| Metacrypt | `/srv/mcp/metacrypt-token` | TLS cert provisioning (PKI issue) |
| MCNS | `/srv/mcp/mcns-token` | DNS record create/delete (any name) |
| mc-proxy | Unix socket (no auth) | Route registration/removal |
| MCR | podman auth store | Image pull (JWT-as-password) |
These tokens are issued by MCIAS for the `mcp-agent` system account.
They carry no roles — authorization is handled by each service's policy
engine:
- **Metacrypt:** Policy rule grants `mcp-agent` write access to
`engine/pki/issue`.
- **MCNS:** Code-level authorization: system account `mcp-agent` can
manage any record; other system accounts can only manage records
matching their username.
- **MCR:** Default policy allows all authenticated users to push/pull.
MCR accepts MCIAS JWTs as passwords at the `/v2/token` endpoint.
### CLI Authentication ### CLI Authentication
@@ -148,6 +189,15 @@ obtained by:
The stored token is used for all subsequent agent RPCs until it expires. The stored token is used for all subsequent agent RPCs until it expires.
### MCR Registry Authentication
`mcp build` auto-authenticates to MCR before pushing images. It reads
the CLI's stored MCIAS token and uses it as the password for `podman
login`. MCR's token endpoint accepts MCIAS JWTs as passwords (the
personal-access-token pattern), so both human and system account tokens
work. This eliminates the need for a separate interactive `podman login`
step.
--- ---
## Services and Components ## Services and Components
@@ -224,6 +274,9 @@ mcp pull <service> <path> [local-file] Copy a file from /srv/<service>/<path> to
mcp node list List registered nodes mcp node list List registered nodes
mcp node add <name> <address> Register a node mcp node add <name> <address> Register a node
mcp node remove <name> Deregister a node mcp node remove <name> Deregister a node
mcp agent upgrade [node] Build, push, and restart agent on all (or one) node(s)
mcp agent status Show agent version on each node
``` ```
### Service Definition Files ### Service Definition Files
@@ -1144,20 +1197,84 @@ The agent's data directory follows the platform convention:
### Agent Deployment (on nodes) ### Agent Deployment (on nodes)
The agent is deployed like any other Metacircular service: #### Provisioning (one-time per node)
1. Provision the `mcp` system user via NixOS config (with podman access Each node needs a one-time setup before the agent can run. The steps are
and subuid/subgid ranges for rootless containers). the same regardless of OS, but the mechanism differs:
1. Create `mcp` system user with podman access and subuid/subgid ranges.
2. Set `/srv/` ownership to the `mcp` user (the agent creates and manages 2. Set `/srv/` ownership to the `mcp` user (the agent creates and manages
`/srv/<service>/` directories for all services). `/srv/<service>/` directories for all services).
3. Create `/srv/mcp/` directory and config file. 3. Create `/srv/mcp/` directory and config file.
4. Provision TLS certificate from Metacrypt. 4. Provision TLS certificate from Metacrypt.
5. Create an MCIAS system account for the agent (`mcp-agent`). 5. Create an MCIAS system account for the agent (`mcp-agent`).
6. Install the `mcp-agent` binary. 6. Install the initial `mcp-agent` binary to `/srv/mcp/mcp-agent`.
7. Start via systemd unit. 7. Install and start the systemd unit.
The agent runs as a systemd service. Container-first deployment is a v2 On **NixOS** (rift), provisioning is declarative via the NixOS config.
concern -- MCP needs to be running before it can manage its own agent. The NixOS config owns the infrastructure (user, systemd unit, podman,
directories, permissions) but **not** the binary. `ExecStart` points to
`/srv/mcp/mcp-agent`, a mutable path that MCP manages. NixOS may
bootstrap the initial binary there, but subsequent updates come from MCP.
On **Debian** (hyperborea, svc), provisioning is done via a setup script
or ansible playbook that creates the same layout.
#### Binary Location
The agent binary lives at `/srv/mcp/mcp-agent` on **all** nodes,
regardless of OS. This unifies the update mechanism across the fleet.
#### Agent Upgrades
After initial provisioning, the agent binary is updated via
`mcp agent upgrade`. The CLI:
1. Cross-compiles the agent for each target architecture
(`GOARCH=amd64` for rift/svc, `GOARCH=arm64` for hyperborea).
2. SSHs to each node, pushes the binary to `/srv/mcp/mcp-agent.new`.
3. Atomically swaps the binary (`mv mcp-agent.new mcp-agent`).
4. Restarts the systemd service (`systemctl restart mcp-agent`).
SSH is used instead of gRPC because:
- It works even when the agent is broken or has an incompatible version.
- The binary is ~17MB, which exceeds gRPC default message limits.
- No self-restart coordination needed.
The CLI uses `golang.org/x/crypto/ssh` for native SSH, keeping the
entire workflow in a single binary with no external tool dependencies.
#### Node Configuration
Node config includes SSH and architecture info for agent management:
```toml
[[nodes]]
name = "rift"
address = "100.95.252.120:9444"
ssh = "rift" # SSH host (from ~/.ssh/config or hostname)
arch = "amd64" # GOARCH for cross-compilation
[[nodes]]
name = "hyperborea"
address = "100.x.x.x:9444"
ssh = "hyperborea"
arch = "arm64"
```
#### Coordinated Upgrades
New MCP releases often add new RPCs. A CLI at v0.6.0 calling an agent
at v0.5.0 fails with `Unimplemented`. Therefore agent upgrades must be
coordinated: `mcp agent upgrade` (with no node argument) upgrades all
nodes before the CLI is used for other operations.
If a node fails to upgrade, it is reported but the others still proceed.
The operator can retry or investigate via SSH.
#### Systemd Unit
The systemd unit is the same on all nodes:
```ini ```ini
[Unit] [Unit]
@@ -1167,7 +1284,7 @@ Wants=network-online.target
[Service] [Service]
Type=simple Type=simple
ExecStart=/usr/local/bin/mcp-agent server --config /srv/mcp/mcp-agent.toml ExecStart=/srv/mcp/mcp-agent server --config /srv/mcp/mcp-agent.toml
Restart=on-failure Restart=on-failure
RestartSec=5 RestartSec=5
@@ -1175,17 +1292,14 @@ User=mcp
Group=mcp Group=mcp
NoNewPrivileges=true NoNewPrivileges=true
ProtectSystem=strict ProtectSystem=full
ProtectHome=true ProtectHome=false
PrivateTmp=true PrivateTmp=true
PrivateDevices=true PrivateDevices=true
ProtectKernelTunables=true ProtectKernelTunables=true
ProtectKernelModules=true ProtectKernelModules=true
ProtectControlGroups=true
RestrictSUIDSGID=true RestrictSUIDSGID=true
RestrictNamespaces=true
LockPersonality=true LockPersonality=true
MemoryDenyWriteExecute=true
RestrictRealtime=true RestrictRealtime=true
ReadWritePaths=/srv ReadWritePaths=/srv
@@ -1195,6 +1309,7 @@ WantedBy=multi-user.target
Note: `ReadWritePaths=/srv` (not `/srv/mcp`) because the agent writes Note: `ReadWritePaths=/srv` (not `/srv/mcp`) because the agent writes
files to any service's `/srv/<service>/` directory on behalf of the CLI. files to any service's `/srv/<service>/` directory on behalf of the CLI.
`ProtectHome=false` because the `mcp` user's home is `/srv/mcp`.
### CLI Installation (on operator workstation) ### CLI Installation (on operator workstation)

View File

@@ -38,7 +38,7 @@ func main() {
if err != nil { if err != nil {
return fmt.Errorf("load config: %w", err) return fmt.Errorf("load config: %w", err)
} }
return agent.Run(cfg) return agent.Run(cfg, version)
}, },
}) })

View File

@@ -43,6 +43,7 @@ func dialAgent(address string, cfg *config.CLIConfig) (mcpv1.McpAgentServiceClie
address, address,
grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
grpc.WithUnaryInterceptor(tokenInterceptor(token)), grpc.WithUnaryInterceptor(tokenInterceptor(token)),
grpc.WithStreamInterceptor(streamTokenInterceptor(token)),
) )
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("dial %q: %w", address, err) return nil, nil, fmt.Errorf("dial %q: %w", address, err)
@@ -60,6 +61,15 @@ func tokenInterceptor(token string) grpc.UnaryClientInterceptor {
} }
} }
// streamTokenInterceptor returns a gRPC client stream interceptor that
// attaches the bearer token to outgoing stream metadata.
func streamTokenInterceptor(token string) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token)
return streamer(ctx, desc, cc, method, opts...)
}
}
// loadBearerToken reads the token from file or env var. // loadBearerToken reads the token from file or env var.
func loadBearerToken(cfg *config.CLIConfig) (string, error) { func loadBearerToken(cfg *config.CLIConfig) (string, error) {
if token := os.Getenv("MCP_TOKEN"); token != "" { if token := os.Getenv("MCP_TOKEN"); token != "" {

87
cmd/mcp/dns.go Normal file
View File

@@ -0,0 +1,87 @@
package main
import (
"context"
"fmt"
"os"
"time"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/config"
"github.com/spf13/cobra"
)
func dnsCmd() *cobra.Command {
return &cobra.Command{
Use: "dns",
Short: "List all DNS zones and records from MCNS",
RunE: runDNS,
}
}
func runDNS(_ *cobra.Command, _ []string) error {
cfg, err := config.LoadCLIConfig(cfgPath)
if err != nil {
return fmt.Errorf("load config: %w", err)
}
// DNS is centralized — query the first reachable agent.
resp, nodeName, err := queryDNS(cfg)
if err != nil {
return err
}
if len(resp.GetZones()) == 0 {
fmt.Println("no DNS zones configured")
return nil
}
_ = nodeName
for i, zone := range resp.GetZones() {
if i > 0 {
fmt.Println()
}
fmt.Printf("ZONE: %s\n", zone.GetName())
if len(zone.GetRecords()) == 0 {
fmt.Println(" (no records)")
continue
}
w := newTable()
_, _ = fmt.Fprintln(w, " NAME\tTYPE\tVALUE\tTTL")
for _, r := range zone.GetRecords() {
_, _ = fmt.Fprintf(w, " %s\t%s\t%s\t%d\n",
r.GetName(), r.GetType(), r.GetValue(), r.GetTtl())
}
_ = w.Flush()
}
return nil
}
// queryDNS tries each configured agent and returns the first successful
// DNS listing. DNS is centralized so any agent with MCNS configured works.
func queryDNS(cfg *config.CLIConfig) (*mcpv1.ListDNSRecordsResponse, string, error) {
for _, node := range cfg.Nodes {
client, conn, err := dialAgent(node.Address, cfg)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "warning: %s: %v\n", node.Name, err)
continue
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
resp, err := client.ListDNSRecords(ctx, &mcpv1.ListDNSRecordsRequest{})
cancel()
_ = conn.Close()
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "warning: %s: list DNS: %v\n", node.Name, err)
continue
}
return resp, node.Name, nil
}
return nil, "", fmt.Errorf("no reachable agent with DNS configured")
}

12
cmd/mcp/edit.go Normal file
View File

@@ -0,0 +1,12 @@
package main
import "github.com/spf13/cobra"
func editCmd() *cobra.Command {
return &cobra.Command{
Use: "edit <service>",
Short: "Open service definition in $EDITOR",
Args: cobra.ExactArgs(1),
RunE: runServiceEdit,
}
}

View File

@@ -14,8 +14,8 @@ import (
func stopCmd() *cobra.Command { func stopCmd() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "stop <service>", Use: "stop <service>[/<component>]",
Short: "Stop all components, set active=false", Short: "Stop components (or all), set active=false",
Args: cobra.ExactArgs(1), Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
cfg, err := config.LoadCLIConfig(cfgPath) cfg, err := config.LoadCLIConfig(cfgPath)
@@ -23,7 +23,7 @@ func stopCmd() *cobra.Command {
return fmt.Errorf("load config: %w", err) return fmt.Errorf("load config: %w", err)
} }
serviceName := args[0] serviceName, component := parseServiceArg(args[0])
defPath := filepath.Join(cfg.Services.Dir, serviceName+".toml") defPath := filepath.Join(cfg.Services.Dir, serviceName+".toml")
def, err := servicedef.Load(defPath) def, err := servicedef.Load(defPath)
@@ -31,10 +31,13 @@ func stopCmd() *cobra.Command {
return fmt.Errorf("load service def: %w", err) return fmt.Errorf("load service def: %w", err)
} }
active := false // Only flip active=false when stopping the whole service.
def.Active = &active if component == "" {
if err := servicedef.Write(defPath, def); err != nil { active := false
return fmt.Errorf("write service def: %w", err) def.Active = &active
if err := servicedef.Write(defPath, def); err != nil {
return fmt.Errorf("write service def: %w", err)
}
} }
address, err := findNodeAddress(cfg, def.Node) address, err := findNodeAddress(cfg, def.Node)
@@ -49,7 +52,8 @@ func stopCmd() *cobra.Command {
defer func() { _ = conn.Close() }() defer func() { _ = conn.Close() }()
resp, err := client.StopService(context.Background(), &mcpv1.StopServiceRequest{ resp, err := client.StopService(context.Background(), &mcpv1.StopServiceRequest{
Name: serviceName, Name: serviceName,
Component: component,
}) })
if err != nil { if err != nil {
return fmt.Errorf("stop service: %w", err) return fmt.Errorf("stop service: %w", err)
@@ -63,8 +67,8 @@ func stopCmd() *cobra.Command {
func startCmd() *cobra.Command { func startCmd() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "start <service>", Use: "start <service>[/<component>]",
Short: "Start all components, set active=true", Short: "Start components (or all), set active=true",
Args: cobra.ExactArgs(1), Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
cfg, err := config.LoadCLIConfig(cfgPath) cfg, err := config.LoadCLIConfig(cfgPath)
@@ -72,7 +76,7 @@ func startCmd() *cobra.Command {
return fmt.Errorf("load config: %w", err) return fmt.Errorf("load config: %w", err)
} }
serviceName := args[0] serviceName, component := parseServiceArg(args[0])
defPath := filepath.Join(cfg.Services.Dir, serviceName+".toml") defPath := filepath.Join(cfg.Services.Dir, serviceName+".toml")
def, err := servicedef.Load(defPath) def, err := servicedef.Load(defPath)
@@ -80,10 +84,13 @@ func startCmd() *cobra.Command {
return fmt.Errorf("load service def: %w", err) return fmt.Errorf("load service def: %w", err)
} }
active := true // Only flip active=true when starting the whole service.
def.Active = &active if component == "" {
if err := servicedef.Write(defPath, def); err != nil { active := true
return fmt.Errorf("write service def: %w", err) def.Active = &active
if err := servicedef.Write(defPath, def); err != nil {
return fmt.Errorf("write service def: %w", err)
}
} }
address, err := findNodeAddress(cfg, def.Node) address, err := findNodeAddress(cfg, def.Node)
@@ -98,7 +105,8 @@ func startCmd() *cobra.Command {
defer func() { _ = conn.Close() }() defer func() { _ = conn.Close() }()
resp, err := client.StartService(context.Background(), &mcpv1.StartServiceRequest{ resp, err := client.StartService(context.Background(), &mcpv1.StartServiceRequest{
Name: serviceName, Name: serviceName,
Component: component,
}) })
if err != nil { if err != nil {
return fmt.Errorf("start service: %w", err) return fmt.Errorf("start service: %w", err)
@@ -112,8 +120,8 @@ func startCmd() *cobra.Command {
func restartCmd() *cobra.Command { func restartCmd() *cobra.Command {
return &cobra.Command{ return &cobra.Command{
Use: "restart <service>", Use: "restart <service>[/<component>]",
Short: "Restart all components", Short: "Restart components (or all)",
Args: cobra.ExactArgs(1), Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
cfg, err := config.LoadCLIConfig(cfgPath) cfg, err := config.LoadCLIConfig(cfgPath)
@@ -121,7 +129,7 @@ func restartCmd() *cobra.Command {
return fmt.Errorf("load config: %w", err) return fmt.Errorf("load config: %w", err)
} }
serviceName := args[0] serviceName, component := parseServiceArg(args[0])
defPath := filepath.Join(cfg.Services.Dir, serviceName+".toml") defPath := filepath.Join(cfg.Services.Dir, serviceName+".toml")
def, err := servicedef.Load(defPath) def, err := servicedef.Load(defPath)
@@ -141,7 +149,8 @@ func restartCmd() *cobra.Command {
defer func() { _ = conn.Close() }() defer func() { _ = conn.Close() }()
resp, err := client.RestartService(context.Background(), &mcpv1.RestartServiceRequest{ resp, err := client.RestartService(context.Background(), &mcpv1.RestartServiceRequest{
Name: serviceName, Name: serviceName,
Component: component,
}) })
if err != nil { if err != nil {
return fmt.Errorf("restart service: %w", err) return fmt.Errorf("restart service: %w", err)

81
cmd/mcp/logs.go Normal file
View File

@@ -0,0 +1,81 @@
package main
import (
"fmt"
"io"
"os"
"github.com/spf13/cobra"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/config"
)
func logsCmd() *cobra.Command {
var (
tail int
follow bool
timestamps bool
since string
)
cmd := &cobra.Command{
Use: "logs <service>[/<component>]",
Short: "Show container logs",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
cfg, err := config.LoadCLIConfig(cfgPath)
if err != nil {
return fmt.Errorf("load config: %w", err)
}
serviceName, component := parseServiceArg(args[0])
def, err := loadServiceDef(cmd, cfg, serviceName)
if err != nil {
return err
}
address, err := findNodeAddress(cfg, def.Node)
if err != nil {
return err
}
client, conn, err := dialAgent(address, cfg)
if err != nil {
return fmt.Errorf("dial agent: %w", err)
}
defer func() { _ = conn.Close() }()
stream, err := client.Logs(cmd.Context(), &mcpv1.LogsRequest{
Service: serviceName,
Component: component,
Tail: int32(tail),
Follow: follow,
Timestamps: timestamps,
Since: since,
})
if err != nil {
return fmt.Errorf("logs: %w", err)
}
for {
resp, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return fmt.Errorf("recv: %w", err)
}
_, _ = os.Stdout.Write(resp.Data)
}
},
}
cmd.Flags().IntVarP(&tail, "tail", "n", 0, "number of lines from end (0 = all)")
cmd.Flags().BoolVarP(&follow, "follow", "f", false, "follow log output")
cmd.Flags().BoolVarP(&timestamps, "timestamps", "t", false, "show timestamps")
cmd.Flags().StringVar(&since, "since", "", "show logs since (e.g., 2h, 2026-03-28T00:00:00Z)")
return cmd
}

View File

@@ -50,6 +50,10 @@ func main() {
root.AddCommand(pullCmd()) root.AddCommand(pullCmd())
root.AddCommand(nodeCmd()) root.AddCommand(nodeCmd())
root.AddCommand(purgeCmd()) root.AddCommand(purgeCmd())
root.AddCommand(logsCmd())
root.AddCommand(editCmd())
root.AddCommand(dnsCmd())
root.AddCommand(routeCmd())
if err := root.Execute(); err != nil { if err := root.Execute(); err != nil {
log.Fatal(err) log.Fatal(err)

View File

@@ -1,12 +1,15 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"text/tabwriter" "text/tabwriter"
"time"
toml "github.com/pelletier/go-toml/v2" toml "github.com/pelletier/go-toml/v2"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/config" "git.wntrmute.dev/mc/mcp/internal/config"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@@ -48,13 +51,35 @@ func runNodeList(_ *cobra.Command, _ []string) error {
} }
w := tabwriter.NewWriter(os.Stdout, 0, 4, 2, ' ', 0) w := tabwriter.NewWriter(os.Stdout, 0, 4, 2, ' ', 0)
_, _ = fmt.Fprintln(w, "NAME\tADDRESS") _, _ = fmt.Fprintln(w, "NAME\tADDRESS\tVERSION")
for _, n := range cfg.Nodes { for _, n := range cfg.Nodes {
_, _ = fmt.Fprintf(w, "%s\t%s\n", n.Name, n.Address) ver := queryAgentVersion(cfg, n.Address)
_, _ = fmt.Fprintf(w, "%s\t%s\t%s\n", n.Name, n.Address, ver)
} }
return w.Flush() return w.Flush()
} }
// queryAgentVersion dials the agent and returns its version, or an error indicator.
func queryAgentVersion(cfg *config.CLIConfig, address string) string {
client, conn, err := dialAgent(address, cfg)
if err != nil {
return "error"
}
defer func() { _ = conn.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.NodeStatus(ctx, &mcpv1.NodeStatusRequest{})
if err != nil {
return "error"
}
if resp.AgentVersion == "" {
return "unknown"
}
return resp.AgentVersion
}
func runNodeAdd(_ *cobra.Command, args []string) error { func runNodeAdd(_ *cobra.Command, args []string) error {
cfg, err := config.LoadCLIConfig(cfgPath) cfg, err := config.LoadCLIConfig(cfgPath)
if err != nil { if err != nil {

225
cmd/mcp/route.go Normal file
View File

@@ -0,0 +1,225 @@
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/spf13/cobra"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/config"
)
func routeCmd() *cobra.Command {
var nodeName string
cmd := &cobra.Command{
Use: "route",
Short: "Manage mc-proxy routes",
}
list := &cobra.Command{
Use: "list",
Short: "List mc-proxy routes",
RunE: func(_ *cobra.Command, _ []string) error {
return runRouteList(nodeName)
},
}
var (
routeMode string
backendTLS bool
tlsCert string
tlsKey string
)
add := &cobra.Command{
Use: "add <listener> <hostname> <backend>",
Short: "Add a route to mc-proxy",
Long: "Add a route. Example: mcp route add -n rift :443 mcq.svc.mcp.metacircular.net 127.0.0.1:48080 --mode l7 --tls-cert /srv/mc-proxy/certs/mcq.pem --tls-key /srv/mc-proxy/certs/mcq.key",
Args: cobra.ExactArgs(3),
RunE: func(_ *cobra.Command, args []string) error {
return runRouteAdd(nodeName, args, routeMode, backendTLS, tlsCert, tlsKey)
},
}
add.Flags().StringVar(&routeMode, "mode", "l4", "route mode (l4 or l7)")
add.Flags().BoolVar(&backendTLS, "backend-tls", false, "re-encrypt traffic to backend")
add.Flags().StringVar(&tlsCert, "tls-cert", "", "path to TLS cert on the node (required for l7)")
add.Flags().StringVar(&tlsKey, "tls-key", "", "path to TLS key on the node (required for l7)")
remove := &cobra.Command{
Use: "remove <listener> <hostname>",
Short: "Remove a route from mc-proxy",
Long: "Remove a route. Example: mcp route remove -n rift :443 mcq.metacircular.net",
Args: cobra.ExactArgs(2),
RunE: func(_ *cobra.Command, args []string) error {
return runRouteRemove(nodeName, args)
},
}
cmd.PersistentFlags().StringVarP(&nodeName, "node", "n", "", "target node (required)")
cmd.AddCommand(list, add, remove)
return cmd
}
func runRouteList(nodeName string) error {
if nodeName == "" {
return runRouteListAll()
}
cfg, err := config.LoadCLIConfig(cfgPath)
if err != nil {
return fmt.Errorf("load config: %w", err)
}
address, err := findNodeAddress(cfg, nodeName)
if err != nil {
return err
}
client, conn, err := dialAgent(address, cfg)
if err != nil {
return fmt.Errorf("dial agent: %w", err)
}
defer func() { _ = conn.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.ListProxyRoutes(ctx, &mcpv1.ListProxyRoutesRequest{})
if err != nil {
return fmt.Errorf("list routes: %w", err)
}
printRoutes(nodeName, resp)
return nil
}
func runRouteListAll() error {
first := true
return forEachNode(func(node config.NodeConfig, client mcpv1.McpAgentServiceClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.ListProxyRoutes(ctx, &mcpv1.ListProxyRoutesRequest{})
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "warning: %s: list routes: %v\n", node.Name, err)
return nil
}
if !first {
fmt.Println()
}
first = false
printRoutes(node.Name, resp)
return nil
})
}
func printRoutes(nodeName string, resp *mcpv1.ListProxyRoutesResponse) {
fmt.Printf("NODE: %s\n", nodeName)
fmt.Printf("mc-proxy %s\n", resp.GetVersion())
if resp.GetStartedAt() != nil {
uptime := time.Since(resp.GetStartedAt().AsTime()).Truncate(time.Second)
fmt.Printf("uptime: %s\n", uptime)
}
fmt.Printf("connections: %d\n", resp.GetTotalConnections())
fmt.Println()
for _, ls := range resp.GetListeners() {
fmt.Printf(" %s routes=%d active=%d\n",
ls.GetAddr(), ls.GetRouteCount(), ls.GetActiveConnections())
for _, r := range ls.GetRoutes() {
mode := r.GetMode()
if mode == "" {
mode = "l4"
}
extra := ""
if r.GetBackendTls() {
extra = " (re-encrypt)"
}
fmt.Printf(" %s %s → %s%s\n", mode, r.GetHostname(), r.GetBackend(), extra)
}
}
}
func runRouteAdd(nodeName string, args []string, mode string, backendTLS bool, tlsCert, tlsKey string) error {
if nodeName == "" {
return fmt.Errorf("--node is required")
}
cfg, err := config.LoadCLIConfig(cfgPath)
if err != nil {
return fmt.Errorf("load config: %w", err)
}
address, err := findNodeAddress(cfg, nodeName)
if err != nil {
return err
}
client, conn, err := dialAgent(address, cfg)
if err != nil {
return fmt.Errorf("dial agent: %w", err)
}
defer func() { _ = conn.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = client.AddProxyRoute(ctx, &mcpv1.AddProxyRouteRequest{
ListenerAddr: args[0],
Hostname: args[1],
Backend: args[2],
Mode: mode,
BackendTls: backendTLS,
TlsCert: tlsCert,
TlsKey: tlsKey,
})
if err != nil {
return fmt.Errorf("add route: %w", err)
}
fmt.Printf("Added route: %s %s → %s on %s (%s)\n", mode, args[1], args[2], args[0], nodeName)
return nil
}
func runRouteRemove(nodeName string, args []string) error {
if nodeName == "" {
return fmt.Errorf("--node is required")
}
cfg, err := config.LoadCLIConfig(cfgPath)
if err != nil {
return fmt.Errorf("load config: %w", err)
}
address, err := findNodeAddress(cfg, nodeName)
if err != nil {
return err
}
client, conn, err := dialAgent(address, cfg)
if err != nil {
return fmt.Errorf("dial agent: %w", err)
}
defer func() { _ = conn.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = client.RemoveProxyRoute(ctx, &mcpv1.RemoveProxyRouteRequest{
ListenerAddr: args[0],
Hostname: args[1],
})
if err != nil {
return fmt.Errorf("remove route: %w", err)
}
fmt.Printf("Removed route: %s from %s (%s)\n", args[1], args[0], nodeName)
return nil
}

View File

@@ -10,7 +10,7 @@
let let
system = "x86_64-linux"; system = "x86_64-linux";
pkgs = nixpkgs.legacyPackages.${system}; pkgs = nixpkgs.legacyPackages.${system};
version = "0.6.0"; version = "0.8.3";
in in
{ {
packages.${system} = { packages.${system} = {

File diff suppressed because it is too large Load Diff

View File

@@ -33,6 +33,15 @@ const (
McpAgentService_PushFile_FullMethodName = "/mcp.v1.McpAgentService/PushFile" McpAgentService_PushFile_FullMethodName = "/mcp.v1.McpAgentService/PushFile"
McpAgentService_PullFile_FullMethodName = "/mcp.v1.McpAgentService/PullFile" McpAgentService_PullFile_FullMethodName = "/mcp.v1.McpAgentService/PullFile"
McpAgentService_NodeStatus_FullMethodName = "/mcp.v1.McpAgentService/NodeStatus" McpAgentService_NodeStatus_FullMethodName = "/mcp.v1.McpAgentService/NodeStatus"
McpAgentService_ListDNSRecords_FullMethodName = "/mcp.v1.McpAgentService/ListDNSRecords"
McpAgentService_ListProxyRoutes_FullMethodName = "/mcp.v1.McpAgentService/ListProxyRoutes"
McpAgentService_AddProxyRoute_FullMethodName = "/mcp.v1.McpAgentService/AddProxyRoute"
McpAgentService_RemoveProxyRoute_FullMethodName = "/mcp.v1.McpAgentService/RemoveProxyRoute"
McpAgentService_SetupEdgeRoute_FullMethodName = "/mcp.v1.McpAgentService/SetupEdgeRoute"
McpAgentService_RemoveEdgeRoute_FullMethodName = "/mcp.v1.McpAgentService/RemoveEdgeRoute"
McpAgentService_ListEdgeRoutes_FullMethodName = "/mcp.v1.McpAgentService/ListEdgeRoutes"
McpAgentService_HealthCheck_FullMethodName = "/mcp.v1.McpAgentService/HealthCheck"
McpAgentService_Logs_FullMethodName = "/mcp.v1.McpAgentService/Logs"
) )
// McpAgentServiceClient is the client API for McpAgentService service. // McpAgentServiceClient is the client API for McpAgentService service.
@@ -60,6 +69,20 @@ type McpAgentServiceClient interface {
PullFile(ctx context.Context, in *PullFileRequest, opts ...grpc.CallOption) (*PullFileResponse, error) PullFile(ctx context.Context, in *PullFileRequest, opts ...grpc.CallOption) (*PullFileResponse, error)
// Node // Node
NodeStatus(ctx context.Context, in *NodeStatusRequest, opts ...grpc.CallOption) (*NodeStatusResponse, error) NodeStatus(ctx context.Context, in *NodeStatusRequest, opts ...grpc.CallOption) (*NodeStatusResponse, error)
// DNS (query MCNS)
ListDNSRecords(ctx context.Context, in *ListDNSRecordsRequest, opts ...grpc.CallOption) (*ListDNSRecordsResponse, error)
// Proxy routes (query mc-proxy)
ListProxyRoutes(ctx context.Context, in *ListProxyRoutesRequest, opts ...grpc.CallOption) (*ListProxyRoutesResponse, error)
AddProxyRoute(ctx context.Context, in *AddProxyRouteRequest, opts ...grpc.CallOption) (*AddProxyRouteResponse, error)
RemoveProxyRoute(ctx context.Context, in *RemoveProxyRouteRequest, opts ...grpc.CallOption) (*RemoveProxyRouteResponse, error)
// Edge routing (called by master on edge nodes)
SetupEdgeRoute(ctx context.Context, in *SetupEdgeRouteRequest, opts ...grpc.CallOption) (*SetupEdgeRouteResponse, error)
RemoveEdgeRoute(ctx context.Context, in *RemoveEdgeRouteRequest, opts ...grpc.CallOption) (*RemoveEdgeRouteResponse, error)
ListEdgeRoutes(ctx context.Context, in *ListEdgeRoutesRequest, opts ...grpc.CallOption) (*ListEdgeRoutesResponse, error)
// Health (called by master on missed heartbeats)
HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error)
// Logs
Logs(ctx context.Context, in *LogsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[LogsResponse], error)
} }
type mcpAgentServiceClient struct { type mcpAgentServiceClient struct {
@@ -210,6 +233,105 @@ func (c *mcpAgentServiceClient) NodeStatus(ctx context.Context, in *NodeStatusRe
return out, nil return out, nil
} }
func (c *mcpAgentServiceClient) ListDNSRecords(ctx context.Context, in *ListDNSRecordsRequest, opts ...grpc.CallOption) (*ListDNSRecordsResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ListDNSRecordsResponse)
err := c.cc.Invoke(ctx, McpAgentService_ListDNSRecords_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *mcpAgentServiceClient) ListProxyRoutes(ctx context.Context, in *ListProxyRoutesRequest, opts ...grpc.CallOption) (*ListProxyRoutesResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ListProxyRoutesResponse)
err := c.cc.Invoke(ctx, McpAgentService_ListProxyRoutes_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *mcpAgentServiceClient) AddProxyRoute(ctx context.Context, in *AddProxyRouteRequest, opts ...grpc.CallOption) (*AddProxyRouteResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(AddProxyRouteResponse)
err := c.cc.Invoke(ctx, McpAgentService_AddProxyRoute_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *mcpAgentServiceClient) RemoveProxyRoute(ctx context.Context, in *RemoveProxyRouteRequest, opts ...grpc.CallOption) (*RemoveProxyRouteResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(RemoveProxyRouteResponse)
err := c.cc.Invoke(ctx, McpAgentService_RemoveProxyRoute_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *mcpAgentServiceClient) SetupEdgeRoute(ctx context.Context, in *SetupEdgeRouteRequest, opts ...grpc.CallOption) (*SetupEdgeRouteResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(SetupEdgeRouteResponse)
err := c.cc.Invoke(ctx, McpAgentService_SetupEdgeRoute_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *mcpAgentServiceClient) RemoveEdgeRoute(ctx context.Context, in *RemoveEdgeRouteRequest, opts ...grpc.CallOption) (*RemoveEdgeRouteResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(RemoveEdgeRouteResponse)
err := c.cc.Invoke(ctx, McpAgentService_RemoveEdgeRoute_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *mcpAgentServiceClient) ListEdgeRoutes(ctx context.Context, in *ListEdgeRoutesRequest, opts ...grpc.CallOption) (*ListEdgeRoutesResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ListEdgeRoutesResponse)
err := c.cc.Invoke(ctx, McpAgentService_ListEdgeRoutes_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *mcpAgentServiceClient) HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(HealthCheckResponse)
err := c.cc.Invoke(ctx, McpAgentService_HealthCheck_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *mcpAgentServiceClient) Logs(ctx context.Context, in *LogsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[LogsResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &McpAgentService_ServiceDesc.Streams[0], McpAgentService_Logs_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[LogsRequest, LogsResponse]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type McpAgentService_LogsClient = grpc.ServerStreamingClient[LogsResponse]
// McpAgentServiceServer is the server API for McpAgentService service. // McpAgentServiceServer is the server API for McpAgentService service.
// All implementations must embed UnimplementedMcpAgentServiceServer // All implementations must embed UnimplementedMcpAgentServiceServer
// for forward compatibility. // for forward compatibility.
@@ -235,6 +357,20 @@ type McpAgentServiceServer interface {
PullFile(context.Context, *PullFileRequest) (*PullFileResponse, error) PullFile(context.Context, *PullFileRequest) (*PullFileResponse, error)
// Node // Node
NodeStatus(context.Context, *NodeStatusRequest) (*NodeStatusResponse, error) NodeStatus(context.Context, *NodeStatusRequest) (*NodeStatusResponse, error)
// DNS (query MCNS)
ListDNSRecords(context.Context, *ListDNSRecordsRequest) (*ListDNSRecordsResponse, error)
// Proxy routes (query mc-proxy)
ListProxyRoutes(context.Context, *ListProxyRoutesRequest) (*ListProxyRoutesResponse, error)
AddProxyRoute(context.Context, *AddProxyRouteRequest) (*AddProxyRouteResponse, error)
RemoveProxyRoute(context.Context, *RemoveProxyRouteRequest) (*RemoveProxyRouteResponse, error)
// Edge routing (called by master on edge nodes)
SetupEdgeRoute(context.Context, *SetupEdgeRouteRequest) (*SetupEdgeRouteResponse, error)
RemoveEdgeRoute(context.Context, *RemoveEdgeRouteRequest) (*RemoveEdgeRouteResponse, error)
ListEdgeRoutes(context.Context, *ListEdgeRoutesRequest) (*ListEdgeRoutesResponse, error)
// Health (called by master on missed heartbeats)
HealthCheck(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
// Logs
Logs(*LogsRequest, grpc.ServerStreamingServer[LogsResponse]) error
mustEmbedUnimplementedMcpAgentServiceServer() mustEmbedUnimplementedMcpAgentServiceServer()
} }
@@ -287,6 +423,33 @@ func (UnimplementedMcpAgentServiceServer) PullFile(context.Context, *PullFileReq
func (UnimplementedMcpAgentServiceServer) NodeStatus(context.Context, *NodeStatusRequest) (*NodeStatusResponse, error) { func (UnimplementedMcpAgentServiceServer) NodeStatus(context.Context, *NodeStatusRequest) (*NodeStatusResponse, error) {
return nil, status.Error(codes.Unimplemented, "method NodeStatus not implemented") return nil, status.Error(codes.Unimplemented, "method NodeStatus not implemented")
} }
func (UnimplementedMcpAgentServiceServer) ListDNSRecords(context.Context, *ListDNSRecordsRequest) (*ListDNSRecordsResponse, error) {
return nil, status.Error(codes.Unimplemented, "method ListDNSRecords not implemented")
}
func (UnimplementedMcpAgentServiceServer) ListProxyRoutes(context.Context, *ListProxyRoutesRequest) (*ListProxyRoutesResponse, error) {
return nil, status.Error(codes.Unimplemented, "method ListProxyRoutes not implemented")
}
func (UnimplementedMcpAgentServiceServer) AddProxyRoute(context.Context, *AddProxyRouteRequest) (*AddProxyRouteResponse, error) {
return nil, status.Error(codes.Unimplemented, "method AddProxyRoute not implemented")
}
func (UnimplementedMcpAgentServiceServer) RemoveProxyRoute(context.Context, *RemoveProxyRouteRequest) (*RemoveProxyRouteResponse, error) {
return nil, status.Error(codes.Unimplemented, "method RemoveProxyRoute not implemented")
}
func (UnimplementedMcpAgentServiceServer) SetupEdgeRoute(context.Context, *SetupEdgeRouteRequest) (*SetupEdgeRouteResponse, error) {
return nil, status.Error(codes.Unimplemented, "method SetupEdgeRoute not implemented")
}
func (UnimplementedMcpAgentServiceServer) RemoveEdgeRoute(context.Context, *RemoveEdgeRouteRequest) (*RemoveEdgeRouteResponse, error) {
return nil, status.Error(codes.Unimplemented, "method RemoveEdgeRoute not implemented")
}
func (UnimplementedMcpAgentServiceServer) ListEdgeRoutes(context.Context, *ListEdgeRoutesRequest) (*ListEdgeRoutesResponse, error) {
return nil, status.Error(codes.Unimplemented, "method ListEdgeRoutes not implemented")
}
func (UnimplementedMcpAgentServiceServer) HealthCheck(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) {
return nil, status.Error(codes.Unimplemented, "method HealthCheck not implemented")
}
func (UnimplementedMcpAgentServiceServer) Logs(*LogsRequest, grpc.ServerStreamingServer[LogsResponse]) error {
return status.Error(codes.Unimplemented, "method Logs not implemented")
}
func (UnimplementedMcpAgentServiceServer) mustEmbedUnimplementedMcpAgentServiceServer() {} func (UnimplementedMcpAgentServiceServer) mustEmbedUnimplementedMcpAgentServiceServer() {}
func (UnimplementedMcpAgentServiceServer) testEmbeddedByValue() {} func (UnimplementedMcpAgentServiceServer) testEmbeddedByValue() {}
@@ -560,6 +723,161 @@ func _McpAgentService_NodeStatus_Handler(srv interface{}, ctx context.Context, d
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _McpAgentService_ListDNSRecords_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListDNSRecordsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(McpAgentServiceServer).ListDNSRecords(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: McpAgentService_ListDNSRecords_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(McpAgentServiceServer).ListDNSRecords(ctx, req.(*ListDNSRecordsRequest))
}
return interceptor(ctx, in, info, handler)
}
func _McpAgentService_ListProxyRoutes_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListProxyRoutesRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(McpAgentServiceServer).ListProxyRoutes(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: McpAgentService_ListProxyRoutes_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(McpAgentServiceServer).ListProxyRoutes(ctx, req.(*ListProxyRoutesRequest))
}
return interceptor(ctx, in, info, handler)
}
func _McpAgentService_AddProxyRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AddProxyRouteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(McpAgentServiceServer).AddProxyRoute(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: McpAgentService_AddProxyRoute_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(McpAgentServiceServer).AddProxyRoute(ctx, req.(*AddProxyRouteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _McpAgentService_RemoveProxyRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RemoveProxyRouteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(McpAgentServiceServer).RemoveProxyRoute(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: McpAgentService_RemoveProxyRoute_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(McpAgentServiceServer).RemoveProxyRoute(ctx, req.(*RemoveProxyRouteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _McpAgentService_SetupEdgeRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SetupEdgeRouteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(McpAgentServiceServer).SetupEdgeRoute(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: McpAgentService_SetupEdgeRoute_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(McpAgentServiceServer).SetupEdgeRoute(ctx, req.(*SetupEdgeRouteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _McpAgentService_RemoveEdgeRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RemoveEdgeRouteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(McpAgentServiceServer).RemoveEdgeRoute(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: McpAgentService_RemoveEdgeRoute_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(McpAgentServiceServer).RemoveEdgeRoute(ctx, req.(*RemoveEdgeRouteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _McpAgentService_ListEdgeRoutes_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListEdgeRoutesRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(McpAgentServiceServer).ListEdgeRoutes(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: McpAgentService_ListEdgeRoutes_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(McpAgentServiceServer).ListEdgeRoutes(ctx, req.(*ListEdgeRoutesRequest))
}
return interceptor(ctx, in, info, handler)
}
func _McpAgentService_HealthCheck_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HealthCheckRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(McpAgentServiceServer).HealthCheck(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: McpAgentService_HealthCheck_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(McpAgentServiceServer).HealthCheck(ctx, req.(*HealthCheckRequest))
}
return interceptor(ctx, in, info, handler)
}
func _McpAgentService_Logs_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(LogsRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(McpAgentServiceServer).Logs(m, &grpc.GenericServerStream[LogsRequest, LogsResponse]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type McpAgentService_LogsServer = grpc.ServerStreamingServer[LogsResponse]
// McpAgentService_ServiceDesc is the grpc.ServiceDesc for McpAgentService service. // McpAgentService_ServiceDesc is the grpc.ServiceDesc for McpAgentService service.
// It's only intended for direct use with grpc.RegisterService, // It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy) // and not to be introspected or modified (even as a copy)
@@ -623,7 +941,45 @@ var McpAgentService_ServiceDesc = grpc.ServiceDesc{
MethodName: "NodeStatus", MethodName: "NodeStatus",
Handler: _McpAgentService_NodeStatus_Handler, Handler: _McpAgentService_NodeStatus_Handler,
}, },
{
MethodName: "ListDNSRecords",
Handler: _McpAgentService_ListDNSRecords_Handler,
},
{
MethodName: "ListProxyRoutes",
Handler: _McpAgentService_ListProxyRoutes_Handler,
},
{
MethodName: "AddProxyRoute",
Handler: _McpAgentService_AddProxyRoute_Handler,
},
{
MethodName: "RemoveProxyRoute",
Handler: _McpAgentService_RemoveProxyRoute_Handler,
},
{
MethodName: "SetupEdgeRoute",
Handler: _McpAgentService_SetupEdgeRoute_Handler,
},
{
MethodName: "RemoveEdgeRoute",
Handler: _McpAgentService_RemoveEdgeRoute_Handler,
},
{
MethodName: "ListEdgeRoutes",
Handler: _McpAgentService_ListEdgeRoutes_Handler,
},
{
MethodName: "HealthCheck",
Handler: _McpAgentService_HealthCheck_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Logs",
Handler: _McpAgentService_Logs_Handler,
ServerStreams: true,
},
}, },
Streams: []grpc.StreamDesc{},
Metadata: "proto/mcp/v1/mcp.proto", Metadata: "proto/mcp/v1/mcp.proto",
} }

View File

@@ -35,11 +35,12 @@ type Agent struct {
Proxy *ProxyRouter Proxy *ProxyRouter
Certs *CertProvisioner Certs *CertProvisioner
DNS *DNSRegistrar DNS *DNSRegistrar
Version string
} }
// Run starts the agent: opens the database, sets up the gRPC server with // Run starts the agent: opens the database, sets up the gRPC server with
// TLS and auth, and blocks until SIGINT/SIGTERM. // TLS and auth, and blocks until SIGINT/SIGTERM.
func Run(cfg *config.AgentConfig) error { func Run(cfg *config.AgentConfig, version string) error {
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: parseLogLevel(cfg.Log.Level), Level: parseLogLevel(cfg.Log.Level),
})) }))
@@ -79,6 +80,7 @@ func Run(cfg *config.AgentConfig) error {
Proxy: proxy, Proxy: proxy,
Certs: certs, Certs: certs,
DNS: dns, DNS: dns,
Version: version,
} }
tlsCert, err := tls.LoadX509KeyPair(cfg.Server.TLSCert, cfg.Server.TLSKey) tlsCert, err := tls.LoadX509KeyPair(cfg.Server.TLSCert, cfg.Server.TLSKey)
@@ -100,6 +102,9 @@ func Run(cfg *config.AgentConfig) error {
grpc.ChainUnaryInterceptor( grpc.ChainUnaryInterceptor(
auth.AuthInterceptor(validator), auth.AuthInterceptor(validator),
), ),
grpc.ChainStreamInterceptor(
auth.StreamAuthInterceptor(validator),
),
) )
mcpv1.RegisterMcpAgentServiceServer(server, a) mcpv1.RegisterMcpAgentServiceServer(server, a)

View File

@@ -34,6 +34,9 @@ func (a *Agent) Deploy(ctx context.Context, req *mcpv1.DeployRequest) (*mcpv1.De
filtered = append(filtered, cs) filtered = append(filtered, cs)
} }
} }
if len(filtered) == 0 {
return nil, fmt.Errorf("component %q not found in service %q", target, serviceName)
}
components = filtered components = filtered
} }
@@ -131,7 +134,8 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
Error: fmt.Sprintf("allocate route ports: %v", err), Error: fmt.Sprintf("allocate route ports: %v", err),
} }
} }
runSpec.Ports = ports // Merge explicit ports from the spec with route-allocated ports.
runSpec.Ports = append(cs.GetPorts(), ports...)
runSpec.Env = append(runSpec.Env, env...) runSpec.Env = append(runSpec.Env, env...)
} else { } else {
// Legacy: use ports directly from the spec. // Legacy: use ports directly from the spec.

View File

@@ -26,8 +26,8 @@ type DNSRegistrar struct {
logger *slog.Logger logger *slog.Logger
} }
// dnsRecord is the JSON representation of an MCNS record. // DNSRecord is the JSON representation of an MCNS record.
type dnsRecord struct { type DNSRecord struct {
ID int `json:"ID"` ID int `json:"ID"`
Name string `json:"Name"` Name string `json:"Name"`
Type string `json:"Type"` Type string `json:"Type"`
@@ -136,8 +136,87 @@ func (d *DNSRegistrar) RemoveRecord(ctx context.Context, serviceName string) err
return nil return nil
} }
// DNSZone is the JSON representation of an MCNS zone.
type DNSZone struct {
Name string `json:"Name"`
}
// ListZones returns all zones from MCNS.
func (d *DNSRegistrar) ListZones(ctx context.Context) ([]DNSZone, error) {
if d == nil {
return nil, fmt.Errorf("DNS registrar not configured")
}
url := fmt.Sprintf("%s/v1/zones", d.serverURL)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("create list zones request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+d.token)
resp, err := d.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("list zones: %w", err)
}
defer func() { _ = resp.Body.Close() }()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read list zones response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("list zones: mcns returned %d: %s", resp.StatusCode, string(body))
}
var envelope struct {
Zones []DNSZone `json:"zones"`
}
if err := json.Unmarshal(body, &envelope); err != nil {
return nil, fmt.Errorf("parse list zones response: %w", err)
}
return envelope.Zones, nil
}
// ListZoneRecords returns all records in the given zone (no filters).
func (d *DNSRegistrar) ListZoneRecords(ctx context.Context, zone string) ([]DNSRecord, error) {
if d == nil {
return nil, fmt.Errorf("DNS registrar not configured")
}
url := fmt.Sprintf("%s/v1/zones/%s/records", d.serverURL, zone)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("create list zone records request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+d.token)
resp, err := d.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("list zone records: %w", err)
}
defer func() { _ = resp.Body.Close() }()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read list zone records response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("list zone records: mcns returned %d: %s", resp.StatusCode, string(body))
}
var envelope struct {
Records []DNSRecord `json:"records"`
}
if err := json.Unmarshal(body, &envelope); err != nil {
return nil, fmt.Errorf("parse list zone records response: %w", err)
}
return envelope.Records, nil
}
// listRecords returns A records matching the service name in the zone. // listRecords returns A records matching the service name in the zone.
func (d *DNSRegistrar) listRecords(ctx context.Context, serviceName string) ([]dnsRecord, error) { func (d *DNSRegistrar) listRecords(ctx context.Context, serviceName string) ([]DNSRecord, error) {
url := fmt.Sprintf("%s/v1/zones/%s/records?name=%s&type=A", d.serverURL, d.zone, serviceName) url := fmt.Sprintf("%s/v1/zones/%s/records?name=%s&type=A", d.serverURL, d.zone, serviceName)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil { if err != nil {
@@ -161,7 +240,7 @@ func (d *DNSRegistrar) listRecords(ctx context.Context, serviceName string) ([]d
} }
var envelope struct { var envelope struct {
Records []dnsRecord `json:"records"` Records []DNSRecord `json:"records"`
} }
if err := json.Unmarshal(body, &envelope); err != nil { if err := json.Unmarshal(body, &envelope); err != nil {
return nil, fmt.Errorf("parse list response: %w", err) return nil, fmt.Errorf("parse list response: %w", err)

40
internal/agent/dns_rpc.go Normal file
View File

@@ -0,0 +1,40 @@
package agent
import (
"context"
"fmt"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
)
// ListDNSRecords queries MCNS for all zones and their records.
func (a *Agent) ListDNSRecords(ctx context.Context, _ *mcpv1.ListDNSRecordsRequest) (*mcpv1.ListDNSRecordsResponse, error) {
a.Logger.Debug("ListDNSRecords called")
zones, err := a.DNS.ListZones(ctx)
if err != nil {
return nil, fmt.Errorf("list zones: %w", err)
}
resp := &mcpv1.ListDNSRecordsResponse{}
for _, z := range zones {
records, err := a.DNS.ListZoneRecords(ctx, z.Name)
if err != nil {
return nil, fmt.Errorf("list records for zone %q: %w", z.Name, err)
}
zone := &mcpv1.DNSZone{Name: z.Name}
for _, r := range records {
zone.Records = append(zone.Records, &mcpv1.DNSRecord{
Id: int64(r.ID),
Name: r.Name,
Type: r.Type,
Value: r.Value,
Ttl: int32(r.TTL), //nolint:gosec // TTL is bounded
})
}
resp.Zones = append(resp.Zones, zone)
}
return resp, nil
}

View File

@@ -90,7 +90,7 @@ func TestEnsureRecordSkipsWhenExists(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet { if r.Method == http.MethodGet {
// Return an existing record with the correct value. // Return an existing record with the correct value.
resp := map[string][]dnsRecord{"records": {{ID: 1, Name: "myservice", Type: "A", Value: "192.168.88.181", TTL: 300}}} resp := map[string][]DNSRecord{"records": {{ID: 1, Name: "myservice", Type: "A", Value: "192.168.88.181", TTL: 300}}}
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp) _ = json.NewEncoder(w).Encode(resp)
return return
@@ -124,7 +124,7 @@ func TestEnsureRecordUpdatesWrongValue(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet { if r.Method == http.MethodGet {
// Return a record with a stale value. // Return a record with a stale value.
resp := map[string][]dnsRecord{"records": {{ID: 42, Name: "myservice", Type: "A", Value: "10.0.0.1", TTL: 300}}} resp := map[string][]DNSRecord{"records": {{ID: 42, Name: "myservice", Type: "A", Value: "10.0.0.1", TTL: 300}}}
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp) _ = json.NewEncoder(w).Encode(resp)
return return
@@ -160,7 +160,7 @@ func TestRemoveRecordDeletes(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet { if r.Method == http.MethodGet {
resp := map[string][]dnsRecord{"records": {{ID: 7, Name: "myservice", Type: "A", Value: "192.168.88.181", TTL: 300}}} resp := map[string][]DNSRecord{"records": {{ID: 7, Name: "myservice", Type: "A", Value: "192.168.88.181", TTL: 300}}}
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp) _ = json.NewEncoder(w).Encode(resp)
return return

196
internal/agent/edge_rpc.go Normal file
View File

@@ -0,0 +1,196 @@
package agent
import (
"context"
"crypto/x509"
"encoding/pem"
"fmt"
"net"
"os"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
mcproxy "git.wntrmute.dev/mc/mc-proxy/client/mcproxy"
"git.wntrmute.dev/mc/mcp/internal/registry"
)
// SetupEdgeRoute provisions a TLS cert and registers an mc-proxy route for a
// public hostname. Called by the master on edge nodes.
func (a *Agent) SetupEdgeRoute(ctx context.Context, req *mcpv1.SetupEdgeRouteRequest) (*mcpv1.SetupEdgeRouteResponse, error) {
a.Logger.Info("SetupEdgeRoute", "hostname", req.GetHostname(),
"backend_hostname", req.GetBackendHostname(), "backend_port", req.GetBackendPort())
// Validate required fields.
if req.GetHostname() == "" {
return nil, status.Error(codes.InvalidArgument, "hostname is required")
}
if req.GetBackendHostname() == "" {
return nil, status.Error(codes.InvalidArgument, "backend_hostname is required")
}
if req.GetBackendPort() == 0 {
return nil, status.Error(codes.InvalidArgument, "backend_port is required")
}
if !req.GetBackendTls() {
return nil, status.Error(codes.InvalidArgument, "backend_tls must be true")
}
if a.Proxy == nil {
return nil, status.Error(codes.FailedPrecondition, "mc-proxy not configured")
}
// Resolve the backend hostname to a Tailnet IP.
ips, err := net.LookupHost(req.GetBackendHostname())
if err != nil || len(ips) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "cannot resolve backend_hostname %q: %v", req.GetBackendHostname(), err)
}
backendIP := ips[0]
// Validate the resolved IP is a Tailnet address (100.64.0.0/10).
ip := net.ParseIP(backendIP)
if ip == nil {
return nil, status.Errorf(codes.InvalidArgument, "resolved IP %q is not valid", backendIP)
}
_, tailnet, _ := net.ParseCIDR("100.64.0.0/10")
if !tailnet.Contains(ip) {
return nil, status.Errorf(codes.InvalidArgument, "resolved IP %s is not a Tailnet address", backendIP)
}
backend := fmt.Sprintf("%s:%d", backendIP, req.GetBackendPort())
// Provision TLS cert for the public hostname if cert provisioner is available.
certPath := ""
keyPath := ""
if a.Certs != nil {
if err := a.Certs.EnsureCert(ctx, req.GetHostname(), []string{req.GetHostname()}); err != nil {
return nil, status.Errorf(codes.Internal, "provision cert for %s: %v", req.GetHostname(), err)
}
certPath = a.Proxy.CertPath(req.GetHostname())
keyPath = a.Proxy.KeyPath(req.GetHostname())
} else {
// No cert provisioner — check if certs already exist on disk.
certPath = a.Proxy.CertPath(req.GetHostname())
keyPath = a.Proxy.KeyPath(req.GetHostname())
if _, err := os.Stat(certPath); err != nil {
return nil, status.Errorf(codes.FailedPrecondition, "no cert provisioner and cert not found at %s", certPath)
}
}
// Register the L7 route in mc-proxy.
route := mcproxy.Route{
Hostname: req.GetHostname(),
Backend: backend,
Mode: "l7",
TLSCert: certPath,
TLSKey: keyPath,
BackendTLS: true,
}
if err := a.Proxy.AddRoute(ctx, ":443", route); err != nil {
return nil, status.Errorf(codes.Internal, "add mc-proxy route: %v", err)
}
// Persist the edge route in the registry.
if err := registry.CreateEdgeRoute(a.DB, req.GetHostname(), req.GetBackendHostname(), int(req.GetBackendPort()), certPath, keyPath); err != nil {
a.Logger.Warn("failed to persist edge route", "hostname", req.GetHostname(), "err", err)
}
a.Logger.Info("edge route established",
"hostname", req.GetHostname(), "backend", backend, "cert", certPath)
return &mcpv1.SetupEdgeRouteResponse{}, nil
}
// RemoveEdgeRoute removes an mc-proxy route and cleans up the TLS cert for a
// public hostname. Called by the master on edge nodes.
func (a *Agent) RemoveEdgeRoute(ctx context.Context, req *mcpv1.RemoveEdgeRouteRequest) (*mcpv1.RemoveEdgeRouteResponse, error) {
a.Logger.Info("RemoveEdgeRoute", "hostname", req.GetHostname())
if req.GetHostname() == "" {
return nil, status.Error(codes.InvalidArgument, "hostname is required")
}
if a.Proxy == nil {
return nil, status.Error(codes.FailedPrecondition, "mc-proxy not configured")
}
// Remove the mc-proxy route.
if err := a.Proxy.RemoveRoute(ctx, ":443", req.GetHostname()); err != nil {
a.Logger.Warn("remove mc-proxy route", "hostname", req.GetHostname(), "err", err)
// Continue — clean up cert and registry even if route removal fails.
}
// Remove the TLS cert.
if a.Certs != nil {
if err := a.Certs.RemoveCert(req.GetHostname()); err != nil {
a.Logger.Warn("remove cert", "hostname", req.GetHostname(), "err", err)
}
}
// Remove from registry.
if err := registry.DeleteEdgeRoute(a.DB, req.GetHostname()); err != nil {
a.Logger.Warn("delete edge route from registry", "hostname", req.GetHostname(), "err", err)
}
a.Logger.Info("edge route removed", "hostname", req.GetHostname())
return &mcpv1.RemoveEdgeRouteResponse{}, nil
}
// ListEdgeRoutes returns all edge routes managed by this agent.
func (a *Agent) ListEdgeRoutes(_ context.Context, _ *mcpv1.ListEdgeRoutesRequest) (*mcpv1.ListEdgeRoutesResponse, error) {
a.Logger.Debug("ListEdgeRoutes called")
routes, err := registry.ListEdgeRoutes(a.DB)
if err != nil {
return nil, status.Errorf(codes.Internal, "list edge routes: %v", err)
}
resp := &mcpv1.ListEdgeRoutesResponse{}
for _, r := range routes {
er := &mcpv1.EdgeRoute{
Hostname: r.Hostname,
BackendHostname: r.BackendHostname,
BackendPort: int32(r.BackendPort), //nolint:gosec // port is a small positive integer
}
// Read cert metadata if available.
if r.TLSCert != "" {
if certData, readErr := os.ReadFile(r.TLSCert); readErr == nil { //nolint:gosec // path from registry, not user input
if block, _ := pem.Decode(certData); block != nil {
if cert, parseErr := x509.ParseCertificate(block.Bytes); parseErr == nil {
er.CertSerial = cert.SerialNumber.String()
er.CertExpires = cert.NotAfter.UTC().Format(time.RFC3339)
}
}
}
}
resp.Routes = append(resp.Routes, er)
}
return resp, nil
}
// HealthCheck returns the agent's health status. Called by the master when
// heartbeats are missed.
func (a *Agent) HealthCheck(_ context.Context, _ *mcpv1.HealthCheckRequest) (*mcpv1.HealthCheckResponse, error) {
a.Logger.Debug("HealthCheck called")
st := "healthy"
containers := int32(0)
// Count running containers if the runtime is available.
if a.Runtime != nil {
if list, err := a.Runtime.List(context.Background()); err == nil {
containers = int32(len(list)) //nolint:gosec // container count is small
} else {
st = "degraded"
}
}
return &mcpv1.HealthCheckResponse{
Status: st,
Containers: containers,
}, nil
}

View File

@@ -12,9 +12,9 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
// StopService stops all components of a service. // StopService stops all components of a service, or a single component if specified.
func (a *Agent) StopService(ctx context.Context, req *mcpv1.StopServiceRequest) (*mcpv1.StopServiceResponse, error) { func (a *Agent) StopService(ctx context.Context, req *mcpv1.StopServiceRequest) (*mcpv1.StopServiceResponse, error) {
a.Logger.Info("StopService", "service", req.GetName()) a.Logger.Info("StopService", "service", req.GetName(), "component", req.GetComponent())
if req.GetName() == "" { if req.GetName() == "" {
return nil, status.Error(codes.InvalidArgument, "service name is required") return nil, status.Error(codes.InvalidArgument, "service name is required")
@@ -25,6 +25,13 @@ func (a *Agent) StopService(ctx context.Context, req *mcpv1.StopServiceRequest)
return nil, status.Errorf(codes.Internal, "list components: %v", err) return nil, status.Errorf(codes.Internal, "list components: %v", err)
} }
if target := req.GetComponent(); target != "" {
components, err = filterComponents(components, req.GetName(), target)
if err != nil {
return nil, err
}
}
var results []*mcpv1.ComponentResult var results []*mcpv1.ComponentResult
for _, c := range components { for _, c := range components {
containerName := ContainerNameFor(req.GetName(), c.Name) containerName := ContainerNameFor(req.GetName(), c.Name)
@@ -59,10 +66,10 @@ func (a *Agent) StopService(ctx context.Context, req *mcpv1.StopServiceRequest)
return &mcpv1.StopServiceResponse{Results: results}, nil return &mcpv1.StopServiceResponse{Results: results}, nil
} }
// StartService starts all components of a service. If a container already // StartService starts all components of a service, or a single component if specified.
// exists but is stopped, it is removed first so a fresh one can be created. // If a container already exists but is stopped, it is removed first so a fresh one can be created.
func (a *Agent) StartService(ctx context.Context, req *mcpv1.StartServiceRequest) (*mcpv1.StartServiceResponse, error) { func (a *Agent) StartService(ctx context.Context, req *mcpv1.StartServiceRequest) (*mcpv1.StartServiceResponse, error) {
a.Logger.Info("StartService", "service", req.GetName()) a.Logger.Info("StartService", "service", req.GetName(), "component", req.GetComponent())
if req.GetName() == "" { if req.GetName() == "" {
return nil, status.Error(codes.InvalidArgument, "service name is required") return nil, status.Error(codes.InvalidArgument, "service name is required")
@@ -73,6 +80,13 @@ func (a *Agent) StartService(ctx context.Context, req *mcpv1.StartServiceRequest
return nil, status.Errorf(codes.Internal, "list components: %v", err) return nil, status.Errorf(codes.Internal, "list components: %v", err)
} }
if target := req.GetComponent(); target != "" {
components, err = filterComponents(components, req.GetName(), target)
if err != nil {
return nil, err
}
}
var results []*mcpv1.ComponentResult var results []*mcpv1.ComponentResult
for _, c := range components { for _, c := range components {
r := startComponent(ctx, a, req.GetName(), &c) r := startComponent(ctx, a, req.GetName(), &c)
@@ -82,10 +96,10 @@ func (a *Agent) StartService(ctx context.Context, req *mcpv1.StartServiceRequest
return &mcpv1.StartServiceResponse{Results: results}, nil return &mcpv1.StartServiceResponse{Results: results}, nil
} }
// RestartService restarts all components of a service by stopping, removing, // RestartService restarts all components of a service, or a single component if specified,
// and re-creating each container. The desired_state is not changed. // by stopping, removing, and re-creating each container. The desired_state is not changed.
func (a *Agent) RestartService(ctx context.Context, req *mcpv1.RestartServiceRequest) (*mcpv1.RestartServiceResponse, error) { func (a *Agent) RestartService(ctx context.Context, req *mcpv1.RestartServiceRequest) (*mcpv1.RestartServiceResponse, error) {
a.Logger.Info("RestartService", "service", req.GetName()) a.Logger.Info("RestartService", "service", req.GetName(), "component", req.GetComponent())
if req.GetName() == "" { if req.GetName() == "" {
return nil, status.Error(codes.InvalidArgument, "service name is required") return nil, status.Error(codes.InvalidArgument, "service name is required")
@@ -96,6 +110,13 @@ func (a *Agent) RestartService(ctx context.Context, req *mcpv1.RestartServiceReq
return nil, status.Errorf(codes.Internal, "list components: %v", err) return nil, status.Errorf(codes.Internal, "list components: %v", err)
} }
if target := req.GetComponent(); target != "" {
components, err = filterComponents(components, req.GetName(), target)
if err != nil {
return nil, err
}
}
var results []*mcpv1.ComponentResult var results []*mcpv1.ComponentResult
for _, c := range components { for _, c := range components {
r := restartComponent(ctx, a, req.GetName(), &c) r := restartComponent(ctx, a, req.GetName(), &c)
@@ -167,6 +188,16 @@ func componentToSpec(service string, c *registry.Component) runtime.ContainerSpe
} }
} }
// filterComponents returns only the component matching target, or an error if not found.
func filterComponents(components []registry.Component, service, target string) ([]registry.Component, error) {
for _, c := range components {
if c.Name == target {
return []registry.Component{c}, nil
}
}
return nil, status.Errorf(codes.NotFound, "component %q not found in service %q", target, service)
}
// componentExists checks whether a component already exists in the registry. // componentExists checks whether a component already exists in the registry.
func componentExists(db *sql.DB, service, name string) bool { func componentExists(db *sql.DB, service, name string) bool {
_, err := registry.GetComponent(db, service, name) _, err := registry.GetComponent(db, service, name)

79
internal/agent/logs.go Normal file
View File

@@ -0,0 +1,79 @@
package agent
import (
"bufio"
"io"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"git.wntrmute.dev/mc/mcp/internal/registry"
"git.wntrmute.dev/mc/mcp/internal/runtime"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Logs streams container logs for a service component.
func (a *Agent) Logs(req *mcpv1.LogsRequest, stream mcpv1.McpAgentService_LogsServer) error {
if req.GetService() == "" {
return status.Error(codes.InvalidArgument, "service name is required")
}
// Resolve component name.
component := req.GetComponent()
if component == "" {
components, err := registry.ListComponents(a.DB, req.GetService())
if err != nil {
return status.Errorf(codes.Internal, "list components: %v", err)
}
if len(components) == 0 {
return status.Error(codes.NotFound, "no components found for service")
}
component = components[0].Name
}
containerName := ContainerNameFor(req.GetService(), component)
podman, ok := a.Runtime.(*runtime.Podman)
if !ok {
return status.Error(codes.Internal, "logs requires podman runtime")
}
cmd := podman.Logs(stream.Context(), containerName, int(req.GetTail()), req.GetFollow(), req.GetTimestamps(), req.GetSince())
a.Logger.Info("running podman logs", "container", containerName, "args", cmd.Args)
// Podman writes container stdout to its stdout and container stderr
// to its stderr. Merge both into a single pipe.
pr, pw := io.Pipe()
cmd.Stdout = pw
cmd.Stderr = pw
if err := cmd.Start(); err != nil {
pw.Close()
return status.Errorf(codes.Internal, "start podman logs: %v", err)
}
// Close the write end when the command exits so the scanner finishes.
go func() {
err := cmd.Wait()
if err != nil {
a.Logger.Warn("podman logs exited", "container", containerName, "error", err)
}
pw.Close()
}()
scanner := bufio.NewScanner(pr)
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
if err := stream.Send(&mcpv1.LogsResponse{
Data: append(line, '\n'),
}); err != nil {
_ = cmd.Process.Kill()
return err
}
}
return nil
}

View File

@@ -31,6 +31,7 @@ func (a *Agent) NodeStatus(ctx context.Context, _ *mcpv1.NodeStatusRequest) (*mc
Runtime: a.Config.Agent.ContainerRuntime, Runtime: a.Config.Agent.ContainerRuntime,
ServiceCount: uint32(len(services)), //nolint:gosec // bounded ServiceCount: uint32(len(services)), //nolint:gosec // bounded
ComponentCount: componentCount, ComponentCount: componentCount,
AgentVersion: a.Version,
} }
// Runtime version. // Runtime version.

View File

@@ -48,6 +48,40 @@ func (p *ProxyRouter) Close() error {
return p.client.Close() return p.client.Close()
} }
// CertPath returns the expected TLS certificate path for a given name.
func (p *ProxyRouter) CertPath(name string) string {
return filepath.Join(p.certDir, name+".pem")
}
// KeyPath returns the expected TLS key path for a given name.
func (p *ProxyRouter) KeyPath(name string) string {
return filepath.Join(p.certDir, name+".key")
}
// GetStatus returns the mc-proxy server status.
func (p *ProxyRouter) GetStatus(ctx context.Context) (*mcproxy.Status, error) {
if p == nil {
return nil, fmt.Errorf("mc-proxy not configured")
}
return p.client.GetStatus(ctx)
}
// AddRoute adds a single route to mc-proxy.
func (p *ProxyRouter) AddRoute(ctx context.Context, listenerAddr string, route mcproxy.Route) error {
if p == nil {
return fmt.Errorf("mc-proxy not configured")
}
return p.client.AddRoute(ctx, listenerAddr, route)
}
// RemoveRoute removes a single route from mc-proxy.
func (p *ProxyRouter) RemoveRoute(ctx context.Context, listenerAddr, hostname string) error {
if p == nil {
return fmt.Errorf("mc-proxy not configured")
}
return p.client.RemoveRoute(ctx, listenerAddr, hostname)
}
// RegisterRoutes registers all routes for a service component with mc-proxy. // RegisterRoutes registers all routes for a service component with mc-proxy.
// It uses the assigned host ports from the registry. // It uses the assigned host ports from the registry.
func (p *ProxyRouter) RegisterRoutes(ctx context.Context, serviceName string, routes []registry.Route, hostPorts map[string]int) error { func (p *ProxyRouter) RegisterRoutes(ctx context.Context, serviceName string, routes []registry.Route, hostPorts map[string]int) error {

113
internal/agent/proxy_rpc.go Normal file
View File

@@ -0,0 +1,113 @@
package agent
import (
"context"
"fmt"
"git.wntrmute.dev/mc/mc-proxy/client/mcproxy"
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)
// ListProxyRoutes queries mc-proxy for its current status and routes.
func (a *Agent) ListProxyRoutes(ctx context.Context, _ *mcpv1.ListProxyRoutesRequest) (*mcpv1.ListProxyRoutesResponse, error) {
a.Logger.Debug("ListProxyRoutes called")
status, err := a.Proxy.GetStatus(ctx)
if err != nil {
return nil, fmt.Errorf("get mc-proxy status: %w", err)
}
resp := &mcpv1.ListProxyRoutesResponse{
Version: status.Version,
TotalConnections: status.TotalConnections,
}
if !status.StartedAt.IsZero() {
resp.StartedAt = timestamppb.New(status.StartedAt)
}
for _, ls := range status.Listeners {
listener := &mcpv1.ProxyListenerInfo{
Addr: ls.Addr,
RouteCount: int32(ls.RouteCount), //nolint:gosec // bounded
ActiveConnections: ls.ActiveConnections,
}
for _, r := range ls.Routes {
listener.Routes = append(listener.Routes, &mcpv1.ProxyRouteInfo{
Hostname: r.Hostname,
Backend: r.Backend,
Mode: r.Mode,
BackendTls: r.BackendTLS,
})
}
resp.Listeners = append(resp.Listeners, listener)
}
return resp, nil
}
// AddProxyRoute adds a route to mc-proxy.
func (a *Agent) AddProxyRoute(ctx context.Context, req *mcpv1.AddProxyRouteRequest) (*mcpv1.AddProxyRouteResponse, error) {
if req.GetListenerAddr() == "" {
return nil, status.Error(codes.InvalidArgument, "listener_addr is required")
}
if req.GetHostname() == "" {
return nil, status.Error(codes.InvalidArgument, "hostname is required")
}
if req.GetBackend() == "" {
return nil, status.Error(codes.InvalidArgument, "backend is required")
}
if a.Proxy == nil {
return nil, status.Error(codes.FailedPrecondition, "mc-proxy not configured")
}
route := mcproxy.Route{
Hostname: req.GetHostname(),
Backend: req.GetBackend(),
Mode: req.GetMode(),
BackendTLS: req.GetBackendTls(),
TLSCert: req.GetTlsCert(),
TLSKey: req.GetTlsKey(),
}
if err := a.Proxy.AddRoute(ctx, req.GetListenerAddr(), route); err != nil {
return nil, fmt.Errorf("add route: %w", err)
}
a.Logger.Info("route added",
"listener", req.GetListenerAddr(),
"hostname", req.GetHostname(),
"backend", req.GetBackend(),
"mode", req.GetMode(),
)
return &mcpv1.AddProxyRouteResponse{}, nil
}
// RemoveProxyRoute removes a route from mc-proxy.
func (a *Agent) RemoveProxyRoute(ctx context.Context, req *mcpv1.RemoveProxyRouteRequest) (*mcpv1.RemoveProxyRouteResponse, error) {
if req.GetListenerAddr() == "" {
return nil, status.Error(codes.InvalidArgument, "listener_addr is required")
}
if req.GetHostname() == "" {
return nil, status.Error(codes.InvalidArgument, "hostname is required")
}
if a.Proxy == nil {
return nil, status.Error(codes.FailedPrecondition, "mc-proxy not configured")
}
if err := a.Proxy.RemoveRoute(ctx, req.GetListenerAddr(), req.GetHostname()); err != nil {
return nil, fmt.Errorf("remove route: %w", err)
}
a.Logger.Info("route removed",
"listener", req.GetListenerAddr(),
"hostname", req.GetHostname(),
)
return &mcpv1.RemoveProxyRouteResponse{}, nil
}

View File

@@ -99,6 +99,12 @@ func (a *Agent) liveCheckServices(ctx context.Context) ([]*mcpv1.ServiceInfo, er
if rc, ok := runtimeByName[containerName]; ok { if rc, ok := runtimeByName[containerName]; ok {
ci.ObservedState = rc.State ci.ObservedState = rc.State
if rc.Version != "" {
ci.Version = rc.Version
}
if rc.Image != "" {
ci.Image = rc.Image
}
if !rc.Started.IsZero() { if !rc.Started.IsZero() {
ci.Started = timestamppb.New(rc.Started) ci.Started = timestamppb.New(rc.Started)
} }

View File

@@ -255,6 +255,52 @@ func AuthInterceptor(validator TokenValidator) grpc.UnaryServerInterceptor {
} }
} }
// StreamAuthInterceptor returns a gRPC stream server interceptor with
// the same authentication rules as AuthInterceptor.
func StreamAuthInterceptor(validator TokenValidator) grpc.StreamServerInterceptor {
return func(
srv any,
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if !ok {
return status.Error(codes.Unauthenticated, "missing metadata")
}
authValues := md.Get("authorization")
if len(authValues) == 0 {
return status.Error(codes.Unauthenticated, "missing authorization header")
}
authHeader := authValues[0]
if !strings.HasPrefix(authHeader, "Bearer ") {
return status.Error(codes.Unauthenticated, "malformed authorization header")
}
token := strings.TrimPrefix(authHeader, "Bearer ")
tokenInfo, err := validator.ValidateToken(ss.Context(), token)
if err != nil {
slog.Error("token validation failed", "method", info.FullMethod, "error", err)
return status.Error(codes.Unauthenticated, "token validation failed")
}
if !tokenInfo.Valid {
return status.Error(codes.Unauthenticated, "invalid token")
}
if tokenInfo.HasRole("guest") {
slog.Warn("guest access denied", "method", info.FullMethod, "user", tokenInfo.Username)
return status.Error(codes.PermissionDenied, "guest access not permitted")
}
slog.Info("rpc", "method", info.FullMethod, "user", tokenInfo.Username, "account_type", tokenInfo.AccountType)
return handler(srv, ss)
}
}
// Login authenticates with MCIAS and returns a bearer token. // Login authenticates with MCIAS and returns a bearer token.
func Login(serverURL, caCertPath, username, password string) (string, error) { func Login(serverURL, caCertPath, username, password string) (string, error) {
client, err := newHTTPClient(caCertPath) client, err := newHTTPClient(caCertPath)

View File

@@ -142,4 +142,18 @@ var migrations = []string{
FOREIGN KEY (service, component) REFERENCES components(service, name) ON DELETE CASCADE FOREIGN KEY (service, component) REFERENCES components(service, name) ON DELETE CASCADE
); );
`, `,
// Migration 3: service comment
`ALTER TABLE services ADD COLUMN comment TEXT NOT NULL DEFAULT '';`,
// Migration 4: edge routes (v2 — public routes managed by the master)
`CREATE TABLE IF NOT EXISTS edge_routes (
hostname TEXT NOT NULL PRIMARY KEY,
backend_hostname TEXT NOT NULL,
backend_port INTEGER NOT NULL,
tls_cert TEXT NOT NULL DEFAULT '',
tls_key TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);`,
} }

View File

@@ -0,0 +1,93 @@
package registry
import (
"database/sql"
"fmt"
"time"
)
// EdgeRoute represents a public edge route managed by the master.
type EdgeRoute struct {
Hostname string
BackendHostname string
BackendPort int
TLSCert string
TLSKey string
CreatedAt time.Time
UpdatedAt time.Time
}
// CreateEdgeRoute inserts or replaces an edge route.
func CreateEdgeRoute(db *sql.DB, hostname, backendHostname string, backendPort int, tlsCert, tlsKey string) error {
_, err := db.Exec(`
INSERT INTO edge_routes (hostname, backend_hostname, backend_port, tls_cert, tls_key, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, datetime('now'), datetime('now'))
ON CONFLICT(hostname) DO UPDATE SET
backend_hostname = excluded.backend_hostname,
backend_port = excluded.backend_port,
tls_cert = excluded.tls_cert,
tls_key = excluded.tls_key,
updated_at = datetime('now')
`, hostname, backendHostname, backendPort, tlsCert, tlsKey)
if err != nil {
return fmt.Errorf("create edge route %s: %w", hostname, err)
}
return nil
}
// GetEdgeRoute returns a single edge route by hostname.
func GetEdgeRoute(db *sql.DB, hostname string) (*EdgeRoute, error) {
var r EdgeRoute
var createdAt, updatedAt string
err := db.QueryRow(`
SELECT hostname, backend_hostname, backend_port, tls_cert, tls_key, created_at, updated_at
FROM edge_routes WHERE hostname = ?
`, hostname).Scan(&r.Hostname, &r.BackendHostname, &r.BackendPort, &r.TLSCert, &r.TLSKey, &createdAt, &updatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("get edge route %s: %w", hostname, err)
}
r.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt)
r.UpdatedAt, _ = time.Parse("2006-01-02 15:04:05", updatedAt)
return &r, nil
}
// ListEdgeRoutes returns all edge routes.
func ListEdgeRoutes(db *sql.DB) ([]*EdgeRoute, error) {
rows, err := db.Query(`
SELECT hostname, backend_hostname, backend_port, tls_cert, tls_key, created_at, updated_at
FROM edge_routes ORDER BY hostname
`)
if err != nil {
return nil, fmt.Errorf("list edge routes: %w", err)
}
defer func() { _ = rows.Close() }()
var routes []*EdgeRoute
for rows.Next() {
var r EdgeRoute
var createdAt, updatedAt string
if err := rows.Scan(&r.Hostname, &r.BackendHostname, &r.BackendPort, &r.TLSCert, &r.TLSKey, &createdAt, &updatedAt); err != nil {
return nil, fmt.Errorf("scan edge route: %w", err)
}
r.CreatedAt, _ = time.Parse("2006-01-02 15:04:05", createdAt)
r.UpdatedAt, _ = time.Parse("2006-01-02 15:04:05", updatedAt)
routes = append(routes, &r)
}
return routes, rows.Err()
}
// DeleteEdgeRoute removes an edge route by hostname.
func DeleteEdgeRoute(db *sql.DB, hostname string) error {
result, err := db.Exec(`DELETE FROM edge_routes WHERE hostname = ?`, hostname)
if err != nil {
return fmt.Errorf("delete edge route %s: %w", hostname, err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("edge route %s not found", hostname)
}
return nil
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"os"
"os/exec" "os/exec"
"strings" "strings"
"time" "time"
@@ -178,6 +179,71 @@ func (p *Podman) Inspect(ctx context.Context, name string) (ContainerInfo, error
return info, nil return info, nil
} }
// Logs returns an exec.Cmd that streams container logs. For containers
// using the journald log driver, it tries journalctl first (podman logs
// can't read journald outside the originating user session). If journalctl
// can't access the journal, it falls back to podman logs.
func (p *Podman) Logs(ctx context.Context, containerName string, tail int, follow, timestamps bool, since string) *exec.Cmd {
// Check if this container uses the journald log driver.
inspectCmd := exec.CommandContext(ctx, p.command(), "inspect", "--format", "{{.HostConfig.LogConfig.Type}}", containerName) //nolint:gosec
if out, err := inspectCmd.Output(); err == nil && strings.TrimSpace(string(out)) == "journald" {
if p.journalAccessible(ctx, containerName) {
return p.journalLogs(ctx, containerName, tail, follow, since)
}
}
return p.podmanLogs(ctx, containerName, tail, follow, timestamps, since)
}
// journalAccessible probes whether journalctl can read logs for the container.
func (p *Podman) journalAccessible(ctx context.Context, containerName string) bool {
args := []string{"--no-pager", "-n", "0"}
if os.Getuid() != 0 {
args = append(args, "--user")
}
args = append(args, "CONTAINER_NAME="+containerName)
cmd := exec.CommandContext(ctx, "journalctl", args...) //nolint:gosec
return cmd.Run() == nil
}
// journalLogs returns a journalctl command filtered by container name.
func (p *Podman) journalLogs(ctx context.Context, containerName string, tail int, follow bool, since string) *exec.Cmd {
args := []string{"--no-pager", "--output", "cat"}
if os.Getuid() != 0 {
args = append(args, "--user")
}
args = append(args, "CONTAINER_NAME="+containerName)
if tail > 0 {
args = append(args, "--lines", fmt.Sprintf("%d", tail))
}
if follow {
args = append(args, "--follow")
}
if since != "" {
args = append(args, "--since", since)
}
return exec.CommandContext(ctx, "journalctl", args...) //nolint:gosec // args built programmatically
}
// podmanLogs returns a podman logs command.
func (p *Podman) podmanLogs(ctx context.Context, containerName string, tail int, follow, timestamps bool, since string) *exec.Cmd {
args := []string{"logs"}
if tail > 0 {
args = append(args, "--tail", fmt.Sprintf("%d", tail))
}
if follow {
args = append(args, "--follow")
}
if timestamps {
args = append(args, "--timestamps")
}
if since != "" {
args = append(args, "--since", since)
}
args = append(args, containerName)
return exec.CommandContext(ctx, p.command(), args...) //nolint:gosec // args built programmatically
}
// Login authenticates to a container registry using the given token as // Login authenticates to a container registry using the given token as
// the password. This enables non-interactive push with service account // the password. This enables non-interactive push with service account
// tokens (MCR accepts MCIAS JWTs as passwords). // tokens (MCR accepts MCIAS JWTs as passwords).

View File

@@ -33,6 +33,25 @@ service McpAgentService {
// Node // Node
rpc NodeStatus(NodeStatusRequest) returns (NodeStatusResponse); rpc NodeStatus(NodeStatusRequest) returns (NodeStatusResponse);
// DNS (query MCNS)
rpc ListDNSRecords(ListDNSRecordsRequest) returns (ListDNSRecordsResponse);
// Proxy routes (query mc-proxy)
rpc ListProxyRoutes(ListProxyRoutesRequest) returns (ListProxyRoutesResponse);
rpc AddProxyRoute(AddProxyRouteRequest) returns (AddProxyRouteResponse);
rpc RemoveProxyRoute(RemoveProxyRouteRequest) returns (RemoveProxyRouteResponse);
// Edge routing (called by master on edge nodes)
rpc SetupEdgeRoute(SetupEdgeRouteRequest) returns (SetupEdgeRouteResponse);
rpc RemoveEdgeRoute(RemoveEdgeRouteRequest) returns (RemoveEdgeRouteResponse);
rpc ListEdgeRoutes(ListEdgeRoutesRequest) returns (ListEdgeRoutesResponse);
// Health (called by master on missed heartbeats)
rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);
// Logs
rpc Logs(LogsRequest) returns (stream LogsResponse);
} }
// --- Service lifecycle --- // --- Service lifecycle ---
@@ -61,6 +80,7 @@ message ServiceSpec {
string name = 1; string name = 1;
bool active = 2; bool active = 2;
repeated ComponentSpec components = 3; repeated ComponentSpec components = 3;
string comment = 4;
} }
message DeployRequest { message DeployRequest {
@@ -81,6 +101,7 @@ message ComponentResult {
message StopServiceRequest { message StopServiceRequest {
string name = 1; string name = 1;
string component = 2;
} }
message StopServiceResponse { message StopServiceResponse {
@@ -89,6 +110,7 @@ message StopServiceResponse {
message StartServiceRequest { message StartServiceRequest {
string name = 1; string name = 1;
string component = 2;
} }
message StartServiceResponse { message StartServiceResponse {
@@ -97,6 +119,7 @@ message StartServiceResponse {
message RestartServiceRequest { message RestartServiceRequest {
string name = 1; string name = 1;
string component = 2;
} }
message RestartServiceResponse { message RestartServiceResponse {
@@ -137,6 +160,7 @@ message ServiceInfo {
string name = 1; string name = 1;
bool active = 2; bool active = 2;
repeated ComponentInfo components = 3; repeated ComponentInfo components = 3;
string comment = 4;
} }
message ComponentInfo { message ComponentInfo {
@@ -254,6 +278,7 @@ message NodeStatusResponse {
uint64 memory_free_bytes = 9; uint64 memory_free_bytes = 9;
double cpu_usage_percent = 10; double cpu_usage_percent = 10;
google.protobuf.Timestamp uptime_since = 11; google.protobuf.Timestamp uptime_since = 11;
string agent_version = 12;
} }
// --- Purge --- // --- Purge ---
@@ -282,3 +307,123 @@ message PurgeResult {
// Why eligible, or why refused. // Why eligible, or why refused.
string reason = 4; string reason = 4;
} }
// --- Logs ---
message LogsRequest {
string service = 1;
string component = 2; // optional; defaults to first/only component
int32 tail = 3; // number of lines from the end (0 = all)
bool follow = 4; // stream new output
bool timestamps = 5; // prepend timestamps
string since = 6; // show logs since (e.g., "2h", "2026-03-28T00:00:00Z")
}
message LogsResponse {
bytes data = 1;
}
// --- DNS ---
message ListDNSRecordsRequest {}
message DNSZone {
string name = 1;
repeated DNSRecord records = 2;
}
message DNSRecord {
int64 id = 1;
string name = 2;
string type = 3;
string value = 4;
int32 ttl = 5;
}
message ListDNSRecordsResponse {
repeated DNSZone zones = 1;
}
// --- Proxy routes ---
message ListProxyRoutesRequest {}
message ProxyRouteInfo {
string hostname = 1;
string backend = 2;
string mode = 3;
bool backend_tls = 4;
}
message ProxyListenerInfo {
string addr = 1;
int32 route_count = 2;
int64 active_connections = 3;
repeated ProxyRouteInfo routes = 4;
}
message ListProxyRoutesResponse {
string version = 1;
int64 total_connections = 2;
google.protobuf.Timestamp started_at = 3;
repeated ProxyListenerInfo listeners = 4;
}
message AddProxyRouteRequest {
string listener_addr = 1; // e.g. ":443"
string hostname = 2;
string backend = 3;
string mode = 4; // "l4" or "l7"
bool backend_tls = 5;
string tls_cert = 6; // path to TLS cert (required for l7)
string tls_key = 7; // path to TLS key (required for l7)
}
message AddProxyRouteResponse {}
message RemoveProxyRouteRequest {
string listener_addr = 1; // e.g. ":443"
string hostname = 2;
}
message RemoveProxyRouteResponse {}
// --- Edge routes (v2) ---
message SetupEdgeRouteRequest {
string hostname = 1; // public hostname (e.g. "mcq.metacircular.net")
string backend_hostname = 2; // internal .svc.mcp hostname
int32 backend_port = 3; // port on worker's mc-proxy
bool backend_tls = 4; // MUST be true; agent rejects false
}
message SetupEdgeRouteResponse {}
message RemoveEdgeRouteRequest {
string hostname = 1;
}
message RemoveEdgeRouteResponse {}
message ListEdgeRoutesRequest {}
message ListEdgeRoutesResponse {
repeated EdgeRoute routes = 1;
}
message EdgeRoute {
string hostname = 1;
string backend_hostname = 2;
int32 backend_port = 3;
string cert_serial = 4;
string cert_expires = 5; // RFC3339
}
// --- Health check (v2) ---
message HealthCheckRequest {}
message HealthCheckResponse {
string status = 1; // "healthy" or "degraded"
int32 containers = 2;
}