Add per-listener connection limits

Configurable maximum concurrent connections per listener. When the
limit is reached, new connections are closed immediately after accept.
0 means unlimited (default, preserving existing behavior).

Config: Listener gains max_connections field, validated non-negative.

DB: Migration 3 adds listeners.max_connections column.
UpdateListenerMaxConns method for runtime changes via gRPC.
CreateListener updated to persist max_connections on seed.

Server: ListenerState/ListenerData gain MaxConnections. Limit checked
in serve() after Accept but before handleConn — if ActiveConnections
>= MaxConnections, connection is closed and the accept loop continues.
SetMaxConnections method for runtime updates.

Proto: SetListenerMaxConnections RPC added. ListenerStatus gains
max_connections field. Generated code regenerated.

gRPC server: SetListenerMaxConnections implements write-through
(DB first, then in-memory update). GetStatus includes max_connections.

Client: SetListenerMaxConnections method, MaxConnections in
ListenerStatus.

Tests: DB CRUD and UpdateListenerMaxConns, server connection limit
enforcement (accept 2, reject 3rd, close one, accept again), gRPC
SetListenerMaxConnections round-trip with DB persistence, not-found
error handling.

Also updates PROJECT_PLAN.md with phases 6-8 and PROGRESS.md with
tracking for the new features.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-25 16:42:53 -07:00
parent 5bc8f4fc8e
commit 564e0a9c67
16 changed files with 595 additions and 102 deletions

View File

@@ -41,7 +41,7 @@ func TestIsEmpty(t *testing.T) {
t.Fatal("expected empty database")
}
if _, err := store.CreateListener(":443", false); err != nil {
if _, err := store.CreateListener(":443", false, 0); err != nil {
t.Fatalf("create listener: %v", err)
}
@@ -57,7 +57,7 @@ func TestIsEmpty(t *testing.T) {
func TestListenerCRUD(t *testing.T) {
store := openTestDB(t)
id, err := store.CreateListener(":443", false)
id, err := store.CreateListener(":443", false, 0)
if err != nil {
t.Fatalf("create: %v", err)
}
@@ -103,7 +103,7 @@ func TestListenerCRUD(t *testing.T) {
func TestListenerProxyProtocol(t *testing.T) {
store := openTestDB(t)
id, err := store.CreateListener(":443", true)
id, err := store.CreateListener(":443", true, 0)
if err != nil {
t.Fatalf("create: %v", err)
}
@@ -120,13 +120,52 @@ func TestListenerProxyProtocol(t *testing.T) {
}
}
func TestListenerMaxConnections(t *testing.T) {
store := openTestDB(t)
id, err := store.CreateListener(":443", false, 5000)
if err != nil {
t.Fatalf("create: %v", err)
}
l, err := store.GetListenerByAddr(":443")
if err != nil {
t.Fatalf("get: %v", err)
}
if l.MaxConnections != 5000 {
t.Fatalf("max_connections = %d, want 5000", l.MaxConnections)
}
// Update max connections.
if err := store.UpdateListenerMaxConns(id, 10000); err != nil {
t.Fatalf("update: %v", err)
}
l, err = store.GetListenerByAddr(":443")
if err != nil {
t.Fatalf("get after update: %v", err)
}
if l.MaxConnections != 10000 {
t.Fatalf("max_connections = %d, want 10000", l.MaxConnections)
}
// Set to 0 (unlimited).
if err := store.UpdateListenerMaxConns(id, 0); err != nil {
t.Fatalf("update to 0: %v", err)
}
l, _ = store.GetListenerByAddr(":443")
if l.MaxConnections != 0 {
t.Fatalf("max_connections = %d, want 0", l.MaxConnections)
}
}
func TestListenerDuplicateAddr(t *testing.T) {
store := openTestDB(t)
if _, err := store.CreateListener(":443", false); err != nil {
if _, err := store.CreateListener(":443", false, 0); err != nil {
t.Fatalf("first create: %v", err)
}
if _, err := store.CreateListener(":443", false); err == nil {
if _, err := store.CreateListener(":443", false, 0); err == nil {
t.Fatal("expected error for duplicate addr")
}
}
@@ -134,7 +173,7 @@ func TestListenerDuplicateAddr(t *testing.T) {
func TestRouteCRUD(t *testing.T) {
store := openTestDB(t)
listenerID, err := store.CreateListener(":443", false)
listenerID, err := store.CreateListener(":443", false, 0)
if err != nil {
t.Fatalf("create listener: %v", err)
}
@@ -177,7 +216,7 @@ func TestRouteCRUD(t *testing.T) {
func TestRouteL7Fields(t *testing.T) {
store := openTestDB(t)
listenerID, _ := store.CreateListener(":443", false)
listenerID, _ := store.CreateListener(":443", false, 0)
_, err := store.CreateRoute(listenerID, "api.example.com", "127.0.0.1:8080", "l7",
"/certs/api.crt", "/certs/api.key", false, true)
@@ -214,7 +253,7 @@ func TestRouteL7Fields(t *testing.T) {
func TestRouteDuplicateHostname(t *testing.T) {
store := openTestDB(t)
listenerID, _ := store.CreateListener(":443", false)
listenerID, _ := store.CreateListener(":443", false, 0)
if _, err := store.CreateRoute(listenerID, "example.com", "127.0.0.1:8443", "l4", "", "", false, false); err != nil {
t.Fatalf("first create: %v", err)
}
@@ -226,7 +265,7 @@ func TestRouteDuplicateHostname(t *testing.T) {
func TestRouteCascadeDelete(t *testing.T) {
store := openTestDB(t)
listenerID, _ := store.CreateListener(":443", false)
listenerID, _ := store.CreateListener(":443", false, 0)
store.CreateRoute(listenerID, "a.example.com", "127.0.0.1:8443", "l4", "", "", false, false)
store.CreateRoute(listenerID, "b.example.com", "127.0.0.1:9443", "l4", "", "", false, false)
@@ -373,7 +412,7 @@ func TestSeed(t *testing.T) {
func TestSnapshot(t *testing.T) {
store := openTestDB(t)
store.CreateListener(":443", false)
store.CreateListener(":443", false, 0)
dest := filepath.Join(t.TempDir(), "backup.db")
if err := store.Snapshot(dest); err != nil {
@@ -432,7 +471,7 @@ func TestMigrationV2Upgrade(t *testing.T) {
}
// Insert a listener and route with defaults to verify new columns work.
lid, err := store.CreateListener(":443", false)
lid, err := store.CreateListener(":443", false, 0)
if err != nil {
t.Fatalf("create listener: %v", err)
}

View File

@@ -4,14 +4,15 @@ import "fmt"
// Listener is a database listener record.
type Listener struct {
ID int64
Addr string
ProxyProtocol bool
ID int64
Addr string
ProxyProtocol bool
MaxConnections int64
}
// ListListeners returns all listeners.
func (s *Store) ListListeners() ([]Listener, error) {
rows, err := s.db.Query("SELECT id, addr, proxy_protocol FROM listeners ORDER BY id")
rows, err := s.db.Query("SELECT id, addr, proxy_protocol, max_connections FROM listeners ORDER BY id")
if err != nil {
return nil, fmt.Errorf("querying listeners: %w", err)
}
@@ -20,7 +21,7 @@ func (s *Store) ListListeners() ([]Listener, error) {
var listeners []Listener
for rows.Next() {
var l Listener
if err := rows.Scan(&l.ID, &l.Addr, &l.ProxyProtocol); err != nil {
if err := rows.Scan(&l.ID, &l.Addr, &l.ProxyProtocol, &l.MaxConnections); err != nil {
return nil, fmt.Errorf("scanning listener: %w", err)
}
listeners = append(listeners, l)
@@ -29,10 +30,10 @@ func (s *Store) ListListeners() ([]Listener, error) {
}
// CreateListener inserts a listener and returns its ID.
func (s *Store) CreateListener(addr string, proxyProtocol bool) (int64, error) {
func (s *Store) CreateListener(addr string, proxyProtocol bool, maxConnections int64) (int64, error) {
result, err := s.db.Exec(
"INSERT INTO listeners (addr, proxy_protocol) VALUES (?, ?)",
addr, proxyProtocol,
"INSERT INTO listeners (addr, proxy_protocol, max_connections) VALUES (?, ?, ?)",
addr, proxyProtocol, maxConnections,
)
if err != nil {
return 0, fmt.Errorf("inserting listener: %w", err)
@@ -56,10 +57,26 @@ func (s *Store) DeleteListener(id int64) error {
// GetListenerByAddr returns a listener by its address.
func (s *Store) GetListenerByAddr(addr string) (Listener, error) {
var l Listener
err := s.db.QueryRow("SELECT id, addr, proxy_protocol FROM listeners WHERE addr = ?", addr).
Scan(&l.ID, &l.Addr, &l.ProxyProtocol)
err := s.db.QueryRow("SELECT id, addr, proxy_protocol, max_connections FROM listeners WHERE addr = ?", addr).
Scan(&l.ID, &l.Addr, &l.ProxyProtocol, &l.MaxConnections)
if err != nil {
return Listener{}, fmt.Errorf("querying listener by addr %q: %w", addr, err)
}
return l, nil
}
// UpdateListenerMaxConns updates the max_connections for a listener.
func (s *Store) UpdateListenerMaxConns(listenerID int64, maxConns int64) error {
result, err := s.db.Exec(
"UPDATE listeners SET max_connections = ? WHERE id = ?",
maxConns, listenerID,
)
if err != nil {
return fmt.Errorf("updating max_connections: %w", err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("listener %d not found", listenerID)
}
return nil
}

View File

@@ -14,6 +14,7 @@ type migration struct {
var migrations = []migration{
{1, "create_core_tables", migrate001CreateCoreTables},
{2, "add_proxy_protocol_and_l7_fields", migrate002AddL7Fields},
{3, "add_listener_max_connections", migrate003AddListenerMaxConnections},
}
// Migrate runs all unapplied migrations sequentially.
@@ -110,3 +111,8 @@ func migrate002AddL7Fields(tx *sql.Tx) error {
}
return nil
}
func migrate003AddListenerMaxConnections(tx *sql.Tx) error {
_, err := tx.Exec(`ALTER TABLE listeners ADD COLUMN max_connections INTEGER NOT NULL DEFAULT 0`)
return err
}

View File

@@ -18,8 +18,8 @@ func (s *Store) Seed(listeners []config.Listener, fw config.Firewall) error {
for _, l := range listeners {
result, err := tx.Exec(
"INSERT INTO listeners (addr, proxy_protocol) VALUES (?, ?)",
l.Addr, l.ProxyProtocol,
"INSERT INTO listeners (addr, proxy_protocol, max_connections) VALUES (?, ?, ?)",
l.Addr, l.ProxyProtocol, l.MaxConnections,
)
if err != nil {
return fmt.Errorf("seeding listener %q: %w", l.Addr, err)