diff --git a/pykanidm/kanidm/__init__.py b/pykanidm/kanidm/__init__.py index cf0623069..ece21d27e 100644 --- a/pykanidm/kanidm/__init__.py +++ b/pykanidm/kanidm/__init__.py @@ -17,7 +17,13 @@ import yarl from kanidm.models.group import Group, GroupList, IGroup, RawGroup from kanidm.models.oauth2_rs import IOauth2Rs, OAuth2Rs, Oauth2RsList, RawOAuth2Rs -from kanidm.models.person import IPerson, Person, PersonList, RawPerson +from kanidm.models.person import ( + IPerson, + Person, + PersonList, + RawPerson, + PersonCredentialResetToken, +) from kanidm.models.service_account import ( IServiceAccount, ServiceAccount, @@ -93,7 +99,7 @@ class KanidmClient: """Constructor for KanidmClient""" self.logger = logger or getLogger(__name__) - self.instance_name = instance_name # TODO: use this in loaders etc + self.instance_name = instance_name # TODO: use this in loaders etc if config is not None: self.config = config else: @@ -123,7 +129,7 @@ class KanidmClient: def _configure_ssl(self) -> None: """Sets up SSL configuration for the client""" - if False in [self.config.verify_certificate, self.config.verify_hostnames ]: + if False in [self.config.verify_certificate, self.config.verify_hostnames]: logging.debug("Setting up SSL context with no verification") self._ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH) self._ssl_context.hostname_checks_common_name = False @@ -135,7 +141,7 @@ class KanidmClient: raise FileNotFoundError(f"CA Path not found: {self.config.ca_path}") else: self.logger.debug("Setting up SSL context with CA path=%s", self.config.ca_path) - self._ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,cafile=self.config.ca_path) + self._ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=self.config.ca_path) else: logging.debug("Setting up default SSL context") @@ -765,6 +771,21 @@ class KanidmClient: endpoint = f"{Endpoints.PERSON}/{id}" return await self.call_delete(endpoint) + async def person_account_credential_update_token(self, id: str, ttl: Optional[int] = None) -> PersonCredentialResetToken: + """Create a password reset token for person with an optional time to live in seconds""" + endpoint = f"{Endpoints.PERSON}/{id}/_credential/_update_intent" + if ttl: + endpoint = f"{endpoint}/{ttl}" + + response: ClientResponse[Any] = await self.call_get(endpoint) + if response.content is None: + raise ValueError(f"Failed to get token: {response.content}") + if response.status_code != 200: + raise ValueError(f"Failed to get token: {response.content}") + token = PersonCredentialResetToken.model_validate(json_lib.loads(response.content)) + + return token + async def person_account_post_ssh_key(self, id: str, tag: str, pubkey: str) -> ClientResponse[None]: """Create an SSH key for a user""" endpoint = f"{Endpoints.PERSON}/{id}/_ssh_pubkeys" diff --git a/pykanidm/kanidm/models/person.py b/pykanidm/kanidm/models/person.py index 0de64ace8..db1303d91 100644 --- a/pykanidm/kanidm/models/person.py +++ b/pykanidm/kanidm/models/person.py @@ -38,8 +38,15 @@ class RawPerson(BaseModel): uuid=UUID(self.attrs["uuid"][0]), ) + PersonList = RootModel[List[RawPerson]] class IPerson(TypedDict): attrs: Dict[str, List[str]] + + +class PersonCredentialResetToken(BaseModel): + token: str + expiry_time: int + model_config = ConfigDict(arbitrary_types_allowed=True) diff --git a/pykanidm/kanidm/types.py b/pykanidm/kanidm/types.py index 4988144de..383678967 100644 --- a/pykanidm/kanidm/types.py +++ b/pykanidm/kanidm/types.py @@ -1,4 +1,5 @@ """ type objects """ + # pylint: disable=too-few-public-methods # ^ disabling this because pydantic models don't have public methods @@ -31,7 +32,7 @@ class ClientResponse(BaseModel, Generic[T]): class AuthInitResponse(BaseModel): - """Aelps parse the response from the Auth 'init' stage""" + """Helps parse the response from the Auth 'init' stage""" class _AuthInitState(BaseModel): """sub-class for the AuthInitResponse model""" @@ -146,9 +147,7 @@ class RadiusClient(BaseModel): socket.gethostbyname(value) return value except socket.gaierror as error: - raise ValueError( - f"ipaddr value ({value}) wasn't an IP Address, Network or valid hostname: {error}" - ) + raise ValueError(f"ipaddr value ({value}) wasn't an IP Address, Network or valid hostname: {error}") class KanidmClientConfig(BaseModel): @@ -196,9 +195,7 @@ class KanidmClientConfig(BaseModel): uri = urlparse(value) valid_schemes = ["http", "https"] if uri.scheme not in valid_schemes: - raise ValueError( - f"Invalid URL Scheme for uri='{value}': '{uri.scheme}' - expected one of {valid_schemes}" - ) + raise ValueError(f"Invalid URL Scheme for uri='{value}': '{uri.scheme}' - expected one of {valid_schemes}") # make sure the URI ends with a / if not value.endswith("/"):