""" kanidm RADIUS module """ import asyncio from functools import reduce import json import logging import os from pathlib import Path import sys from typing import Any, Dict, Optional, Union import aiohttp from kanidm import KanidmClient from kanidm.types import AuthStepPasswordResponse from kanidm.utils import load_config from kanidm.exceptions import NoMatchingEntries from . import radiusd logging.basicConfig( level=logging.DEBUG, stream=sys.stderr, ) # the list of places to try config_paths = [ os.getenv("KANIDM_RLM_CONFIG", "/data/kanidm"), # container goodness "~/.config/kanidm", # for a user "/etc/kanidm/kanidm", # system-wide ] CONFIG_PATH = None for config_file_path in config_paths: CONFIG_PATH = Path(config_file_path).expanduser().resolve() if CONFIG_PATH.exists(): break if (CONFIG_PATH is None) or (not CONFIG_PATH.exists()): logging.error("Failed to find configuration file, checked (%s), quitting!", config_paths) sys.exit(1) config = load_config(str(CONFIG_PATH)) COOKIE_JAR = aiohttp.CookieJar() KANIDM_CLIENT = KanidmClient(config_file=CONFIG_PATH) def authenticate( acct: str, password: str, kanidm_client: KanidmClient = KANIDM_CLIENT, ) -> Union[int, AuthStepPasswordResponse]: """ authenticate the RADIUS service account to Kanidm """ logging.error("authenticate - %s:%s", acct, password) try: loop = asyncio.get_event_loop() with aiohttp.client.ClientSession(cookie_jar=COOKIE_JAR) as session: kanidm_client.session = session return loop.run_until_complete(kanidm_client.authenticate_password( username=acct, password=password )) except Exception as error_message: #pylint: disable=broad-except logging.error("Failed to run kanidm.authenticate_password: %s", error_message) return radiusd.RLM_MODULE_FAIL async def _get_radius_token( username: Optional[str]=None, kanidm_client: KanidmClient=KANIDM_CLIENT, ) -> Optional[Dict[str, Any]]: if username is None: raise ValueError("Didn't get a username for _get_radius_token") # authenticate as the radius service account logging.debug("Authenticating kanidm radius service account") radius_auth_response = await kanidm_client.authenticate_password() logging.debug("Getting RADIUS token for %s", username) response = await kanidm_client.get_radius_token( username=username, radius_session_id = radius_auth_response.sessionid, ) logging.debug("Got radius token for %s", username) if response.status_code != 200: logging.error("got response status code: %s", response.status_code) logging.error("Response content: %s", response.json()) raise Exception("Failed to get RadiusAuthToken") logging.debug("Success getting RADIUS token: %s", response.json()) print(response.data) return response.data def check_vlan( acc: int, group: Dict[str, str], kanidm_client: Optional[KanidmClient] = None, ) -> int: """ checks if a vlan is in the config, acc is the default vlan """ logging.debug("acc=%s", acc) if kanidm_client is None: kanidm_client = KANIDM_CLIENT # raise ValueError("Need to pass this a kanidm_client") for radius_group in kanidm_client.config.radius_groups: logging.debug("Checking '%s' radius_group against group %s", radius_group, group['name']) if radius_group.name == group['name']: return radius_group.vlan #if CONFIG.has_section(f"group.{group['name']}"): # if CONFIG.has_option(f"group.{group['name']}", "vlan"): # vlan = CONFIG.getint(f"group.{group['name']}", "vlan") # logging.debug("assigning vlan %s from group %s", vlan, group) # return vlan logging.debug("returning default vlan: %s", acc) return acc def instantiate(_: Any) -> Any: """ start up radiusd """ logging.info("Starting up!") return radiusd.RLM_MODULE_OK def authorize( args: Any=Dict[Any,Any], kanidm_client: KanidmClient=KANIDM_CLIENT, ) -> Any: """ does the kanidm authorize step """ logging.info('kanidm python module called') # args comes in like this # ( # ('User-Name', ''), # ('User-Password', ''), # ('NAS-IP-Address', ''), # ('NAS-Port', '