PyKanidm updates and testing (#2301)

* otel can eprintln kthx

* started python integration tests, features

* more tests more things

* adding heaps more things

* updating docs

* fixing python test

* fixing errors, updating integration test

* Add models for OAuth2, Person, ServiceAccount and add missing endpoints

* Alias Group to GroupInfo to keep it retrocompatible

* Fixed issues from review

* adding oauth2rs_get_basic_secret

* adding oauth2rs_get_basic_secret

* Fixed mypy issues

* adding more error logs

* updating test scripts and configs

* fixing tests and validating things

* more errors

---------

Co-authored-by: Dogeek <simon.bordeyne@gmail.com>
This commit is contained in:
James Hodgkinson 2024-01-31 13:27:43 +10:00 committed by GitHub
parent c8a9e2c9c6
commit c8bd1739f9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 1163 additions and 204 deletions

1
Cargo.lock generated
View file

@ -1152,6 +1152,7 @@ dependencies = [
"reqwest",
"sd-notify",
"serde",
"serde_json",
"sketching",
"tikv-jemallocator",
"tokio",

View file

@ -1,4 +1,5 @@
# This should be at /etc/kanidm/config or ~/.config/kanidm, and configures the kanidm command line tool
# to point at the local dev server and trust the CA security.
uri="https://localhost:8443"
verify_ca="/tmp/kanidm/ca.pem"
verify_ca=true
ca_path="/tmp/kanidm/ca.pem"

View file

@ -1,8 +1,10 @@
use crate::{ClientError, KanidmClient};
use kanidm_proto::constants::{
ATTR_DISPLAYNAME, ATTR_OAUTH2_ALLOW_INSECURE_CLIENT_DISABLE_PKCE,
ATTR_DISPLAYNAME, ATTR_ES256_PRIVATE_KEY_DER, ATTR_OAUTH2_ALLOW_INSECURE_CLIENT_DISABLE_PKCE,
ATTR_OAUTH2_ALLOW_LOCALHOST_REDIRECT, ATTR_OAUTH2_JWT_LEGACY_CRYPTO_ENABLE,
ATTR_OAUTH2_PREFER_SHORT_USERNAME, ATTR_OAUTH2_RS_NAME, ATTR_OAUTH2_RS_ORIGIN,
ATTR_OAUTH2_PREFER_SHORT_USERNAME, ATTR_OAUTH2_RS_BASIC_SECRET, ATTR_OAUTH2_RS_NAME,
ATTR_OAUTH2_RS_ORIGIN, ATTR_OAUTH2_RS_ORIGIN_LANDING, ATTR_OAUTH2_RS_TOKEN_KEY,
ATTR_RS256_PRIVATE_KEY_DER,
};
use kanidm_proto::internal::{ImageValue, Oauth2ClaimMapJoin};
use kanidm_proto::v1::Entry;
@ -105,27 +107,27 @@ impl KanidmClient {
}
if let Some(newlanding) = landing {
update_oauth2_rs.attrs.insert(
"oauth2_rs_origin_landing".to_string(),
ATTR_OAUTH2_RS_ORIGIN_LANDING.to_string(),
vec![newlanding.to_string()],
);
}
if reset_secret {
update_oauth2_rs
.attrs
.insert("oauth2_rs_basic_secret".to_string(), Vec::new());
.insert(ATTR_OAUTH2_RS_BASIC_SECRET.to_string(), Vec::new());
}
if reset_token_key {
update_oauth2_rs
.attrs
.insert("oauth2_rs_token_key".to_string(), Vec::new());
.insert(ATTR_OAUTH2_RS_TOKEN_KEY.to_string(), Vec::new());
}
if reset_sign_key {
update_oauth2_rs
.attrs
.insert("es256_private_key_der".to_string(), Vec::new());
.insert(ATTR_ES256_PRIVATE_KEY_DER.to_string(), Vec::new());
update_oauth2_rs
.attrs
.insert("rs256_private_key_der".to_string(), Vec::new());
.insert(ATTR_RS256_PRIVATE_KEY_DER.to_string(), Vec::new());
}
self.perform_patch_request(format!("/v1/oauth2/{}", id).as_str(), update_oauth2_rs)
.await

View file

@ -75,7 +75,7 @@ impl KanidmClient {
}
pub async fn idm_person_account_delete(&self, id: &str) -> Result<(), ClientError> {
self.perform_delete_request(["/v1/person/", id].concat().as_str())
self.perform_delete_request(format!("/v1/person/{}", id).as_str())
.await
}

View file

@ -122,6 +122,6 @@ impl Drop for TracingPipelineGuard {
fn drop(&mut self) {
opentelemetry::global::shutdown_tracer_provider();
opentelemetry::global::shutdown_logger_provider();
println!("Logging pipeline completed shutdown");
eprintln!("Logging pipeline completed shutdown");
}
}

View file

@ -16,7 +16,7 @@ mkdir -p "$pkgdir"
make release/kanidm release/kanidm-unixd release/kanidm-ssh
# enable the following block to include deployment specific configuration files
if [ 1 -eq 0 ]; then
if [ "${INCLUDE_CONFIG}" -eq 1 ]; then
mkdir -p deployment-config
# Customize the following heredocs according to the deployment

View file

@ -1,14 +1,28 @@
""" Kanidm python module """
from datetime import datetime
from functools import lru_cache
import json as json_lib # because we're taking a field "json" at various points
import logging
from logging import Logger, getLogger
import os
from pathlib import Path
import platform
import ssl
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Tuple, Union
import aiohttp
from pydantic import ValidationError
import yarl
from kanidm.models.group import Group, GroupList, IGroup, RawGroup
from kanidm.models.oauth2_rs import IOauth2Rs, OAuth2Rs, Oauth2RsList, RawOAuth2Rs
from kanidm.models.person import IPerson, Person, PersonList, RawPerson
from kanidm.models.service_account import (
IServiceAccount,
ServiceAccount,
RawServiceAccount,
ServiceAccountList,
)
from .exceptions import (
AuthBeginFailed,
@ -22,19 +36,32 @@ from .types import (
AuthInitResponse,
AuthState,
ClientResponse,
GroupInfo,
GroupList,
KanidmClientConfig,
)
from .utils import load_config
KANIDMURLS = {
"auth": "/v1/auth",
"person": "/v1/person",
"service_account": "/v1/person",
}
K_AUTH_SESSION_ID = "x-kanidm-auth-session-id"
TOKEN_PATH = Path("~/.cache/kanidm_tokens")
class Endpoints:
AUTH = "/v1/auth"
GROUP = "/v1/group"
OAUTH2 = "/v1/oauth2"
PERSON = "/v1/person"
SYSTEM = "/v1/system"
DOMAIN = "/v1/domain"
SERVICE_ACCOUNT = "/v1/service_account"
XDG_CACHE_HOME = (
Path(os.getenv("LOCALAPPDATA", "~/AppData/Local")) / "cache"
if platform.system() == "Windows"
else Path(os.getenv("XDG_CACHE_HOME", "~/.cache"))
)
TOKEN_PATH = XDG_CACHE_HOME / "kanidm_tokens"
CallJsonType = Optional[Union[Dict[str, Any], List[Any], Tuple[str, str]]]
class KanidmClient:
@ -58,22 +85,29 @@ class KanidmClient:
uri: Optional[str] = None,
verify_hostnames: bool = True,
verify_certificate: bool = True,
verify_ca: bool = True,
ca_path: Optional[str] = None,
token: Optional[str] = None,
logger: Optional[Logger] = None,
) -> None:
"""Constructor for KanidmClient"""
self.logger = logger or getLogger(__name__)
if config is not None:
self.config = config
else:
self.config = KanidmClientConfig(
uri=uri,
verify_hostnames=verify_hostnames,
verify_certificate=verify_certificate,
ca_path=ca_path,
auth_token=token,
self.config = KanidmClientConfig.model_validate(
{
"uri": uri,
"verify_hostnames": verify_hostnames,
"verify_certificate": verify_certificate,
"verify_ca": verify_ca,
"ca_path": ca_path,
"auth_token": token,
}
)
self.logger.debug(self.config)
if config_file is not None:
if not isinstance(config_file, Path):
@ -97,7 +131,7 @@ class KanidmClient:
and not Path(self.config.ca_path).expanduser().resolve().exists()
):
raise FileNotFoundError(f"CA Path not found: {self.config.ca_path}")
logging.debug(
self.logger.debug(
"Setting up SSL context with CA path: %s", self.config.ca_path
)
self._ssl = ssl.create_default_context(cafile=self.config.ca_path)
@ -120,7 +154,7 @@ class KanidmClient:
async def check_token_valid(self, token: Optional[str] = None) -> bool:
"""checks if a given token is valid, or the local one if you don't pass it"""
url = "/v1/auth/valid"
endpoint = f"{Endpoints.AUTH}/valid"
if token is not None:
headers = {
"authorization": f"Bearer {token}",
@ -128,8 +162,8 @@ class KanidmClient:
}
else:
headers = None
result = await self.call_get(url, headers=headers)
logging.debug(result)
result = await self.call_get(endpoint, headers=headers)
self.logger.debug(result)
if result.status_code == 200:
return True
return False
@ -157,9 +191,9 @@ class KanidmClient:
path: str,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
json: Optional[Dict[str, str]] = None,
json: CallJsonType = None,
params: Optional[Dict[str, str]] = None,
) -> ClientResponse:
) -> ClientResponse[Any]:
if timeout is None:
timeout = self.config.connect_timeout
# if we have a token set, we send it.
@ -168,7 +202,7 @@ class KanidmClient:
headers = self._token_headers
elif headers.get("authorization") is None:
headers.update(self._token_headers)
logging.debug(
self.logger.debug(
"_call method=%s to %s, headers=%s",
method,
self.get_path_uri(path),
@ -187,31 +221,50 @@ class KanidmClient:
ssl=self._ssl,
) as request:
content = await request.content.read()
try:
response_json = json_lib.loads(content)
response_headers = dict(request.headers)
response_status = request.status
except json_lib.JSONDecodeError as json_error:
logging.error("Failed to JSON Decode Response: %s", json_error)
logging.error("Response data: %s", content)
response_json = None
if len(content) > 0:
try:
response_json = json_lib.loads(content)
response_headers = dict(request.headers)
response_status = request.status
except json_lib.JSONDecodeError as json_error:
self.logger.error(
"Failed to JSON Decode Response: %s", json_error
)
self.logger.error("Response data: %s", content)
response_json = None
else:
response_json = {}
response_input = {
"data": response_json,
"content": content.decode("utf-8"),
"headers": response_headers,
"status_code": response_status,
}
logging.debug(json_lib.dumps(response_input, default=str, indent=4))
response = ClientResponse.model_validate(response_input)
self.logger.debug(json_lib.dumps(response_input, default=str, indent=4))
response: ClientResponse[Any] = ClientResponse.model_validate(
response_input
)
return response
async def call_delete(
self,
path: str,
headers: Optional[Dict[str, str]] = None,
json: CallJsonType = None,
timeout: Optional[int] = None,
) -> ClientResponse[Any]:
"""does a DELETE call to the server"""
return await self._call(
method="DELETE", path=path, headers=headers, json=json, timeout=timeout
)
async def call_get(
self,
path: str,
headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
) -> ClientResponse:
) -> ClientResponse[Any]:
"""does a get call to the server"""
return await self._call("GET", path, headers, timeout, params=params)
@ -219,50 +272,76 @@ class KanidmClient:
self,
path: str,
headers: Optional[Dict[str, str]] = None,
json: Optional[Dict[str, Any]] = None,
json: CallJsonType = None,
timeout: Optional[int] = None,
) -> ClientResponse:
"""does a get call to the server"""
) -> ClientResponse[Any]:
"""does a POST call to the server"""
return await self._call(
method="POST", path=path, headers=headers, json=json, timeout=timeout
)
async def call_patch(
self,
path: str,
headers: Optional[Dict[str, str]] = None,
json: CallJsonType = None,
timeout: Optional[int] = None,
) -> ClientResponse[Any]:
"""does a PATCH call to the server"""
return await self._call(
method="PATCH", path=path, headers=headers, json=json, timeout=timeout
)
async def call_put(
self,
path: str,
headers: Optional[Dict[str, str]] = None,
json: CallJsonType = None,
timeout: Optional[int] = None,
) -> ClientResponse[Any]:
"""does a PUT call to the server"""
return await self._call(
method="PUT", path=path, headers=headers, json=json, timeout=timeout
)
async def auth_init(
self, username: str, update_internal_auth_token: bool = False
) -> AuthInitResponse:
"""init step, starts the auth session, sets the class-local session ID"""
init_auth = {"step": {"init": username}}
self.logger.debug("auth_init called")
response = await self.call_post(
path=KANIDMURLS["auth"],
path=Endpoints.AUTH,
json=init_auth,
)
if response.status_code != 200:
logging.debug(
self.logger.debug(
"Failed to authenticate, response from server: %s",
response.content,
)
# TODO: mock test auth_init raises AuthInitFailed
raise AuthInitFailed(response.content)
if "x-kanidm-auth-session-id" not in response.headers:
logging.debug("response.content: %s", response.content)
logging.debug("response.headers: %s", response.headers)
if K_AUTH_SESSION_ID not in response.headers:
self.logger.debug("response.content: %s", response.content)
self.logger.debug("response.headers: %s", response.headers)
raise ValueError(
f"Missing x-kanidm-auth-session-id header in init auth response: {response.headers}"
f"Missing {K_AUTH_SESSION_ID} header in init auth response: {response.headers}"
)
else:
self.config.auth_token = response.headers["x-kanidm-auth-session-id"]
self.config.auth_token = response.headers[K_AUTH_SESSION_ID]
data = getattr(response, "data", {})
data["response"] = response
data["response"] = response.model_dump()
retval = AuthInitResponse.model_validate(data)
if update_internal_auth_token:
self.config.auth_token = response.headers.get(
"x-kanidm-auth-session-id", ""
)
self.config.auth_token = response.headers.get(K_AUTH_SESSION_ID, "")
return retval
async def auth_begin(
@ -270,7 +349,7 @@ class KanidmClient:
method: str,
sessionid: Optional[str] = None,
update_internal_auth_token: bool = False,
) -> ClientResponse:
) -> ClientResponse[Any]:
"""the 'begin' step"""
begin_auth = {
@ -280,13 +359,12 @@ class KanidmClient:
}
if sessionid is not None:
headers = {"x-kanidm-auth-session-id": sessionid}
else:
if self.config.auth_token is not None:
headers = {"x-kanidm-auth-session-id": self.config.auth_token}
headers = {K_AUTH_SESSION_ID: sessionid}
elif self.config.auth_token is not None:
headers = {K_AUTH_SESSION_ID: self.config.auth_token}
response = await self.call_post(
KANIDMURLS["auth"],
Endpoints.AUTH,
json=begin_auth,
headers=headers,
)
@ -294,21 +372,17 @@ class KanidmClient:
# TODO: mock test for auth_begin raises AuthBeginFailed
raise AuthBeginFailed(response.content)
if response.data is not None:
response.data["sessionid"] = response.headers.get(
"x-kanidm-auth-session-id", ""
)
response.data["sessionid"] = response.headers.get(K_AUTH_SESSION_ID, "")
if update_internal_auth_token:
self.config.auth_token = response.headers.get(
"x-kanidm-auth-session-id", ""
)
self.config.auth_token = response.headers.get(K_AUTH_SESSION_ID, "")
logging.debug(json_lib.dumps(response.data, indent=4))
self.logger.debug(json_lib.dumps(response.data, indent=4))
try:
retobject = AuthBeginResponse.model_validate(response.data)
except ValidationError as exc:
logging.debug(repr(exc.errors()[0]))
self.logger.debug(repr(exc.errors()[0]))
raise exc
retobject.response = response
@ -339,7 +413,7 @@ class KanidmClient:
if auth_init.response is None:
raise NotImplementedError("This should throw a really cool response")
sessionid = auth_init.response.headers["x-kanidm-auth-session-id"]
sessionid = auth_init.response.headers[K_AUTH_SESSION_ID]
if len(auth_init.state.choose) == 0:
# there's no mechanisms at all - bail
@ -362,7 +436,7 @@ class KanidmClient:
update_internal_auth_token: bool = False,
) -> AuthState:
"""does the password auth step"""
self.logger.debug("auth_step_password called")
if password is None:
password = self.config.password
if password is None:
@ -371,23 +445,25 @@ class KanidmClient:
)
if sessionid is not None:
headers = {"x-kanidm-auth-session-id": sessionid}
headers = {K_AUTH_SESSION_ID: sessionid}
elif self.config.auth_token is not None:
headers = {"x-kanidm-auth-session-id": self.config.auth_token}
headers = {K_AUTH_SESSION_ID: self.config.auth_token}
cred_auth = {"step": {"cred": {"password": password}}}
response = await self.call_post(
path="/v1/auth", json=cred_auth, headers=headers
path=Endpoints.AUTH, json=cred_auth, headers=headers
)
if response.status_code != 200:
# TODO: write test coverage auth_step_password raises AuthCredFailed
logging.debug("Failed to authenticate, response: %s", response.content)
self.logger.debug("Failed to authenticate, response: %s", response.content)
raise AuthCredFailed("Failed password authentication!")
result = AuthState.model_validate(response.data)
result.response = response
if result.state is None:
raise AuthCredFailed
if update_internal_auth_token:
self.config.auth_token = result.state.success
@ -399,39 +475,51 @@ class KanidmClient:
return result
async def auth_as_anonymous(self) -> None:
"""authenticate as the anonymous user"""
"""Authenticate as the anonymous user"""
auth_init = await self.auth_init("anonymous", update_internal_auth_token=True)
init = await self.auth_init("anonymous", update_internal_auth_token=True)
logging.debug("auth_init completed, moving onto begin step")
await self.auth_begin(
method=init.state.choose[0], update_internal_auth_token=True
method=auth_init.state.choose[0],
update_internal_auth_token=True,
)
logging.debug("auth_begin completed, moving onto cred step")
cred_auth = {"step": {"cred": "anonymous"}}
cred_auth = {
"step": {"cred": "anonymous"},
}
if self.config.auth_token is None:
raise AuthBeginFailed
headers = {
K_AUTH_SESSION_ID: self.config.auth_token,
}
if self.config.auth_token is None:
raise ValueError("Auth token is not set, auth failure!")
response = await self.call_post(
path=KANIDMURLS["auth"],
path=Endpoints.AUTH,
json=cred_auth,
headers=headers,
)
state = AuthState.model_validate(response.data)
logging.debug("anonymous auth completed, setting token")
self.logger.debug("anonymous auth completed, setting token")
if state.state is None:
raise AuthCredFailed
self.config.auth_token = state.state.success
def session_header(
self,
sessionid: str,
) -> Dict[str, str]:
"""create a headers dict from a session id"""
"""Create a headers dict from a session id"""
# TODO: perhaps allow session_header to take a dict and update it, too?
return {
"authorization": f"bearer {sessionid}",
}
# TODO: write tests for get_groups
async def get_radius_token(self, username: str) -> ClientResponse:
async def get_radius_token(self, username: str) -> ClientResponse[Any]:
"""does the call to the radius token endpoint"""
path = f"/v1/account/{username}/_radius/_token"
response = await self.call_get(path)
@ -441,20 +529,485 @@ class KanidmClient:
)
return response
async def oauth2_rs_list(self) -> List[OAuth2Rs]:
"""gets the list of oauth2 resource servers"""
response = await self.call_get(Endpoints.OAUTH2)
if response.data is None:
return []
if response.status_code != 200:
raise ValueError(
f"Failed to get oauth2 resource servers: {response.content}"
)
oauth2_rs_list = Oauth2RsList.model_validate(response.data)
return [oauth2_rs.as_oauth2_rs for oauth2_rs in oauth2_rs_list.root]
async def oauth2_rs_get(self, rs_name: str) -> OAuth2Rs:
"""get an OAuth2 client"""
endpoint = f"{Endpoints.OAUTH2}/{rs_name}"
response: ClientResponse[IOauth2Rs] = await self.call_get(endpoint)
if response.status_code != 200:
raise ValueError(
f"Failed to get oauth2 resource server: {response.content}"
)
if response.data is None:
raise ValueError(
f"Failed to get oauth2 resource server: {response.content}"
)
return RawOAuth2Rs(**response.data).as_oauth2_rs
async def oauth2_rs_secret_get(self, rs_name: str) -> str:
"""get an OAuth2 client secret"""
endpoint = f"{Endpoints.OAUTH2}/{rs_name}/_basic_secret"
response: ClientResponse[str] = await self.call_get(endpoint)
if response.status_code != 200:
raise ValueError(
f"Failed to get oauth2 resource server secret: {response.content}"
)
return response.data or ""
async def oauth2_rs_delete(self, rs_name: str) -> ClientResponse[None]:
"""delete an oauth2 resource server"""
endpoint = f"{Endpoints.OAUTH2}/{rs_name}"
return await self.call_delete(endpoint)
async def oauth2_rs_basic_create(
self, rs_name: str, displayname: str, origin: str
) -> ClientResponse[None]:
"""Create a basic OAuth2 RS"""
self._validate_is_valid_origin_url(origin)
endpoint = f"{Endpoints.OAUTH2}/_basic"
payload = {
"attrs": {
"oauth2_rs_name": [rs_name],
"oauth2_rs_origin": [origin],
"displayname": [displayname],
}
}
return await self.call_post(endpoint, json=payload)
@classmethod
def _validate_is_valid_origin_url(cls, url: str) -> None:
"""Check if it's HTTPS and a valid URL as far as we can tell"""
parsed_url = yarl.URL(url)
if parsed_url.scheme not in ["http", "https"]:
raise ValueError(
f"Invalid scheme: {parsed_url.scheme} for origin URL: {url}"
)
if parsed_url.host is None:
raise ValueError(f"Empty/invalid host for origin URL: {url}")
if parsed_url.user is not None:
raise ValueError(f"Can't have username in origin URL: {url}")
if parsed_url.password is not None:
raise ValueError(f"Can't have password in origin URL: {url}")
async def service_account_list(self) -> List[ServiceAccount]:
"""List service accounts"""
response = await self.call_get(Endpoints.SERVICE_ACCOUNT)
if response.content is None:
return []
if response.status_code != 200:
raise ValueError(f"Failed to get service accounts: {response.content}")
service_account_list = ServiceAccountList.model_validate(
json_lib.loads(response.content)
)
return [
service_account.as_service_account
for service_account in service_account_list.root
]
async def service_account_get(self, name: str) -> ServiceAccount:
"""Get a service account"""
endpoint = f"{Endpoints.SERVICE_ACCOUNT}/{name}"
response: ClientResponse[IServiceAccount] = await self.call_get(endpoint)
if response.status_code != 200:
raise ValueError(f"Failed to get service account: {response.content}")
if response.data is None:
raise ValueError(f"Failed to get service account: {response.content}")
return RawServiceAccount(**response.data).as_service_account
async def service_account_create(
self, name: str, displayname: str
) -> ClientResponse[None]:
"""Create a service account"""
endpoint = f"{Endpoints.SERVICE_ACCOUNT}"
payload = {
"attrs": {
"name": [name],
"displayname": [
displayname,
],
}
}
return await self.call_post(endpoint, json=payload)
async def service_account_delete(self, name: str) -> ClientResponse[None]:
"""Create a service account"""
endpoint = f"{Endpoints.SERVICE_ACCOUNT}/{name}"
return await self.call_delete(endpoint)
async def service_account_post_ssh_pubkey(
self,
id: str,
tag: str,
pubkey: str,
) -> ClientResponse[None]:
payload = (tag, pubkey)
return await self.call_post(
f"{Endpoints.SERVICE_ACCOUNT}/{id}/_ssh_pubkeys",
json=payload,
)
async def service_account_delete_ssh_pubkey(
self, id: str, tag: str
) -> ClientResponse[None]:
return await self.call_delete(
f"{Endpoints.SERVICE_ACCOUNT}/{id}/_ssh_pubkeys/{tag}"
)
async def service_account_generate_api_token(
self, account_id: str, label: str, expiry: str, read_write: bool = False
) -> ClientResponse[None]:
"""Create a service account API token, expiry needs to be in RFC3339 format."""
# parse the expiry as rfc3339
try:
datetime.strptime(expiry, "%Y-%m-%dT%H:%M:%SZ")
except Exception as error:
raise ValueError(
f"Failed to parse expiry from {expiry} (needs to be RFC3339 format): {error}"
)
payload = {
"label": label,
"expiry": expiry,
"read_write": read_write,
}
endpoint = f"{Endpoints.SERVICE_ACCOUNT}/{account_id}/_token"
return await self.call_post(endpoint, json=payload)
async def service_account_destroy_api_token(
self,
id: str,
token_id: str,
) -> ClientResponse[None]:
endpoint = f"{Endpoints.SERVICE_ACCOUNT}/{id}/_api_token/{token_id}"
return await self.call_delete(endpoint)
async def get_groups(self) -> List[Group]:
"""Lists all groups"""
# For compatibility reasons
# TODO: delete this method
return await self.group_list()
# TODO: write tests for get_groups
async def get_groups(self) -> List[GroupInfo]:
# Renamed to keep it consistent with the rest of the Client
async def group_list(self) -> List[Group]:
"""does the call to the group endpoint"""
response = await self.call_get("/v1/group")
response = await self.call_get(Endpoints.GROUP)
if response.content is None:
return []
if response.status_code != 200:
raise ValueError(f"Failed to get groups: {response.content}")
grouplist = GroupList.model_validate(json_lib.loads(response.content))
return [group.as_groupinfo() for group in grouplist.root]
return [group.as_group for group in grouplist.root]
async def idm_oauth2_rs_list(self) -> ClientResponse:
"""gets the list of oauth2 resource servers"""
endpoint = "/v1/oauth2"
async def group_get(self, name: str) -> Group:
"""Get a group"""
endpoint = f"{Endpoints.GROUP}/{name}"
response: ClientResponse[IGroup] = await self.call_get(endpoint)
if response.status_code != 200:
raise ValueError(f"Failed to get group: {response.content}")
if response.data is None:
raise ValueError(f"Failed to get group: {response.content}")
return RawGroup(**response.data).as_group
resp = await self.call_get(endpoint)
return resp
async def group_create(self, name: str) -> ClientResponse[None]:
"""Create a group"""
payload = {"attrs": {"name": [name]}}
return await self.call_post(Endpoints.GROUP, json=payload)
async def group_delete(self, name: str) -> ClientResponse[None]:
"""Delete a group"""
endpoint = f"{Endpoints.GROUP}/{name}"
return await self.call_delete(endpoint)
async def group_set_members(
self, id: str, members: List[str]
) -> ClientResponse[None]:
"""Set group member list"""
endpoint = f"{Endpoints.GROUP}/{id}/_attr/member"
return await self.call_put(endpoint, json=members)
async def group_add_members(
self, id: str, members: List[str]
) -> ClientResponse[None]:
"""Add members to a group"""
endpoint = f"{Endpoints.GROUP}/{id}/_attr/member"
return await self.call_post(endpoint, json=members)
async def group_delete_members(
self, id: str, members: List[str]
) -> ClientResponse[None]:
"""Remove members from a group"""
endpoint = f"{Endpoints.GROUP}/{id}/_attr/member"
return await self.call_delete(endpoint, json=members)
async def person_account_list(self) -> List[Person]:
"""List all people"""
response = await self.call_get(Endpoints.PERSON)
if response.content is None:
return []
if response.status_code != 200:
raise ValueError(f"Failed to get people: {response.content}")
personlist = PersonList.model_validate(json_lib.loads(response.content))
return [person.as_person for person in personlist.root]
async def person_account_get(self, name: str) -> Person:
"""Get a person by name"""
endpoint = f"{Endpoints.PERSON}/{name}"
response: ClientResponse[IPerson] = await self.call_get(endpoint)
if response.status_code != 200:
raise ValueError(f"Failed to get person: {response.content}")
if response.data is None:
raise ValueError(f"Failed to get person: {response.content}")
return RawPerson(**response.data).as_person
async def person_account_create(
self, name: str, displayname: str
) -> ClientResponse[None]:
"""Create a person account"""
payload = {
"attrs": {
"name": [name],
"displayname": [displayname],
}
}
return await self.call_post(Endpoints.PERSON, json=payload)
async def person_account_update(
self,
id: str,
newname: Optional[str] = None,
displayname: Optional[str] = None,
legalname: Optional[str] = None,
mail: Optional[List[str]] = None,
) -> ClientResponse[None]:
"""Update details of a person"""
endpoint = f"{Endpoints.PERSON}/{id}"
attrs = {}
if newname is not None:
attrs["name"] = [newname]
if displayname is not None:
attrs["displayname"] = [displayname]
if legalname is not None:
attrs["legalname"] = [legalname]
if mail is not None:
attrs["mail"] = mail
if not attrs:
raise ValueError("You need to specify something to update!")
return await self.call_patch(endpoint, json={"attrs": attrs})
async def person_account_delete(self, id: str) -> ClientResponse[None]:
"""Delete a person"""
endpoint = f"{Endpoints.PERSON}/{id}"
return await self.call_delete(endpoint)
async def person_account_post_ssh_key(
self, id: str, tag: str, pubkey: str
) -> ClientResponse[None]:
"""Create an SSH key for a user"""
endpoint = f"{Endpoints.PERSON}/{id}/_ssh_pubkeys"
payload = (tag, pubkey)
return await self.call_post(endpoint, json=payload)
async def person_account_delete_ssh_key(
self, id: str, tag: str
) -> ClientResponse[None]:
"""Delete an SSH key for a user"""
endpoint = f"{Endpoints.PERSON}/{id}/_ssh_pubkeys/{tag}"
return await self.call_delete(endpoint)
async def group_account_policy_enable(self, id: str) -> ClientResponse[None]:
"""Enable account policy for a group"""
endpoint = f"{Endpoints.GROUP}/{id}/_attr/class"
payload = ["account_policy"]
return await self.call_post(endpoint, json=payload)
async def group_account_policy_authsession_expiry_set(
self,
id: str,
expiry: int,
) -> ClientResponse[None]:
"""set the account policy authenticated session expiry length (seconds) for a group"""
endpoint = f"{Endpoints.GROUP}/{id}/_attr/authsession_expiry"
payload = [expiry]
return await self.call_put(endpoint, json=payload)
async def group_account_policy_password_minimum_length_set(
self, id: str, minimum_length: int
) -> ClientResponse[None]:
"""set the account policy password minimum length for a group"""
endpoint = f"{Endpoints.GROUP}/{id}/_attr/auth_password_minimum_length"
payload = [minimum_length]
return await self.call_put(endpoint, json=payload)
async def group_account_policy_privilege_expiry_set(
self, id: str, expiry: int
) -> ClientResponse[None]:
"""set the account policy privilege expiry for a group"""
endpoint = f"{Endpoints.GROUP}/{id}/_attr/privilege_expiry"
payload = [expiry]
return await self.call_put(endpoint, json=payload)
async def system_password_badlist_get(self) -> List[str]:
"""Get the password badlist"""
response = await self.call_get("/v1/system/_attr/badlist_password")
badlist: Optional[List[str]] = response.data
if badlist is None:
return []
return badlist
async def system_password_badlist_append(
self, new_passwords: List[str]
) -> ClientResponse[None]:
"""Add new items to the password badlist"""
return await self.call_post(
"/v1/system/_attr/badlist_password", json=new_passwords
)
async def system_password_badlist_remove(
self, items: List[str]
) -> ClientResponse[None]:
"""Remove items from the password badlist"""
return await self.call_delete("/v1/system/_attr/badlist_password", json=items)
async def system_denied_names_get(self) -> List[str]:
"""Get the denied names list"""
response: Optional[List[str]] = (
await self.call_get("/v1/system/_attr/denied_name")
).data
if response is None:
return []
return response
async def system_denied_names_append(
self, names: List[str]
) -> ClientResponse[None]:
"""Add items to the denied names list"""
return await self.call_post("/v1/system/_attr/denied_name", json=names)
async def system_denied_names_remove(
self, names: List[str]
) -> ClientResponse[None]:
"""Remove items from the denied names list"""
return await self.call_delete("/v1/system/_attr/denied_name", json=names)
async def domain_set_display_name(
self,
new_display_name: str,
) -> ClientResponse[None]:
"""Set the Domain Display Name - this requires admin privs"""
return await self.call_put(
f"{Endpoints.DOMAIN}/_attr/domain_display_name",
json=[new_display_name],
)
async def domain_set_ldap_basedn(self, new_basedn: str) -> ClientResponse[None]:
"""Set the domain LDAP base DN."""
return await self.call_put(
f"{Endpoints.DOMAIN}/_attr/domain_ldap_basedn",
json=[new_basedn],
)
async def oauth2_rs_get_basic_secret(self, rs_name: str) -> ClientResponse[Any]:
"""get the basic secret for an OAuth2 resource server"""
endpoint = f"/v1/oauth2/{rs_name}/_basic_secret"
return await self.call_get(endpoint)
async def oauth2_rs_update(
self,
id: str,
name: Optional[str] = None,
displayname: Optional[str] = None,
origin: Optional[str] = None,
landing: Optional[str] = None,
reset_secret: bool = False,
reset_token_key: bool = False,
reset_sign_key: bool = False,
) -> ClientResponse[None]:
"""Update an OAuth2 Resource Server"""
attrs = {}
if name is not None:
attrs["name"] = [name]
if displayname is not None:
attrs["displayname"] = [displayname]
if origin is not None:
attrs["oauth2_rs_origin"] = [origin]
if landing is not None:
attrs["oauth2_rs_landing"] = [landing]
if reset_secret:
attrs["oauth2_rs_basic_secret"] = []
if reset_token_key:
attrs["oauth2_rs_token_key"] = []
if reset_sign_key:
attrs["es256_private_key_der"] = []
attrs["rs256_private_key_der"] = []
if not attrs:
raise ValueError("You need to set something to change!")
endpoint = f"{Endpoints.OAUTH2}/{id}"
payload = {"attrs": attrs}
return await self.call_patch(endpoint, json=payload)
async def oauth2_rs_update_scope_map(
self, id: str, group: str, scopes: List[str]
) -> ClientResponse[None]:
"""Update an OAuth2 scope map"""
endpoint = f"{Endpoints.OAUTH2}/{id}/_scopemap/{group}"
return await self.call_post(endpoint, json=scopes)
async def oauth2_rs_delete_scope_map(
self,
id: str,
group: str,
) -> ClientResponse[None]:
"""Delete an OAuth2 scope map"""
return await self.call_delete(f"{Endpoints.OAUTH2}/{id}/_scopemap/{group}")
async def oauth2_rs_update_sup_scope_map(
self, id: str, group: str, scopes: List[str]
) -> ClientResponse[None]:
"""Update an OAuth2 supplemental scope map"""
endpoint = f"{Endpoints.OAUTH2}/{id}/_sup_scopemap/{group}"
return await self.call_post(endpoint, json=scopes)
async def oauth2_rs_delete_sup_scope_map(
self,
id: str,
group: str,
) -> ClientResponse[None]:
"""Delete an OAuth2 supplemental scope map"""
return await self.call_delete(f"{Endpoints.OAUTH2}/{id}/_sup_scopemap/{group}")

View file

View file

@ -0,0 +1,61 @@
# pylint: disable=too-few-public-methods
# ^ disabling this because pydantic models don't have public methods
from typing import Dict, List, Optional, TypedDict
from pydantic import ConfigDict, BaseModel, RootModel
class Group(BaseModel):
"""nicer"""
name: str
dynmember: List[str]
member: List[str]
spn: str
uuid: str
# posix-enabled group
gidnumber: Optional[int]
def has_member(self, member: str) -> bool:
"""check if a member is in the group"""
return member in self.member or member in self.dynmember
class RawGroup(BaseModel):
"""group information as it comes back from the API"""
attrs: Dict[str, List[str]]
model_config = ConfigDict(arbitrary_types_allowed=True)
@property
def as_group(self) -> Group:
"""return it as the GroupInfo object which has nicer fields"""
required_fields = ("name", "uuid", "spn")
for field in required_fields:
if field not in self.attrs:
raise ValueError(f"Missing field {field} in {self.attrs}")
if len(self.attrs[field]) == 0:
raise ValueError(f"Empty field {field} in {self.attrs}")
# we want either the first element of gidnumber_field, or None
gidnumber_field = self.attrs.get("gidnumber", [])
gidnumber: Optional[int] = None
if len(gidnumber_field) > 0:
gidnumber = int(gidnumber_field[0])
return Group(
name=self.attrs["name"][0],
uuid=self.attrs["uuid"][0],
spn=self.attrs["spn"][0],
member=self.attrs.get("member", []),
dynmember=self.attrs.get("dynmember", []),
gidnumber=gidnumber,
)
GroupList = RootModel[List[RawGroup]]
class IGroup(TypedDict):
attrs: Dict[str, List[str]]

View file

@ -0,0 +1,56 @@
# pylint: disable=too-few-public-methods
# ^ disabling this because pydantic models don't have public methods
from typing import Dict, List, TypedDict
from pydantic import BaseModel, ConfigDict, RootModel
class OAuth2Rs(BaseModel):
classes: List[str]
displayname: str
es256_private_key_der: str
oauth2_rs_basic_secret: str
oauth2_rs_name: str
oauth2_rs_origin: str
oauth2_rs_token_key: str
oauth2_rs_sup_scope_map: List[str]
class RawOAuth2Rs(BaseModel):
attrs: Dict[str, List[str]]
model_config = ConfigDict(arbitrary_types_allowed=True)
@property
def as_oauth2_rs(self) -> OAuth2Rs:
"""return it as the Person object which has nicer fields"""
required_fields = (
"displayname",
"es256_private_key_der",
"oauth2_rs_basic_secret",
"oauth2_rs_name",
"oauth2_rs_origin",
"oauth2_rs_token_key",
)
for field in required_fields:
if field not in self.attrs:
raise ValueError(f"Missing field {field} in {self.attrs}")
if len(self.attrs[field]) == 0:
raise ValueError(f"Empty field {field} in {self.attrs}")
return OAuth2Rs(
classes=self.attrs["class"],
displayname=self.attrs["displayname"][0],
es256_private_key_der=self.attrs["es256_private_key_der"][0],
oauth2_rs_basic_secret=self.attrs["oauth2_rs_basic_secret"][0],
oauth2_rs_name=self.attrs["oauth2_rs_name"][0],
oauth2_rs_origin=self.attrs["oauth2_rs_origin"][0],
oauth2_rs_token_key=self.attrs["oauth2_rs_token_key"][0],
oauth2_rs_sup_scope_map=self.attrs.get("oauth2_rs_sup_scope_map", []),
)
Oauth2RsList = RootModel[List[RawOAuth2Rs]]
class IOauth2Rs(TypedDict):
attrs: Dict[str, List[str]]

View file

@ -0,0 +1,45 @@
# pylint: disable=too-few-public-methods
# ^ disabling this because pydantic models don't have public methods
from typing import Dict, List, TypedDict
from uuid import UUID
from pydantic import BaseModel, ConfigDict, RootModel
class Person(BaseModel):
classes: List[str]
displayname: str
memberof: List[str]
name: str
spn: str
uuid: UUID
class RawPerson(BaseModel):
attrs: Dict[str, List[str]]
model_config = ConfigDict(arbitrary_types_allowed=True)
@property
def as_person(self) -> Person:
"""return it as the Person object which has nicer fields"""
required_fields = ("name", "uuid", "spn", "displayname")
for field in required_fields:
if field not in self.attrs:
raise ValueError(f"Missing field {field} in {self.attrs}")
if len(self.attrs[field]) == 0:
raise ValueError(f"Empty field {field} in {self.attrs}")
return Person(
classes=self.attrs["class"],
displayname=self.attrs["displayname"][0],
memberof=self.attrs.get("memberof", []),
name=self.attrs["name"][0],
spn=self.attrs["spn"][0],
uuid=UUID(self.attrs["uuid"][0]),
)
PersonList = RootModel[List[RawPerson]]
class IPerson(TypedDict):
attrs: Dict[str, List[str]]

View file

@ -0,0 +1,47 @@
# pylint: disable=too-few-public-methods
# ^ disabling this because pydantic models don't have public methods
from typing import Dict, List, TypedDict
from uuid import UUID
from pydantic import ConfigDict, BaseModel, RootModel
class ServiceAccount(BaseModel):
"""nicer"""
classes: List[str]
displayname: str
memberof: List[str]
name: str
spn: str
uuid: UUID
class RawServiceAccount(BaseModel):
"""service account information as it comes back from the API"""
attrs: Dict[str, List[str]]
model_config = ConfigDict(arbitrary_types_allowed=True)
@property
def as_service_account(self) -> ServiceAccount:
"""return it as the Person object which has nicer fields"""
required_fields = ("displayname", "uuid", "spn", "name")
for field in required_fields:
if field not in self.attrs:
raise ValueError(f"Missing field {field} in {self.attrs}")
return ServiceAccount(
classes=self.attrs["class"],
displayname=self.attrs["displayname"][0],
memberof=self.attrs.get("memberof", []),
name=self.attrs["name"][0],
spn=self.attrs["spn"][0],
uuid=UUID(self.attrs["uuid"][0]),
)
ServiceAccountList = RootModel[List[RawServiceAccount]]
class IServiceAccount(TypedDict):
attrs: Dict[str, List[str]]

View file

@ -4,14 +4,17 @@
from ipaddress import IPv4Address, IPv6Address, IPv6Network, IPv4Network
import socket
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Generic, TypeVar
from urllib.parse import urlparse
from pydantic import field_validator, ConfigDict, BaseModel, Field, RootModel
from pydantic import field_validator, ConfigDict, BaseModel, Field
import toml
class ClientResponse(BaseModel):
T = TypeVar("T")
class ClientResponse(BaseModel, Generic[T]):
"""response from an API call, includes the following fields:
content: Optional[str]
data: Optional[Dict[str, Any]]
@ -21,7 +24,7 @@ class ClientResponse(BaseModel):
content: Optional[str] = None
# the data field is used for the json-parsed response
data: Optional[Any] = None
data: Optional[T] = None
headers: Dict[str, Any]
status_code: int
model_config = ConfigDict(arbitrary_types_allowed=True)
@ -38,7 +41,7 @@ class AuthInitResponse(BaseModel):
sessionid: str
state: _AuthInitState
response: Optional[ClientResponse] = None
response: Optional[ClientResponse[Any]] = None
# model_config = ConfigDict(arbitrary_types_allowed=True)
@ -58,7 +61,7 @@ class AuthBeginResponse(BaseModel):
# this should be pulled from the response headers as x-kanidm-auth-session-id
sessionid: Optional[str]
state: _AuthBeginState
response: Optional[ClientResponse] = None
response: Optional[ClientResponse[Any]] = None
model_config = ConfigDict(arbitrary_types_allowed=True)
@ -70,9 +73,9 @@ class AuthState(BaseModel):
success: Optional[str] = None
state: _InternalState
state: Optional[_InternalState] = Field(_InternalState(success=None))
sessionid: Optional[str] = None
response: Optional[ClientResponse] = None
response: Optional[ClientResponse[Any]] = None
model_config = ConfigDict(arbitrary_types_allowed=True)
@ -161,7 +164,8 @@ class KanidmClientConfig(BaseModel):
verify_hostnames: bool = True
verify_certificate: bool = True
ca_path: Optional[str] = Field(default=None, alias='verify_ca')
ca_path: Optional[str] = Field(default=None)
verify_ca: bool = True
username: Optional[str] = None
password: Optional[str] = None
@ -201,50 +205,3 @@ class KanidmClientConfig(BaseModel):
value = f"{value}/"
return value
class GroupInfo(BaseModel):
"""nicer"""
name: str
dynmember: List[str]
member: List[str]
spn: str
uuid: str
# posix-enabled group
gidnumber: Optional[int]
def has_member(self, member: str) -> bool:
"""check if a member is in the group"""
return member in self.member or member in self.dynmember
class RawGroupInfo(BaseModel):
"""group information as it comes back from the API"""
attrs: Dict[str, List[str]]
model_config = ConfigDict(arbitrary_types_allowed=True)
def as_groupinfo(self) -> GroupInfo:
"""return it as the GroupInfo object which has nicer fields"""
for field in "name", "uuid", "spn":
if field not in self.attrs:
raise ValueError(f"Missing field {field} in {self.attrs}")
# we want either the first element of gidnumber_field, or None
gidnumber_field = self.attrs.get("gidnumber", [])
gidnumber: Optional[int] = None
if len(gidnumber_field) > 0:
gidnumber = int(gidnumber_field[0])
return GroupInfo(
name=self.attrs["name"][0],
uuid=self.attrs["uuid"][0],
spn=self.attrs["spn"][0],
member=self.attrs.get("member", []),
dynmember=self.attrs.get("dynmember", []),
gidnumber=gidnumber,
)
GroupList = RootModel[List[RawGroupInfo]]

View file

@ -1,6 +1,6 @@
[tool.poetry]
name = "kanidm"
version = "0.0.3"
version = "1.0.0"
description = "Kanidm client library"
license = "MPL-2.0"

View file

@ -1,46 +1,59 @@
#!/bin/bash
# This sets up a Kanidm environment for doing RADIUS testing.
set -e
# This sets up a Kanidm environment for doing RADIUS testing.
read -r -n 1 -p "This script rather destructively resets the idm_admin and admin passwords and YOLO's its way through setting up a RADIUS user (test) and service account (radius_server) make sure you're not running this on an environment you care deeply about!"
PWD="$(pwd)"
cd ../kanidmd/daemon || exit 1
cd ../server/daemon || exit 1
KEEP_GOING=1
while [ $KEEP_GOING -eq 1 ]; do
curl -f -s -q -k https://localhost:8443/status > /dev/null && KEEP_GOING=0
echo -n "Start the server in another terminal"
sleep 1
done
echo ""
echo "Resetting IDM_ADMIN"
# set up idm admin account
IDM_ADMIN=$(./run_insecure_dev_server.sh recover_account idm_admin -o json 2>&1 | grep -v Running | grep recover_account | jq .result)
IDM_ADMIN=$(./run_insecure_dev_server.sh recover-account idm_admin -o json 2>&1 | grep '\"password' | jq -r .password)
if [ -z "${IDM_ADMIN}" ]; then
echo "Failed to reset idm_admin password"
exit 1
fi
echo "IDM_ADMIN_PASSWORD: ${IDM_ADMIN}"
read -r -n 1 -p "Copy the idm_admin password somewhere and hit enter to continue"
# set up idm admin account
ADMIN=$(./run_insecure_dev_server.sh recover_account admin -o json 2>&1 | grep -v Running | grep recover_account | jq .result)
ADMIN=$(./run_insecure_dev_server.sh recover-account admin -o json 2>&1 | grep '\"password' | jq -r .password )
if [ -z "${ADMIN}" ]; then
echo "Failed to reset admin password"
exit 1
fi
echo "ADMIN_PASSWORD: ${ADMIN}"
read -r -n 1 -p "Copy the admin password somewhere and hit enter to continue"
echo -n "Start the server in another terminal"
KEEP_GOING=1
while [ $KEEP_GOING -eq 1 ]; do
echo -n "."
curl -f -s -k https://localhost:8443/status && KEEP_GOING=0
sleep 1
done
export KANIDM_URL="https://localhost:8443"
export KANIDM_CA_PATH="/tmp/kanidm/ca.pem"
cd ../../ || exit 1
echo "Logging in as admin"
cargo run --bin kanidm -- login --name admin
cargo run --bin kanidm -- login --name admin --password "${ADMIN}"
echo "Logging in as idm_admin"
cargo run --bin kanidm -- login --name idm_admin
cargo run --bin kanidm -- login --name idm_admin --password "${IDM_ADMIN}"
echo "Creating person 'test'"
cargo run --bin kanidm -- person create test test --name idm_admin
@ -48,12 +61,12 @@ cargo run --bin kanidm -- person create test test --name idm_admin
echo "Creating group 'radius_access_allowed'"
cargo run --bin kanidm -- group create radius_access_allowed --name idm_admin
echo "Adding 'test' to group 'radius_access_allowed'"
cargo run --bin kanidm -- group add_members radius_access_allowed test --name idm_admin
cargo run --bin kanidm -- group add-members radius_access_allowed test --name idm_admin
echo "Creating radius secret for 'test'"
cargo run --bin kanidm -- person radius generate_secret test --name idm_admin
cargo run --bin kanidm -- person radius generate-secret test --name idm_admin
echo "Showing radius secret for 'test'"
cargo run --bin kanidm -- person radius show_secret test --name idm_admin
cargo run --bin kanidm -- person radius show-secret test --name idm_admin
read -r -n 1 -p "Copy the RADIUS secret above then press enter to continue"
@ -63,11 +76,10 @@ echo "Creating SA 'radius_server'"
cargo run --bin kanidm -- service-account create radius_server radius_server --name idm_admin
echo "Setting radius_server to be allowed to be a RADIUS server"
cargo run --bin kanidm group add_members --name admin idm_radius_servers radius_server
cargo run --bin kanidm group add-members --name admin idm_radius_servers radius_server
echo "Creating API Token for 'radius_server' account"
cargo run --bin kanidm -- service-account api-token generate radius_server radius --name admin
echo "Copy the API Token above to the config file"
echo "Copy the API Token above to the config file as auth_token"
echo "blep?"

View file

@ -29,7 +29,7 @@ async def test_auth_init(client_configfile: KanidmClient) -> None:
pytest.skip("Can't run auth test without a username/password")
result = await client_configfile.auth_init(client_configfile.config.username)
print(f"{result=}")
print(result.dict())
print(result.model_dump_json())
assert result.sessionid
@ -44,7 +44,7 @@ async def test_auth_begin(client_configfile: KanidmClient) -> None:
result = await client_configfile.auth_init(client_configfile.config.username)
print(f"{result=}")
print("Result dict:")
print(result.dict())
print(result.model_dump_json())
assert result.sessionid
print(f"Doing auth_begin for {client_configfile.config.username}")
@ -62,7 +62,7 @@ async def test_auth_begin(client_configfile: KanidmClient) -> None:
if retval is None:
raise pytest.fail("Failed to do begin_result")
retval["response"] = begin_result
retval["response"] = begin_result.model_dump()
assert AuthBeginResponse.model_validate(retval)
@ -91,7 +91,7 @@ async def test_authenticate_anonymous(client_configfile: KanidmClient) -> None:
"""tests the authenticate() flow"""
client_configfile.config.auth_token = None
print(f"Doing client.authenticate for {client_configfile.config.username}")
print("Doing anonymous auth")
await client_configfile.auth_as_anonymous()
assert client_configfile.config.auth_token is not None

View file

@ -13,7 +13,7 @@ from kanidm.utils import load_config
logging.basicConfig(level=logging.DEBUG)
EXAMPLE_CONFIG_FILE = "../examples/config_localhost"
EXAMPLE_CONFIG_FILE = str(Path(__file__).parent.parent.parent / "examples/config_localhost")
@pytest.fixture(scope="function")

View file

@ -1,42 +1,53 @@
import json
import logging
from pathlib import Path
from kanidm import KanidmClient
import pytest
@pytest.fixture(scope="function")
async def client() -> KanidmClient:
"""sets up a client with a basic thing"""
return KanidmClient(config_file="../examples/config_localhost")
return KanidmClient(
config_file=Path(__file__).parent.parent.parent / "examples/config_localhost",
)
@pytest.mark.network
@pytest.mark.asyncio
async def test_idm_oauth2_rs_list(client: KanidmClient) -> None:
async def test_oauth2_rs_list(client: KanidmClient) -> None:
"""tests getting the list of oauth2 resource servers"""
logging.basicConfig(level=logging.DEBUG)
print(f"config: {client.config}")
username = "admin"
# change this to be your admin password.
password = "Ek7A0fShLsCTXgK2xDqC9TNUgPYQdVFB6RMGKXLyNtGL5cER"
password = "pdf1Xz8q2QFsMTsvbv2jXNBaSEsDpW9h83ZRsH7dDfsJeJdM"
auth_resp = await client.authenticate_password(username, password, update_internal_auth_token=True)
assert auth_resp.state.success is not None
auth_resp = await client.authenticate_password(
username, password, update_internal_auth_token=True
)
if auth_resp.state is None:
raise ValueError(
"Failed to authenticate, check the admin password is set right"
)
if auth_resp.state.success is None:
raise ValueError(
"Failed to authenticate, check the admin password is set right"
)
resp = await client.idm_oauth2_rs_list()
resource_servers = await client.oauth2_rs_list()
print("content:")
print(json.dumps(resp.data, indent=4))
assert resp.status_code == 200
print(json.dumps(resource_servers, indent=4))
if resp.data is not None:
for oauth_rs in resp.data:
oauth2_rs_sup_scope_map = oauth_rs.get("attrs", {}).get("oauth2_rs_sup_scope_map", {})
for mapping in oauth2_rs_sup_scope_map:
if resource_servers:
for oauth_rs in resource_servers:
for mapping in oauth_rs.oauth2_rs_sup_scope_map:
print(f"oauth2_rs_sup_scope_map: {mapping}")
user, scopes = mapping.split(":")
scopes = scopes.replace("{", "[").replace("}", "]")
scopes = json.loads(scopes)
print(f"{user=} {scopes=}")

View file

@ -0,0 +1,14 @@
""" test validation of urls """
import pytest
from kanidm import KanidmClient
def test_bad_origin() -> None:
"""testing with a bad origin"""
client = KanidmClient(uri="http://localhost:8000")
with pytest.raises(ValueError):
client._validate_is_valid_origin_url("ftp://example.com")

View file

@ -10,7 +10,7 @@ from kanidm.types import KanidmClientConfig
from kanidm.utils import load_config
EXAMPLE_CONFIG_FILE = "../examples/config"
EXAMPLE_CONFIG_FILE = Path(__file__).parent.parent.parent / "examples/config"
def test_radius_groups() -> None:
"""testing loading a config file with radius groups defined"""

View file

@ -28,4 +28,4 @@ async def test_radius_call(client_configfile: KanidmClient) -> None:
result = await client_configfile.get_radius_token(RADIUS_TEST_USER)
print(f"{result=}")
print(json.dumps(result.dict(), indent=4, default=str))
print(json.dumps(result.model_dump_json(), indent=4, default=str))

View file

@ -1,5 +1,6 @@
""" reusable widgets for testing """
from logging import DEBUG, basicConfig, getLogger
from pathlib import Path
from typing import Any
@ -11,6 +12,8 @@ from kanidm import KanidmClient
async def client() -> KanidmClient:
"""sets up a client with a basic thing"""
try:
basicConfig(level=DEBUG)
return KanidmClient(uri="https://idm.example.com")
except FileNotFoundError:
raise pytest.skip("Couldn't find config file...")

View file

@ -0,0 +1,3 @@
# Kanidm Python test things
Only run this on a test instance, beware.

View file

@ -0,0 +1,166 @@
import asyncio
import json
import logging
import os
import pathlib
import subprocess
import sys
# so we can load kanidm without building virtualenvs
sys.path.append("./pykanidm")
from kanidm import KanidmClient
def recover_account(username: str) -> str:
"""runs the kanidmd binary to recover creds"""
recover_cmd = [
"cargo",
"run",
"--bin",
"kanidmd",
"--",
"recover-account",
username,
"--config",
"../../examples/insecure_server.toml",
"--output",
"json",
]
# Define the new working directory
daemon_dir = os.path.abspath("./server/daemon/")
# Run the command in the specified working directory
result = subprocess.run(
" ".join(recover_cmd), cwd=daemon_dir, shell=True, capture_output=True
)
stdout = result.stdout.decode("utf-8").strip().split("\n")[-1]
try:
password_response = json.loads(stdout)
except json.decoder.JSONDecodeError:
print(f"Failed to decode this as json: {stdout}")
sys.exit(1)
return password_response["password"]
async def main() -> None:
"""main loop"""
# first reset the admin creds
logger = logging.getLogger(__name__)
admin_password = recover_account("admin")
idm_admin_password = recover_account("idm_admin")
host = "https://localhost:8443"
# login time!
admin_client = KanidmClient(uri=host, ca_path="/tmp/kanidm/ca.pem")
logger.info("Attempting to login as admin with password")
await admin_client.authenticate_password(
"admin", admin_password, update_internal_auth_token=True
)
idm_admin_client = KanidmClient(uri=host, ca_path="/tmp/kanidm/ca.pem")
logger.info("Attempting to login as idm_admin with password")
await idm_admin_client.authenticate_password(
"idm_admin", idm_admin_password, update_internal_auth_token=True
)
# create an oauth2 rs
logger.info("Creating OAuth2 RS")
res = await admin_client.oauth2_rs_basic_create(
"basic_rs", "Basic AF RS", "https://basic.example.com"
)
logger.debug(f"Result: {res}")
assert res.status_code == 200
logger.info("Done!")
logger.info("Getting basic secret for OAuth2 RS")
res = await admin_client.oauth2_rs_get_basic_secret("basic_rs")
assert res.status_code == 200
assert res.data is not None
# delete the oauth2 rs
logger.info("Deleting OAuth2 RS")
res = await admin_client.oauth2_rs_delete("basic_rs")
logger.debug(f"Result: {res}")
assert res.status_code == 200
logger.info("Done!")
print("Woooooooo")
logger.info("Adding password 'cheese' to badlist")
res = await admin_client.system_password_badlist_append(["cheese"])
assert res.status_code == 200
logger.info("Checking password 'cheese' is in badlist")
res = await admin_client.system_password_badlist_get()
assert res.status_code == 200
assert "cheese" in res.data
logger.info("Removing password 'cheese' from badlist")
res = await admin_client.system_password_badlist_remove(["cheese"])
assert res.status_code == 200
test_user = "testuser"
test_group = "testusers"
logger.info("Adding user '%s' 'test_user'", test_user)
res = await idm_admin_client.person_account_create(test_user, test_user.upper())
assert res.status_code == 200
logger.info("Adding group '%s'", test_group)
res = await idm_admin_client.group_create(test_group)
assert res.status_code == 200
logger.info("Adding testuser to group '%s'", test_group)
res = await idm_admin_client.group_add_members(test_group, ["testuser"])
assert res.status_code == 200
logger.info("Getting group %s", test_group)
res = await idm_admin_client.group_get(test_group)
assert res.status_code == 200
logger.info("Got group %s", res.data)
assert res.data.get("attrs", {}).get("member") == ["testuser@localhost"]
logger.info("Deleting user '%s'", test_user)
res = await idm_admin_client.person_account_delete(test_user)
assert res.status_code == 200
logger.info("Getting group %s", test_group)
res = await idm_admin_client.group_get(test_group)
assert res.status_code == 200
logger.info("Got group %s", res.data)
assert res.data.get("attrs", {}).get("member") is None
logger.info("Deleting group '%s'", test_group)
res = await idm_admin_client.group_delete(test_group)
assert res.status_code == 200
logger.info("Adding service account %s", test_user)
res = await admin_client.service_account_create(test_user, test_user.upper())
assert res.status_code == 200
logger.info("Deleting service account %s", test_user)
res = await admin_client.service_account_delete(test_user)
assert res.status_code == 200
if __name__ == "__main__":
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
if not pathlib.Path("scripts/pykanidm/integration_test.py").exists():
logging.error("Please ensure this is running from the root of the repo!")
sys.exit(1)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("##########################################")
print("If you got this far, all the tests passed!")
print("##########################################")

21
scripts/pykanidm/run.sh Executable file
View file

@ -0,0 +1,21 @@
#!/bin/bash
# sets up the venv and runs the integration test
MYDIR="$(dirname "$0")"
if [ ! -d ".venv" ]; then
echo "Setting up virtualenv"
python -m venv .venv
# shellcheck disable=SC1091
source .venv/bin/activate
pip install --upgrade pip
pip install poetry pytest ruff mypy black
echo "Installing in virtualenv"
pip install -e pykanidm
fi
# shellcheck disable=SC1091
source .venv/bin/activate
python "${MYDIR}/integration_test.py"

View file

@ -74,12 +74,14 @@ pub struct ServerState {
impl ServerState {
fn reinflate_uuid_from_bytes(&self, input: &str) -> Option<Uuid> {
match JwsCompact::from_str(input) {
Ok(val) => self
.jws_signer
.verify(&val)
.ok()
.and_then(|jws| jws.from_json::<SessionId>().ok())
.map(|inner| inner.sessionid),
Ok(val) => match self.jws_signer.verify(&val) {
Ok(val) => val.from_json::<SessionId>().ok(),
Err(err) => {
error!("Failed to unmarshal JWT from headers: {:?}", err);
None
}
}
.map(|inner| inner.sessionid),
Err(_) => None,
}
}

View file

@ -39,6 +39,7 @@ tracing = { workspace = true, features = [
"max_level_trace",
"release_max_level_debug",
] }
serde_json.workspace = true
[target.'cfg(target_os = "linux")'.dependencies]
sd-notify.workspace = true

View file

@ -144,7 +144,10 @@ async fn submit_admin_req(path: &str, req: AdminTaskRequest, output_mode: Consol
match reqs.next().await {
Some(Ok(AdminTaskResponse::RecoverAccount { password })) => match output_mode {
ConsoleOutputMode::JSON => {
eprintln!("{{\"password\":\"{}\"}}", password)
let json_output = serde_json::json!({
"password": password
});
println!("{}", json_output);
}
ConsoleOutputMode::Text => {
info!(new_password = ?password)