- Add TOTP enrollment/confirmation/removal to all clients - Add password change and admin set-password endpoints - Add account listing, status update, and tag management - Add audit log listing with filter support - Add policy rule CRUD operations - Expand test coverage for all new endpoints across clients - Fix .gitignore to exclude built binaries Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
147 lines
5.2 KiB
Python
147 lines
5.2 KiB
Python
"""Data models for MCIAS API responses."""
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, cast
|
|
|
|
|
|
@dataclass
|
|
class Account:
|
|
"""A user or service account."""
|
|
id: str
|
|
username: str
|
|
account_type: str
|
|
status: str
|
|
created_at: str
|
|
updated_at: str
|
|
totp_enabled: bool = False
|
|
@classmethod
|
|
def from_dict(cls, d: dict[str, object]) -> "Account":
|
|
return cls(
|
|
id=str(d["id"]),
|
|
username=str(d["username"]),
|
|
account_type=str(d["account_type"]),
|
|
status=str(d["status"]),
|
|
created_at=str(d["created_at"]),
|
|
updated_at=str(d["updated_at"]),
|
|
totp_enabled=bool(d.get("totp_enabled", False)),
|
|
)
|
|
@dataclass
|
|
class PublicKey:
|
|
"""Ed25519 public key in JWK format."""
|
|
kty: str
|
|
crv: str
|
|
x: str
|
|
use: str = ""
|
|
alg: str = ""
|
|
@classmethod
|
|
def from_dict(cls, d: dict[str, object]) -> "PublicKey":
|
|
return cls(
|
|
kty=str(d["kty"]),
|
|
crv=str(d["crv"]),
|
|
x=str(d["x"]),
|
|
use=str(d.get("use", "")),
|
|
alg=str(d.get("alg", "")),
|
|
)
|
|
@dataclass
|
|
class TokenClaims:
|
|
"""Claims from a validated token."""
|
|
valid: bool
|
|
sub: str = ""
|
|
roles: list[str] = field(default_factory=list)
|
|
expires_at: str = ""
|
|
@classmethod
|
|
def from_dict(cls, d: dict[str, object]) -> "TokenClaims":
|
|
roles_raw = cast(list[object], d.get("roles") or [])
|
|
return cls(
|
|
valid=bool(d.get("valid", False)),
|
|
sub=str(d.get("sub", "")),
|
|
roles=[str(r) for r in roles_raw],
|
|
expires_at=str(d.get("expires_at", "")),
|
|
)
|
|
@dataclass
|
|
class PGCreds:
|
|
"""Postgres connection credentials."""
|
|
host: str
|
|
port: int
|
|
database: str
|
|
username: str
|
|
password: str
|
|
@classmethod
|
|
def from_dict(cls, d: dict[str, object]) -> "PGCreds":
|
|
return cls(
|
|
host=str(d["host"]),
|
|
port=int(cast(int, d["port"])),
|
|
database=str(d["database"]),
|
|
username=str(d["username"]),
|
|
password=str(d["password"]),
|
|
)
|
|
@dataclass
|
|
class RuleBody:
|
|
"""Match conditions and effect of a policy rule."""
|
|
effect: str
|
|
roles: list[str] = field(default_factory=list)
|
|
account_types: list[str] = field(default_factory=list)
|
|
subject_uuid: str | None = None
|
|
actions: list[str] = field(default_factory=list)
|
|
resource_type: str | None = None
|
|
owner_matches_subject: bool | None = None
|
|
service_names: list[str] = field(default_factory=list)
|
|
required_tags: list[str] = field(default_factory=list)
|
|
@classmethod
|
|
def from_dict(cls, d: dict[str, object]) -> "RuleBody":
|
|
return cls(
|
|
effect=str(d["effect"]),
|
|
roles=[str(r) for r in cast(list[Any], d.get("roles") or [])],
|
|
account_types=[str(t) for t in cast(list[Any], d.get("account_types") or [])],
|
|
subject_uuid=str(d["subject_uuid"]) if d.get("subject_uuid") is not None else None,
|
|
actions=[str(a) for a in cast(list[Any], d.get("actions") or [])],
|
|
resource_type=str(d["resource_type"]) if d.get("resource_type") is not None else None,
|
|
owner_matches_subject=bool(d["owner_matches_subject"]) if d.get("owner_matches_subject") is not None else None,
|
|
service_names=[str(s) for s in cast(list[Any], d.get("service_names") or [])],
|
|
required_tags=[str(t) for t in cast(list[Any], d.get("required_tags") or [])],
|
|
)
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""Serialise to a JSON-compatible dict, omitting None/empty fields."""
|
|
out: dict[str, Any] = {"effect": self.effect}
|
|
if self.roles:
|
|
out["roles"] = self.roles
|
|
if self.account_types:
|
|
out["account_types"] = self.account_types
|
|
if self.subject_uuid is not None:
|
|
out["subject_uuid"] = self.subject_uuid
|
|
if self.actions:
|
|
out["actions"] = self.actions
|
|
if self.resource_type is not None:
|
|
out["resource_type"] = self.resource_type
|
|
if self.owner_matches_subject is not None:
|
|
out["owner_matches_subject"] = self.owner_matches_subject
|
|
if self.service_names:
|
|
out["service_names"] = self.service_names
|
|
if self.required_tags:
|
|
out["required_tags"] = self.required_tags
|
|
return out
|
|
@dataclass
|
|
class PolicyRule:
|
|
"""An operator-defined policy rule."""
|
|
id: int
|
|
priority: int
|
|
description: str
|
|
rule: RuleBody
|
|
enabled: bool
|
|
created_at: str
|
|
updated_at: str
|
|
not_before: str | None = None
|
|
expires_at: str | None = None
|
|
@classmethod
|
|
def from_dict(cls, d: dict[str, object]) -> "PolicyRule":
|
|
return cls(
|
|
id=int(cast(int, d["id"])),
|
|
priority=int(cast(int, d["priority"])),
|
|
description=str(d["description"]),
|
|
rule=RuleBody.from_dict(cast(dict[str, object], d["rule"])),
|
|
enabled=bool(d["enabled"]),
|
|
created_at=str(d["created_at"]),
|
|
updated_at=str(d["updated_at"]),
|
|
not_before=str(d["not_before"]) if d.get("not_before") is not None else None,
|
|
expires_at=str(d["expires_at"]) if d.get("expires_at") is not None else None,
|
|
)
|