- clients/README.md: canonical API surface and error type reference - clients/testdata/: shared JSON response fixtures - clients/go/: mciasgoclient package; net/http + TLS 1.2+; sync.RWMutex token state; DisallowUnknownFields on all decoders; 25 tests pass - clients/rust/: async mcias-client crate; reqwest+rustls (no OpenSSL); thiserror MciasError enum; Arc<RwLock> token state; 22+1 tests pass; cargo clippy -D warnings clean - clients/lisp/: ASDF mcias-client; dexador HTTP, yason JSON; mcias-error condition hierarchy; Hunchentoot mock-dispatcher; 37 fiveam checks pass on SBCL 2.6.1; yason boolean normalisation in validate-token - clients/python/: mcias_client package (Python 3.11+); httpx sync; py.typed; dataclasses; 32 pytest tests; mypy --strict + ruff clean - test/mock/mockserver.go: in-memory mock server for Go client tests - ARCHITECTURE.md §19: updated per-language notes to match implementation - PROGRESS.md: Phase 9 marked complete - .gitignore: exclude clients/rust/target/, python .venv, .pytest_cache, .fasl files Security: token never logged or exposed in error messages in any library; TLS enforced in all four languages; token stored under lock/mutex/RwLock
217 lines
8.0 KiB
Python
217 lines
8.0 KiB
Python
"""Synchronous HTTP client for the MCIAS API."""
|
|
from __future__ import annotations
|
|
|
|
import ssl
|
|
from types import TracebackType
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from ._errors import raise_for_status
|
|
from ._models import Account, PGCreds, PublicKey, TokenClaims
|
|
|
|
|
|
class Client:
|
|
"""Synchronous MCIAS API client backed by httpx."""
|
|
def __init__(
|
|
self,
|
|
server_url: str,
|
|
*,
|
|
ca_cert_path: str | None = None,
|
|
token: str | None = None,
|
|
timeout: float = 30.0,
|
|
) -> None:
|
|
self._base_url = server_url.rstrip("/")
|
|
self.token = token
|
|
ssl_context: ssl.SSLContext | bool
|
|
if ca_cert_path is not None:
|
|
ssl_context = ssl.create_default_context(cafile=ca_cert_path)
|
|
else:
|
|
ssl_context = True # use default SSL verification
|
|
self._http = httpx.Client(
|
|
verify=ssl_context,
|
|
timeout=timeout,
|
|
)
|
|
def __enter__(self) -> Client:
|
|
return self
|
|
def __exit__(
|
|
self,
|
|
exc_type: type[BaseException] | None,
|
|
exc_val: BaseException | None,
|
|
exc_tb: TracebackType | None,
|
|
) -> None:
|
|
self.close()
|
|
def close(self) -> None:
|
|
"""Close the underlying HTTP client."""
|
|
self._http.close()
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
*,
|
|
json: dict[str, Any] | None = None,
|
|
expected_status: int | None = None,
|
|
) -> dict[str, Any] | None:
|
|
"""Send an HTTP request and return the parsed JSON body.
|
|
Returns None for 204 No Content responses.
|
|
Raises the appropriate MciasError subclass on 4xx/5xx.
|
|
"""
|
|
url = f"{self._base_url}{path}"
|
|
headers: dict[str, str] = {}
|
|
if self.token is not None:
|
|
headers["Authorization"] = f"Bearer {self.token}"
|
|
response = self._http.request(method, url, json=json, headers=headers)
|
|
status = response.status_code
|
|
if expected_status is not None:
|
|
success_codes = {expected_status}
|
|
else:
|
|
success_codes = {200, 201, 204}
|
|
if status not in success_codes and status >= 400:
|
|
try:
|
|
body = response.json()
|
|
message = str(body.get("error", response.text))
|
|
except Exception:
|
|
message = response.text
|
|
raise_for_status(status, message)
|
|
if status == 204 or not response.content:
|
|
return None
|
|
return response.json() # type: ignore[no-any-return]
|
|
def health(self) -> None:
|
|
"""GET /v1/health — liveness check."""
|
|
self._request("GET", "/v1/health")
|
|
def get_public_key(self) -> PublicKey:
|
|
"""GET /v1/keys/public — retrieve the server's Ed25519 public key."""
|
|
data = self._request("GET", "/v1/keys/public")
|
|
assert data is not None
|
|
return PublicKey.from_dict(data)
|
|
def login(
|
|
self,
|
|
username: str,
|
|
password: str,
|
|
totp_code: str | None = None,
|
|
) -> tuple[str, str]:
|
|
"""POST /v1/auth/login — authenticate and obtain a JWT.
|
|
Returns (token, expires_at). Stores the token on self.token.
|
|
"""
|
|
payload: dict[str, Any] = {
|
|
"username": username,
|
|
"password": password,
|
|
}
|
|
if totp_code is not None:
|
|
payload["totp_code"] = totp_code
|
|
data = self._request("POST", "/v1/auth/login", json=payload)
|
|
assert data is not None
|
|
token = str(data["token"])
|
|
expires_at = str(data["expires_at"])
|
|
self.token = token
|
|
return token, expires_at
|
|
def logout(self) -> None:
|
|
"""POST /v1/auth/logout — invalidate the current token."""
|
|
self._request("POST", "/v1/auth/logout")
|
|
self.token = None
|
|
def renew_token(self) -> tuple[str, str]:
|
|
"""POST /v1/auth/renew — exchange current token for a fresh one.
|
|
Returns (token, expires_at). Updates self.token.
|
|
"""
|
|
data = self._request("POST", "/v1/auth/renew")
|
|
assert data is not None
|
|
token = str(data["token"])
|
|
expires_at = str(data["expires_at"])
|
|
self.token = token
|
|
return token, expires_at
|
|
def validate_token(self, token: str) -> TokenClaims:
|
|
"""POST /v1/token/validate — check whether a token is valid."""
|
|
data = self._request("POST", "/v1/token/validate", json={"token": token})
|
|
assert data is not None
|
|
return TokenClaims.from_dict(data)
|
|
def create_account(
|
|
self,
|
|
username: str,
|
|
account_type: str,
|
|
*,
|
|
password: str | None = None,
|
|
) -> Account:
|
|
"""POST /v1/accounts — create a new account."""
|
|
payload: dict[str, Any] = {
|
|
"username": username,
|
|
"account_type": account_type,
|
|
}
|
|
if password is not None:
|
|
payload["password"] = password
|
|
data = self._request("POST", "/v1/accounts", json=payload)
|
|
assert data is not None
|
|
return Account.from_dict(data)
|
|
def list_accounts(self) -> list[Account]:
|
|
"""GET /v1/accounts — list all accounts."""
|
|
data = self._request("GET", "/v1/accounts")
|
|
assert data is not None
|
|
accounts_raw = data.get("accounts") or []
|
|
return [Account.from_dict(a) for a in accounts_raw]
|
|
def get_account(self, account_id: str) -> Account:
|
|
"""GET /v1/accounts/{id} — retrieve a single account."""
|
|
data = self._request("GET", f"/v1/accounts/{account_id}")
|
|
assert data is not None
|
|
return Account.from_dict(data)
|
|
def update_account(
|
|
self,
|
|
account_id: str,
|
|
*,
|
|
status: str | None = None,
|
|
) -> Account:
|
|
"""PATCH /v1/accounts/{id} — update account fields."""
|
|
payload: dict[str, Any] = {}
|
|
if status is not None:
|
|
payload["status"] = status
|
|
data = self._request("PATCH", f"/v1/accounts/{account_id}", json=payload)
|
|
assert data is not None
|
|
return Account.from_dict(data)
|
|
def delete_account(self, account_id: str) -> None:
|
|
"""DELETE /v1/accounts/{id} — permanently remove an account."""
|
|
self._request("DELETE", f"/v1/accounts/{account_id}")
|
|
def get_roles(self, account_id: str) -> list[str]:
|
|
"""GET /v1/accounts/{id}/roles — list roles for an account."""
|
|
data = self._request("GET", f"/v1/accounts/{account_id}/roles")
|
|
assert data is not None
|
|
roles_raw = data.get("roles") or []
|
|
return [str(r) for r in roles_raw]
|
|
def set_roles(self, account_id: str, roles: list[str]) -> None:
|
|
"""PUT /v1/accounts/{id}/roles — replace the full role set."""
|
|
self._request(
|
|
"PUT",
|
|
f"/v1/accounts/{account_id}/roles",
|
|
json={"roles": roles},
|
|
)
|
|
def issue_service_token(self, account_id: str) -> tuple[str, str]:
|
|
"""POST /v1/accounts/{id}/token — issue a long-lived service token.
|
|
Returns (token, expires_at).
|
|
"""
|
|
data = self._request("POST", f"/v1/accounts/{account_id}/token")
|
|
assert data is not None
|
|
return str(data["token"]), str(data["expires_at"])
|
|
def revoke_token(self, jti: str) -> None:
|
|
"""DELETE /v1/token/{jti} — revoke a token by JTI."""
|
|
self._request("DELETE", f"/v1/token/{jti}")
|
|
def get_pg_creds(self, account_id: str) -> PGCreds:
|
|
"""GET /v1/accounts/{id}/pgcreds — retrieve Postgres credentials."""
|
|
data = self._request("GET", f"/v1/accounts/{account_id}/pgcreds")
|
|
assert data is not None
|
|
return PGCreds.from_dict(data)
|
|
def set_pg_creds(
|
|
self,
|
|
account_id: str,
|
|
host: str,
|
|
port: int,
|
|
database: str,
|
|
username: str,
|
|
password: str,
|
|
) -> None:
|
|
"""PUT /v1/accounts/{id}/pgcreds — store or replace Postgres credentials."""
|
|
payload: dict[str, Any] = {
|
|
"host": host,
|
|
"port": port,
|
|
"database": database,
|
|
"username": username,
|
|
"password": password,
|
|
}
|
|
self._request("PUT", f"/v1/accounts/{account_id}/pgcreds", json=payload)
|