Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d8f45ca520 | |||
| 95f86157b4 | |||
| 93e26d3789 | |||
| 3d2edb7c26 | |||
| bf02935716 | |||
| c4f0d7be8e | |||
| 4d900eafd1 | |||
| 38f9070c24 | |||
| 67d0ab1d9d | |||
| 7383b370f0 | |||
| 4c847e6de9 | |||
| 14b978861f | |||
| 18365cc0a8 |
151
ARCHITECTURE.md
151
ARCHITECTURE.md
@@ -121,9 +121,26 @@ option for future security hardening.
|
||||
## Authentication and Authorization
|
||||
|
||||
MCP follows the platform authentication model: all auth is delegated to
|
||||
MCIAS.
|
||||
MCIAS. The auth model separates three concerns: operator intent (CLI to
|
||||
agent), infrastructure automation (agent to platform services), and
|
||||
access control (who can do what).
|
||||
|
||||
### Agent Authentication
|
||||
### Identity Model
|
||||
|
||||
| Identity | Type | Purpose |
|
||||
|----------|------|---------|
|
||||
| Human operator (e.g., `kyle`) | human | CLI operations: deploy, stop, start, build |
|
||||
| `mcp-agent` | system | Agent-to-service automation: certs, DNS, routes, image pull |
|
||||
| Per-service accounts (e.g., `mcq`) | system | Scoped self-management (own DNS records only) |
|
||||
| `admin` role | role | MCIAS account management, policy changes, zone creation |
|
||||
| `guest` role | role | Explicitly rejected by the agent |
|
||||
|
||||
The `admin` role is reserved for MCIAS-level administrative operations
|
||||
(account creation, policy management, zone mutations). Routine MCP
|
||||
operations (deploy, stop, start, build) do not require admin — any
|
||||
authenticated non-guest user or system account is accepted.
|
||||
|
||||
### Agent Authentication (CLI → Agent)
|
||||
|
||||
The agent is a gRPC server with a unary interceptor that enforces
|
||||
authentication on every RPC:
|
||||
@@ -132,10 +149,34 @@ authentication on every RPC:
|
||||
(`authorization: Bearer <token>`).
|
||||
2. Agent extracts the token and validates it against MCIAS (cached 30s by
|
||||
SHA-256 of the token, per platform convention).
|
||||
3. Agent checks that the caller has the `admin` role. All MCP operations
|
||||
require admin -- there is no unprivileged MCP access.
|
||||
3. Agent rejects guests (`guest` role → `PERMISSION_DENIED`). All other
|
||||
authenticated users and system accounts are accepted.
|
||||
4. If validation fails, the RPC returns `UNAUTHENTICATED` (invalid/expired
|
||||
token) or `PERMISSION_DENIED` (valid token, not admin).
|
||||
token) or `PERMISSION_DENIED` (guest).
|
||||
|
||||
### Agent Service Authentication (Agent → Platform Services)
|
||||
|
||||
The agent authenticates to platform services using a long-lived system
|
||||
account token (`mcp-agent`). Each service has its own token file:
|
||||
|
||||
| Service | Token Path | Operations |
|
||||
|---------|------------|------------|
|
||||
| Metacrypt | `/srv/mcp/metacrypt-token` | TLS cert provisioning (PKI issue) |
|
||||
| MCNS | `/srv/mcp/mcns-token` | DNS record create/delete (any name) |
|
||||
| mc-proxy | Unix socket (no auth) | Route registration/removal |
|
||||
| MCR | podman auth store | Image pull (JWT-as-password) |
|
||||
|
||||
These tokens are issued by MCIAS for the `mcp-agent` system account.
|
||||
They carry no roles — authorization is handled by each service's policy
|
||||
engine:
|
||||
|
||||
- **Metacrypt:** Policy rule grants `mcp-agent` write access to
|
||||
`engine/pki/issue`.
|
||||
- **MCNS:** Code-level authorization: system account `mcp-agent` can
|
||||
manage any record; other system accounts can only manage records
|
||||
matching their username.
|
||||
- **MCR:** Default policy allows all authenticated users to push/pull.
|
||||
MCR accepts MCIAS JWTs as passwords at the `/v2/token` endpoint.
|
||||
|
||||
### CLI Authentication
|
||||
|
||||
@@ -148,6 +189,15 @@ obtained by:
|
||||
|
||||
The stored token is used for all subsequent agent RPCs until it expires.
|
||||
|
||||
### MCR Registry Authentication
|
||||
|
||||
`mcp build` auto-authenticates to MCR before pushing images. It reads
|
||||
the CLI's stored MCIAS token and uses it as the password for `podman
|
||||
login`. MCR's token endpoint accepts MCIAS JWTs as passwords (the
|
||||
personal-access-token pattern), so both human and system account tokens
|
||||
work. This eliminates the need for a separate interactive `podman login`
|
||||
step.
|
||||
|
||||
---
|
||||
|
||||
## Services and Components
|
||||
@@ -224,6 +274,9 @@ mcp pull <service> <path> [local-file] Copy a file from /srv/<service>/<path> to
|
||||
mcp node list List registered nodes
|
||||
mcp node add <name> <address> Register a node
|
||||
mcp node remove <name> Deregister a node
|
||||
|
||||
mcp agent upgrade [node] Build, push, and restart agent on all (or one) node(s)
|
||||
mcp agent status Show agent version on each node
|
||||
```
|
||||
|
||||
### Service Definition Files
|
||||
@@ -1144,20 +1197,84 @@ The agent's data directory follows the platform convention:
|
||||
|
||||
### Agent Deployment (on nodes)
|
||||
|
||||
The agent is deployed like any other Metacircular service:
|
||||
#### Provisioning (one-time per node)
|
||||
|
||||
1. Provision the `mcp` system user via NixOS config (with podman access
|
||||
and subuid/subgid ranges for rootless containers).
|
||||
Each node needs a one-time setup before the agent can run. The steps are
|
||||
the same regardless of OS, but the mechanism differs:
|
||||
|
||||
1. Create `mcp` system user with podman access and subuid/subgid ranges.
|
||||
2. Set `/srv/` ownership to the `mcp` user (the agent creates and manages
|
||||
`/srv/<service>/` directories for all services).
|
||||
3. Create `/srv/mcp/` directory and config file.
|
||||
4. Provision TLS certificate from Metacrypt.
|
||||
5. Create an MCIAS system account for the agent (`mcp-agent`).
|
||||
6. Install the `mcp-agent` binary.
|
||||
7. Start via systemd unit.
|
||||
6. Install the initial `mcp-agent` binary to `/srv/mcp/mcp-agent`.
|
||||
7. Install and start the systemd unit.
|
||||
|
||||
The agent runs as a systemd service. Container-first deployment is a v2
|
||||
concern -- MCP needs to be running before it can manage its own agent.
|
||||
On **NixOS** (rift), provisioning is declarative via the NixOS config.
|
||||
The NixOS config owns the infrastructure (user, systemd unit, podman,
|
||||
directories, permissions) but **not** the binary. `ExecStart` points to
|
||||
`/srv/mcp/mcp-agent`, a mutable path that MCP manages. NixOS may
|
||||
bootstrap the initial binary there, but subsequent updates come from MCP.
|
||||
|
||||
On **Debian** (hyperborea, svc), provisioning is done via a setup script
|
||||
or ansible playbook that creates the same layout.
|
||||
|
||||
#### Binary Location
|
||||
|
||||
The agent binary lives at `/srv/mcp/mcp-agent` on **all** nodes,
|
||||
regardless of OS. This unifies the update mechanism across the fleet.
|
||||
|
||||
#### Agent Upgrades
|
||||
|
||||
After initial provisioning, the agent binary is updated via
|
||||
`mcp agent upgrade`. The CLI:
|
||||
|
||||
1. Cross-compiles the agent for each target architecture
|
||||
(`GOARCH=amd64` for rift/svc, `GOARCH=arm64` for hyperborea).
|
||||
2. SSHs to each node, pushes the binary to `/srv/mcp/mcp-agent.new`.
|
||||
3. Atomically swaps the binary (`mv mcp-agent.new mcp-agent`).
|
||||
4. Restarts the systemd service (`systemctl restart mcp-agent`).
|
||||
|
||||
SSH is used instead of gRPC because:
|
||||
- It works even when the agent is broken or has an incompatible version.
|
||||
- The binary is ~17MB, which exceeds gRPC default message limits.
|
||||
- No self-restart coordination needed.
|
||||
|
||||
The CLI uses `golang.org/x/crypto/ssh` for native SSH, keeping the
|
||||
entire workflow in a single binary with no external tool dependencies.
|
||||
|
||||
#### Node Configuration
|
||||
|
||||
Node config includes SSH and architecture info for agent management:
|
||||
|
||||
```toml
|
||||
[[nodes]]
|
||||
name = "rift"
|
||||
address = "100.95.252.120:9444"
|
||||
ssh = "rift" # SSH host (from ~/.ssh/config or hostname)
|
||||
arch = "amd64" # GOARCH for cross-compilation
|
||||
|
||||
[[nodes]]
|
||||
name = "hyperborea"
|
||||
address = "100.x.x.x:9444"
|
||||
ssh = "hyperborea"
|
||||
arch = "arm64"
|
||||
```
|
||||
|
||||
#### Coordinated Upgrades
|
||||
|
||||
New MCP releases often add new RPCs. A CLI at v0.6.0 calling an agent
|
||||
at v0.5.0 fails with `Unimplemented`. Therefore agent upgrades must be
|
||||
coordinated: `mcp agent upgrade` (with no node argument) upgrades all
|
||||
nodes before the CLI is used for other operations.
|
||||
|
||||
If a node fails to upgrade, it is reported but the others still proceed.
|
||||
The operator can retry or investigate via SSH.
|
||||
|
||||
#### Systemd Unit
|
||||
|
||||
The systemd unit is the same on all nodes:
|
||||
|
||||
```ini
|
||||
[Unit]
|
||||
@@ -1167,7 +1284,7 @@ Wants=network-online.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
ExecStart=/usr/local/bin/mcp-agent server --config /srv/mcp/mcp-agent.toml
|
||||
ExecStart=/srv/mcp/mcp-agent server --config /srv/mcp/mcp-agent.toml
|
||||
Restart=on-failure
|
||||
RestartSec=5
|
||||
|
||||
@@ -1175,17 +1292,14 @@ User=mcp
|
||||
Group=mcp
|
||||
|
||||
NoNewPrivileges=true
|
||||
ProtectSystem=strict
|
||||
ProtectHome=true
|
||||
ProtectSystem=full
|
||||
ProtectHome=false
|
||||
PrivateTmp=true
|
||||
PrivateDevices=true
|
||||
ProtectKernelTunables=true
|
||||
ProtectKernelModules=true
|
||||
ProtectControlGroups=true
|
||||
RestrictSUIDSGID=true
|
||||
RestrictNamespaces=true
|
||||
LockPersonality=true
|
||||
MemoryDenyWriteExecute=true
|
||||
RestrictRealtime=true
|
||||
ReadWritePaths=/srv
|
||||
|
||||
@@ -1195,6 +1309,7 @@ WantedBy=multi-user.target
|
||||
|
||||
Note: `ReadWritePaths=/srv` (not `/srv/mcp`) because the agent writes
|
||||
files to any service's `/srv/<service>/` directory on behalf of the CLI.
|
||||
`ProtectHome=false` because the `mcp` user's home is `/srv/mcp`.
|
||||
|
||||
### CLI Installation (on operator workstation)
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ func main() {
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
return agent.Run(cfg)
|
||||
return agent.Run(cfg, version)
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
@@ -43,6 +43,7 @@ func dialAgent(address string, cfg *config.CLIConfig) (mcpv1.McpAgentServiceClie
|
||||
address,
|
||||
grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
|
||||
grpc.WithUnaryInterceptor(tokenInterceptor(token)),
|
||||
grpc.WithStreamInterceptor(streamTokenInterceptor(token)),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("dial %q: %w", address, err)
|
||||
@@ -60,6 +61,15 @@ func tokenInterceptor(token string) grpc.UnaryClientInterceptor {
|
||||
}
|
||||
}
|
||||
|
||||
// streamTokenInterceptor returns a gRPC client stream interceptor that
|
||||
// attaches the bearer token to outgoing stream metadata.
|
||||
func streamTokenInterceptor(token string) grpc.StreamClientInterceptor {
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token)
|
||||
return streamer(ctx, desc, cc, method, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// loadBearerToken reads the token from file or env var.
|
||||
func loadBearerToken(cfg *config.CLIConfig) (string, error) {
|
||||
if token := os.Getenv("MCP_TOKEN"); token != "" {
|
||||
|
||||
87
cmd/mcp/dns.go
Normal file
87
cmd/mcp/dns.go
Normal file
@@ -0,0 +1,87 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||
"git.wntrmute.dev/mc/mcp/internal/config"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
func dnsCmd() *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "dns",
|
||||
Short: "List all DNS zones and records from MCNS",
|
||||
RunE: runDNS,
|
||||
}
|
||||
}
|
||||
|
||||
func runDNS(_ *cobra.Command, _ []string) error {
|
||||
cfg, err := config.LoadCLIConfig(cfgPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
// DNS is centralized — query the first reachable agent.
|
||||
resp, nodeName, err := queryDNS(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(resp.GetZones()) == 0 {
|
||||
fmt.Println("no DNS zones configured")
|
||||
return nil
|
||||
}
|
||||
|
||||
_ = nodeName
|
||||
for i, zone := range resp.GetZones() {
|
||||
if i > 0 {
|
||||
fmt.Println()
|
||||
}
|
||||
fmt.Printf("ZONE: %s\n", zone.GetName())
|
||||
|
||||
if len(zone.GetRecords()) == 0 {
|
||||
fmt.Println(" (no records)")
|
||||
continue
|
||||
}
|
||||
|
||||
w := newTable()
|
||||
_, _ = fmt.Fprintln(w, " NAME\tTYPE\tVALUE\tTTL")
|
||||
for _, r := range zone.GetRecords() {
|
||||
_, _ = fmt.Fprintf(w, " %s\t%s\t%s\t%d\n",
|
||||
r.GetName(), r.GetType(), r.GetValue(), r.GetTtl())
|
||||
}
|
||||
_ = w.Flush()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// queryDNS tries each configured agent and returns the first successful
|
||||
// DNS listing. DNS is centralized so any agent with MCNS configured works.
|
||||
func queryDNS(cfg *config.CLIConfig) (*mcpv1.ListDNSRecordsResponse, string, error) {
|
||||
for _, node := range cfg.Nodes {
|
||||
client, conn, err := dialAgent(node.Address, cfg)
|
||||
if err != nil {
|
||||
_, _ = fmt.Fprintf(os.Stderr, "warning: %s: %v\n", node.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
resp, err := client.ListDNSRecords(ctx, &mcpv1.ListDNSRecordsRequest{})
|
||||
cancel()
|
||||
_ = conn.Close()
|
||||
|
||||
if err != nil {
|
||||
_, _ = fmt.Fprintf(os.Stderr, "warning: %s: list DNS: %v\n", node.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
return resp, node.Name, nil
|
||||
}
|
||||
|
||||
return nil, "", fmt.Errorf("no reachable agent with DNS configured")
|
||||
}
|
||||
12
cmd/mcp/edit.go
Normal file
12
cmd/mcp/edit.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package main
|
||||
|
||||
import "github.com/spf13/cobra"
|
||||
|
||||
func editCmd() *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "edit <service>",
|
||||
Short: "Open service definition in $EDITOR",
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: runServiceEdit,
|
||||
}
|
||||
}
|
||||
81
cmd/mcp/logs.go
Normal file
81
cmd/mcp/logs.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||
"git.wntrmute.dev/mc/mcp/internal/config"
|
||||
)
|
||||
|
||||
func logsCmd() *cobra.Command {
|
||||
var (
|
||||
tail int
|
||||
follow bool
|
||||
timestamps bool
|
||||
since string
|
||||
)
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "logs <service>[/<component>]",
|
||||
Short: "Show container logs",
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
cfg, err := config.LoadCLIConfig(cfgPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
serviceName, component := parseServiceArg(args[0])
|
||||
|
||||
def, err := loadServiceDef(cmd, cfg, serviceName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
address, err := findNodeAddress(cfg, def.Node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client, conn, err := dialAgent(address, cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dial agent: %w", err)
|
||||
}
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
stream, err := client.Logs(cmd.Context(), &mcpv1.LogsRequest{
|
||||
Service: serviceName,
|
||||
Component: component,
|
||||
Tail: int32(tail),
|
||||
Follow: follow,
|
||||
Timestamps: timestamps,
|
||||
Since: since,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("logs: %w", err)
|
||||
}
|
||||
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("recv: %w", err)
|
||||
}
|
||||
_, _ = os.Stdout.Write(resp.Data)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
cmd.Flags().IntVarP(&tail, "tail", "n", 0, "number of lines from end (0 = all)")
|
||||
cmd.Flags().BoolVarP(&follow, "follow", "f", false, "follow log output")
|
||||
cmd.Flags().BoolVarP(×tamps, "timestamps", "t", false, "show timestamps")
|
||||
cmd.Flags().StringVar(&since, "since", "", "show logs since (e.g., 2h, 2026-03-28T00:00:00Z)")
|
||||
|
||||
return cmd
|
||||
}
|
||||
@@ -50,6 +50,10 @@ func main() {
|
||||
root.AddCommand(pullCmd())
|
||||
root.AddCommand(nodeCmd())
|
||||
root.AddCommand(purgeCmd())
|
||||
root.AddCommand(logsCmd())
|
||||
root.AddCommand(editCmd())
|
||||
root.AddCommand(dnsCmd())
|
||||
root.AddCommand(routeCmd())
|
||||
|
||||
if err := root.Execute(); err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
toml "github.com/pelletier/go-toml/v2"
|
||||
|
||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||
"git.wntrmute.dev/mc/mcp/internal/config"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
@@ -48,13 +51,35 @@ func runNodeList(_ *cobra.Command, _ []string) error {
|
||||
}
|
||||
|
||||
w := tabwriter.NewWriter(os.Stdout, 0, 4, 2, ' ', 0)
|
||||
_, _ = fmt.Fprintln(w, "NAME\tADDRESS")
|
||||
_, _ = fmt.Fprintln(w, "NAME\tADDRESS\tVERSION")
|
||||
for _, n := range cfg.Nodes {
|
||||
_, _ = fmt.Fprintf(w, "%s\t%s\n", n.Name, n.Address)
|
||||
ver := queryAgentVersion(cfg, n.Address)
|
||||
_, _ = fmt.Fprintf(w, "%s\t%s\t%s\n", n.Name, n.Address, ver)
|
||||
}
|
||||
return w.Flush()
|
||||
}
|
||||
|
||||
// queryAgentVersion dials the agent and returns its version, or an error indicator.
|
||||
func queryAgentVersion(cfg *config.CLIConfig, address string) string {
|
||||
client, conn, err := dialAgent(address, cfg)
|
||||
if err != nil {
|
||||
return "error"
|
||||
}
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
resp, err := client.NodeStatus(ctx, &mcpv1.NodeStatusRequest{})
|
||||
if err != nil {
|
||||
return "error"
|
||||
}
|
||||
if resp.AgentVersion == "" {
|
||||
return "unknown"
|
||||
}
|
||||
return resp.AgentVersion
|
||||
}
|
||||
|
||||
func runNodeAdd(_ *cobra.Command, args []string) error {
|
||||
cfg, err := config.LoadCLIConfig(cfgPath)
|
||||
if err != nil {
|
||||
|
||||
212
cmd/mcp/route.go
Normal file
212
cmd/mcp/route.go
Normal file
@@ -0,0 +1,212 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||
"git.wntrmute.dev/mc/mcp/internal/config"
|
||||
)
|
||||
|
||||
func routeCmd() *cobra.Command {
|
||||
var nodeName string
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "route",
|
||||
Short: "Manage mc-proxy routes",
|
||||
}
|
||||
|
||||
list := &cobra.Command{
|
||||
Use: "list",
|
||||
Short: "List mc-proxy routes",
|
||||
RunE: func(_ *cobra.Command, _ []string) error {
|
||||
return runRouteList(nodeName)
|
||||
},
|
||||
}
|
||||
|
||||
add := &cobra.Command{
|
||||
Use: "add <listener> <hostname> <backend>",
|
||||
Short: "Add a route to mc-proxy",
|
||||
Long: "Add a route. Example: mcp route add -n rift :443 mcq.metacircular.net 100.95.252.120:443",
|
||||
Args: cobra.ExactArgs(3),
|
||||
RunE: func(_ *cobra.Command, args []string) error {
|
||||
return runRouteAdd(nodeName, args)
|
||||
},
|
||||
}
|
||||
add.Flags().String("mode", "l4", "route mode (l4 or l7)")
|
||||
add.Flags().Bool("backend-tls", false, "re-encrypt traffic to backend")
|
||||
|
||||
remove := &cobra.Command{
|
||||
Use: "remove <listener> <hostname>",
|
||||
Short: "Remove a route from mc-proxy",
|
||||
Long: "Remove a route. Example: mcp route remove -n rift :443 mcq.metacircular.net",
|
||||
Args: cobra.ExactArgs(2),
|
||||
RunE: func(_ *cobra.Command, args []string) error {
|
||||
return runRouteRemove(nodeName, args)
|
||||
},
|
||||
}
|
||||
|
||||
cmd.PersistentFlags().StringVarP(&nodeName, "node", "n", "", "target node (required)")
|
||||
|
||||
cmd.AddCommand(list, add, remove)
|
||||
return cmd
|
||||
}
|
||||
|
||||
func runRouteList(nodeName string) error {
|
||||
if nodeName == "" {
|
||||
return runRouteListAll()
|
||||
}
|
||||
|
||||
cfg, err := config.LoadCLIConfig(cfgPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
address, err := findNodeAddress(cfg, nodeName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client, conn, err := dialAgent(address, cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dial agent: %w", err)
|
||||
}
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
resp, err := client.ListProxyRoutes(ctx, &mcpv1.ListProxyRoutesRequest{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("list routes: %w", err)
|
||||
}
|
||||
|
||||
printRoutes(nodeName, resp)
|
||||
return nil
|
||||
}
|
||||
|
||||
func runRouteListAll() error {
|
||||
first := true
|
||||
return forEachNode(func(node config.NodeConfig, client mcpv1.McpAgentServiceClient) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
resp, err := client.ListProxyRoutes(ctx, &mcpv1.ListProxyRoutesRequest{})
|
||||
if err != nil {
|
||||
_, _ = fmt.Fprintf(os.Stderr, "warning: %s: list routes: %v\n", node.Name, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if !first {
|
||||
fmt.Println()
|
||||
}
|
||||
first = false
|
||||
|
||||
printRoutes(node.Name, resp)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func printRoutes(nodeName string, resp *mcpv1.ListProxyRoutesResponse) {
|
||||
fmt.Printf("NODE: %s\n", nodeName)
|
||||
fmt.Printf("mc-proxy %s\n", resp.GetVersion())
|
||||
if resp.GetStartedAt() != nil {
|
||||
uptime := time.Since(resp.GetStartedAt().AsTime()).Truncate(time.Second)
|
||||
fmt.Printf("uptime: %s\n", uptime)
|
||||
}
|
||||
fmt.Printf("connections: %d\n", resp.GetTotalConnections())
|
||||
fmt.Println()
|
||||
|
||||
for _, ls := range resp.GetListeners() {
|
||||
fmt.Printf(" %s routes=%d active=%d\n",
|
||||
ls.GetAddr(), ls.GetRouteCount(), ls.GetActiveConnections())
|
||||
for _, r := range ls.GetRoutes() {
|
||||
mode := r.GetMode()
|
||||
if mode == "" {
|
||||
mode = "l4"
|
||||
}
|
||||
extra := ""
|
||||
if r.GetBackendTls() {
|
||||
extra = " (re-encrypt)"
|
||||
}
|
||||
fmt.Printf(" %s %s → %s%s\n", mode, r.GetHostname(), r.GetBackend(), extra)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func runRouteAdd(nodeName string, args []string) error {
|
||||
if nodeName == "" {
|
||||
return fmt.Errorf("--node is required")
|
||||
}
|
||||
|
||||
cfg, err := config.LoadCLIConfig(cfgPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
address, err := findNodeAddress(cfg, nodeName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client, conn, err := dialAgent(address, cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dial agent: %w", err)
|
||||
}
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_, err = client.AddProxyRoute(ctx, &mcpv1.AddProxyRouteRequest{
|
||||
ListenerAddr: args[0],
|
||||
Hostname: args[1],
|
||||
Backend: args[2],
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("add route: %w", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Added route: %s → %s on %s (%s)\n", args[1], args[2], args[0], nodeName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func runRouteRemove(nodeName string, args []string) error {
|
||||
if nodeName == "" {
|
||||
return fmt.Errorf("--node is required")
|
||||
}
|
||||
|
||||
cfg, err := config.LoadCLIConfig(cfgPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
address, err := findNodeAddress(cfg, nodeName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client, conn, err := dialAgent(address, cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dial agent: %w", err)
|
||||
}
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_, err = client.RemoveProxyRoute(ctx, &mcpv1.RemoveProxyRouteRequest{
|
||||
ListenerAddr: args[0],
|
||||
Hostname: args[1],
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("remove route: %w", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Removed route: %s from %s (%s)\n", args[1], args[0], nodeName)
|
||||
return nil
|
||||
}
|
||||
@@ -10,7 +10,7 @@
|
||||
let
|
||||
system = "x86_64-linux";
|
||||
pkgs = nixpkgs.legacyPackages.${system};
|
||||
version = "0.6.0";
|
||||
version = pkgs.lib.removePrefix "v" (self.gitDescribe or self.shortRev or self.dirtyShortRev or "unknown");
|
||||
in
|
||||
{
|
||||
packages.${system} = {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -33,6 +33,11 @@ const (
|
||||
McpAgentService_PushFile_FullMethodName = "/mcp.v1.McpAgentService/PushFile"
|
||||
McpAgentService_PullFile_FullMethodName = "/mcp.v1.McpAgentService/PullFile"
|
||||
McpAgentService_NodeStatus_FullMethodName = "/mcp.v1.McpAgentService/NodeStatus"
|
||||
McpAgentService_ListDNSRecords_FullMethodName = "/mcp.v1.McpAgentService/ListDNSRecords"
|
||||
McpAgentService_ListProxyRoutes_FullMethodName = "/mcp.v1.McpAgentService/ListProxyRoutes"
|
||||
McpAgentService_AddProxyRoute_FullMethodName = "/mcp.v1.McpAgentService/AddProxyRoute"
|
||||
McpAgentService_RemoveProxyRoute_FullMethodName = "/mcp.v1.McpAgentService/RemoveProxyRoute"
|
||||
McpAgentService_Logs_FullMethodName = "/mcp.v1.McpAgentService/Logs"
|
||||
)
|
||||
|
||||
// McpAgentServiceClient is the client API for McpAgentService service.
|
||||
@@ -60,6 +65,14 @@ type McpAgentServiceClient interface {
|
||||
PullFile(ctx context.Context, in *PullFileRequest, opts ...grpc.CallOption) (*PullFileResponse, error)
|
||||
// Node
|
||||
NodeStatus(ctx context.Context, in *NodeStatusRequest, opts ...grpc.CallOption) (*NodeStatusResponse, error)
|
||||
// DNS (query MCNS)
|
||||
ListDNSRecords(ctx context.Context, in *ListDNSRecordsRequest, opts ...grpc.CallOption) (*ListDNSRecordsResponse, error)
|
||||
// Proxy routes (query mc-proxy)
|
||||
ListProxyRoutes(ctx context.Context, in *ListProxyRoutesRequest, opts ...grpc.CallOption) (*ListProxyRoutesResponse, error)
|
||||
AddProxyRoute(ctx context.Context, in *AddProxyRouteRequest, opts ...grpc.CallOption) (*AddProxyRouteResponse, error)
|
||||
RemoveProxyRoute(ctx context.Context, in *RemoveProxyRouteRequest, opts ...grpc.CallOption) (*RemoveProxyRouteResponse, error)
|
||||
// Logs
|
||||
Logs(ctx context.Context, in *LogsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[LogsResponse], error)
|
||||
}
|
||||
|
||||
type mcpAgentServiceClient struct {
|
||||
@@ -210,6 +223,65 @@ func (c *mcpAgentServiceClient) NodeStatus(ctx context.Context, in *NodeStatusRe
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mcpAgentServiceClient) ListDNSRecords(ctx context.Context, in *ListDNSRecordsRequest, opts ...grpc.CallOption) (*ListDNSRecordsResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(ListDNSRecordsResponse)
|
||||
err := c.cc.Invoke(ctx, McpAgentService_ListDNSRecords_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mcpAgentServiceClient) ListProxyRoutes(ctx context.Context, in *ListProxyRoutesRequest, opts ...grpc.CallOption) (*ListProxyRoutesResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(ListProxyRoutesResponse)
|
||||
err := c.cc.Invoke(ctx, McpAgentService_ListProxyRoutes_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mcpAgentServiceClient) AddProxyRoute(ctx context.Context, in *AddProxyRouteRequest, opts ...grpc.CallOption) (*AddProxyRouteResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(AddProxyRouteResponse)
|
||||
err := c.cc.Invoke(ctx, McpAgentService_AddProxyRoute_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mcpAgentServiceClient) RemoveProxyRoute(ctx context.Context, in *RemoveProxyRouteRequest, opts ...grpc.CallOption) (*RemoveProxyRouteResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(RemoveProxyRouteResponse)
|
||||
err := c.cc.Invoke(ctx, McpAgentService_RemoveProxyRoute_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mcpAgentServiceClient) Logs(ctx context.Context, in *LogsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[LogsResponse], error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
stream, err := c.cc.NewStream(ctx, &McpAgentService_ServiceDesc.Streams[0], McpAgentService_Logs_FullMethodName, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &grpc.GenericClientStream[LogsRequest, LogsResponse]{ClientStream: stream}
|
||||
if err := x.ClientStream.SendMsg(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := x.ClientStream.CloseSend(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||
type McpAgentService_LogsClient = grpc.ServerStreamingClient[LogsResponse]
|
||||
|
||||
// McpAgentServiceServer is the server API for McpAgentService service.
|
||||
// All implementations must embed UnimplementedMcpAgentServiceServer
|
||||
// for forward compatibility.
|
||||
@@ -235,6 +307,14 @@ type McpAgentServiceServer interface {
|
||||
PullFile(context.Context, *PullFileRequest) (*PullFileResponse, error)
|
||||
// Node
|
||||
NodeStatus(context.Context, *NodeStatusRequest) (*NodeStatusResponse, error)
|
||||
// DNS (query MCNS)
|
||||
ListDNSRecords(context.Context, *ListDNSRecordsRequest) (*ListDNSRecordsResponse, error)
|
||||
// Proxy routes (query mc-proxy)
|
||||
ListProxyRoutes(context.Context, *ListProxyRoutesRequest) (*ListProxyRoutesResponse, error)
|
||||
AddProxyRoute(context.Context, *AddProxyRouteRequest) (*AddProxyRouteResponse, error)
|
||||
RemoveProxyRoute(context.Context, *RemoveProxyRouteRequest) (*RemoveProxyRouteResponse, error)
|
||||
// Logs
|
||||
Logs(*LogsRequest, grpc.ServerStreamingServer[LogsResponse]) error
|
||||
mustEmbedUnimplementedMcpAgentServiceServer()
|
||||
}
|
||||
|
||||
@@ -287,6 +367,21 @@ func (UnimplementedMcpAgentServiceServer) PullFile(context.Context, *PullFileReq
|
||||
func (UnimplementedMcpAgentServiceServer) NodeStatus(context.Context, *NodeStatusRequest) (*NodeStatusResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method NodeStatus not implemented")
|
||||
}
|
||||
func (UnimplementedMcpAgentServiceServer) ListDNSRecords(context.Context, *ListDNSRecordsRequest) (*ListDNSRecordsResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method ListDNSRecords not implemented")
|
||||
}
|
||||
func (UnimplementedMcpAgentServiceServer) ListProxyRoutes(context.Context, *ListProxyRoutesRequest) (*ListProxyRoutesResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method ListProxyRoutes not implemented")
|
||||
}
|
||||
func (UnimplementedMcpAgentServiceServer) AddProxyRoute(context.Context, *AddProxyRouteRequest) (*AddProxyRouteResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method AddProxyRoute not implemented")
|
||||
}
|
||||
func (UnimplementedMcpAgentServiceServer) RemoveProxyRoute(context.Context, *RemoveProxyRouteRequest) (*RemoveProxyRouteResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method RemoveProxyRoute not implemented")
|
||||
}
|
||||
func (UnimplementedMcpAgentServiceServer) Logs(*LogsRequest, grpc.ServerStreamingServer[LogsResponse]) error {
|
||||
return status.Error(codes.Unimplemented, "method Logs not implemented")
|
||||
}
|
||||
func (UnimplementedMcpAgentServiceServer) mustEmbedUnimplementedMcpAgentServiceServer() {}
|
||||
func (UnimplementedMcpAgentServiceServer) testEmbeddedByValue() {}
|
||||
|
||||
@@ -560,6 +655,89 @@ func _McpAgentService_NodeStatus_Handler(srv interface{}, ctx context.Context, d
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _McpAgentService_ListDNSRecords_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(ListDNSRecordsRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(McpAgentServiceServer).ListDNSRecords(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: McpAgentService_ListDNSRecords_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(McpAgentServiceServer).ListDNSRecords(ctx, req.(*ListDNSRecordsRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _McpAgentService_ListProxyRoutes_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(ListProxyRoutesRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(McpAgentServiceServer).ListProxyRoutes(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: McpAgentService_ListProxyRoutes_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(McpAgentServiceServer).ListProxyRoutes(ctx, req.(*ListProxyRoutesRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _McpAgentService_AddProxyRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(AddProxyRouteRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(McpAgentServiceServer).AddProxyRoute(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: McpAgentService_AddProxyRoute_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(McpAgentServiceServer).AddProxyRoute(ctx, req.(*AddProxyRouteRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _McpAgentService_RemoveProxyRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(RemoveProxyRouteRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(McpAgentServiceServer).RemoveProxyRoute(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: McpAgentService_RemoveProxyRoute_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(McpAgentServiceServer).RemoveProxyRoute(ctx, req.(*RemoveProxyRouteRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _McpAgentService_Logs_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
m := new(LogsRequest)
|
||||
if err := stream.RecvMsg(m); err != nil {
|
||||
return err
|
||||
}
|
||||
return srv.(McpAgentServiceServer).Logs(m, &grpc.GenericServerStream[LogsRequest, LogsResponse]{ServerStream: stream})
|
||||
}
|
||||
|
||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||
type McpAgentService_LogsServer = grpc.ServerStreamingServer[LogsResponse]
|
||||
|
||||
// McpAgentService_ServiceDesc is the grpc.ServiceDesc for McpAgentService service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
@@ -623,7 +801,29 @@ var McpAgentService_ServiceDesc = grpc.ServiceDesc{
|
||||
MethodName: "NodeStatus",
|
||||
Handler: _McpAgentService_NodeStatus_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "ListDNSRecords",
|
||||
Handler: _McpAgentService_ListDNSRecords_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "ListProxyRoutes",
|
||||
Handler: _McpAgentService_ListProxyRoutes_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "AddProxyRoute",
|
||||
Handler: _McpAgentService_AddProxyRoute_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "RemoveProxyRoute",
|
||||
Handler: _McpAgentService_RemoveProxyRoute_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
StreamName: "Logs",
|
||||
Handler: _McpAgentService_Logs_Handler,
|
||||
ServerStreams: true,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{},
|
||||
Metadata: "proto/mcp/v1/mcp.proto",
|
||||
}
|
||||
|
||||
@@ -35,11 +35,12 @@ type Agent struct {
|
||||
Proxy *ProxyRouter
|
||||
Certs *CertProvisioner
|
||||
DNS *DNSRegistrar
|
||||
Version string
|
||||
}
|
||||
|
||||
// Run starts the agent: opens the database, sets up the gRPC server with
|
||||
// TLS and auth, and blocks until SIGINT/SIGTERM.
|
||||
func Run(cfg *config.AgentConfig) error {
|
||||
func Run(cfg *config.AgentConfig, version string) error {
|
||||
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
|
||||
Level: parseLogLevel(cfg.Log.Level),
|
||||
}))
|
||||
@@ -79,6 +80,7 @@ func Run(cfg *config.AgentConfig) error {
|
||||
Proxy: proxy,
|
||||
Certs: certs,
|
||||
DNS: dns,
|
||||
Version: version,
|
||||
}
|
||||
|
||||
tlsCert, err := tls.LoadX509KeyPair(cfg.Server.TLSCert, cfg.Server.TLSKey)
|
||||
@@ -100,6 +102,9 @@ func Run(cfg *config.AgentConfig) error {
|
||||
grpc.ChainUnaryInterceptor(
|
||||
auth.AuthInterceptor(validator),
|
||||
),
|
||||
grpc.ChainStreamInterceptor(
|
||||
auth.StreamAuthInterceptor(validator),
|
||||
),
|
||||
)
|
||||
mcpv1.RegisterMcpAgentServiceServer(server, a)
|
||||
|
||||
|
||||
@@ -34,6 +34,9 @@ func (a *Agent) Deploy(ctx context.Context, req *mcpv1.DeployRequest) (*mcpv1.De
|
||||
filtered = append(filtered, cs)
|
||||
}
|
||||
}
|
||||
if len(filtered) == 0 {
|
||||
return nil, fmt.Errorf("component %q not found in service %q", target, serviceName)
|
||||
}
|
||||
components = filtered
|
||||
}
|
||||
|
||||
@@ -131,7 +134,8 @@ func (a *Agent) deployComponent(ctx context.Context, serviceName string, cs *mcp
|
||||
Error: fmt.Sprintf("allocate route ports: %v", err),
|
||||
}
|
||||
}
|
||||
runSpec.Ports = ports
|
||||
// Merge explicit ports from the spec with route-allocated ports.
|
||||
runSpec.Ports = append(cs.GetPorts(), ports...)
|
||||
runSpec.Env = append(runSpec.Env, env...)
|
||||
} else {
|
||||
// Legacy: use ports directly from the spec.
|
||||
|
||||
@@ -26,8 +26,8 @@ type DNSRegistrar struct {
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// dnsRecord is the JSON representation of an MCNS record.
|
||||
type dnsRecord struct {
|
||||
// DNSRecord is the JSON representation of an MCNS record.
|
||||
type DNSRecord struct {
|
||||
ID int `json:"ID"`
|
||||
Name string `json:"Name"`
|
||||
Type string `json:"Type"`
|
||||
@@ -136,8 +136,87 @@ func (d *DNSRegistrar) RemoveRecord(ctx context.Context, serviceName string) err
|
||||
return nil
|
||||
}
|
||||
|
||||
// DNSZone is the JSON representation of an MCNS zone.
|
||||
type DNSZone struct {
|
||||
Name string `json:"Name"`
|
||||
}
|
||||
|
||||
// ListZones returns all zones from MCNS.
|
||||
func (d *DNSRegistrar) ListZones(ctx context.Context) ([]DNSZone, error) {
|
||||
if d == nil {
|
||||
return nil, fmt.Errorf("DNS registrar not configured")
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("%s/v1/zones", d.serverURL)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create list zones request: %w", err)
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+d.token)
|
||||
|
||||
resp, err := d.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list zones: %w", err)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read list zones response: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("list zones: mcns returned %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var envelope struct {
|
||||
Zones []DNSZone `json:"zones"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &envelope); err != nil {
|
||||
return nil, fmt.Errorf("parse list zones response: %w", err)
|
||||
}
|
||||
return envelope.Zones, nil
|
||||
}
|
||||
|
||||
// ListZoneRecords returns all records in the given zone (no filters).
|
||||
func (d *DNSRegistrar) ListZoneRecords(ctx context.Context, zone string) ([]DNSRecord, error) {
|
||||
if d == nil {
|
||||
return nil, fmt.Errorf("DNS registrar not configured")
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("%s/v1/zones/%s/records", d.serverURL, zone)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create list zone records request: %w", err)
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+d.token)
|
||||
|
||||
resp, err := d.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list zone records: %w", err)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read list zone records response: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("list zone records: mcns returned %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var envelope struct {
|
||||
Records []DNSRecord `json:"records"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &envelope); err != nil {
|
||||
return nil, fmt.Errorf("parse list zone records response: %w", err)
|
||||
}
|
||||
return envelope.Records, nil
|
||||
}
|
||||
|
||||
// listRecords returns A records matching the service name in the zone.
|
||||
func (d *DNSRegistrar) listRecords(ctx context.Context, serviceName string) ([]dnsRecord, error) {
|
||||
func (d *DNSRegistrar) listRecords(ctx context.Context, serviceName string) ([]DNSRecord, error) {
|
||||
url := fmt.Sprintf("%s/v1/zones/%s/records?name=%s&type=A", d.serverURL, d.zone, serviceName)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
@@ -161,7 +240,7 @@ func (d *DNSRegistrar) listRecords(ctx context.Context, serviceName string) ([]d
|
||||
}
|
||||
|
||||
var envelope struct {
|
||||
Records []dnsRecord `json:"records"`
|
||||
Records []DNSRecord `json:"records"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &envelope); err != nil {
|
||||
return nil, fmt.Errorf("parse list response: %w", err)
|
||||
|
||||
40
internal/agent/dns_rpc.go
Normal file
40
internal/agent/dns_rpc.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||
)
|
||||
|
||||
// ListDNSRecords queries MCNS for all zones and their records.
|
||||
func (a *Agent) ListDNSRecords(ctx context.Context, _ *mcpv1.ListDNSRecordsRequest) (*mcpv1.ListDNSRecordsResponse, error) {
|
||||
a.Logger.Debug("ListDNSRecords called")
|
||||
|
||||
zones, err := a.DNS.ListZones(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list zones: %w", err)
|
||||
}
|
||||
|
||||
resp := &mcpv1.ListDNSRecordsResponse{}
|
||||
for _, z := range zones {
|
||||
records, err := a.DNS.ListZoneRecords(ctx, z.Name)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list records for zone %q: %w", z.Name, err)
|
||||
}
|
||||
|
||||
zone := &mcpv1.DNSZone{Name: z.Name}
|
||||
for _, r := range records {
|
||||
zone.Records = append(zone.Records, &mcpv1.DNSRecord{
|
||||
Id: int64(r.ID),
|
||||
Name: r.Name,
|
||||
Type: r.Type,
|
||||
Value: r.Value,
|
||||
Ttl: int32(r.TTL), //nolint:gosec // TTL is bounded
|
||||
})
|
||||
}
|
||||
resp.Zones = append(resp.Zones, zone)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
@@ -90,7 +90,7 @@ func TestEnsureRecordSkipsWhenExists(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method == http.MethodGet {
|
||||
// Return an existing record with the correct value.
|
||||
resp := map[string][]dnsRecord{"records": {{ID: 1, Name: "myservice", Type: "A", Value: "192.168.88.181", TTL: 300}}}
|
||||
resp := map[string][]DNSRecord{"records": {{ID: 1, Name: "myservice", Type: "A", Value: "192.168.88.181", TTL: 300}}}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(resp)
|
||||
return
|
||||
@@ -124,7 +124,7 @@ func TestEnsureRecordUpdatesWrongValue(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method == http.MethodGet {
|
||||
// Return a record with a stale value.
|
||||
resp := map[string][]dnsRecord{"records": {{ID: 42, Name: "myservice", Type: "A", Value: "10.0.0.1", TTL: 300}}}
|
||||
resp := map[string][]DNSRecord{"records": {{ID: 42, Name: "myservice", Type: "A", Value: "10.0.0.1", TTL: 300}}}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(resp)
|
||||
return
|
||||
@@ -160,7 +160,7 @@ func TestRemoveRecordDeletes(t *testing.T) {
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method == http.MethodGet {
|
||||
resp := map[string][]dnsRecord{"records": {{ID: 7, Name: "myservice", Type: "A", Value: "192.168.88.181", TTL: 300}}}
|
||||
resp := map[string][]DNSRecord{"records": {{ID: 7, Name: "myservice", Type: "A", Value: "192.168.88.181", TTL: 300}}}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(resp)
|
||||
return
|
||||
|
||||
79
internal/agent/logs.go
Normal file
79
internal/agent/logs.go
Normal file
@@ -0,0 +1,79 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
|
||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||
"git.wntrmute.dev/mc/mcp/internal/registry"
|
||||
"git.wntrmute.dev/mc/mcp/internal/runtime"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// Logs streams container logs for a service component.
|
||||
func (a *Agent) Logs(req *mcpv1.LogsRequest, stream mcpv1.McpAgentService_LogsServer) error {
|
||||
if req.GetService() == "" {
|
||||
return status.Error(codes.InvalidArgument, "service name is required")
|
||||
}
|
||||
|
||||
// Resolve component name.
|
||||
component := req.GetComponent()
|
||||
if component == "" {
|
||||
components, err := registry.ListComponents(a.DB, req.GetService())
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "list components: %v", err)
|
||||
}
|
||||
if len(components) == 0 {
|
||||
return status.Error(codes.NotFound, "no components found for service")
|
||||
}
|
||||
component = components[0].Name
|
||||
}
|
||||
|
||||
containerName := ContainerNameFor(req.GetService(), component)
|
||||
|
||||
podman, ok := a.Runtime.(*runtime.Podman)
|
||||
if !ok {
|
||||
return status.Error(codes.Internal, "logs requires podman runtime")
|
||||
}
|
||||
|
||||
cmd := podman.Logs(stream.Context(), containerName, int(req.GetTail()), req.GetFollow(), req.GetTimestamps(), req.GetSince())
|
||||
|
||||
a.Logger.Info("running podman logs", "container", containerName, "args", cmd.Args)
|
||||
|
||||
// Podman writes container stdout to its stdout and container stderr
|
||||
// to its stderr. Merge both into a single pipe.
|
||||
pr, pw := io.Pipe()
|
||||
cmd.Stdout = pw
|
||||
cmd.Stderr = pw
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
pw.Close()
|
||||
return status.Errorf(codes.Internal, "start podman logs: %v", err)
|
||||
}
|
||||
|
||||
// Close the write end when the command exits so the scanner finishes.
|
||||
go func() {
|
||||
err := cmd.Wait()
|
||||
if err != nil {
|
||||
a.Logger.Warn("podman logs exited", "container", containerName, "error", err)
|
||||
}
|
||||
pw.Close()
|
||||
}()
|
||||
|
||||
scanner := bufio.NewScanner(pr)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
if len(line) == 0 {
|
||||
continue
|
||||
}
|
||||
if err := stream.Send(&mcpv1.LogsResponse{
|
||||
Data: append(line, '\n'),
|
||||
}); err != nil {
|
||||
_ = cmd.Process.Kill()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -31,6 +31,7 @@ func (a *Agent) NodeStatus(ctx context.Context, _ *mcpv1.NodeStatusRequest) (*mc
|
||||
Runtime: a.Config.Agent.ContainerRuntime,
|
||||
ServiceCount: uint32(len(services)), //nolint:gosec // bounded
|
||||
ComponentCount: componentCount,
|
||||
AgentVersion: a.Version,
|
||||
}
|
||||
|
||||
// Runtime version.
|
||||
|
||||
@@ -48,6 +48,30 @@ func (p *ProxyRouter) Close() error {
|
||||
return p.client.Close()
|
||||
}
|
||||
|
||||
// GetStatus returns the mc-proxy server status.
|
||||
func (p *ProxyRouter) GetStatus(ctx context.Context) (*mcproxy.Status, error) {
|
||||
if p == nil {
|
||||
return nil, fmt.Errorf("mc-proxy not configured")
|
||||
}
|
||||
return p.client.GetStatus(ctx)
|
||||
}
|
||||
|
||||
// AddRoute adds a single route to mc-proxy.
|
||||
func (p *ProxyRouter) AddRoute(ctx context.Context, listenerAddr string, route mcproxy.Route) error {
|
||||
if p == nil {
|
||||
return fmt.Errorf("mc-proxy not configured")
|
||||
}
|
||||
return p.client.AddRoute(ctx, listenerAddr, route)
|
||||
}
|
||||
|
||||
// RemoveRoute removes a single route from mc-proxy.
|
||||
func (p *ProxyRouter) RemoveRoute(ctx context.Context, listenerAddr, hostname string) error {
|
||||
if p == nil {
|
||||
return fmt.Errorf("mc-proxy not configured")
|
||||
}
|
||||
return p.client.RemoveRoute(ctx, listenerAddr, hostname)
|
||||
}
|
||||
|
||||
// RegisterRoutes registers all routes for a service component with mc-proxy.
|
||||
// It uses the assigned host ports from the registry.
|
||||
func (p *ProxyRouter) RegisterRoutes(ctx context.Context, serviceName string, routes []registry.Route, hostPorts map[string]int) error {
|
||||
|
||||
111
internal/agent/proxy_rpc.go
Normal file
111
internal/agent/proxy_rpc.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.wntrmute.dev/mc/mc-proxy/client/mcproxy"
|
||||
mcpv1 "git.wntrmute.dev/mc/mcp/gen/mcp/v1"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
// ListProxyRoutes queries mc-proxy for its current status and routes.
|
||||
func (a *Agent) ListProxyRoutes(ctx context.Context, _ *mcpv1.ListProxyRoutesRequest) (*mcpv1.ListProxyRoutesResponse, error) {
|
||||
a.Logger.Debug("ListProxyRoutes called")
|
||||
|
||||
status, err := a.Proxy.GetStatus(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get mc-proxy status: %w", err)
|
||||
}
|
||||
|
||||
resp := &mcpv1.ListProxyRoutesResponse{
|
||||
Version: status.Version,
|
||||
TotalConnections: status.TotalConnections,
|
||||
}
|
||||
if !status.StartedAt.IsZero() {
|
||||
resp.StartedAt = timestamppb.New(status.StartedAt)
|
||||
}
|
||||
|
||||
for _, ls := range status.Listeners {
|
||||
listener := &mcpv1.ProxyListenerInfo{
|
||||
Addr: ls.Addr,
|
||||
RouteCount: int32(ls.RouteCount), //nolint:gosec // bounded
|
||||
ActiveConnections: ls.ActiveConnections,
|
||||
}
|
||||
for _, r := range ls.Routes {
|
||||
listener.Routes = append(listener.Routes, &mcpv1.ProxyRouteInfo{
|
||||
Hostname: r.Hostname,
|
||||
Backend: r.Backend,
|
||||
Mode: r.Mode,
|
||||
BackendTls: r.BackendTLS,
|
||||
})
|
||||
}
|
||||
resp.Listeners = append(resp.Listeners, listener)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// AddProxyRoute adds a route to mc-proxy.
|
||||
func (a *Agent) AddProxyRoute(ctx context.Context, req *mcpv1.AddProxyRouteRequest) (*mcpv1.AddProxyRouteResponse, error) {
|
||||
if req.GetListenerAddr() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "listener_addr is required")
|
||||
}
|
||||
if req.GetHostname() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "hostname is required")
|
||||
}
|
||||
if req.GetBackend() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "backend is required")
|
||||
}
|
||||
|
||||
if a.Proxy == nil {
|
||||
return nil, status.Error(codes.FailedPrecondition, "mc-proxy not configured")
|
||||
}
|
||||
|
||||
route := mcproxy.Route{
|
||||
Hostname: req.GetHostname(),
|
||||
Backend: req.GetBackend(),
|
||||
Mode: req.GetMode(),
|
||||
BackendTLS: req.GetBackendTls(),
|
||||
}
|
||||
|
||||
if err := a.Proxy.AddRoute(ctx, req.GetListenerAddr(), route); err != nil {
|
||||
return nil, fmt.Errorf("add route: %w", err)
|
||||
}
|
||||
|
||||
a.Logger.Info("route added",
|
||||
"listener", req.GetListenerAddr(),
|
||||
"hostname", req.GetHostname(),
|
||||
"backend", req.GetBackend(),
|
||||
"mode", req.GetMode(),
|
||||
)
|
||||
|
||||
return &mcpv1.AddProxyRouteResponse{}, nil
|
||||
}
|
||||
|
||||
// RemoveProxyRoute removes a route from mc-proxy.
|
||||
func (a *Agent) RemoveProxyRoute(ctx context.Context, req *mcpv1.RemoveProxyRouteRequest) (*mcpv1.RemoveProxyRouteResponse, error) {
|
||||
if req.GetListenerAddr() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "listener_addr is required")
|
||||
}
|
||||
if req.GetHostname() == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "hostname is required")
|
||||
}
|
||||
|
||||
if a.Proxy == nil {
|
||||
return nil, status.Error(codes.FailedPrecondition, "mc-proxy not configured")
|
||||
}
|
||||
|
||||
if err := a.Proxy.RemoveRoute(ctx, req.GetListenerAddr(), req.GetHostname()); err != nil {
|
||||
return nil, fmt.Errorf("remove route: %w", err)
|
||||
}
|
||||
|
||||
a.Logger.Info("route removed",
|
||||
"listener", req.GetListenerAddr(),
|
||||
"hostname", req.GetHostname(),
|
||||
)
|
||||
|
||||
return &mcpv1.RemoveProxyRouteResponse{}, nil
|
||||
}
|
||||
@@ -99,6 +99,12 @@ func (a *Agent) liveCheckServices(ctx context.Context) ([]*mcpv1.ServiceInfo, er
|
||||
|
||||
if rc, ok := runtimeByName[containerName]; ok {
|
||||
ci.ObservedState = rc.State
|
||||
if rc.Version != "" {
|
||||
ci.Version = rc.Version
|
||||
}
|
||||
if rc.Image != "" {
|
||||
ci.Image = rc.Image
|
||||
}
|
||||
if !rc.Started.IsZero() {
|
||||
ci.Started = timestamppb.New(rc.Started)
|
||||
}
|
||||
|
||||
@@ -255,6 +255,52 @@ func AuthInterceptor(validator TokenValidator) grpc.UnaryServerInterceptor {
|
||||
}
|
||||
}
|
||||
|
||||
// StreamAuthInterceptor returns a gRPC stream server interceptor with
|
||||
// the same authentication rules as AuthInterceptor.
|
||||
func StreamAuthInterceptor(validator TokenValidator) grpc.StreamServerInterceptor {
|
||||
return func(
|
||||
srv any,
|
||||
ss grpc.ServerStream,
|
||||
info *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler,
|
||||
) error {
|
||||
md, ok := metadata.FromIncomingContext(ss.Context())
|
||||
if !ok {
|
||||
return status.Error(codes.Unauthenticated, "missing metadata")
|
||||
}
|
||||
|
||||
authValues := md.Get("authorization")
|
||||
if len(authValues) == 0 {
|
||||
return status.Error(codes.Unauthenticated, "missing authorization header")
|
||||
}
|
||||
|
||||
authHeader := authValues[0]
|
||||
if !strings.HasPrefix(authHeader, "Bearer ") {
|
||||
return status.Error(codes.Unauthenticated, "malformed authorization header")
|
||||
}
|
||||
token := strings.TrimPrefix(authHeader, "Bearer ")
|
||||
|
||||
tokenInfo, err := validator.ValidateToken(ss.Context(), token)
|
||||
if err != nil {
|
||||
slog.Error("token validation failed", "method", info.FullMethod, "error", err)
|
||||
return status.Error(codes.Unauthenticated, "token validation failed")
|
||||
}
|
||||
|
||||
if !tokenInfo.Valid {
|
||||
return status.Error(codes.Unauthenticated, "invalid token")
|
||||
}
|
||||
|
||||
if tokenInfo.HasRole("guest") {
|
||||
slog.Warn("guest access denied", "method", info.FullMethod, "user", tokenInfo.Username)
|
||||
return status.Error(codes.PermissionDenied, "guest access not permitted")
|
||||
}
|
||||
|
||||
slog.Info("rpc", "method", info.FullMethod, "user", tokenInfo.Username, "account_type", tokenInfo.AccountType)
|
||||
|
||||
return handler(srv, ss)
|
||||
}
|
||||
}
|
||||
|
||||
// Login authenticates with MCIAS and returns a bearer token.
|
||||
func Login(serverURL, caCertPath, username, password string) (string, error) {
|
||||
client, err := newHTTPClient(caCertPath)
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -178,6 +179,71 @@ func (p *Podman) Inspect(ctx context.Context, name string) (ContainerInfo, error
|
||||
return info, nil
|
||||
}
|
||||
|
||||
// Logs returns an exec.Cmd that streams container logs. For containers
|
||||
// using the journald log driver, it tries journalctl first (podman logs
|
||||
// can't read journald outside the originating user session). If journalctl
|
||||
// can't access the journal, it falls back to podman logs.
|
||||
func (p *Podman) Logs(ctx context.Context, containerName string, tail int, follow, timestamps bool, since string) *exec.Cmd {
|
||||
// Check if this container uses the journald log driver.
|
||||
inspectCmd := exec.CommandContext(ctx, p.command(), "inspect", "--format", "{{.HostConfig.LogConfig.Type}}", containerName) //nolint:gosec
|
||||
if out, err := inspectCmd.Output(); err == nil && strings.TrimSpace(string(out)) == "journald" {
|
||||
if p.journalAccessible(ctx, containerName) {
|
||||
return p.journalLogs(ctx, containerName, tail, follow, since)
|
||||
}
|
||||
}
|
||||
|
||||
return p.podmanLogs(ctx, containerName, tail, follow, timestamps, since)
|
||||
}
|
||||
|
||||
// journalAccessible probes whether journalctl can read logs for the container.
|
||||
func (p *Podman) journalAccessible(ctx context.Context, containerName string) bool {
|
||||
args := []string{"--no-pager", "-n", "0"}
|
||||
if os.Getuid() != 0 {
|
||||
args = append(args, "--user")
|
||||
}
|
||||
args = append(args, "CONTAINER_NAME="+containerName)
|
||||
cmd := exec.CommandContext(ctx, "journalctl", args...) //nolint:gosec
|
||||
return cmd.Run() == nil
|
||||
}
|
||||
|
||||
// journalLogs returns a journalctl command filtered by container name.
|
||||
func (p *Podman) journalLogs(ctx context.Context, containerName string, tail int, follow bool, since string) *exec.Cmd {
|
||||
args := []string{"--no-pager", "--output", "cat"}
|
||||
if os.Getuid() != 0 {
|
||||
args = append(args, "--user")
|
||||
}
|
||||
args = append(args, "CONTAINER_NAME="+containerName)
|
||||
if tail > 0 {
|
||||
args = append(args, "--lines", fmt.Sprintf("%d", tail))
|
||||
}
|
||||
if follow {
|
||||
args = append(args, "--follow")
|
||||
}
|
||||
if since != "" {
|
||||
args = append(args, "--since", since)
|
||||
}
|
||||
return exec.CommandContext(ctx, "journalctl", args...) //nolint:gosec // args built programmatically
|
||||
}
|
||||
|
||||
// podmanLogs returns a podman logs command.
|
||||
func (p *Podman) podmanLogs(ctx context.Context, containerName string, tail int, follow, timestamps bool, since string) *exec.Cmd {
|
||||
args := []string{"logs"}
|
||||
if tail > 0 {
|
||||
args = append(args, "--tail", fmt.Sprintf("%d", tail))
|
||||
}
|
||||
if follow {
|
||||
args = append(args, "--follow")
|
||||
}
|
||||
if timestamps {
|
||||
args = append(args, "--timestamps")
|
||||
}
|
||||
if since != "" {
|
||||
args = append(args, "--since", since)
|
||||
}
|
||||
args = append(args, containerName)
|
||||
return exec.CommandContext(ctx, p.command(), args...) //nolint:gosec // args built programmatically
|
||||
}
|
||||
|
||||
// Login authenticates to a container registry using the given token as
|
||||
// the password. This enables non-interactive push with service account
|
||||
// tokens (MCR accepts MCIAS JWTs as passwords).
|
||||
|
||||
@@ -33,6 +33,17 @@ service McpAgentService {
|
||||
|
||||
// Node
|
||||
rpc NodeStatus(NodeStatusRequest) returns (NodeStatusResponse);
|
||||
|
||||
// DNS (query MCNS)
|
||||
rpc ListDNSRecords(ListDNSRecordsRequest) returns (ListDNSRecordsResponse);
|
||||
|
||||
// Proxy routes (query mc-proxy)
|
||||
rpc ListProxyRoutes(ListProxyRoutesRequest) returns (ListProxyRoutesResponse);
|
||||
rpc AddProxyRoute(AddProxyRouteRequest) returns (AddProxyRouteResponse);
|
||||
rpc RemoveProxyRoute(RemoveProxyRouteRequest) returns (RemoveProxyRouteResponse);
|
||||
|
||||
// Logs
|
||||
rpc Logs(LogsRequest) returns (stream LogsResponse);
|
||||
}
|
||||
|
||||
// --- Service lifecycle ---
|
||||
@@ -254,6 +265,7 @@ message NodeStatusResponse {
|
||||
uint64 memory_free_bytes = 9;
|
||||
double cpu_usage_percent = 10;
|
||||
google.protobuf.Timestamp uptime_since = 11;
|
||||
string agent_version = 12;
|
||||
}
|
||||
|
||||
// --- Purge ---
|
||||
@@ -282,3 +294,81 @@ message PurgeResult {
|
||||
// Why eligible, or why refused.
|
||||
string reason = 4;
|
||||
}
|
||||
|
||||
// --- Logs ---
|
||||
|
||||
message LogsRequest {
|
||||
string service = 1;
|
||||
string component = 2; // optional; defaults to first/only component
|
||||
int32 tail = 3; // number of lines from the end (0 = all)
|
||||
bool follow = 4; // stream new output
|
||||
bool timestamps = 5; // prepend timestamps
|
||||
string since = 6; // show logs since (e.g., "2h", "2026-03-28T00:00:00Z")
|
||||
}
|
||||
|
||||
message LogsResponse {
|
||||
bytes data = 1;
|
||||
}
|
||||
|
||||
// --- DNS ---
|
||||
|
||||
message ListDNSRecordsRequest {}
|
||||
|
||||
message DNSZone {
|
||||
string name = 1;
|
||||
repeated DNSRecord records = 2;
|
||||
}
|
||||
|
||||
message DNSRecord {
|
||||
int64 id = 1;
|
||||
string name = 2;
|
||||
string type = 3;
|
||||
string value = 4;
|
||||
int32 ttl = 5;
|
||||
}
|
||||
|
||||
message ListDNSRecordsResponse {
|
||||
repeated DNSZone zones = 1;
|
||||
}
|
||||
|
||||
// --- Proxy routes ---
|
||||
|
||||
message ListProxyRoutesRequest {}
|
||||
|
||||
message ProxyRouteInfo {
|
||||
string hostname = 1;
|
||||
string backend = 2;
|
||||
string mode = 3;
|
||||
bool backend_tls = 4;
|
||||
}
|
||||
|
||||
message ProxyListenerInfo {
|
||||
string addr = 1;
|
||||
int32 route_count = 2;
|
||||
int64 active_connections = 3;
|
||||
repeated ProxyRouteInfo routes = 4;
|
||||
}
|
||||
|
||||
message ListProxyRoutesResponse {
|
||||
string version = 1;
|
||||
int64 total_connections = 2;
|
||||
google.protobuf.Timestamp started_at = 3;
|
||||
repeated ProxyListenerInfo listeners = 4;
|
||||
}
|
||||
|
||||
message AddProxyRouteRequest {
|
||||
string listener_addr = 1; // e.g. ":443"
|
||||
string hostname = 2;
|
||||
string backend = 3;
|
||||
string mode = 4; // "l4" or "l7"
|
||||
bool backend_tls = 5;
|
||||
}
|
||||
|
||||
message AddProxyRouteResponse {}
|
||||
|
||||
message RemoveProxyRouteRequest {
|
||||
string listener_addr = 1; // e.g. ":443"
|
||||
string hostname = 2;
|
||||
}
|
||||
|
||||
message RemoveProxyRouteResponse {}
|
||||
|
||||
Reference in New Issue
Block a user