pykanidm: Add retrieving credential reset token for a person. (#3279)

This commit is contained in:
George Wu 2024-12-08 18:52:51 -08:00 committed by GitHub
parent 9b3350f753
commit 97a1c39d62
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 36 additions and 11 deletions

View file

@ -17,7 +17,13 @@ import yarl
from kanidm.models.group import Group, GroupList, IGroup, RawGroup from kanidm.models.group import Group, GroupList, IGroup, RawGroup
from kanidm.models.oauth2_rs import IOauth2Rs, OAuth2Rs, Oauth2RsList, RawOAuth2Rs from kanidm.models.oauth2_rs import IOauth2Rs, OAuth2Rs, Oauth2RsList, RawOAuth2Rs
from kanidm.models.person import IPerson, Person, PersonList, RawPerson from kanidm.models.person import (
IPerson,
Person,
PersonList,
RawPerson,
PersonCredentialResetToken,
)
from kanidm.models.service_account import ( from kanidm.models.service_account import (
IServiceAccount, IServiceAccount,
ServiceAccount, ServiceAccount,
@ -93,7 +99,7 @@ class KanidmClient:
"""Constructor for KanidmClient""" """Constructor for KanidmClient"""
self.logger = logger or getLogger(__name__) self.logger = logger or getLogger(__name__)
self.instance_name = instance_name # TODO: use this in loaders etc self.instance_name = instance_name # TODO: use this in loaders etc
if config is not None: if config is not None:
self.config = config self.config = config
else: else:
@ -123,7 +129,7 @@ class KanidmClient:
def _configure_ssl(self) -> None: def _configure_ssl(self) -> None:
"""Sets up SSL configuration for the client""" """Sets up SSL configuration for the client"""
if False in [self.config.verify_certificate, self.config.verify_hostnames ]: if False in [self.config.verify_certificate, self.config.verify_hostnames]:
logging.debug("Setting up SSL context with no verification") logging.debug("Setting up SSL context with no verification")
self._ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH) self._ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
self._ssl_context.hostname_checks_common_name = False self._ssl_context.hostname_checks_common_name = False
@ -135,7 +141,7 @@ class KanidmClient:
raise FileNotFoundError(f"CA Path not found: {self.config.ca_path}") raise FileNotFoundError(f"CA Path not found: {self.config.ca_path}")
else: else:
self.logger.debug("Setting up SSL context with CA path=%s", self.config.ca_path) self.logger.debug("Setting up SSL context with CA path=%s", self.config.ca_path)
self._ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,cafile=self.config.ca_path) self._ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=self.config.ca_path)
else: else:
logging.debug("Setting up default SSL context") logging.debug("Setting up default SSL context")
@ -765,6 +771,21 @@ class KanidmClient:
endpoint = f"{Endpoints.PERSON}/{id}" endpoint = f"{Endpoints.PERSON}/{id}"
return await self.call_delete(endpoint) return await self.call_delete(endpoint)
async def person_account_credential_update_token(self, id: str, ttl: Optional[int] = None) -> PersonCredentialResetToken:
"""Create a password reset token for person with an optional time to live in seconds"""
endpoint = f"{Endpoints.PERSON}/{id}/_credential/_update_intent"
if ttl:
endpoint = f"{endpoint}/{ttl}"
response: ClientResponse[Any] = await self.call_get(endpoint)
if response.content is None:
raise ValueError(f"Failed to get token: {response.content}")
if response.status_code != 200:
raise ValueError(f"Failed to get token: {response.content}")
token = PersonCredentialResetToken.model_validate(json_lib.loads(response.content))
return token
async def person_account_post_ssh_key(self, id: str, tag: str, pubkey: str) -> ClientResponse[None]: async def person_account_post_ssh_key(self, id: str, tag: str, pubkey: str) -> ClientResponse[None]:
"""Create an SSH key for a user""" """Create an SSH key for a user"""
endpoint = f"{Endpoints.PERSON}/{id}/_ssh_pubkeys" endpoint = f"{Endpoints.PERSON}/{id}/_ssh_pubkeys"

View file

@ -38,8 +38,15 @@ class RawPerson(BaseModel):
uuid=UUID(self.attrs["uuid"][0]), uuid=UUID(self.attrs["uuid"][0]),
) )
PersonList = RootModel[List[RawPerson]] PersonList = RootModel[List[RawPerson]]
class IPerson(TypedDict): class IPerson(TypedDict):
attrs: Dict[str, List[str]] attrs: Dict[str, List[str]]
class PersonCredentialResetToken(BaseModel):
token: str
expiry_time: int
model_config = ConfigDict(arbitrary_types_allowed=True)

View file

@ -1,4 +1,5 @@
""" type objects """ """ type objects """
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
# ^ disabling this because pydantic models don't have public methods # ^ disabling this because pydantic models don't have public methods
@ -31,7 +32,7 @@ class ClientResponse(BaseModel, Generic[T]):
class AuthInitResponse(BaseModel): class AuthInitResponse(BaseModel):
"""Aelps parse the response from the Auth 'init' stage""" """Helps parse the response from the Auth 'init' stage"""
class _AuthInitState(BaseModel): class _AuthInitState(BaseModel):
"""sub-class for the AuthInitResponse model""" """sub-class for the AuthInitResponse model"""
@ -146,9 +147,7 @@ class RadiusClient(BaseModel):
socket.gethostbyname(value) socket.gethostbyname(value)
return value return value
except socket.gaierror as error: except socket.gaierror as error:
raise ValueError( raise ValueError(f"ipaddr value ({value}) wasn't an IP Address, Network or valid hostname: {error}")
f"ipaddr value ({value}) wasn't an IP Address, Network or valid hostname: {error}"
)
class KanidmClientConfig(BaseModel): class KanidmClientConfig(BaseModel):
@ -196,9 +195,7 @@ class KanidmClientConfig(BaseModel):
uri = urlparse(value) uri = urlparse(value)
valid_schemes = ["http", "https"] valid_schemes = ["http", "https"]
if uri.scheme not in valid_schemes: if uri.scheme not in valid_schemes:
raise ValueError( raise ValueError(f"Invalid URL Scheme for uri='{value}': '{uri.scheme}' - expected one of {valid_schemes}")
f"Invalid URL Scheme for uri='{value}': '{uri.scheme}' - expected one of {valid_schemes}"
)
# make sure the URI ends with a / # make sure the URI ends with a /
if not value.endswith("/"): if not value.endswith("/"):