Implement Phase 9: client libraries (Go, Rust, Lisp, Python)

- clients/README.md: canonical API surface and error type reference
- clients/testdata/: shared JSON response fixtures
- clients/go/: mciasgoclient package; net/http + TLS 1.2+; sync.RWMutex
  token state; DisallowUnknownFields on all decoders; 25 tests pass
- clients/rust/: async mcias-client crate; reqwest+rustls (no OpenSSL);
  thiserror MciasError enum; Arc<RwLock> token state; 22+1 tests pass;
  cargo clippy -D warnings clean
- clients/lisp/: ASDF mcias-client; dexador HTTP, yason JSON; mcias-error
  condition hierarchy; Hunchentoot mock-dispatcher; 37 fiveam checks pass
  on SBCL 2.6.1; yason boolean normalisation in validate-token
- clients/python/: mcias_client package (Python 3.11+); httpx sync;
  py.typed; dataclasses; 32 pytest tests; mypy --strict + ruff clean
- test/mock/mockserver.go: in-memory mock server for Go client tests
- ARCHITECTURE.md §19: updated per-language notes to match implementation
- PROGRESS.md: Phase 9 marked complete
- .gitignore: exclude clients/rust/target/, python .venv, .pytest_cache,
  .fasl files
Security: token never logged or exposed in error messages in any library;
TLS enforced in all four languages; token stored under lock/mutex/RwLock
This commit is contained in:
2026-03-11 16:38:32 -07:00
parent f34e9a69a0
commit 0c441f5c4f
1974 changed files with 10151 additions and 33 deletions

91
clients/python/README.md Normal file
View File

@@ -0,0 +1,91 @@
# mcias-client (Python)
Python client library for the [MCIAS](../../README.md) identity and access management API.
## Requirements
- Python 3.11+
- `httpx >= 0.27`
## Installation
```sh
pip install .
# or in development mode:
pip install -e ".[dev]"
```
## Quick Start
```python
from mcias_client import Client
# Connect to the MCIAS server.
with Client("https://auth.example.com") as client:
# Authenticate.
token, expires_at = client.login("alice", "s3cret")
print(f"token expires at {expires_at}")
# The token is stored in the client automatically.
accounts = client.list_accounts()
# Revoke the token when done (also called automatically on context exit).
client.logout()
```
## Custom CA Certificate
```python
client = Client(
"https://auth.example.com",
ca_cert_path="/etc/mcias/ca.pem",
)
```
## Error Handling
All methods raise typed exceptions on error:
```python
from mcias_client import (
MciasAuthError,
MciasForbiddenError,
MciasNotFoundError,
MciasInputError,
MciasConflictError,
MciasServerError,
)
try:
client.login("alice", "wrongpass")
except MciasAuthError as e:
print(f"auth failed ({e.status_code}): {e.message}")
except MciasForbiddenError as e:
print(f"forbidden: {e.message}")
except MciasNotFoundError as e:
print(f"not found: {e.message}")
except MciasInputError as e:
print(f"bad input: {e.message}")
except MciasConflictError as e:
print(f"conflict: {e.message}")
except MciasServerError as e:
print(f"server error {e.status_code}: {e.message}")
```
All exception types are subclasses of `MciasError`, which has attributes:
- `status_code: int` — HTTP status code
- `message: str` — server error message
## Thread Safety
`Client` is **not** thread-safe. Each thread should use its own `Client`
instance.
## Running Tests
```sh
pip install -e ".[dev]"
pytest tests/ -q
mypy mcias_client/ tests/
ruff check mcias_client/ tests/
```

View File

@@ -0,0 +1,12 @@
Metadata-Version: 2.4
Name: mcias-client
Version: 0.1.0
Summary: Python client library for the MCIAS identity and access management API
License: MIT
Requires-Python: >=3.11
Requires-Dist: httpx>=0.27
Provides-Extra: dev
Requires-Dist: pytest>=8; extra == "dev"
Requires-Dist: respx>=0.21; extra == "dev"
Requires-Dist: mypy>=1.10; extra == "dev"
Requires-Dist: ruff>=0.4; extra == "dev"

View File

@@ -0,0 +1,13 @@
README.md
pyproject.toml
mcias_client/__init__.py
mcias_client/_client.py
mcias_client/_errors.py
mcias_client/_models.py
mcias_client/py.typed
mcias_client.egg-info/PKG-INFO
mcias_client.egg-info/SOURCES.txt
mcias_client.egg-info/dependency_links.txt
mcias_client.egg-info/requires.txt
mcias_client.egg-info/top_level.txt
tests/test_client.py

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,7 @@
httpx>=0.27
[dev]
pytest>=8
respx>=0.21
mypy>=1.10
ruff>=0.4

View File

@@ -0,0 +1 @@
mcias_client

View File

@@ -0,0 +1,27 @@
"""MCIAS Python client library."""
from ._client import Client
from ._errors import (
MciasAuthError,
MciasConflictError,
MciasError,
MciasForbiddenError,
MciasInputError,
MciasNotFoundError,
MciasServerError,
)
from ._models import Account, PGCreds, PublicKey, TokenClaims
__all__ = [
"Client",
"MciasError",
"MciasAuthError",
"MciasForbiddenError",
"MciasNotFoundError",
"MciasInputError",
"MciasConflictError",
"MciasServerError",
"Account",
"PublicKey",
"TokenClaims",
"PGCreds",
]

View File

@@ -0,0 +1,216 @@
"""Synchronous HTTP client for the MCIAS API."""
from __future__ import annotations
import ssl
from types import TracebackType
from typing import Any
import httpx
from ._errors import raise_for_status
from ._models import Account, PGCreds, PublicKey, TokenClaims
class Client:
"""Synchronous MCIAS API client backed by httpx."""
def __init__(
self,
server_url: str,
*,
ca_cert_path: str | None = None,
token: str | None = None,
timeout: float = 30.0,
) -> None:
self._base_url = server_url.rstrip("/")
self.token = token
ssl_context: ssl.SSLContext | bool
if ca_cert_path is not None:
ssl_context = ssl.create_default_context(cafile=ca_cert_path)
else:
ssl_context = True # use default SSL verification
self._http = httpx.Client(
verify=ssl_context,
timeout=timeout,
)
def __enter__(self) -> Client:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
self.close()
def close(self) -> None:
"""Close the underlying HTTP client."""
self._http.close()
def _request(
self,
method: str,
path: str,
*,
json: dict[str, Any] | None = None,
expected_status: int | None = None,
) -> dict[str, Any] | None:
"""Send an HTTP request and return the parsed JSON body.
Returns None for 204 No Content responses.
Raises the appropriate MciasError subclass on 4xx/5xx.
"""
url = f"{self._base_url}{path}"
headers: dict[str, str] = {}
if self.token is not None:
headers["Authorization"] = f"Bearer {self.token}"
response = self._http.request(method, url, json=json, headers=headers)
status = response.status_code
if expected_status is not None:
success_codes = {expected_status}
else:
success_codes = {200, 201, 204}
if status not in success_codes and status >= 400:
try:
body = response.json()
message = str(body.get("error", response.text))
except Exception:
message = response.text
raise_for_status(status, message)
if status == 204 or not response.content:
return None
return response.json() # type: ignore[no-any-return]
def health(self) -> None:
"""GET /v1/health — liveness check."""
self._request("GET", "/v1/health")
def get_public_key(self) -> PublicKey:
"""GET /v1/keys/public — retrieve the server's Ed25519 public key."""
data = self._request("GET", "/v1/keys/public")
assert data is not None
return PublicKey.from_dict(data)
def login(
self,
username: str,
password: str,
totp_code: str | None = None,
) -> tuple[str, str]:
"""POST /v1/auth/login — authenticate and obtain a JWT.
Returns (token, expires_at). Stores the token on self.token.
"""
payload: dict[str, Any] = {
"username": username,
"password": password,
}
if totp_code is not None:
payload["totp_code"] = totp_code
data = self._request("POST", "/v1/auth/login", json=payload)
assert data is not None
token = str(data["token"])
expires_at = str(data["expires_at"])
self.token = token
return token, expires_at
def logout(self) -> None:
"""POST /v1/auth/logout — invalidate the current token."""
self._request("POST", "/v1/auth/logout")
self.token = None
def renew_token(self) -> tuple[str, str]:
"""POST /v1/auth/renew — exchange current token for a fresh one.
Returns (token, expires_at). Updates self.token.
"""
data = self._request("POST", "/v1/auth/renew")
assert data is not None
token = str(data["token"])
expires_at = str(data["expires_at"])
self.token = token
return token, expires_at
def validate_token(self, token: str) -> TokenClaims:
"""POST /v1/token/validate — check whether a token is valid."""
data = self._request("POST", "/v1/token/validate", json={"token": token})
assert data is not None
return TokenClaims.from_dict(data)
def create_account(
self,
username: str,
account_type: str,
*,
password: str | None = None,
) -> Account:
"""POST /v1/accounts — create a new account."""
payload: dict[str, Any] = {
"username": username,
"account_type": account_type,
}
if password is not None:
payload["password"] = password
data = self._request("POST", "/v1/accounts", json=payload)
assert data is not None
return Account.from_dict(data)
def list_accounts(self) -> list[Account]:
"""GET /v1/accounts — list all accounts."""
data = self._request("GET", "/v1/accounts")
assert data is not None
accounts_raw = data.get("accounts") or []
return [Account.from_dict(a) for a in accounts_raw]
def get_account(self, account_id: str) -> Account:
"""GET /v1/accounts/{id} — retrieve a single account."""
data = self._request("GET", f"/v1/accounts/{account_id}")
assert data is not None
return Account.from_dict(data)
def update_account(
self,
account_id: str,
*,
status: str | None = None,
) -> Account:
"""PATCH /v1/accounts/{id} — update account fields."""
payload: dict[str, Any] = {}
if status is not None:
payload["status"] = status
data = self._request("PATCH", f"/v1/accounts/{account_id}", json=payload)
assert data is not None
return Account.from_dict(data)
def delete_account(self, account_id: str) -> None:
"""DELETE /v1/accounts/{id} — permanently remove an account."""
self._request("DELETE", f"/v1/accounts/{account_id}")
def get_roles(self, account_id: str) -> list[str]:
"""GET /v1/accounts/{id}/roles — list roles for an account."""
data = self._request("GET", f"/v1/accounts/{account_id}/roles")
assert data is not None
roles_raw = data.get("roles") or []
return [str(r) for r in roles_raw]
def set_roles(self, account_id: str, roles: list[str]) -> None:
"""PUT /v1/accounts/{id}/roles — replace the full role set."""
self._request(
"PUT",
f"/v1/accounts/{account_id}/roles",
json={"roles": roles},
)
def issue_service_token(self, account_id: str) -> tuple[str, str]:
"""POST /v1/accounts/{id}/token — issue a long-lived service token.
Returns (token, expires_at).
"""
data = self._request("POST", f"/v1/accounts/{account_id}/token")
assert data is not None
return str(data["token"]), str(data["expires_at"])
def revoke_token(self, jti: str) -> None:
"""DELETE /v1/token/{jti} — revoke a token by JTI."""
self._request("DELETE", f"/v1/token/{jti}")
def get_pg_creds(self, account_id: str) -> PGCreds:
"""GET /v1/accounts/{id}/pgcreds — retrieve Postgres credentials."""
data = self._request("GET", f"/v1/accounts/{account_id}/pgcreds")
assert data is not None
return PGCreds.from_dict(data)
def set_pg_creds(
self,
account_id: str,
host: str,
port: int,
database: str,
username: str,
password: str,
) -> None:
"""PUT /v1/accounts/{id}/pgcreds — store or replace Postgres credentials."""
payload: dict[str, Any] = {
"host": host,
"port": port,
"database": database,
"username": username,
"password": password,
}
self._request("PUT", f"/v1/accounts/{account_id}/pgcreds", json=payload)

View File

@@ -0,0 +1,30 @@
"""Typed exception hierarchy for MCIAS client errors."""
class MciasError(Exception):
"""Base exception for all MCIAS API errors."""
def __init__(self, status_code: int, message: str) -> None:
super().__init__(f"HTTP {status_code}: {message}")
self.status_code = status_code
self.message = message
class MciasAuthError(MciasError):
"""401 Unauthorized — token missing, invalid, or expired."""
class MciasForbiddenError(MciasError):
"""403 Forbidden — insufficient role."""
class MciasNotFoundError(MciasError):
"""404 Not Found — resource does not exist."""
class MciasInputError(MciasError):
"""400 Bad Request — malformed request."""
class MciasConflictError(MciasError):
"""409 Conflict — e.g. duplicate username."""
class MciasServerError(MciasError):
"""5xx — unexpected server error."""
def raise_for_status(status_code: int, message: str) -> None:
"""Raise the appropriate MciasError subclass for the given status code."""
exc_map = {
400: MciasInputError,
401: MciasAuthError,
403: MciasForbiddenError,
404: MciasNotFoundError,
409: MciasConflictError,
}
cls = exc_map.get(status_code, MciasServerError)
raise cls(status_code, message)

View File

@@ -0,0 +1,76 @@
"""Data models for MCIAS API responses."""
from dataclasses import dataclass, field
from typing import cast
@dataclass
class Account:
"""A user or service account."""
id: str
username: str
account_type: str
status: str
created_at: str
updated_at: str
totp_enabled: bool = False
@classmethod
def from_dict(cls, d: dict[str, object]) -> "Account":
return cls(
id=str(d["id"]),
username=str(d["username"]),
account_type=str(d["account_type"]),
status=str(d["status"]),
created_at=str(d["created_at"]),
updated_at=str(d["updated_at"]),
totp_enabled=bool(d.get("totp_enabled", False)),
)
@dataclass
class PublicKey:
"""Ed25519 public key in JWK format."""
kty: str
crv: str
x: str
use: str = ""
alg: str = ""
@classmethod
def from_dict(cls, d: dict[str, object]) -> "PublicKey":
return cls(
kty=str(d["kty"]),
crv=str(d["crv"]),
x=str(d["x"]),
use=str(d.get("use", "")),
alg=str(d.get("alg", "")),
)
@dataclass
class TokenClaims:
"""Claims from a validated token."""
valid: bool
sub: str = ""
roles: list[str] = field(default_factory=list)
expires_at: str = ""
@classmethod
def from_dict(cls, d: dict[str, object]) -> "TokenClaims":
roles_raw = cast(list[object], d.get("roles") or [])
return cls(
valid=bool(d.get("valid", False)),
sub=str(d.get("sub", "")),
roles=[str(r) for r in roles_raw],
expires_at=str(d.get("expires_at", "")),
)
@dataclass
class PGCreds:
"""Postgres connection credentials."""
host: str
port: int
database: str
username: str
password: str
@classmethod
def from_dict(cls, d: dict[str, object]) -> "PGCreds":
return cls(
host=str(d["host"]),
port=int(cast(int, d["port"])),
database=str(d["database"]),
username=str(d["username"]),
password=str(d["password"]),
)

View File

View File

@@ -0,0 +1,31 @@
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"
[project]
name = "mcias-client"
version = "0.1.0"
description = "Python client library for the MCIAS identity and access management API"
requires-python = ">=3.11"
license = { text = "MIT" }
dependencies = [
"httpx>=0.27",
]
[project.optional-dependencies]
dev = [
"pytest>=8",
"respx>=0.21",
"mypy>=1.10",
"ruff>=0.4",
]
[tool.setuptools.packages.find]
where = ["."]
include = ["mcias_client*"]
[tool.mypy]
strict = true
python_version = "3.11"
[tool.ruff]
target-version = "py311"
line-length = 88
[tool.ruff.lint]
select = ["E", "F", "W", "I", "UP"]

View File

View File

@@ -0,0 +1,320 @@
"""Tests for the MCIAS Python client using respx to mock httpx."""
from __future__ import annotations
import httpx
import pytest
import respx
from mcias_client import (
Client,
MciasAuthError,
MciasConflictError,
MciasError,
MciasForbiddenError,
MciasInputError,
MciasNotFoundError,
MciasServerError,
)
from mcias_client._models import Account, PGCreds, PublicKey, TokenClaims
BASE_URL = "https://auth.example.com"
SAMPLE_ACCOUNT: dict[str, object] = {
"id": "acc-001",
"username": "alice",
"account_type": "user",
"status": "active",
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2024-01-01T00:00:00Z",
"totp_enabled": False,
}
SAMPLE_PK: dict[str, object] = {
"kty": "OKP",
"crv": "Ed25519",
"x": "base64urlpublickey",
"use": "sig",
"alg": "EdDSA",
}
@pytest.fixture
def client() -> Client:
return Client(BASE_URL)
@pytest.fixture
def admin_client() -> Client:
return Client(BASE_URL, token="admin-token")
def test_client_init() -> None:
c = Client(BASE_URL)
assert c.token is None
assert c._base_url == BASE_URL
c.close()
def test_client_strips_trailing_slash() -> None:
c = Client(BASE_URL + "/")
assert c._base_url == BASE_URL
c.close()
def test_client_init_with_token() -> None:
c = Client(BASE_URL, token="mytoken")
assert c.token == "mytoken"
c.close()
@respx.mock
def test_health_ok(client: Client) -> None:
respx.get(f"{BASE_URL}/v1/health").mock(return_value=httpx.Response(200))
client.health() # should not raise
@respx.mock
def test_health_server_error(client: Client) -> None:
respx.get(f"{BASE_URL}/v1/health").mock(
return_value=httpx.Response(503, json={"error": "service unavailable"})
)
with pytest.raises(MciasServerError) as exc_info:
client.health()
assert exc_info.value.status_code == 503
@respx.mock
def test_get_public_key(client: Client) -> None:
respx.get(f"{BASE_URL}/v1/keys/public").mock(
return_value=httpx.Response(200, json=SAMPLE_PK)
)
pk = client.get_public_key()
assert isinstance(pk, PublicKey)
assert pk.kty == "OKP"
assert pk.crv == "Ed25519"
assert pk.alg == "EdDSA"
@respx.mock
def test_login_success(client: Client) -> None:
respx.post(f"{BASE_URL}/v1/auth/login").mock(
return_value=httpx.Response(
200,
json={"token": "jwt-token-abc", "expires_at": "2099-01-01T00:00:00Z"},
)
)
token, expires_at = client.login("alice", "s3cr3t")
assert token == "jwt-token-abc"
assert expires_at == "2099-01-01T00:00:00Z"
assert client.token == "jwt-token-abc"
@respx.mock
def test_login_unauthorized(client: Client) -> None:
respx.post(f"{BASE_URL}/v1/auth/login").mock(
return_value=httpx.Response(
401, json={"error": "invalid credentials"}
)
)
with pytest.raises(MciasAuthError) as exc_info:
client.login("alice", "wrong")
assert exc_info.value.status_code == 401
@respx.mock
def test_logout_clears_token(admin_client: Client) -> None:
respx.post(f"{BASE_URL}/v1/auth/logout").mock(
return_value=httpx.Response(204)
)
assert admin_client.token == "admin-token"
admin_client.logout()
assert admin_client.token is None
@respx.mock
def test_renew_token(admin_client: Client) -> None:
respx.post(f"{BASE_URL}/v1/auth/renew").mock(
return_value=httpx.Response(
200,
json={"token": "new-jwt-token", "expires_at": "2099-06-01T00:00:00Z"},
)
)
token, expires_at = admin_client.renew_token()
assert token == "new-jwt-token"
assert expires_at == "2099-06-01T00:00:00Z"
assert admin_client.token == "new-jwt-token"
@respx.mock
def test_validate_token_valid(admin_client: Client) -> None:
respx.post(f"{BASE_URL}/v1/token/validate").mock(
return_value=httpx.Response(
200,
json={
"valid": True,
"sub": "acc-001",
"roles": ["admin"],
"expires_at": "2099-01-01T00:00:00Z",
},
)
)
claims = admin_client.validate_token("some-token")
assert isinstance(claims, TokenClaims)
assert claims.valid is True
assert claims.sub == "acc-001"
assert claims.roles == ["admin"]
@respx.mock
def test_validate_token_invalid(admin_client: Client) -> None:
"""valid=False in the response body is NOT an exception — just a falsy claim."""
respx.post(f"{BASE_URL}/v1/token/validate").mock(
return_value=httpx.Response(
200,
json={"valid": False, "sub": "", "roles": []},
)
)
claims = admin_client.validate_token("expired-token")
assert claims.valid is False
@respx.mock
def test_create_account(admin_client: Client) -> None:
respx.post(f"{BASE_URL}/v1/accounts").mock(
return_value=httpx.Response(201, json=SAMPLE_ACCOUNT)
)
acc = admin_client.create_account("alice", "user", password="pass123")
assert isinstance(acc, Account)
assert acc.id == "acc-001"
assert acc.username == "alice"
@respx.mock
def test_create_account_conflict(admin_client: Client) -> None:
respx.post(f"{BASE_URL}/v1/accounts").mock(
return_value=httpx.Response(409, json={"error": "username already exists"})
)
with pytest.raises(MciasConflictError) as exc_info:
admin_client.create_account("alice", "user")
assert exc_info.value.status_code == 409
@respx.mock
def test_list_accounts(admin_client: Client) -> None:
second = {**SAMPLE_ACCOUNT, "id": "acc-002"}
respx.get(f"{BASE_URL}/v1/accounts").mock(
return_value=httpx.Response(
200, json={"accounts": [SAMPLE_ACCOUNT, second]}
)
)
accounts = admin_client.list_accounts()
assert len(accounts) == 2
assert all(isinstance(a, Account) for a in accounts)
@respx.mock
def test_get_account(admin_client: Client) -> None:
respx.get(f"{BASE_URL}/v1/accounts/acc-001").mock(
return_value=httpx.Response(200, json=SAMPLE_ACCOUNT)
)
acc = admin_client.get_account("acc-001")
assert acc.id == "acc-001"
@respx.mock
def test_update_account(admin_client: Client) -> None:
updated = {**SAMPLE_ACCOUNT, "status": "suspended"}
respx.patch(f"{BASE_URL}/v1/accounts/acc-001").mock(
return_value=httpx.Response(200, json=updated)
)
acc = admin_client.update_account("acc-001", status="suspended")
assert acc.status == "suspended"
@respx.mock
def test_delete_account(admin_client: Client) -> None:
respx.delete(f"{BASE_URL}/v1/accounts/acc-001").mock(
return_value=httpx.Response(204)
)
admin_client.delete_account("acc-001") # should not raise
@respx.mock
def test_get_roles(admin_client: Client) -> None:
respx.get(f"{BASE_URL}/v1/accounts/acc-001/roles").mock(
return_value=httpx.Response(200, json={"roles": ["admin", "viewer"]})
)
roles = admin_client.get_roles("acc-001")
assert roles == ["admin", "viewer"]
@respx.mock
def test_set_roles(admin_client: Client) -> None:
respx.put(f"{BASE_URL}/v1/accounts/acc-001/roles").mock(
return_value=httpx.Response(204)
)
admin_client.set_roles("acc-001", ["viewer"]) # should not raise
@respx.mock
def test_issue_service_token(admin_client: Client) -> None:
respx.post(f"{BASE_URL}/v1/accounts/acc-001/token").mock(
return_value=httpx.Response(
200,
json={"token": "svc-token-xyz", "expires_at": "2099-12-31T00:00:00Z"},
)
)
token, expires_at = admin_client.issue_service_token("acc-001")
assert token == "svc-token-xyz"
assert expires_at == "2099-12-31T00:00:00Z"
@respx.mock
def test_revoke_token(admin_client: Client) -> None:
jti = "some-jti-uuid"
respx.delete(f"{BASE_URL}/v1/token/{jti}").mock(
return_value=httpx.Response(204)
)
admin_client.revoke_token(jti) # should not raise
SAMPLE_PG_CREDS: dict[str, object] = {
"host": "db.example.com",
"port": 5432,
"database": "myapp",
"username": "appuser",
"password": "s3cr3t",
}
@respx.mock
def test_get_pg_creds(admin_client: Client) -> None:
respx.get(f"{BASE_URL}/v1/accounts/acc-001/pgcreds").mock(
return_value=httpx.Response(200, json=SAMPLE_PG_CREDS)
)
creds = admin_client.get_pg_creds("acc-001")
assert isinstance(creds, PGCreds)
assert creds.host == "db.example.com"
assert creds.port == 5432
assert creds.database == "myapp"
@respx.mock
def test_set_pg_creds(admin_client: Client) -> None:
respx.put(f"{BASE_URL}/v1/accounts/acc-001/pgcreds").mock(
return_value=httpx.Response(204)
)
admin_client.set_pg_creds(
"acc-001",
host="db.example.com",
port=5432,
database="myapp",
username="appuser",
password="s3cr3t",
) # should not raise
@pytest.mark.parametrize(
("status_code", "exc_class"),
[
(400, MciasInputError),
(401, MciasAuthError),
(403, MciasForbiddenError),
(404, MciasNotFoundError),
(409, MciasConflictError),
(500, MciasServerError),
],
)
@respx.mock
def test_error_types(
client: Client,
status_code: int,
exc_class: type,
) -> None:
respx.get(f"{BASE_URL}/v1/health").mock(
return_value=httpx.Response(
status_code, json={"error": "test error"}
)
)
with pytest.raises(exc_class) as exc_info:
client.health()
err = exc_info.value
assert isinstance(err, MciasError)
assert err.status_code == status_code
@respx.mock
def test_context_manager() -> None:
respx.get(f"{BASE_URL}/v1/health").mock(return_value=httpx.Response(200))
with Client(BASE_URL) as c:
c.health()
assert c._http.is_closed
@respx.mock
def test_integration_login_validate_logout() -> None:
"""Full flow: login, validate the issued token, then logout."""
login_resp = httpx.Response(
200,
json={"token": "flow-token-abc", "expires_at": "2099-01-01T00:00:00Z"},
)
validate_resp = httpx.Response(
200,
json={
"valid": True,
"sub": "acc-001",
"roles": ["admin"],
"expires_at": "2099-01-01T00:00:00Z",
},
)
logout_resp = httpx.Response(204)
respx.post(f"{BASE_URL}/v1/auth/login").mock(return_value=login_resp)
respx.post(f"{BASE_URL}/v1/token/validate").mock(return_value=validate_resp)
respx.post(f"{BASE_URL}/v1/auth/logout").mock(return_value=logout_resp)
with Client(BASE_URL) as c:
token, _ = c.login("alice", "password")
assert token == "flow-token-abc"
assert c.token == "flow-token-abc"
claims = c.validate_token(token)
assert claims.valid is True
assert "admin" in claims.roles
c.logout()
assert c.token is None