Skip to content

KanidmClient

Kanidm client module

config: a KanidmClientConfig object, if this is set, everything else is ignored config_file: a pathlib.Path object pointing to a configuration file uri: kanidm base URL session: a aiohttp.client.ClientSession verify_hostnames: verify the hostname is correct verify_certificate: verify the validity of the certificate and its CA ca_path: set this to a trusted CA certificate (PEM format)

Source code in kanidm/__init__.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
class KanidmClient:
    """Kanidm client module

    config: a `KanidmClientConfig` object, if this is set, everything else is ignored
    config_file: a `pathlib.Path` object pointing to a configuration file
    uri: kanidm base URL
    session: a `aiohttp.client.ClientSession`
    verify_hostnames: verify the hostname is correct
    verify_certificate: verify the validity of the certificate and its CA
    ca_path: set this to a trusted CA certificate (PEM format)
    """

    # pylint: disable=too-many-instance-attributes,too-many-arguments
    def __init__(
        self,
        config: Optional[KanidmClientConfig] = None,
        config_file: Optional[Union[Path, str]] = None,
        uri: Optional[str] = None,
        session: Optional[aiohttp.client.ClientSession] = None,
        verify_hostnames: bool = True,
        verify_certificate: bool = True,
        ca_path: Optional[str] = None,
    ) -> None:
        """Constructor for KanidmClient"""

        if config is not None:
            self.config = config

        else:
            self.config = KanidmClientConfig(
                uri=uri,
                verify_hostnames=verify_hostnames,
                verify_certificate=verify_certificate,
                ca_path=ca_path,
            )

            if config_file is not None:
                if not isinstance(config_file, Path):
                    config_file = Path(config_file)
                config_data = load_config(config_file.expanduser().resolve())
                self.config = self.config.parse_obj(config_data)

            self.session = session

        self.sessionid: Optional[str] = None
        if self.config.uri is None:
            raise ValueError("Please intitialize this with a server URI")

        self._ssl: Optional[Union[bool, ssl.SSLContext]] = None
        self._configure_ssl()

    def _configure_ssl(self) -> None:
        """Sets up SSL configuration for the client"""
        if self.config.verify_certificate is False:
            self._ssl = False
        else:
            self._ssl = ssl.create_default_context(cafile=self.config.ca_path)
        if self._ssl is not False:
            # ignoring this for typing because mypy is being weird
            # ssl.SSLContext.check_hostname is totally a thing
            # https://docs.python.org/3/library/ssl.html#ssl.SSLContext.check_hostname
            self._ssl.check_hostname = self.config.verify_hostnames  # type: ignore

    def parse_config_data(
        self,
        config_data: Dict[str, Any],
    ) -> None:
        """hand it a config dict and it'll configure the client"""
        try:
            self.config.parse_obj(config_data)
        except ValidationError as validation_error:
            raise ValueError(f"Failed to validate configuration: {validation_error}")

    def get_path_uri(self, path: str) -> str:
        """turns a path into a full URI"""
        if path.startswith("/"):
            path = path[1:]
        return f"{self.config.uri}{path}"

    # pylint: disable=too-many-arguments
    async def _call(
        self,
        method: str,
        path: str,
        headers: Optional[Dict[str, str]] = None,
        timeout: Optional[int] = None,
        json: Optional[Dict[str, str]] = None,
    ) -> ClientResponse:

        if timeout is None:
            timeout = self.config.connect_timeout
        if self.session is None:
            self.session = aiohttp.client.ClientSession()
        async with self.session.request(
            method=method,
            url=self.get_path_uri(path),
            headers=headers,
            timeout=timeout,
            json=json,
            ssl=self._ssl,
        ) as request:
            content = await request.content.read()
            try:
                response_json = loads(content)
                if not isinstance(response_json, dict):
                    response_json = None
            except JSONDecodeError as json_error:
                logging.error("Failed to JSON Decode Response: %s", json_error)
                response_json = {}
            response_input = {
                "data": response_json,
                "content": content.decode("utf-8"),
                "headers": request.headers,
                "status_code": request.status,
            }
            logging.debug(dumps(response_input, default=str, indent=4))
            response = ClientResponse.parse_obj(response_input)
        return response

    async def call_get(
        self,
        path: str,
        headers: Optional[Dict[str, str]] = None,
        timeout: Optional[int] = None,
    ) -> ClientResponse:
        """does a get call to the server"""
        return await self._call("GET", path, headers, timeout)

    async def call_post(
        self,
        path: str,
        headers: Optional[Dict[str, str]] = None,
        json: Optional[Dict[str, Any]] = None,
        timeout: Optional[int] = None,
    ) -> ClientResponse:
        """does a get call to the server"""

        return await self._call(
            method="POST", path=path, headers=headers, json=json, timeout=timeout
        )

    async def auth_init(self, username: str) -> AuthInitResponse:
        """init step, starts the auth session, sets the class-local session ID"""
        init_auth = {"step": {"init": username}}

        response = await self.call_post(
            path=KANIDMURLS["auth"],
            json=init_auth,
        )
        if response.status_code != 200:
            logging.debug(
                "Failed to authenticate, response from server: %s",
                response.content,
            )
            # TODO: mock test this
            raise AuthInitFailed(response.content)

        if "x-kanidm-auth-session-id" not in response.headers:
            logging.debug("response.content: %s", response.content)
            logging.debug("response.headers: %s", response.headers)
            raise ValueError(
                f"Missing x-kanidm-auth-session-id header in init auth response: {response.headers}"
            )
        # TODO: setting the class-local session id, do we want this?
        self.sessionid = response.headers["x-kanidm-auth-session-id"]
        retval = AuthInitResponse.parse_obj(response.data)
        retval.response = response
        return retval

    async def auth_begin(
        self,
        method: str = "password",  # TODO: do we want a default auth mech to be set?
    ) -> ClientResponse:
        """the 'begin' step"""

        begin_auth = {
            "step": {
                "begin": method,
            }
        }

        response = await self.call_post(
            KANIDMURLS["auth"],
            json=begin_auth,
            headers=self.session_header(),
        )
        if response.status_code != 200:
            # TODO: write mocked test for this
            raise AuthBeginFailed(response.content)

        retobject = AuthBeginResponse.parse_obj(response.data)
        retobject.response = response
        return response

    async def authenticate_password(
        self,
        username: Optional[str] = None,
        password: Optional[str] = None,
    ) -> AuthStepPasswordResponse:
        """authenticates with a username and password, returns the auth token"""
        if username is None and password is None:
            if self.config.username is None or self.config.password is None:
                raise ValueError(
                    "Need username/password to be in caller or class settings before calling authenticate_password"
                )
            username = self.config.username
            password = self.config.password
        if username is None or password is None:
            raise ValueError("Username and Password need to be set somewhere!")

        auth_init = await self.auth_init(username)

        if len(auth_init.state.choose) == 0:
            # there's no mechanisms at all - bail
            # TODO: write test coverage for this
            raise AuthMechUnknown(f"No auth mechanisms for {username}")
        auth_begin = await self.auth_begin(
            method="password",
        )
        # does a little bit of validation
        auth_begin_object = AuthBeginResponse.parse_obj(auth_begin.data)
        auth_begin_object.response = auth_begin
        return await self.auth_step_password(password=password)

    async def auth_step_password(
        self,
        password: Optional[str] = None,
    ) -> AuthStepPasswordResponse:
        """does the password auth step"""

        if password is None:
            password = self.config.password
        if password is None:
            raise ValueError(
                "Password has to be passed to auth_step_password or in self.password!"
            )

        cred_auth = {"step": {"cred": {"password": password}}}
        response = await self.call_post(
            path="/v1/auth",
            json=cred_auth,
        )
        if response.status_code != 200:
            # TODO: write test coverage for this
            logging.debug("Failed to authenticate, response: %s", response.content)
            raise AuthCredFailed("Failed password authentication!")

        result = AuthStepPasswordResponse.parse_obj(response.data)
        result.response = response
        print(f"auth_step_password: {result.dict()}")

        # pull the token out and set it
        if result.state.success is None:
            # TODO: write test coverage for AuthCredFailed
            raise AuthCredFailed
        result.sessionid = result.state.success
        return result

    def session_header(
        self,
        sessionid: Optional[str] = None,
    ) -> Dict[str, str]:
        """create a headers dict from a session id"""
        # TODO: perhaps allow session_header to take a dict and update it, too?

        if sessionid is not None:
            return {
                "X-KANIDM-AUTH-SESSION-ID": sessionid,
            }

        if self.sessionid is not None:
            return {
                "X-KANIDM-AUTH-SESSION-ID": self.sessionid,
            }
        raise ValueError("Class doesn't have a sessionid stored and none was provided")

    async def get_radius_token(
        self, username: str, radius_session_id: str
    ) -> ClientResponse:
        """does the call to the radius token endpoint"""
        path = f"/v1/account/{username}/_radius/_token"
        headers = {
            "Authorization": f"Bearer {radius_session_id}",
        }
        response = await self.call_get(
            path,
            headers,
        )
        if response.status_code == 404:
            raise NoMatchingEntries(
                f"No user found: '{username}' {response.headers['x-kanidm-opid']}"
            )
        return response

__init__(config=None, config_file=None, uri=None, session=None, verify_hostnames=True, verify_certificate=True, ca_path=None)

Constructor for KanidmClient

Source code in kanidm/__init__.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def __init__(
    self,
    config: Optional[KanidmClientConfig] = None,
    config_file: Optional[Union[Path, str]] = None,
    uri: Optional[str] = None,
    session: Optional[aiohttp.client.ClientSession] = None,
    verify_hostnames: bool = True,
    verify_certificate: bool = True,
    ca_path: Optional[str] = None,
) -> None:
    """Constructor for KanidmClient"""

    if config is not None:
        self.config = config

    else:
        self.config = KanidmClientConfig(
            uri=uri,
            verify_hostnames=verify_hostnames,
            verify_certificate=verify_certificate,
            ca_path=ca_path,
        )

        if config_file is not None:
            if not isinstance(config_file, Path):
                config_file = Path(config_file)
            config_data = load_config(config_file.expanduser().resolve())
            self.config = self.config.parse_obj(config_data)

        self.session = session

    self.sessionid: Optional[str] = None
    if self.config.uri is None:
        raise ValueError("Please intitialize this with a server URI")

    self._ssl: Optional[Union[bool, ssl.SSLContext]] = None
    self._configure_ssl()

auth_begin(method='password') async

the 'begin' step

Source code in kanidm/__init__.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
async def auth_begin(
    self,
    method: str = "password",  # TODO: do we want a default auth mech to be set?
) -> ClientResponse:
    """the 'begin' step"""

    begin_auth = {
        "step": {
            "begin": method,
        }
    }

    response = await self.call_post(
        KANIDMURLS["auth"],
        json=begin_auth,
        headers=self.session_header(),
    )
    if response.status_code != 200:
        # TODO: write mocked test for this
        raise AuthBeginFailed(response.content)

    retobject = AuthBeginResponse.parse_obj(response.data)
    retobject.response = response
    return response

auth_init(username) async

init step, starts the auth session, sets the class-local session ID

Source code in kanidm/__init__.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
async def auth_init(self, username: str) -> AuthInitResponse:
    """init step, starts the auth session, sets the class-local session ID"""
    init_auth = {"step": {"init": username}}

    response = await self.call_post(
        path=KANIDMURLS["auth"],
        json=init_auth,
    )
    if response.status_code != 200:
        logging.debug(
            "Failed to authenticate, response from server: %s",
            response.content,
        )
        # TODO: mock test this
        raise AuthInitFailed(response.content)

    if "x-kanidm-auth-session-id" not in response.headers:
        logging.debug("response.content: %s", response.content)
        logging.debug("response.headers: %s", response.headers)
        raise ValueError(
            f"Missing x-kanidm-auth-session-id header in init auth response: {response.headers}"
        )
    # TODO: setting the class-local session id, do we want this?
    self.sessionid = response.headers["x-kanidm-auth-session-id"]
    retval = AuthInitResponse.parse_obj(response.data)
    retval.response = response
    return retval

auth_step_password(password=None) async

does the password auth step

Source code in kanidm/__init__.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
async def auth_step_password(
    self,
    password: Optional[str] = None,
) -> AuthStepPasswordResponse:
    """does the password auth step"""

    if password is None:
        password = self.config.password
    if password is None:
        raise ValueError(
            "Password has to be passed to auth_step_password or in self.password!"
        )

    cred_auth = {"step": {"cred": {"password": password}}}
    response = await self.call_post(
        path="/v1/auth",
        json=cred_auth,
    )
    if response.status_code != 200:
        # TODO: write test coverage for this
        logging.debug("Failed to authenticate, response: %s", response.content)
        raise AuthCredFailed("Failed password authentication!")

    result = AuthStepPasswordResponse.parse_obj(response.data)
    result.response = response
    print(f"auth_step_password: {result.dict()}")

    # pull the token out and set it
    if result.state.success is None:
        # TODO: write test coverage for AuthCredFailed
        raise AuthCredFailed
    result.sessionid = result.state.success
    return result

authenticate_password(username=None, password=None) async

authenticates with a username and password, returns the auth token

Source code in kanidm/__init__.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
async def authenticate_password(
    self,
    username: Optional[str] = None,
    password: Optional[str] = None,
) -> AuthStepPasswordResponse:
    """authenticates with a username and password, returns the auth token"""
    if username is None and password is None:
        if self.config.username is None or self.config.password is None:
            raise ValueError(
                "Need username/password to be in caller or class settings before calling authenticate_password"
            )
        username = self.config.username
        password = self.config.password
    if username is None or password is None:
        raise ValueError("Username and Password need to be set somewhere!")

    auth_init = await self.auth_init(username)

    if len(auth_init.state.choose) == 0:
        # there's no mechanisms at all - bail
        # TODO: write test coverage for this
        raise AuthMechUnknown(f"No auth mechanisms for {username}")
    auth_begin = await self.auth_begin(
        method="password",
    )
    # does a little bit of validation
    auth_begin_object = AuthBeginResponse.parse_obj(auth_begin.data)
    auth_begin_object.response = auth_begin
    return await self.auth_step_password(password=password)

call_get(path, headers=None, timeout=None) async

does a get call to the server

Source code in kanidm/__init__.py
153
154
155
156
157
158
159
160
async def call_get(
    self,
    path: str,
    headers: Optional[Dict[str, str]] = None,
    timeout: Optional[int] = None,
) -> ClientResponse:
    """does a get call to the server"""
    return await self._call("GET", path, headers, timeout)

call_post(path, headers=None, json=None, timeout=None) async

does a get call to the server

Source code in kanidm/__init__.py
162
163
164
165
166
167
168
169
170
171
172
173
async def call_post(
    self,
    path: str,
    headers: Optional[Dict[str, str]] = None,
    json: Optional[Dict[str, Any]] = None,
    timeout: Optional[int] = None,
) -> ClientResponse:
    """does a get call to the server"""

    return await self._call(
        method="POST", path=path, headers=headers, json=json, timeout=timeout
    )

get_path_uri(path)

turns a path into a full URI

Source code in kanidm/__init__.py
107
108
109
110
111
def get_path_uri(self, path: str) -> str:
    """turns a path into a full URI"""
    if path.startswith("/"):
        path = path[1:]
    return f"{self.config.uri}{path}"

get_radius_token(username, radius_session_id) async

does the call to the radius token endpoint

Source code in kanidm/__init__.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
async def get_radius_token(
    self, username: str, radius_session_id: str
) -> ClientResponse:
    """does the call to the radius token endpoint"""
    path = f"/v1/account/{username}/_radius/_token"
    headers = {
        "Authorization": f"Bearer {radius_session_id}",
    }
    response = await self.call_get(
        path,
        headers,
    )
    if response.status_code == 404:
        raise NoMatchingEntries(
            f"No user found: '{username}' {response.headers['x-kanidm-opid']}"
        )
    return response

parse_config_data(config_data)

hand it a config dict and it'll configure the client

Source code in kanidm/__init__.py
 97
 98
 99
100
101
102
103
104
105
def parse_config_data(
    self,
    config_data: Dict[str, Any],
) -> None:
    """hand it a config dict and it'll configure the client"""
    try:
        self.config.parse_obj(config_data)
    except ValidationError as validation_error:
        raise ValueError(f"Failed to validate configuration: {validation_error}")

session_header(sessionid=None)

create a headers dict from a session id

Source code in kanidm/__init__.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def session_header(
    self,
    sessionid: Optional[str] = None,
) -> Dict[str, str]:
    """create a headers dict from a session id"""
    # TODO: perhaps allow session_header to take a dict and update it, too?

    if sessionid is not None:
        return {
            "X-KANIDM-AUTH-SESSION-ID": sessionid,
        }

    if self.sessionid is not None:
        return {
            "X-KANIDM-AUTH-SESSION-ID": self.sessionid,
        }
    raise ValueError("Class doesn't have a sessionid stored and none was provided")