Switch gRPC admin API to Unix socket only, add client package
- Remove TCP listener support from gRPC server; Unix socket is now the only transport for the admin API (access controlled via filesystem permissions) - Add standard gRPC health check service (grpc.health.v1.Health) - Implement MCPROXY_* environment variable overrides for config - Create client/mcproxy package with full API coverage and tests - Update ARCHITECTURE.md and dev config (srv/mc-proxy.toml) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -23,11 +23,7 @@ type Database struct {
|
||||
}
|
||||
|
||||
type GRPC struct {
|
||||
Addr string `toml:"addr"`
|
||||
TLSCert string `toml:"tls_cert"`
|
||||
TLSKey string `toml:"tls_key"`
|
||||
CACert string `toml:"ca_cert"` // CA cert for verifying the server (client-side)
|
||||
ClientCA string `toml:"client_ca"` // CA cert for verifying clients (server-side mTLS)
|
||||
Addr string `toml:"addr"` // Unix socket path (e.g., "/var/run/mc-proxy.sock")
|
||||
}
|
||||
|
||||
type Listener struct {
|
||||
@@ -64,13 +60,7 @@ type Duration struct {
|
||||
time.Duration
|
||||
}
|
||||
|
||||
// IsUnixSocket returns true if the gRPC address refers to a Unix domain socket.
|
||||
func (g GRPC) IsUnixSocket() bool {
|
||||
path := strings.TrimPrefix(g.Addr, "unix:")
|
||||
return strings.Contains(path, "/")
|
||||
}
|
||||
|
||||
// SocketPath returns the filesystem path for a Unix socket address,
|
||||
// SocketPath returns the filesystem path for the Unix socket,
|
||||
// stripping any "unix:" prefix.
|
||||
func (g GRPC) SocketPath() string {
|
||||
return strings.TrimPrefix(g.Addr, "unix:")
|
||||
@@ -93,6 +83,10 @@ func Load(path string) (*Config, error) {
|
||||
return nil, fmt.Errorf("parsing config: %w", err)
|
||||
}
|
||||
|
||||
if err := cfg.applyEnvOverrides(); err != nil {
|
||||
return nil, fmt.Errorf("applying env overrides: %w", err)
|
||||
}
|
||||
|
||||
if err := cfg.validate(); err != nil {
|
||||
return nil, fmt.Errorf("invalid config: %w", err)
|
||||
}
|
||||
@@ -100,6 +94,69 @@ func Load(path string) (*Config, error) {
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
// applyEnvOverrides applies environment variable overrides to the config.
|
||||
// Variables use the MCPROXY_ prefix with underscore-separated paths.
|
||||
func (c *Config) applyEnvOverrides() error {
|
||||
// Database
|
||||
if v := os.Getenv("MCPROXY_DATABASE_PATH"); v != "" {
|
||||
c.Database.Path = v
|
||||
}
|
||||
|
||||
// gRPC
|
||||
if v := os.Getenv("MCPROXY_GRPC_ADDR"); v != "" {
|
||||
c.GRPC.Addr = v
|
||||
}
|
||||
|
||||
// Firewall
|
||||
if v := os.Getenv("MCPROXY_FIREWALL_GEOIP_DB"); v != "" {
|
||||
c.Firewall.GeoIPDB = v
|
||||
}
|
||||
if v := os.Getenv("MCPROXY_FIREWALL_RATE_LIMIT"); v != "" {
|
||||
var n int64
|
||||
if _, err := fmt.Sscanf(v, "%d", &n); err != nil {
|
||||
return fmt.Errorf("MCPROXY_FIREWALL_RATE_LIMIT: %w", err)
|
||||
}
|
||||
c.Firewall.RateLimit = n
|
||||
}
|
||||
if v := os.Getenv("MCPROXY_FIREWALL_RATE_WINDOW"); v != "" {
|
||||
d, err := time.ParseDuration(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("MCPROXY_FIREWALL_RATE_WINDOW: %w", err)
|
||||
}
|
||||
c.Firewall.RateWindow = Duration{d}
|
||||
}
|
||||
|
||||
// Proxy timeouts
|
||||
if v := os.Getenv("MCPROXY_PROXY_CONNECT_TIMEOUT"); v != "" {
|
||||
d, err := time.ParseDuration(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("MCPROXY_PROXY_CONNECT_TIMEOUT: %w", err)
|
||||
}
|
||||
c.Proxy.ConnectTimeout = Duration{d}
|
||||
}
|
||||
if v := os.Getenv("MCPROXY_PROXY_IDLE_TIMEOUT"); v != "" {
|
||||
d, err := time.ParseDuration(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("MCPROXY_PROXY_IDLE_TIMEOUT: %w", err)
|
||||
}
|
||||
c.Proxy.IdleTimeout = Duration{d}
|
||||
}
|
||||
if v := os.Getenv("MCPROXY_PROXY_SHUTDOWN_TIMEOUT"); v != "" {
|
||||
d, err := time.ParseDuration(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("MCPROXY_PROXY_SHUTDOWN_TIMEOUT: %w", err)
|
||||
}
|
||||
c.Proxy.ShutdownTimeout = Duration{d}
|
||||
}
|
||||
|
||||
// Log
|
||||
if v := os.Getenv("MCPROXY_LOG_LEVEL"); v != "" {
|
||||
c.Log.Level = v
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Config) validate() error {
|
||||
if c.Database.Path == "" {
|
||||
return fmt.Errorf("database.path is required")
|
||||
@@ -139,11 +196,11 @@ func (c *Config) validate() error {
|
||||
return fmt.Errorf("firewall.rate_window is required when rate_limit is set")
|
||||
}
|
||||
|
||||
// Validate gRPC config: if enabled, TLS cert and key are required
|
||||
// (unless using a Unix socket, which doesn't need TLS).
|
||||
if c.GRPC.Addr != "" && !c.GRPC.IsUnixSocket() {
|
||||
if c.GRPC.TLSCert == "" || c.GRPC.TLSKey == "" {
|
||||
return fmt.Errorf("grpc: tls_cert and tls_key are required when grpc.addr is a TCP address")
|
||||
// Validate gRPC config: if enabled, addr must be a Unix socket path.
|
||||
if c.GRPC.Addr != "" {
|
||||
path := c.GRPC.SocketPath()
|
||||
if !strings.Contains(path, "/") {
|
||||
return fmt.Errorf("grpc.addr must be a Unix socket path (e.g., /var/run/mc-proxy.sock)")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -195,26 +195,23 @@ addr = ":8443"
|
||||
}
|
||||
}
|
||||
|
||||
func TestGRPCIsUnixSocket(t *testing.T) {
|
||||
func TestGRPCSocketPath(t *testing.T) {
|
||||
tests := []struct {
|
||||
addr string
|
||||
want bool
|
||||
want string
|
||||
}{
|
||||
{"/var/run/mc-proxy.sock", true},
|
||||
{"unix:/var/run/mc-proxy.sock", true},
|
||||
{"127.0.0.1:9090", false},
|
||||
{":9090", false},
|
||||
{"", false},
|
||||
{"/var/run/mc-proxy.sock", "/var/run/mc-proxy.sock"},
|
||||
{"unix:/var/run/mc-proxy.sock", "/var/run/mc-proxy.sock"},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
g := GRPC{Addr: tt.addr}
|
||||
if got := g.IsUnixSocket(); got != tt.want {
|
||||
t.Fatalf("IsUnixSocket(%q) = %v, want %v", tt.addr, got, tt.want)
|
||||
if got := g.SocketPath(); got != tt.want {
|
||||
t.Fatalf("SocketPath(%q) = %q, want %q", tt.addr, got, tt.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateGRPCUnixNoTLS(t *testing.T) {
|
||||
func TestValidateGRPCUnixSocket(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
path := filepath.Join(dir, "test.toml")
|
||||
|
||||
@@ -231,11 +228,11 @@ addr = "/var/run/mc-proxy.sock"
|
||||
|
||||
_, err := Load(path)
|
||||
if err != nil {
|
||||
t.Fatalf("expected Unix socket without TLS to be valid, got: %v", err)
|
||||
t.Fatalf("expected Unix socket to be valid, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateGRPCTCPRequiresTLS(t *testing.T) {
|
||||
func TestValidateGRPCRejectsTCPAddr(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
path := filepath.Join(dir, "test.toml")
|
||||
|
||||
@@ -252,7 +249,7 @@ addr = "127.0.0.1:9090"
|
||||
|
||||
_, err := Load(path)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for TCP gRPC addr without TLS certs")
|
||||
t.Fatal("expected error for TCP gRPC addr")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -311,3 +308,86 @@ func TestDuration(t *testing.T) {
|
||||
t.Fatalf("got %v, want 5s", d.Duration)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnvOverrides(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
path := filepath.Join(dir, "test.toml")
|
||||
|
||||
data := `
|
||||
[database]
|
||||
path = "/tmp/test.db"
|
||||
|
||||
[proxy]
|
||||
idle_timeout = "60s"
|
||||
|
||||
[log]
|
||||
level = "info"
|
||||
`
|
||||
if err := os.WriteFile(path, []byte(data), 0600); err != nil {
|
||||
t.Fatalf("write config: %v", err)
|
||||
}
|
||||
|
||||
// Set env overrides.
|
||||
t.Setenv("MCPROXY_LOG_LEVEL", "debug")
|
||||
t.Setenv("MCPROXY_PROXY_IDLE_TIMEOUT", "600s")
|
||||
t.Setenv("MCPROXY_DATABASE_PATH", "/override/test.db")
|
||||
|
||||
cfg, err := Load(path)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if cfg.Log.Level != "debug" {
|
||||
t.Fatalf("got log.level %q, want %q", cfg.Log.Level, "debug")
|
||||
}
|
||||
if cfg.Proxy.IdleTimeout.Duration.Seconds() != 600 {
|
||||
t.Fatalf("got idle_timeout %v, want 600s", cfg.Proxy.IdleTimeout.Duration)
|
||||
}
|
||||
if cfg.Database.Path != "/override/test.db" {
|
||||
t.Fatalf("got database.path %q, want %q", cfg.Database.Path, "/override/test.db")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnvOverrideInvalidDuration(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
path := filepath.Join(dir, "test.toml")
|
||||
|
||||
data := `
|
||||
[database]
|
||||
path = "/tmp/test.db"
|
||||
`
|
||||
if err := os.WriteFile(path, []byte(data), 0600); err != nil {
|
||||
t.Fatalf("write config: %v", err)
|
||||
}
|
||||
|
||||
t.Setenv("MCPROXY_PROXY_IDLE_TIMEOUT", "not-a-duration")
|
||||
|
||||
_, err := Load(path)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for invalid duration")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnvOverrideGRPCAddr(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
path := filepath.Join(dir, "test.toml")
|
||||
|
||||
data := `
|
||||
[database]
|
||||
path = "/tmp/test.db"
|
||||
`
|
||||
if err := os.WriteFile(path, []byte(data), 0600); err != nil {
|
||||
t.Fatalf("write config: %v", err)
|
||||
}
|
||||
|
||||
t.Setenv("MCPROXY_GRPC_ADDR", "/var/run/override.sock")
|
||||
|
||||
cfg, err := Load(path)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if cfg.GRPC.Addr != "/var/run/override.sock" {
|
||||
t.Fatalf("got grpc.addr %q, want %q", cfg.GRPC.Addr, "/var/run/override.sock")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,8 +2,6 @@ package grpcserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net"
|
||||
@@ -14,7 +12,8 @@ import (
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/health"
|
||||
healthpb "google.golang.org/grpc/health/grpc_health_v1"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
@@ -34,8 +33,16 @@ type AdminServer struct {
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// New creates a gRPC server. For Unix sockets, no TLS is used. For TCP
|
||||
// addresses, TLS is required with optional mTLS.
|
||||
// NewAdminServer creates an AdminServer for use in testing or custom setups.
|
||||
func NewAdminServer(srv *server.Server, store *db.Store, logger *slog.Logger) *AdminServer {
|
||||
return &AdminServer{
|
||||
srv: srv,
|
||||
store: store,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// New creates a gRPC server listening on a Unix socket.
|
||||
func New(cfg config.GRPC, srv *server.Server, store *db.Store, logger *slog.Logger) (*grpc.Server, net.Listener, error) {
|
||||
admin := &AdminServer{
|
||||
srv: srv,
|
||||
@@ -43,13 +50,6 @@ func New(cfg config.GRPC, srv *server.Server, store *db.Store, logger *slog.Logg
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
if cfg.IsUnixSocket() {
|
||||
return newUnixServer(cfg, admin)
|
||||
}
|
||||
return newTCPServer(cfg, admin)
|
||||
}
|
||||
|
||||
func newUnixServer(cfg config.GRPC, admin *AdminServer) (*grpc.Server, net.Listener, error) {
|
||||
path := cfg.SocketPath()
|
||||
|
||||
// Remove stale socket file from a previous run.
|
||||
@@ -67,41 +67,12 @@ func newUnixServer(cfg config.GRPC, admin *AdminServer) (*grpc.Server, net.Liste
|
||||
|
||||
grpcServer := grpc.NewServer()
|
||||
pb.RegisterProxyAdminServiceServer(grpcServer, admin)
|
||||
return grpcServer, ln, nil
|
||||
}
|
||||
|
||||
func newTCPServer(cfg config.GRPC, admin *AdminServer) (*grpc.Server, net.Listener, error) {
|
||||
cert, err := tls.LoadX509KeyPair(cfg.TLSCert, cfg.TLSKey)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("loading TLS keypair: %w", err)
|
||||
}
|
||||
|
||||
tlsConfig := &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
MinVersion: tls.VersionTLS13,
|
||||
}
|
||||
|
||||
if cfg.ClientCA != "" {
|
||||
caCert, err := os.ReadFile(cfg.ClientCA)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("reading client CA: %w", err)
|
||||
}
|
||||
pool := x509.NewCertPool()
|
||||
if !pool.AppendCertsFromPEM(caCert) {
|
||||
return nil, nil, fmt.Errorf("failed to parse client CA certificate")
|
||||
}
|
||||
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
|
||||
tlsConfig.ClientCAs = pool
|
||||
}
|
||||
|
||||
creds := credentials.NewTLS(tlsConfig)
|
||||
grpcServer := grpc.NewServer(grpc.Creds(creds))
|
||||
pb.RegisterProxyAdminServiceServer(grpcServer, admin)
|
||||
|
||||
ln, err := net.Listen("tcp", cfg.Addr)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("listening on %s: %w", cfg.Addr, err)
|
||||
}
|
||||
// Register standard gRPC health check service.
|
||||
healthServer := health.NewServer()
|
||||
healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
|
||||
healthServer.SetServingStatus("mc_proxy.v1.ProxyAdminService", healthpb.HealthCheckResponse_SERVING)
|
||||
healthpb.RegisterHealthServer(grpcServer, healthServer)
|
||||
|
||||
return grpcServer, ln, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user