pykanidm test code (#2202)

* Testing #1998 - validated response is JSON-parseable
* disable network tests in pytest
* fixing a type-handling thing in a test
This commit is contained in:
James Hodgkinson 2023-10-24 13:26:10 +10:00 committed by GitHub
parent d531f602c6
commit 6f3e932f7f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 106 additions and 53 deletions

View file

@ -38,4 +38,4 @@ jobs:
run: |
cd pykanidm
poetry install
poetry run pytest -v
poetry run pytest -v -m 'not network'

View file

@ -1051,10 +1051,6 @@ pub enum AuthState {
// Everything is good, your bearer token has been issued and is within
// the result.
Success(String),
// Everything is good, your cookie has been issued.
// Cookies no longer supported. Left as a comment as an example of alternate
// issue types.
// SuccessCookie,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]

View file

@ -97,6 +97,9 @@ class KanidmClient:
and not Path(self.config.ca_path).expanduser().resolve().exists()
):
raise FileNotFoundError(f"CA Path not found: {self.config.ca_path}")
logging.debug(
"Setting up SSL context with CA path: %s", self.config.ca_path
)
self._ssl = ssl.create_default_context(cafile=self.config.ca_path)
if self._ssl is not False:
# ignoring this for typing because mypy is being weird
@ -186,14 +189,12 @@ class KanidmClient:
content = await request.content.read()
try:
response_json = json_lib.loads(content)
if not isinstance(response_json, dict):
response_json = None
response_headers = dict(request.headers)
response_status = request.status
except json_lib.JSONDecodeError as json_error:
logging.error("Failed to JSON Decode Response: %s", json_error)
logging.error("Response data: %s", content)
response_json = {}
response_json = None
response_input = {
"data": response_json,
"content": content.decode("utf-8"),
@ -257,6 +258,11 @@ class KanidmClient:
data = getattr(response, "data", {})
data["response"] = response
retval = AuthInitResponse.model_validate(data)
if update_internal_auth_token:
self.config.auth_token = response.headers.get(
"x-kanidm-auth-session-id", ""
)
return retval
async def auth_begin(
@ -312,6 +318,7 @@ class KanidmClient:
self,
username: Optional[str] = None,
password: Optional[str] = None,
update_internal_auth_token: bool = False,
) -> AuthState:
"""authenticates with a username and password, returns the auth token"""
if username is None and password is None:
@ -325,7 +332,9 @@ class KanidmClient:
if username is None or password is None:
raise ValueError("Username and Password need to be set somewhere!")
auth_init: AuthInitResponse = await self.auth_init(username)
auth_init: AuthInitResponse = await self.auth_init(
username, update_internal_auth_token=update_internal_auth_token
)
if auth_init.response is None:
raise NotImplementedError("This should throw a really cool response")
@ -340,12 +349,17 @@ class KanidmClient:
# does a little bit of validation
auth_begin_object = AuthBeginResponse.model_validate(auth_begin.data)
auth_begin_object.response = auth_begin
return await self.auth_step_password(password=password, sessionid=sessionid)
return await self.auth_step_password(
password=password,
sessionid=sessionid,
update_internal_auth_token=update_internal_auth_token,
)
async def auth_step_password(
self,
sessionid: str,
sessionid: Optional[str] = None,
password: Optional[str] = None,
update_internal_auth_token: bool = False,
) -> AuthState:
"""does the password auth step"""
@ -356,8 +370,15 @@ class KanidmClient:
"Password has to be passed to auth_step_password or in self.password!"
)
if sessionid is not None:
headers = {"x-kanidm-auth-session-id": sessionid}
elif self.config.auth_token is not None:
headers = {"x-kanidm-auth-session-id": self.config.auth_token}
cred_auth = {"step": {"cred": {"password": password}}}
response = await self.call_post(path="/v1/auth", json=cred_auth)
response = await self.call_post(
path="/v1/auth", json=cred_auth, headers=headers
)
if response.status_code != 200:
# TODO: write test coverage auth_step_password raises AuthCredFailed
@ -367,6 +388,9 @@ class KanidmClient:
result = AuthState.model_validate(response.data)
result.response = response
if update_internal_auth_token:
self.config.auth_token = result.state.success
# pull the token out and set it
if result.state.success is None:
# TODO: write test coverage for AuthCredFailed
@ -374,13 +398,34 @@ class KanidmClient:
result.sessionid = result.state.success
return result
async def auth_as_anonymous(self) -> None:
"""authenticate as the anonymous user"""
init = await self.auth_init("anonymous", update_internal_auth_token=True)
logging.debug("auth_init completed, moving onto begin step")
await self.auth_begin(
method=init.state.choose[0], update_internal_auth_token=True
)
logging.debug("auth_begin completed, moving onto cred step")
cred_auth = {"step": {"cred": "anonymous"}}
if self.config.auth_token is None:
raise ValueError("Auth token is not set, auth failure!")
response = await self.call_post(
path=KANIDMURLS["auth"],
json=cred_auth,
)
state = AuthState.model_validate(response.data)
logging.debug("anonymous auth completed, setting token")
self.config.auth_token = state.state.success
def session_header(
self,
sessionid: str,
) -> Dict[str, str]:
"""create a headers dict from a session id"""
# TODO: perhaps allow session_header to take a dict and update it, too?
return {
"authorization": f"bearer {sessionid}",
}
@ -407,27 +452,9 @@ class KanidmClient:
grouplist = GroupList.model_validate(json_lib.loads(response.content))
return [group.as_groupinfo() for group in grouplist.root]
async def auth_as_anonymous(self) -> None:
"""authenticate as the anonymous user"""
async def idm_oauth2_rs_list(self) -> ClientResponse:
"""gets the list of oauth2 resource servers"""
endpoint = "/v1/oauth2"
init = await self.auth_init("anonymous", update_internal_auth_token=True)
logging.debug("auth_init completed, moving onto cred step")
await self.auth_begin(
method=init.state.choose[0], update_internal_auth_token=True
)
logging.debug("auth_begin completed, moving onto cred step")
cred_auth = {"step": {"cred": "anonymous"}}
headers = {}
if self.config.auth_token is None:
raise ValueError("Auth token is not set, auth failure!")
else:
headers["x-kanidm-auth-session-id"] = self.config.auth_token
response = await self.call_post(
path=KANIDMURLS["auth"],
json=cred_auth,
headers=headers,
)
state = AuthState.model_validate(response.data)
logging.debug("anonymous auth completed, setting token")
self.config.auth_token = state.state.success
resp = await self.call_get(endpoint)
return resp

View file

@ -20,7 +20,8 @@ class ClientResponse(BaseModel):
"""
content: Optional[str] = None
data: Optional[Dict[str, Any]] = None
# the data field is used for the json-parsed response
data: Optional[Any] = None
headers: Dict[str, Any]
status_code: int
model_config = ConfigDict(arbitrary_types_allowed=True)
@ -160,7 +161,7 @@ class KanidmClientConfig(BaseModel):
verify_hostnames: bool = True
verify_certificate: bool = True
ca_path: Optional[str] = None
ca_path: Optional[str] = Field(default=None, alias='verify_ca')
username: Optional[str] = None
password: Optional[str] = None

View file

@ -13,7 +13,7 @@ from kanidm.utils import load_config
logging.basicConfig(level=logging.DEBUG)
EXAMPLE_CONFIG_FILE = "../examples/config"
EXAMPLE_CONFIG_FILE = "../examples/config_localhost"
@pytest.fixture(scope="function")
@ -31,7 +31,7 @@ def test_load_config_file() -> None:
pytest.skip()
print("Loading config file")
config = load_config(EXAMPLE_CONFIG_FILE)
assert config.get("uri") == "https://idm.example.com"
assert config.get("uri") == "https://localhost:8443"
print(f"{config.get('uri')=}")
print(config)

View file

@ -0,0 +1,42 @@
import json
import logging
from kanidm import KanidmClient
import pytest
@pytest.fixture(scope="function")
async def client() -> KanidmClient:
"""sets up a client with a basic thing"""
return KanidmClient(config_file="../examples/config_localhost")
@pytest.mark.network
@pytest.mark.asyncio
async def test_idm_oauth2_rs_list(client: KanidmClient) -> None:
"""tests getting the list of oauth2 resource servers"""
logging.basicConfig(level=logging.DEBUG)
print(f"config: {client.config}")
username = "admin"
# change this to be your admin password.
password = "Ek7A0fShLsCTXgK2xDqC9TNUgPYQdVFB6RMGKXLyNtGL5cER"
auth_resp = await client.authenticate_password(username, password, update_internal_auth_token=True)
assert auth_resp.state.success is not None
resp = await client.idm_oauth2_rs_list()
print("content:")
print(json.dumps(resp.data, indent=4))
assert resp.status_code == 200
if resp.data is not None:
for oauth_rs in resp.data:
oauth2_rs_sup_scope_map = oauth_rs.get("attrs", {}).get("oauth2_rs_sup_scope_map", {})
for mapping in oauth2_rs_sup_scope_map:
print(f"oauth2_rs_sup_scope_map: {mapping}")
user, scopes = mapping.split(":")
scopes = scopes.replace("{", "[").replace("}", "]")
scopes = json.loads(scopes)
print(f"{user=} {scopes=}")

View file

@ -12,19 +12,6 @@ from kanidm.utils import load_config
EXAMPLE_CONFIG_FILE = "../examples/config"
def test_load_config_file() -> None:
"""tests that the file loads"""
if not Path(EXAMPLE_CONFIG_FILE).expanduser().resolve().exists():
print("Can't find client config file", file=sys.stderr)
pytest.skip()
config = load_config(EXAMPLE_CONFIG_FILE)
kanidm_config = KanidmClientConfig.model_validate(config)
assert kanidm_config.uri == "https://idm.example.com/"
print(f"{kanidm_config.uri=}")
print(kanidm_config)
def test_radius_groups() -> None:
"""testing loading a config file with radius groups defined"""