diff --git a/server/core/src/config.rs b/server/core/src/config.rs index f5322f8c0..c302996f2 100644 --- a/server/core/src/config.rs +++ b/server/core/src/config.rs @@ -135,6 +135,10 @@ pub struct ServerConfig { /// The path to the "admin" socket, used for local communication when performing certain server control tasks. Default is set on build, based on the system target. pub adminbindpath: Option, + /// The maximum amount of threads the server will use for the async worker pool. Defaults + /// to std::threads::available_parallelism. + pub thread_count: Option, + /// Don't touch this unless you know what you're doing! #[allow(dead_code)] db_arc_size: Option, @@ -723,4 +727,10 @@ impl Configuration { } } } + + // Update the thread count of this server, only up to the maximum set by self threads + // which is configured with available parallelism. + pub fn update_threads_count(&mut self, threads: usize) { + self.threads = std::cmp::min(self.threads, threads); + } } diff --git a/server/daemon/src/main.rs b/server/daemon/src/main.rs index a74bf9a9d..e28c174a5 100644 --- a/server/daemon/src/main.rs +++ b/server/daemon/src/main.rs @@ -289,31 +289,10 @@ fn main() -> ExitCode { return ExitCode::FAILURE; } + // We need enough backtrace depth to find leak sources if they exist. #[cfg(feature = "dhat-heap")] let _profiler = dhat::Profiler::builder().trim_backtraces(Some(40)).build(); - let maybe_rt = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .thread_name("kanidmd-thread-pool") - // .thread_stack_size(8 * 1024 * 1024) - // If we want a hook for thread start. - // .on_thread_start() - // In future, we can stop the whole process if a panic occurs. - // .unhandled_panic(tokio::runtime::UnhandledPanic::ShutdownRuntime) - .build(); - - let rt = match maybe_rt { - Ok(rt) => rt, - Err(err) => { - eprintln!("CRITICAL: Unable to start runtime! {:?}", err); - return ExitCode::FAILURE; - } - }; - - rt.block_on(kanidm_main()) -} - -async fn kanidm_main() -> ExitCode { // Read CLI args, determine what the user has asked us to do. let opt = KanidmdParser::parse(); @@ -401,6 +380,17 @@ async fn kanidm_main() -> ExitCode { return ExitCode::FAILURE; } + let sconfig = match sconfig { + Some(val) => val, + None => { + error!("Somehow you got an empty ServerConfig after error checking?"); + return ExitCode::FAILURE; + } + }; + + // =========================================================================== + // Config ready, start to setup pre-run checks. + // Get info about who we are. #[cfg(target_family = "unix")] let (cuid, ceuid) = { @@ -423,14 +413,6 @@ async fn kanidm_main() -> ExitCode { (cuid, ceuid) }; - let sconfig = match sconfig { - Some(val) => val, - None => { - error!("Somehow you got an empty ServerConfig after error checking?"); - return ExitCode::FAILURE; - } - }; - if let Some(cfg_path) = opt.config_path() { #[cfg(target_family = "unix")] { @@ -534,9 +516,18 @@ async fn kanidm_main() -> ExitCode { config.update_output_mode(opt.commands.commonopt().output_mode.to_owned().into()); config.update_trust_x_forward_for(sconfig.trust_x_forward_for); config.update_admin_bind_path(&sconfig.adminbindpath); - config.update_replication_config(sconfig.repl_config.clone()); + // We always set threads to 1 unless it's the main server. + if matches!(&opt.commands, KanidmdOpt::Server(_)) { + // If not updated, will default to maximum + if let Some(threads) = sconfig.thread_count { + config.update_threads_count(threads); + } + } else { + config.update_threads_count(1); + }; + match &opt.commands { // we aren't going to touch the DB so we can carry on KanidmdOpt::ShowReplicationCertificate { .. } @@ -575,6 +566,35 @@ async fn kanidm_main() -> ExitCode { } } + let maybe_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(config.threads) + .enable_all() + .thread_name("kanidmd-thread-pool") + // .thread_stack_size(8 * 1024 * 1024) + // If we want a hook for thread start. + // .on_thread_start() + // In future, we can stop the whole process if a panic occurs. + // .unhandled_panic(tokio::runtime::UnhandledPanic::ShutdownRuntime) + .build(); + + let rt = match maybe_rt { + Ok(rt) => rt, + Err(err) => { + eprintln!("CRITICAL: Unable to start runtime! {:?}", err); + return ExitCode::FAILURE; + } + }; + + rt.block_on(kanidm_main(sconfig, config, opt)) +} + +/// Build and execute the main server. The ServerConfig are the configuration options +/// that we are processing into the config for the main server. +async fn kanidm_main( + sconfig: ServerConfig, + mut config: Configuration, + opt: KanidmdParser, +) -> ExitCode { match &opt.commands { KanidmdOpt::Server(_sopt) | KanidmdOpt::ConfigTest(_sopt) => { let config_test = matches!(&opt.commands, KanidmdOpt::ConfigTest(_)); diff --git a/tools/orca/src/generate.rs b/tools/orca/src/generate.rs index f1e6e0c4e..104257d78 100644 --- a/tools/orca/src/generate.rs +++ b/tools/orca/src/generate.rs @@ -50,6 +50,8 @@ pub async fn populate(_client: &KanidmOrcaClient, profile: Profile) -> Result Result Result ExitCode { +fn main() -> ExitCode { let opt = OrcaOpt::parse(); if opt.debug() { @@ -66,7 +65,7 @@ async fn main() -> ExitCode { match opt { OrcaOpt::Version { .. } => { println!("orca {}", env!("KANIDM_PKG_VERSION")); - return ExitCode::SUCCESS; + ExitCode::SUCCESS } // Build the profile and the test dimensions. @@ -77,6 +76,7 @@ async fn main() -> ExitCode { control_uri, seed, profile_path, + threads, } => { // For now I hardcoded some dimensions, but we should prompt // the user for these later. @@ -89,8 +89,16 @@ async fn main() -> ExitCode { } }); - let builder = - ProfileBuilder::new(control_uri, admin_password, idm_admin_password).seed(seed); + let extra_uris = Vec::with_capacity(0); + + let builder = ProfileBuilder::new( + control_uri, + extra_uris, + admin_password, + idm_admin_password, + threads, + ) + .seed(seed); let profile = match builder.build() { Ok(p) => p, @@ -100,12 +108,8 @@ async fn main() -> ExitCode { }; match profile.write_to_path(&profile_path) { - Ok(_) => { - return ExitCode::SUCCESS; - } - Err(_err) => { - return ExitCode::FAILURE; - } + Ok(_) => ExitCode::SUCCESS, + Err(_err) => ExitCode::FAILURE, } } @@ -123,15 +127,17 @@ async fn main() -> ExitCode { info!("Performing conntest of {}", profile.control_uri()); - match kani::KanidmOrcaClient::new(&profile).await { - Ok(_) => { - info!("success"); - return ExitCode::SUCCESS; + // we're okay with just one thread here + let runtime = build_tokio_runtime(Some(1)); + runtime.block_on(async { + match kani::KanidmOrcaClient::new(&profile).await { + Ok(_) => { + info!("success"); + ExitCode::SUCCESS + } + Err(_err) => ExitCode::FAILURE, } - Err(_err) => { - return ExitCode::FAILURE; - } - } + }) } // From the profile and test dimensions, generate the data into a state file. @@ -147,29 +153,30 @@ async fn main() -> ExitCode { } }; - let client = match kani::KanidmOrcaClient::new(&profile).await { - Ok(client) => client, - Err(_err) => { - return ExitCode::FAILURE; - } - }; + // This is single threaded. + let runtime = build_tokio_runtime(Some(1)); - // do-it. - let state = match generate::populate(&client, profile).await { - Ok(s) => s, - Err(_err) => { - return ExitCode::FAILURE; - } - }; + runtime.block_on(async { + let client = match kani::KanidmOrcaClient::new(&profile).await { + Ok(client) => client, + Err(_err) => { + return ExitCode::FAILURE; + } + }; - match state.write_to_path(&state_path) { - Ok(_) => { - return ExitCode::SUCCESS; + // do-it. + let state = match generate::populate(&client, profile).await { + Ok(s) => s, + Err(_err) => { + return ExitCode::FAILURE; + } + }; + + match state.write_to_path(&state_path) { + Ok(_) => ExitCode::SUCCESS, + Err(_err) => ExitCode::FAILURE, } - Err(_err) => { - return ExitCode::FAILURE; - } - } + }) } // @@ -184,14 +191,15 @@ async fn main() -> ExitCode { } }; - match populate::preflight(state).await { - Ok(_) => { - return ExitCode::SUCCESS; + // here we want all threads available to speed up the process. + let runtime = build_tokio_runtime(state.thread_count); + + runtime.block_on(async { + match populate::preflight(state).await { + Ok(_) => ExitCode::SUCCESS, + Err(_err) => ExitCode::FAILURE, } - Err(_err) => { - return ExitCode::FAILURE; - } - }; + }) } // Run the test based on the state file. @@ -205,74 +213,90 @@ async fn main() -> ExitCode { return ExitCode::FAILURE; } }; - + // here we need to create one less worker compared to the desired amount since we later call `spawn_blocking`, which consumes + // an extra thread all on its own + let runtime = build_tokio_runtime(state.thread_count); // We have a broadcast channel setup for controlling the state of // various actors and parts. // // We want a small amount of backlog because there are a few possible // commands that could be sent. + runtime.block_on(async { + let (control_tx, control_rx) = broadcast::channel(8); - let (control_tx, control_rx) = broadcast::channel(8); + let mut run_execute = tokio::task::spawn(run::execute(state, control_rx)); - let mut run_execute = tokio::task::spawn(run::execute(state, control_rx)); - - loop { - tokio::select! { - // Note that we pass a &mut handle here because we want the future to join - // but not be consumed each loop iteration. - result = &mut run_execute => { - match result { - Ok(_) => { - return ExitCode::SUCCESS; - } - Err(_err) => { - return ExitCode::FAILURE; - } - }; - } - // Signal handling. - Ok(()) = tokio::signal::ctrl_c() => { - info!("Stopping Task ..."); - let _ = control_tx.send(run::Signal::Stop); - } - Some(()) = async move { - let sigterm = tokio::signal::unix::SignalKind::terminate(); - #[allow(clippy::unwrap_used)] - tokio::signal::unix::signal(sigterm).unwrap().recv().await - } => { - // Kill it with fire I guess. - return ExitCode::FAILURE; - } - Some(()) = async move { - let sigterm = tokio::signal::unix::SignalKind::alarm(); - #[allow(clippy::unwrap_used)] - tokio::signal::unix::signal(sigterm).unwrap().recv().await - } => { - // Ignore - } - Some(()) = async move { - let sigterm = tokio::signal::unix::SignalKind::hangup(); - #[allow(clippy::unwrap_used)] - tokio::signal::unix::signal(sigterm).unwrap().recv().await - } => { - // Ignore - } - Some(()) = async move { - let sigterm = tokio::signal::unix::SignalKind::user_defined1(); - #[allow(clippy::unwrap_used)] - tokio::signal::unix::signal(sigterm).unwrap().recv().await - } => { - // Ignore - } - Some(()) = async move { - let sigterm = tokio::signal::unix::SignalKind::user_defined2(); - #[allow(clippy::unwrap_used)] - tokio::signal::unix::signal(sigterm).unwrap().recv().await - } => { - // Ignore + loop { + tokio::select! { + // Note that we pass a &mut handle here because we want the future to join + // but not be consumed each loop iteration. + result = &mut run_execute => { + match result { + Ok(_) => { + return ExitCode::SUCCESS; + } + Err(_err) => { + return ExitCode::FAILURE; + } + }; + } + // Signal handling. + Ok(()) = tokio::signal::ctrl_c() => { + info!("Stopping Task ..."); + let _ = control_tx.send(run::Signal::Stop); + } + Some(()) = async move { + let sigterm = tokio::signal::unix::SignalKind::terminate(); + #[allow(clippy::unwrap_used)] + tokio::signal::unix::signal(sigterm).unwrap().recv().await + } => { + // Kill it with fire I guess. + return ExitCode::FAILURE; + } + Some(()) = async move { + let sigterm = tokio::signal::unix::SignalKind::alarm(); + #[allow(clippy::unwrap_used)] + tokio::signal::unix::signal(sigterm).unwrap().recv().await + } => { + // Ignore + } + Some(()) = async move { + let sigterm = tokio::signal::unix::SignalKind::hangup(); + #[allow(clippy::unwrap_used)] + tokio::signal::unix::signal(sigterm).unwrap().recv().await + } => { + // Ignore + } + Some(()) = async move { + let sigterm = tokio::signal::unix::SignalKind::user_defined1(); + #[allow(clippy::unwrap_used)] + tokio::signal::unix::signal(sigterm).unwrap().recv().await + } => { + // Ignore + } + Some(()) = async move { + let sigterm = tokio::signal::unix::SignalKind::user_defined2(); + #[allow(clippy::unwrap_used)] + tokio::signal::unix::signal(sigterm).unwrap().recv().await + } => { + // Ignore + } } } - } + }) } - }; + } +} + +/// Build the tokio runtime with the configured number of threads. If set to None, then the maximum +/// of the system is used. +fn build_tokio_runtime(threads: Option) -> Runtime { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + match threads { + Some(threads) => builder.worker_threads(threads), + None => &mut builder, + } + .enable_all() + .build() + .expect("Failed to build tokio runtime") } diff --git a/tools/orca/src/model.rs b/tools/orca/src/model.rs index c70136881..a42263126 100644 --- a/tools/orca/src/model.rs +++ b/tools/orca/src/model.rs @@ -15,7 +15,7 @@ pub enum TransitionAction { WriteAttributePersonMail, ReadSelfAccount, ReadSelfMemberOf, - SetSelfPassword, + WriteSelfPassword, } // Is this the right way? Should transitions/delay be part of the actor model? Should @@ -137,7 +137,7 @@ pub async fn person_set_self_password( let duration = Instant::now().duration_since(start); let parsed_result = parse_call_result_into_transition_result_and_event_record( result, - EventDetail::PersonSelfSetPassword, + EventDetail::PersonSetSelfPassword, start, duration, ); diff --git a/tools/orca/src/models/basic.rs b/tools/orca/src/models/basic.rs index a2f9aa7d1..61fc56e60 100644 --- a/tools/orca/src/models/basic.rs +++ b/tools/orca/src/models/basic.rs @@ -57,7 +57,7 @@ impl ActorModel for ActorBasic { TransitionAction::ReadSelfMemberOf => { model::person_get_self_memberof(client, person).await } - TransitionAction::SetSelfPassword => { + TransitionAction::WriteSelfPassword => { // I know it's dumb but here we just re-set the same password because it's the simplest thing to do let Credential::Password { plain } = &person.credential; model::person_set_self_password(client, person, plain).await @@ -78,11 +78,11 @@ impl ActorBasic { }; match self.state { State::Unauthenticated => Transition { - delay: None, + delay: Some(Duration::from_secs(15)), action: TransitionAction::Login, }, State::Authenticated => Transition { - delay: Some(Duration::from_millis(1000)), + delay: Some(Duration::from_secs(10)), action: TransitionAction::PrivilegeReauth, }, // Since this is the basic model we don't want to get too fancy and do too many things, but since the struct Person @@ -91,20 +91,20 @@ impl ActorBasic { State::AuthenticatedWithReauth => match roles.first() { Some(role) => match role { ActorRole::PeopleSelfMailWrite => Transition { - delay: Some(Duration::from_millis(200)), + delay: Some(Duration::from_secs(5)), action: TransitionAction::WriteAttributePersonMail, }, ActorRole::PeopleSelfReadProfile => Transition { - delay: Some(Duration::from_millis(450)), + delay: Some(Duration::from_secs(2)), action: TransitionAction::ReadSelfAccount, }, ActorRole::PeopleSelfReadMemberOf => Transition { - delay: Some(Duration::from_millis(500)), + delay: Some(Duration::from_secs(1)), action: TransitionAction::ReadSelfMemberOf, }, ActorRole::PeopleSelfSetPassword => Transition { - delay: Some(Duration::from_secs(2)), - action: TransitionAction::SetSelfPassword, + delay: Some(Duration::from_secs(3)), + action: TransitionAction::WriteSelfPassword, }, ActorRole::PeoplePiiReader | ActorRole::None => logout_transition, }, @@ -128,7 +128,7 @@ impl ActorBasic { TransitionAction::WriteAttributePersonMail | TransitionAction::ReadSelfAccount | TransitionAction::ReadSelfMemberOf - | TransitionAction::SetSelfPassword, + | TransitionAction::WriteSelfPassword, TransitionResult::Ok, ) => { self.state = State::AuthenticatedWithReauth; diff --git a/tools/orca/src/opt.rs b/tools/orca/src/opt.rs index 55a6743f3..d00fbcda1 100644 --- a/tools/orca/src/opt.rs +++ b/tools/orca/src/opt.rs @@ -54,6 +54,10 @@ enum OrcaOpt { #[clap(long = "profile")] /// The configuration file path to update (or create) profile_path: PathBuf, + + #[clap(long)] + /// Optional thread count, defaults to maximum available on the system + threads: Option, }, #[clap(name = "conntest")] diff --git a/tools/orca/src/profile.rs b/tools/orca/src/profile.rs index c274302f1..6073a04a7 100644 --- a/tools/orca/src/profile.rs +++ b/tools/orca/src/profile.rs @@ -25,6 +25,7 @@ pub struct Profile { test_time: Option, group_count: u64, person_count: u64, + thread_count: Option, } impl Profile { @@ -53,6 +54,10 @@ impl Profile { self.person_count } + pub fn thread_count(&self) -> Option { + self.thread_count + } + pub fn seed(&self) -> u64 { if self.seed < 0 { self.seed.wrapping_mul(-1) as u64 @@ -81,6 +86,7 @@ pub struct ProfileBuilder { pub test_time: Option>, pub group_count: Option, pub person_count: Option, + pub thread_count: Option, } fn validate_u64_bound(value: Option, default: u64) -> Result { @@ -97,17 +103,24 @@ fn validate_u64_bound(value: Option, default: u64) -> Result { } impl ProfileBuilder { - pub fn new(control_uri: String, admin_password: String, idm_admin_password: String) -> Self { + pub fn new( + control_uri: String, + extra_uris: Vec, + admin_password: String, + idm_admin_password: String, + thread_count: Option, + ) -> Self { ProfileBuilder { control_uri, + extra_uris, admin_password, idm_admin_password, seed: None, - extra_uris: Vec::new(), warmup_time: None, test_time: None, group_count: None, person_count: None, + thread_count, } } @@ -146,11 +159,12 @@ impl ProfileBuilder { admin_password, idm_admin_password, seed, - extra_uris: _, + extra_uris, warmup_time, test_time, group_count, person_count, + thread_count, } = self; let seed: u64 = seed.unwrap_or_else(|| { @@ -158,8 +172,6 @@ impl ProfileBuilder { rng.gen() }); - let extra_uris = Vec::new(); - let group_count = validate_u64_bound(group_count, DEFAULT_GROUP_COUNT)?; let person_count = validate_u64_bound(person_count, DEFAULT_PERSON_COUNT)?; @@ -184,6 +196,7 @@ impl ProfileBuilder { test_time, group_count, person_count, + thread_count, }) } } diff --git a/tools/orca/src/run.rs b/tools/orca/src/run.rs index 1fc20af20..5d05cb537 100644 --- a/tools/orca/src/run.rs +++ b/tools/orca/src/run.rs @@ -40,13 +40,14 @@ pub struct EventRecord { pub details: EventDetail, } +#[derive(Debug)] pub enum EventDetail { Login, Logout, PersonSetSelfMail, PersonGetSelfAccount, PersonGetSelfMemberOf, - PersonSelfSetPassword, + PersonSetSelfPassword, PersonReauth, Error, } diff --git a/tools/orca/src/state.rs b/tools/orca/src/state.rs index 5817e80d3..d3d489cbc 100644 --- a/tools/orca/src/state.rs +++ b/tools/orca/src/state.rs @@ -17,7 +17,7 @@ pub struct State { pub preflight_flags: Vec, pub persons: Vec, pub groups: Vec, - // oauth_clients: Vec, + pub thread_count: Option, // oauth_clients: Vec, } impl State { diff --git a/tools/orca/src/stats.rs b/tools/orca/src/stats.rs index ffeda41a3..e75693060 100644 --- a/tools/orca/src/stats.rs +++ b/tools/orca/src/stats.rs @@ -37,7 +37,7 @@ impl From for OpKind { EventDetail::PersonGetSelfMemberOf | EventDetail::PersonGetSelfAccount => { OpKind::ReadOp } - EventDetail::PersonSetSelfMail | EventDetail::PersonSelfSetPassword => OpKind::WriteOp, + EventDetail::PersonSetSelfMail | EventDetail::PersonSetSelfPassword => OpKind::WriteOp, EventDetail::Error | EventDetail::Login | EventDetail::Logout