"""Synchronous HTTP client for the MCIAS API.""" from __future__ import annotations import ssl from types import TracebackType from typing import Any import httpx from ._errors import raise_for_status from ._models import Account, PGCreds, PublicKey, TokenClaims class Client: """Synchronous MCIAS API client backed by httpx.""" def __init__( self, server_url: str, *, ca_cert_path: str | None = None, token: str | None = None, timeout: float = 30.0, ) -> None: self._base_url = server_url.rstrip("/") self.token = token ssl_context: ssl.SSLContext | bool if ca_cert_path is not None: ssl_context = ssl.create_default_context(cafile=ca_cert_path) else: ssl_context = True # use default SSL verification self._http = httpx.Client( verify=ssl_context, timeout=timeout, ) def __enter__(self) -> Client: return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: self.close() def close(self) -> None: """Close the underlying HTTP client.""" self._http.close() def _request( self, method: str, path: str, *, json: dict[str, Any] | None = None, expected_status: int | None = None, ) -> dict[str, Any] | None: """Send an HTTP request and return the parsed JSON body. Returns None for 204 No Content responses. Raises the appropriate MciasError subclass on 4xx/5xx. """ url = f"{self._base_url}{path}" headers: dict[str, str] = {} if self.token is not None: headers["Authorization"] = f"Bearer {self.token}" response = self._http.request(method, url, json=json, headers=headers) status = response.status_code if expected_status is not None: success_codes = {expected_status} else: success_codes = {200, 201, 204} if status not in success_codes and status >= 400: try: body = response.json() message = str(body.get("error", response.text)) except Exception: message = response.text raise_for_status(status, message) if status == 204 or not response.content: return None return response.json() # type: ignore[no-any-return] def health(self) -> None: """GET /v1/health — liveness check.""" self._request("GET", "/v1/health") def get_public_key(self) -> PublicKey: """GET /v1/keys/public — retrieve the server's Ed25519 public key.""" data = self._request("GET", "/v1/keys/public") assert data is not None return PublicKey.from_dict(data) def login( self, username: str, password: str, totp_code: str | None = None, ) -> tuple[str, str]: """POST /v1/auth/login — authenticate and obtain a JWT. Returns (token, expires_at). Stores the token on self.token. """ payload: dict[str, Any] = { "username": username, "password": password, } if totp_code is not None: payload["totp_code"] = totp_code data = self._request("POST", "/v1/auth/login", json=payload) assert data is not None token = str(data["token"]) expires_at = str(data["expires_at"]) self.token = token return token, expires_at def logout(self) -> None: """POST /v1/auth/logout — invalidate the current token.""" self._request("POST", "/v1/auth/logout") self.token = None def renew_token(self) -> tuple[str, str]: """POST /v1/auth/renew — exchange current token for a fresh one. Returns (token, expires_at). Updates self.token. """ data = self._request("POST", "/v1/auth/renew") assert data is not None token = str(data["token"]) expires_at = str(data["expires_at"]) self.token = token return token, expires_at def validate_token(self, token: str) -> TokenClaims: """POST /v1/token/validate — check whether a token is valid.""" data = self._request("POST", "/v1/token/validate", json={"token": token}) assert data is not None return TokenClaims.from_dict(data) def create_account( self, username: str, account_type: str, *, password: str | None = None, ) -> Account: """POST /v1/accounts — create a new account.""" payload: dict[str, Any] = { "username": username, "account_type": account_type, } if password is not None: payload["password"] = password data = self._request("POST", "/v1/accounts", json=payload) assert data is not None return Account.from_dict(data) def list_accounts(self) -> list[Account]: """GET /v1/accounts — list all accounts.""" data = self._request("GET", "/v1/accounts") assert data is not None accounts_raw = data.get("accounts") or [] return [Account.from_dict(a) for a in accounts_raw] def get_account(self, account_id: str) -> Account: """GET /v1/accounts/{id} — retrieve a single account.""" data = self._request("GET", f"/v1/accounts/{account_id}") assert data is not None return Account.from_dict(data) def update_account( self, account_id: str, *, status: str | None = None, ) -> Account: """PATCH /v1/accounts/{id} — update account fields.""" payload: dict[str, Any] = {} if status is not None: payload["status"] = status data = self._request("PATCH", f"/v1/accounts/{account_id}", json=payload) assert data is not None return Account.from_dict(data) def delete_account(self, account_id: str) -> None: """DELETE /v1/accounts/{id} — permanently remove an account.""" self._request("DELETE", f"/v1/accounts/{account_id}") def get_roles(self, account_id: str) -> list[str]: """GET /v1/accounts/{id}/roles — list roles for an account.""" data = self._request("GET", f"/v1/accounts/{account_id}/roles") assert data is not None roles_raw = data.get("roles") or [] return [str(r) for r in roles_raw] def set_roles(self, account_id: str, roles: list[str]) -> None: """PUT /v1/accounts/{id}/roles — replace the full role set.""" self._request( "PUT", f"/v1/accounts/{account_id}/roles", json={"roles": roles}, ) def issue_service_token(self, account_id: str) -> tuple[str, str]: """POST /v1/accounts/{id}/token — issue a long-lived service token. Returns (token, expires_at). """ data = self._request("POST", f"/v1/accounts/{account_id}/token") assert data is not None return str(data["token"]), str(data["expires_at"]) def revoke_token(self, jti: str) -> None: """DELETE /v1/token/{jti} — revoke a token by JTI.""" self._request("DELETE", f"/v1/token/{jti}") def get_pg_creds(self, account_id: str) -> PGCreds: """GET /v1/accounts/{id}/pgcreds — retrieve Postgres credentials.""" data = self._request("GET", f"/v1/accounts/{account_id}/pgcreds") assert data is not None return PGCreds.from_dict(data) def set_pg_creds( self, account_id: str, host: str, port: int, database: str, username: str, password: str, ) -> None: """PUT /v1/accounts/{id}/pgcreds — store or replace Postgres credentials.""" payload: dict[str, Any] = { "host": host, "port": port, "database": database, "username": username, "password": password, } self._request("PUT", f"/v1/accounts/{account_id}/pgcreds", json=payload)