Skip to content

kanidm.KanidmClient

Kanidm client module

config: a KanidmClientConfig object, if this is set, everything else is ignored config_file: a pathlib.Path object pointing to a configuration file uri: kanidm base URL session: a aiohttp.client.ClientSession verify_hostnames: verify the hostname is correct verify_certificate: verify the validity of the certificate and its CA ca_path: set this to a trusted CA certificate (PEM format) token: a JWS from an authentication session

Source code in kanidm/__init__.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
class KanidmClient:
    """Kanidm client module

    config: a `KanidmClientConfig` object, if this is set, everything else is ignored
    config_file: a `pathlib.Path` object pointing to a configuration file
    uri: kanidm base URL
    session: a `aiohttp.client.ClientSession`
    verify_hostnames: verify the hostname is correct
    verify_certificate: verify the validity of the certificate and its CA
    ca_path: set this to a trusted CA certificate (PEM format)
    token: a JWS from an authentication session
    """

    # pylint: disable=too-many-instance-attributes,too-many-arguments
    def __init__(
        self,
        config: Optional[KanidmClientConfig] = None,
        config_file: Optional[Union[Path, str]] = None,
        uri: Optional[str] = None,
        verify_hostnames: bool = True,
        verify_certificate: bool = True,
        ca_path: Optional[str] = None,
        token: Optional[str] = None,
    ) -> None:
        """Constructor for KanidmClient"""

        if config is not None:
            self.config = config

        else:
            self.config = KanidmClientConfig(
                uri=uri,
                verify_hostnames=verify_hostnames,
                verify_certificate=verify_certificate,
                ca_path=ca_path,
                auth_token=token,
            )

            if config_file is not None:
                if not isinstance(config_file, Path):
                    config_file = Path(config_file)
                config_data = load_config(config_file.expanduser().resolve())
                self.config = self.config.parse_obj(config_data)

        if self.config.uri is None:
            raise ValueError("Please initialize this with a server URI")

        self._ssl: Optional[Union[bool, ssl.SSLContext]] = None
        self._configure_ssl()

    def _configure_ssl(self) -> None:
        """Sets up SSL configuration for the client"""
        if self.config.verify_certificate is False:
            self._ssl = False
        else:
            if (
                self.config.ca_path is not None
                and not Path(self.config.ca_path).expanduser().resolve().exists()
            ):
                raise FileNotFoundError(f"CA Path not found: {self.config.ca_path}")
            self._ssl = ssl.create_default_context(cafile=self.config.ca_path)
        if self._ssl is not False:
            # ignoring this for typing because mypy is being weird
            # ssl.SSLContext.check_hostname is totally a thing
            # https://docs.python.org/3/library/ssl.html#ssl.SSLContext.check_hostname
            self._ssl.check_hostname = self.config.verify_hostnames  # type: ignore

    def parse_config_data(
        self,
        config_data: Dict[str, Any],
    ) -> None:
        """hand it a config dict and it'll configure the client"""
        try:
            self.config.parse_obj(config_data)
        except ValidationError as validation_error:
            # pylint: disable=raise-missing-from
            raise ValueError(f"Failed to validate configuration: {validation_error}")

    async def check_token_valid(self, token: Optional[str] = None) -> bool:
        """checks if a given token is valid, or the local one if you don't pass it"""
        url = "/v1/auth/valid"
        if token is not None:
            headers = {
                "authorization": f"Bearer {token}",
                "content-type": "application/json",
            }
        else:
            headers = None
        result = await self.call_get(url, headers=headers)
        logging.debug(result)
        if result.status_code == 200:
            return True
        return False

    @lru_cache()
    def get_path_uri(self, path: str) -> str:
        """turns a path into a full URI"""
        if path.startswith("/"):
            path = path[1:]
        return f"{self.config.uri}{path}"

    @property
    def _token_headers(self) -> Dict[str, str]:
        """returns an auth header with the token in it"""
        if self.config.auth_token is None:
            raise ValueError("Token is not set")
        return {"authorization": f"Bearer {self.config.auth_token}"}

    # pylint: disable=too-many-arguments
    async def _call(
        self,
        method: str,
        path: str,
        headers: Optional[Dict[str, str]] = None,
        timeout: Optional[int] = None,
        json: Optional[Dict[str, str]] = None,
        params: Optional[Dict[str, str]] = None,
    ) -> ClientResponse:

        if timeout is None:
            timeout = self.config.connect_timeout
        async with aiohttp.client.ClientSession() as session:
            # if we have a token set, we send it.
            if self.config.auth_token is not None:
                logging.debug("Found a token internally")
                if headers is None:
                    headers = self._token_headers
                elif "authorization" not in headers:
                    logging.debug("Setting auth headers as Authorization not in keys")
                    headers.update(self._token_headers)
            logging.debug("_call method=%s to %s", method, self.get_path_uri(path))
            async with session.request(
                method=method,
                url=self.get_path_uri(path),
                headers=headers,
                timeout=timeout,
                json=json,
                params=params,
                ssl=self._ssl,
            ) as request:
                content = await request.content.read()
                try:
                    response_json = json_lib.loads(content)
                    if not isinstance(response_json, dict):
                        response_json = None
                except json_lib.JSONDecodeError as json_error:
                    logging.error("Failed to JSON Decode Response: %s", json_error)
                    logging.error("Response data: %s", content)
                    response_json = {}
                response_input = {
                    "data": response_json,
                    "content": content.decode("utf-8"),
                    "headers": request.headers,
                    "status_code": request.status,
                }
                logging.debug(json_lib.dumps(response_input, default=str, indent=4))
                response = ClientResponse.parse_obj(response_input)
            return response

    async def call_get(
        self,
        path: str,
        headers: Optional[Dict[str, str]] = None,
        params: Optional[Dict[str, str]] = None,
        timeout: Optional[int] = None,
    ) -> ClientResponse:
        """does a get call to the server"""
        return await self._call("GET", path, headers, timeout, params=params)

    async def call_post(
        self,
        path: str,
        headers: Optional[Dict[str, str]] = None,
        json: Optional[Dict[str, Any]] = None,
        timeout: Optional[int] = None,
    ) -> ClientResponse:
        """does a get call to the server"""

        return await self._call(
            method="POST", path=path, headers=headers, json=json, timeout=timeout
        )

    async def auth_init(self, username: str) -> AuthInitResponse:
        """init step, starts the auth session, sets the class-local session ID"""
        init_auth = {"step": {"init": username}}

        response = await self.call_post(
            path=KANIDMURLS["auth"],
            json=init_auth,
        )
        if response.status_code != 200:
            logging.debug(
                "Failed to authenticate, response from server: %s",
                response.content,
            )
            # TODO: mock test auth_init raises AuthInitFailed
            raise AuthInitFailed(response.content)

        if "x-kanidm-auth-session-id" not in response.headers:
            logging.debug("response.content: %s", response.content)
            logging.debug("response.headers: %s", response.headers)
            raise ValueError(
                f"Missing x-kanidm-auth-session-id header in init auth response: {response.headers}"
            )
        retval = AuthInitResponse.parse_obj(response.data)
        retval.response = response
        return retval

    async def auth_begin(self, method: str, sessionid: str) -> ClientResponse:
        """the 'begin' step"""

        begin_auth = {
            "step": {
                "begin": method,
            },
        }
        headers = self.session_header(sessionid)

        response = await self.call_post(
            KANIDMURLS["auth"],
            json=begin_auth,
            headers=headers,
        )
        if response.status_code != 200:
            # TODO: mock test for auth_begin raises AuthBeginFailed
            raise AuthBeginFailed(response.content)

        retobject = AuthBeginResponse.parse_obj(response.data)
        retobject.response = response
        return response

    async def authenticate_password(
        self,
        username: Optional[str] = None,
        password: Optional[str] = None,
    ) -> AuthStepPasswordResponse:
        """authenticates with a username and password, returns the auth token"""
        if username is None and password is None:
            if self.config.username is None or self.config.password is None:
                # pylint: disable=line-too-long
                raise ValueError(
                    "Need username/password to be in caller or class settings before calling authenticate_password"
                )
            username = self.config.username
            password = self.config.password
        if username is None or password is None:
            raise ValueError("Username and Password need to be set somewhere!")

        auth_init: AuthInitResponse = await self.auth_init(username)

        if auth_init.response is None:
            raise NotImplementedError("This should throw a really cool response")

        sessionid = auth_init.response.headers["x-kanidm-auth-session-id"]

        if len(auth_init.state.choose) == 0:
            # there's no mechanisms at all - bail
            # TODO: write test coverage for authenticate_password raises AuthMechUnknown
            raise AuthMechUnknown(f"No auth mechanisms for {username}")
        auth_begin = await self.auth_begin(method="password", sessionid=sessionid)
        # does a little bit of validation
        auth_begin_object = AuthBeginResponse.parse_obj(auth_begin.data)
        auth_begin_object.response = auth_begin
        return await self.auth_step_password(password=password, sessionid=sessionid)

    async def auth_step_password(
        self,
        sessionid: str,
        password: Optional[str] = None,
    ) -> AuthStepPasswordResponse:
        """does the password auth step"""

        if password is None:
            password = self.config.password
        if password is None:
            raise ValueError(
                "Password has to be passed to auth_step_password or in self.password!"
            )

        cred_auth = {"step": {"cred": {"password": password}}}
        response = await self.call_post(
            path="/v1/auth", json=cred_auth, headers=self.session_header(sessionid)
        )

        if response.status_code != 200:
            # TODO: write test coverage auth_step_password raises AuthCredFailed
            logging.debug("Failed to authenticate, response: %s", response.content)
            raise AuthCredFailed("Failed password authentication!")

        result = AuthStepPasswordResponse.parse_obj(response.data)
        result.response = response

        # pull the token out and set it
        if result.state.success is None:
            # TODO: write test coverage for AuthCredFailed
            raise AuthCredFailed
        result.sessionid = result.state.success
        return result

    def session_header(
        self,
        sessionid: str,
    ) -> Dict[str, str]:
        """create a headers dict from a session id"""
        # TODO: perhaps allow session_header to take a dict and update it, too?
        return {
            "X-KANIDM-AUTH-SESSION-ID": sessionid,
        }

    async def get_radius_token(self, username: str) -> ClientResponse:
        """does the call to the radius token endpoint"""
        path = f"/v1/account/{username}/_radius/_token"
        response = await self.call_get(path)
        if response.status_code == 404:
            raise NoMatchingEntries(
                f"No user found: '{username}' {response.headers['x-kanidm-opid']}"
            )
        return response

__init__(config=None, config_file=None, uri=None, verify_hostnames=True, verify_certificate=True, ca_path=None, token=None)

Constructor for KanidmClient

Source code in kanidm/__init__.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(
    self,
    config: Optional[KanidmClientConfig] = None,
    config_file: Optional[Union[Path, str]] = None,
    uri: Optional[str] = None,
    verify_hostnames: bool = True,
    verify_certificate: bool = True,
    ca_path: Optional[str] = None,
    token: Optional[str] = None,
) -> None:
    """Constructor for KanidmClient"""

    if config is not None:
        self.config = config

    else:
        self.config = KanidmClientConfig(
            uri=uri,
            verify_hostnames=verify_hostnames,
            verify_certificate=verify_certificate,
            ca_path=ca_path,
            auth_token=token,
        )

        if config_file is not None:
            if not isinstance(config_file, Path):
                config_file = Path(config_file)
            config_data = load_config(config_file.expanduser().resolve())
            self.config = self.config.parse_obj(config_data)

    if self.config.uri is None:
        raise ValueError("Please initialize this with a server URI")

    self._ssl: Optional[Union[bool, ssl.SSLContext]] = None
    self._configure_ssl()

auth_begin(method, sessionid) async

the 'begin' step

Source code in kanidm/__init__.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
async def auth_begin(self, method: str, sessionid: str) -> ClientResponse:
    """the 'begin' step"""

    begin_auth = {
        "step": {
            "begin": method,
        },
    }
    headers = self.session_header(sessionid)

    response = await self.call_post(
        KANIDMURLS["auth"],
        json=begin_auth,
        headers=headers,
    )
    if response.status_code != 200:
        # TODO: mock test for auth_begin raises AuthBeginFailed
        raise AuthBeginFailed(response.content)

    retobject = AuthBeginResponse.parse_obj(response.data)
    retobject.response = response
    return response

auth_init(username) async

init step, starts the auth session, sets the class-local session ID

Source code in kanidm/__init__.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
async def auth_init(self, username: str) -> AuthInitResponse:
    """init step, starts the auth session, sets the class-local session ID"""
    init_auth = {"step": {"init": username}}

    response = await self.call_post(
        path=KANIDMURLS["auth"],
        json=init_auth,
    )
    if response.status_code != 200:
        logging.debug(
            "Failed to authenticate, response from server: %s",
            response.content,
        )
        # TODO: mock test auth_init raises AuthInitFailed
        raise AuthInitFailed(response.content)

    if "x-kanidm-auth-session-id" not in response.headers:
        logging.debug("response.content: %s", response.content)
        logging.debug("response.headers: %s", response.headers)
        raise ValueError(
            f"Missing x-kanidm-auth-session-id header in init auth response: {response.headers}"
        )
    retval = AuthInitResponse.parse_obj(response.data)
    retval.response = response
    return retval

auth_step_password(sessionid, password=None) async

does the password auth step

Source code in kanidm/__init__.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
async def auth_step_password(
    self,
    sessionid: str,
    password: Optional[str] = None,
) -> AuthStepPasswordResponse:
    """does the password auth step"""

    if password is None:
        password = self.config.password
    if password is None:
        raise ValueError(
            "Password has to be passed to auth_step_password or in self.password!"
        )

    cred_auth = {"step": {"cred": {"password": password}}}
    response = await self.call_post(
        path="/v1/auth", json=cred_auth, headers=self.session_header(sessionid)
    )

    if response.status_code != 200:
        # TODO: write test coverage auth_step_password raises AuthCredFailed
        logging.debug("Failed to authenticate, response: %s", response.content)
        raise AuthCredFailed("Failed password authentication!")

    result = AuthStepPasswordResponse.parse_obj(response.data)
    result.response = response

    # pull the token out and set it
    if result.state.success is None:
        # TODO: write test coverage for AuthCredFailed
        raise AuthCredFailed
    result.sessionid = result.state.success
    return result

authenticate_password(username=None, password=None) async

authenticates with a username and password, returns the auth token

Source code in kanidm/__init__.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
async def authenticate_password(
    self,
    username: Optional[str] = None,
    password: Optional[str] = None,
) -> AuthStepPasswordResponse:
    """authenticates with a username and password, returns the auth token"""
    if username is None and password is None:
        if self.config.username is None or self.config.password is None:
            # pylint: disable=line-too-long
            raise ValueError(
                "Need username/password to be in caller or class settings before calling authenticate_password"
            )
        username = self.config.username
        password = self.config.password
    if username is None or password is None:
        raise ValueError("Username and Password need to be set somewhere!")

    auth_init: AuthInitResponse = await self.auth_init(username)

    if auth_init.response is None:
        raise NotImplementedError("This should throw a really cool response")

    sessionid = auth_init.response.headers["x-kanidm-auth-session-id"]

    if len(auth_init.state.choose) == 0:
        # there's no mechanisms at all - bail
        # TODO: write test coverage for authenticate_password raises AuthMechUnknown
        raise AuthMechUnknown(f"No auth mechanisms for {username}")
    auth_begin = await self.auth_begin(method="password", sessionid=sessionid)
    # does a little bit of validation
    auth_begin_object = AuthBeginResponse.parse_obj(auth_begin.data)
    auth_begin_object.response = auth_begin
    return await self.auth_step_password(password=password, sessionid=sessionid)

call_get(path, headers=None, params=None, timeout=None) async

does a get call to the server

Source code in kanidm/__init__.py
197
198
199
200
201
202
203
204
205
async def call_get(
    self,
    path: str,
    headers: Optional[Dict[str, str]] = None,
    params: Optional[Dict[str, str]] = None,
    timeout: Optional[int] = None,
) -> ClientResponse:
    """does a get call to the server"""
    return await self._call("GET", path, headers, timeout, params=params)

call_post(path, headers=None, json=None, timeout=None) async

does a get call to the server

Source code in kanidm/__init__.py
207
208
209
210
211
212
213
214
215
216
217
218
async def call_post(
    self,
    path: str,
    headers: Optional[Dict[str, str]] = None,
    json: Optional[Dict[str, Any]] = None,
    timeout: Optional[int] = None,
) -> ClientResponse:
    """does a get call to the server"""

    return await self._call(
        method="POST", path=path, headers=headers, json=json, timeout=timeout
    )

check_token_valid(token=None) async

checks if a given token is valid, or the local one if you don't pass it

Source code in kanidm/__init__.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
async def check_token_valid(self, token: Optional[str] = None) -> bool:
    """checks if a given token is valid, or the local one if you don't pass it"""
    url = "/v1/auth/valid"
    if token is not None:
        headers = {
            "authorization": f"Bearer {token}",
            "content-type": "application/json",
        }
    else:
        headers = None
    result = await self.call_get(url, headers=headers)
    logging.debug(result)
    if result.status_code == 200:
        return True
    return False

get_path_uri(path) cached

turns a path into a full URI

Source code in kanidm/__init__.py
132
133
134
135
136
137
@lru_cache()
def get_path_uri(self, path: str) -> str:
    """turns a path into a full URI"""
    if path.startswith("/"):
        path = path[1:]
    return f"{self.config.uri}{path}"

get_radius_token(username) async

does the call to the radius token endpoint

Source code in kanidm/__init__.py
347
348
349
350
351
352
353
354
355
async def get_radius_token(self, username: str) -> ClientResponse:
    """does the call to the radius token endpoint"""
    path = f"/v1/account/{username}/_radius/_token"
    response = await self.call_get(path)
    if response.status_code == 404:
        raise NoMatchingEntries(
            f"No user found: '{username}' {response.headers['x-kanidm-opid']}"
        )
    return response

parse_config_data(config_data)

hand it a config dict and it'll configure the client

Source code in kanidm/__init__.py
105
106
107
108
109
110
111
112
113
114
def parse_config_data(
    self,
    config_data: Dict[str, Any],
) -> None:
    """hand it a config dict and it'll configure the client"""
    try:
        self.config.parse_obj(config_data)
    except ValidationError as validation_error:
        # pylint: disable=raise-missing-from
        raise ValueError(f"Failed to validate configuration: {validation_error}")

session_header(sessionid)

create a headers dict from a session id

Source code in kanidm/__init__.py
337
338
339
340
341
342
343
344
345
def session_header(
    self,
    sessionid: str,
) -> Dict[str, str]:
    """create a headers dict from a session id"""
    # TODO: perhaps allow session_header to take a dict and update it, too?
    return {
        "X-KANIDM-AUTH-SESSION-ID": sessionid,
    }