diff --git a/.github/workflows/pykanidm.yml b/.github/workflows/pykanidm.yml index 9b1bb1ce2..35b8eba33 100644 --- a/.github/workflows/pykanidm.yml +++ b/.github/workflows/pykanidm.yml @@ -38,4 +38,4 @@ jobs: run: | cd pykanidm poetry install - poetry run pytest -v + poetry run pytest -v -m 'not network' diff --git a/proto/src/v1.rs b/proto/src/v1.rs index b47a7864e..96f2c0e78 100644 --- a/proto/src/v1.rs +++ b/proto/src/v1.rs @@ -1051,10 +1051,6 @@ pub enum AuthState { // Everything is good, your bearer token has been issued and is within // the result. Success(String), - // Everything is good, your cookie has been issued. - // Cookies no longer supported. Left as a comment as an example of alternate - // issue types. - // SuccessCookie, } #[derive(Debug, Serialize, Deserialize, ToSchema)] diff --git a/pykanidm/kanidm/__init__.py b/pykanidm/kanidm/__init__.py index 9813a574f..de1f6aa96 100644 --- a/pykanidm/kanidm/__init__.py +++ b/pykanidm/kanidm/__init__.py @@ -97,6 +97,9 @@ class KanidmClient: and not Path(self.config.ca_path).expanduser().resolve().exists() ): raise FileNotFoundError(f"CA Path not found: {self.config.ca_path}") + logging.debug( + "Setting up SSL context with CA path: %s", self.config.ca_path + ) self._ssl = ssl.create_default_context(cafile=self.config.ca_path) if self._ssl is not False: # ignoring this for typing because mypy is being weird @@ -186,14 +189,12 @@ class KanidmClient: content = await request.content.read() try: response_json = json_lib.loads(content) - if not isinstance(response_json, dict): - response_json = None response_headers = dict(request.headers) response_status = request.status except json_lib.JSONDecodeError as json_error: logging.error("Failed to JSON Decode Response: %s", json_error) logging.error("Response data: %s", content) - response_json = {} + response_json = None response_input = { "data": response_json, "content": content.decode("utf-8"), @@ -257,6 +258,11 @@ class KanidmClient: data = getattr(response, "data", {}) data["response"] = response retval = AuthInitResponse.model_validate(data) + + if update_internal_auth_token: + self.config.auth_token = response.headers.get( + "x-kanidm-auth-session-id", "" + ) return retval async def auth_begin( @@ -312,6 +318,7 @@ class KanidmClient: self, username: Optional[str] = None, password: Optional[str] = None, + update_internal_auth_token: bool = False, ) -> AuthState: """authenticates with a username and password, returns the auth token""" if username is None and password is None: @@ -325,7 +332,9 @@ class KanidmClient: if username is None or password is None: raise ValueError("Username and Password need to be set somewhere!") - auth_init: AuthInitResponse = await self.auth_init(username) + auth_init: AuthInitResponse = await self.auth_init( + username, update_internal_auth_token=update_internal_auth_token + ) if auth_init.response is None: raise NotImplementedError("This should throw a really cool response") @@ -340,12 +349,17 @@ class KanidmClient: # does a little bit of validation auth_begin_object = AuthBeginResponse.model_validate(auth_begin.data) auth_begin_object.response = auth_begin - return await self.auth_step_password(password=password, sessionid=sessionid) + return await self.auth_step_password( + password=password, + sessionid=sessionid, + update_internal_auth_token=update_internal_auth_token, + ) async def auth_step_password( self, - sessionid: str, + sessionid: Optional[str] = None, password: Optional[str] = None, + update_internal_auth_token: bool = False, ) -> AuthState: """does the password auth step""" @@ -356,8 +370,15 @@ class KanidmClient: "Password has to be passed to auth_step_password or in self.password!" ) + if sessionid is not None: + headers = {"x-kanidm-auth-session-id": sessionid} + elif self.config.auth_token is not None: + headers = {"x-kanidm-auth-session-id": self.config.auth_token} + cred_auth = {"step": {"cred": {"password": password}}} - response = await self.call_post(path="/v1/auth", json=cred_auth) + response = await self.call_post( + path="/v1/auth", json=cred_auth, headers=headers + ) if response.status_code != 200: # TODO: write test coverage auth_step_password raises AuthCredFailed @@ -367,6 +388,9 @@ class KanidmClient: result = AuthState.model_validate(response.data) result.response = response + if update_internal_auth_token: + self.config.auth_token = result.state.success + # pull the token out and set it if result.state.success is None: # TODO: write test coverage for AuthCredFailed @@ -374,13 +398,34 @@ class KanidmClient: result.sessionid = result.state.success return result + async def auth_as_anonymous(self) -> None: + """authenticate as the anonymous user""" + + init = await self.auth_init("anonymous", update_internal_auth_token=True) + logging.debug("auth_init completed, moving onto begin step") + await self.auth_begin( + method=init.state.choose[0], update_internal_auth_token=True + ) + logging.debug("auth_begin completed, moving onto cred step") + cred_auth = {"step": {"cred": "anonymous"}} + + if self.config.auth_token is None: + raise ValueError("Auth token is not set, auth failure!") + + response = await self.call_post( + path=KANIDMURLS["auth"], + json=cred_auth, + ) + state = AuthState.model_validate(response.data) + logging.debug("anonymous auth completed, setting token") + self.config.auth_token = state.state.success + def session_header( self, sessionid: str, ) -> Dict[str, str]: """create a headers dict from a session id""" # TODO: perhaps allow session_header to take a dict and update it, too? - return { "authorization": f"bearer {sessionid}", } @@ -407,27 +452,9 @@ class KanidmClient: grouplist = GroupList.model_validate(json_lib.loads(response.content)) return [group.as_groupinfo() for group in grouplist.root] - async def auth_as_anonymous(self) -> None: - """authenticate as the anonymous user""" + async def idm_oauth2_rs_list(self) -> ClientResponse: + """gets the list of oauth2 resource servers""" + endpoint = "/v1/oauth2" - init = await self.auth_init("anonymous", update_internal_auth_token=True) - logging.debug("auth_init completed, moving onto cred step") - await self.auth_begin( - method=init.state.choose[0], update_internal_auth_token=True - ) - logging.debug("auth_begin completed, moving onto cred step") - cred_auth = {"step": {"cred": "anonymous"}} - headers = {} - if self.config.auth_token is None: - raise ValueError("Auth token is not set, auth failure!") - else: - headers["x-kanidm-auth-session-id"] = self.config.auth_token - - response = await self.call_post( - path=KANIDMURLS["auth"], - json=cred_auth, - headers=headers, - ) - state = AuthState.model_validate(response.data) - logging.debug("anonymous auth completed, setting token") - self.config.auth_token = state.state.success + resp = await self.call_get(endpoint) + return resp diff --git a/pykanidm/kanidm/types.py b/pykanidm/kanidm/types.py index 9961bae01..5b8f7fd90 100644 --- a/pykanidm/kanidm/types.py +++ b/pykanidm/kanidm/types.py @@ -20,7 +20,8 @@ class ClientResponse(BaseModel): """ content: Optional[str] = None - data: Optional[Dict[str, Any]] = None + # the data field is used for the json-parsed response + data: Optional[Any] = None headers: Dict[str, Any] status_code: int model_config = ConfigDict(arbitrary_types_allowed=True) @@ -160,7 +161,7 @@ class KanidmClientConfig(BaseModel): verify_hostnames: bool = True verify_certificate: bool = True - ca_path: Optional[str] = None + ca_path: Optional[str] = Field(default=None, alias='verify_ca') username: Optional[str] = None password: Optional[str] = None diff --git a/pykanidm/tests/test_config_loader.py b/pykanidm/tests/test_config_loader.py index 40560ee55..c87be8ec6 100644 --- a/pykanidm/tests/test_config_loader.py +++ b/pykanidm/tests/test_config_loader.py @@ -13,7 +13,7 @@ from kanidm.utils import load_config logging.basicConfig(level=logging.DEBUG) -EXAMPLE_CONFIG_FILE = "../examples/config" +EXAMPLE_CONFIG_FILE = "../examples/config_localhost" @pytest.fixture(scope="function") @@ -31,7 +31,7 @@ def test_load_config_file() -> None: pytest.skip() print("Loading config file") config = load_config(EXAMPLE_CONFIG_FILE) - assert config.get("uri") == "https://idm.example.com" + assert config.get("uri") == "https://localhost:8443" print(f"{config.get('uri')=}") print(config) diff --git a/pykanidm/tests/test_oauth2.py b/pykanidm/tests/test_oauth2.py new file mode 100644 index 000000000..45ae30516 --- /dev/null +++ b/pykanidm/tests/test_oauth2.py @@ -0,0 +1,42 @@ +import json +import logging + +from kanidm import KanidmClient + +import pytest + +@pytest.fixture(scope="function") +async def client() -> KanidmClient: + """sets up a client with a basic thing""" + return KanidmClient(config_file="../examples/config_localhost") + +@pytest.mark.network +@pytest.mark.asyncio +async def test_idm_oauth2_rs_list(client: KanidmClient) -> None: + """tests getting the list of oauth2 resource servers""" + logging.basicConfig(level=logging.DEBUG) + print(f"config: {client.config}") + + username = "admin" + # change this to be your admin password. + password = "Ek7A0fShLsCTXgK2xDqC9TNUgPYQdVFB6RMGKXLyNtGL5cER" + + auth_resp = await client.authenticate_password(username, password, update_internal_auth_token=True) + assert auth_resp.state.success is not None + + resp = await client.idm_oauth2_rs_list() + print("content:") + print(json.dumps(resp.data, indent=4)) + assert resp.status_code == 200 + + if resp.data is not None: + for oauth_rs in resp.data: + oauth2_rs_sup_scope_map = oauth_rs.get("attrs", {}).get("oauth2_rs_sup_scope_map", {}) + for mapping in oauth2_rs_sup_scope_map: + print(f"oauth2_rs_sup_scope_map: {mapping}") + user, scopes = mapping.split(":") + scopes = scopes.replace("{", "[").replace("}", "]") + scopes = json.loads(scopes) + print(f"{user=} {scopes=}") + + diff --git a/pykanidm/tests/test_radius_config.py b/pykanidm/tests/test_radius_config.py index 4ef260c69..cce830cf7 100644 --- a/pykanidm/tests/test_radius_config.py +++ b/pykanidm/tests/test_radius_config.py @@ -12,19 +12,6 @@ from kanidm.utils import load_config EXAMPLE_CONFIG_FILE = "../examples/config" - -def test_load_config_file() -> None: - """tests that the file loads""" - if not Path(EXAMPLE_CONFIG_FILE).expanduser().resolve().exists(): - print("Can't find client config file", file=sys.stderr) - pytest.skip() - config = load_config(EXAMPLE_CONFIG_FILE) - kanidm_config = KanidmClientConfig.model_validate(config) - assert kanidm_config.uri == "https://idm.example.com/" - print(f"{kanidm_config.uri=}") - print(kanidm_config) - - def test_radius_groups() -> None: """testing loading a config file with radius groups defined"""