Configurable thread count (#2847)

* added `thread_count` configuration for the server
* added `thread_count` to orca

---------

Co-authored-by: Sebastiano Tocci <sebastiano.tocci@proton.me>
This commit is contained in:
Firstyear 2024-06-21 11:47:36 +10:00 committed by GitHub
parent 10e15fd6b3
commit b58370adc8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 234 additions and 159 deletions

View file

@ -135,6 +135,10 @@ pub struct ServerConfig {
/// The path to the "admin" socket, used for local communication when performing certain server control tasks. Default is set on build, based on the system target.
pub adminbindpath: Option<String>,
/// The maximum amount of threads the server will use for the async worker pool. Defaults
/// to std::threads::available_parallelism.
pub thread_count: Option<usize>,
/// Don't touch this unless you know what you're doing!
#[allow(dead_code)]
db_arc_size: Option<usize>,
@ -723,4 +727,10 @@ impl Configuration {
}
}
}
// Update the thread count of this server, only up to the maximum set by self threads
// which is configured with available parallelism.
pub fn update_threads_count(&mut self, threads: usize) {
self.threads = std::cmp::min(self.threads, threads);
}
}

View file

@ -289,31 +289,10 @@ fn main() -> ExitCode {
return ExitCode::FAILURE;
}
// We need enough backtrace depth to find leak sources if they exist.
#[cfg(feature = "dhat-heap")]
let _profiler = dhat::Profiler::builder().trim_backtraces(Some(40)).build();
let maybe_rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("kanidmd-thread-pool")
// .thread_stack_size(8 * 1024 * 1024)
// If we want a hook for thread start.
// .on_thread_start()
// In future, we can stop the whole process if a panic occurs.
// .unhandled_panic(tokio::runtime::UnhandledPanic::ShutdownRuntime)
.build();
let rt = match maybe_rt {
Ok(rt) => rt,
Err(err) => {
eprintln!("CRITICAL: Unable to start runtime! {:?}", err);
return ExitCode::FAILURE;
}
};
rt.block_on(kanidm_main())
}
async fn kanidm_main() -> ExitCode {
// Read CLI args, determine what the user has asked us to do.
let opt = KanidmdParser::parse();
@ -401,6 +380,17 @@ async fn kanidm_main() -> ExitCode {
return ExitCode::FAILURE;
}
let sconfig = match sconfig {
Some(val) => val,
None => {
error!("Somehow you got an empty ServerConfig after error checking?");
return ExitCode::FAILURE;
}
};
// ===========================================================================
// Config ready, start to setup pre-run checks.
// Get info about who we are.
#[cfg(target_family = "unix")]
let (cuid, ceuid) = {
@ -423,14 +413,6 @@ async fn kanidm_main() -> ExitCode {
(cuid, ceuid)
};
let sconfig = match sconfig {
Some(val) => val,
None => {
error!("Somehow you got an empty ServerConfig after error checking?");
return ExitCode::FAILURE;
}
};
if let Some(cfg_path) = opt.config_path() {
#[cfg(target_family = "unix")]
{
@ -534,9 +516,18 @@ async fn kanidm_main() -> ExitCode {
config.update_output_mode(opt.commands.commonopt().output_mode.to_owned().into());
config.update_trust_x_forward_for(sconfig.trust_x_forward_for);
config.update_admin_bind_path(&sconfig.adminbindpath);
config.update_replication_config(sconfig.repl_config.clone());
// We always set threads to 1 unless it's the main server.
if matches!(&opt.commands, KanidmdOpt::Server(_)) {
// If not updated, will default to maximum
if let Some(threads) = sconfig.thread_count {
config.update_threads_count(threads);
}
} else {
config.update_threads_count(1);
};
match &opt.commands {
// we aren't going to touch the DB so we can carry on
KanidmdOpt::ShowReplicationCertificate { .. }
@ -575,6 +566,35 @@ async fn kanidm_main() -> ExitCode {
}
}
let maybe_rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.threads)
.enable_all()
.thread_name("kanidmd-thread-pool")
// .thread_stack_size(8 * 1024 * 1024)
// If we want a hook for thread start.
// .on_thread_start()
// In future, we can stop the whole process if a panic occurs.
// .unhandled_panic(tokio::runtime::UnhandledPanic::ShutdownRuntime)
.build();
let rt = match maybe_rt {
Ok(rt) => rt,
Err(err) => {
eprintln!("CRITICAL: Unable to start runtime! {:?}", err);
return ExitCode::FAILURE;
}
};
rt.block_on(kanidm_main(sconfig, config, opt))
}
/// Build and execute the main server. The ServerConfig are the configuration options
/// that we are processing into the config for the main server.
async fn kanidm_main(
sconfig: ServerConfig,
mut config: Configuration,
opt: KanidmdParser,
) -> ExitCode {
match &opt.commands {
KanidmdOpt::Server(_sopt) | KanidmdOpt::ConfigTest(_sopt) => {
let config_test = matches!(&opt.commands, KanidmdOpt::ConfigTest(_));

View file

@ -50,6 +50,8 @@ pub async fn populate(_client: &KanidmOrcaClient, profile: Profile) -> Result<St
surnames.len()
);
let thread_count = profile.thread_count();
// PHASE 0 - For now, set require MFA off.
let preflight_flags = vec![Flag::DisableAllPersonsMFAPolicy];
@ -148,7 +150,7 @@ pub async fn populate(_client: &KanidmOrcaClient, profile: Profile) -> Result<St
for group in groups.iter_mut() {
// For now, our baseline is 20%. We can adjust this in future per
// role for example.
let baseline = persons.len() / 5;
let baseline = persons.len() / 3;
let inverse = persons.len() - baseline;
// Randomly add extra from the inverse
let extra = Uniform::new(0, inverse);
@ -192,6 +194,7 @@ pub async fn populate(_client: &KanidmOrcaClient, profile: Profile) -> Result<St
groups,
preflight_flags,
persons,
thread_count,
};
Ok(state)

View file

@ -1,4 +1,4 @@
// #![deny(warnings)]
#![deny(warnings)]
#![warn(unused_extern_crates)]
#![allow(clippy::panic)]
#![deny(clippy::unreachable)]
@ -20,7 +20,7 @@ use clap::Parser;
use crate::profile::{Profile, ProfileBuilder};
use tokio::sync::broadcast;
use tokio::{runtime::Runtime, sync::broadcast};
mod error;
mod generate;
@ -48,8 +48,7 @@ impl OrcaOpt {
}
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> ExitCode {
fn main() -> ExitCode {
let opt = OrcaOpt::parse();
if opt.debug() {
@ -66,7 +65,7 @@ async fn main() -> ExitCode {
match opt {
OrcaOpt::Version { .. } => {
println!("orca {}", env!("KANIDM_PKG_VERSION"));
return ExitCode::SUCCESS;
ExitCode::SUCCESS
}
// Build the profile and the test dimensions.
@ -77,6 +76,7 @@ async fn main() -> ExitCode {
control_uri,
seed,
profile_path,
threads,
} => {
// For now I hardcoded some dimensions, but we should prompt
// the user for these later.
@ -89,8 +89,16 @@ async fn main() -> ExitCode {
}
});
let builder =
ProfileBuilder::new(control_uri, admin_password, idm_admin_password).seed(seed);
let extra_uris = Vec::with_capacity(0);
let builder = ProfileBuilder::new(
control_uri,
extra_uris,
admin_password,
idm_admin_password,
threads,
)
.seed(seed);
let profile = match builder.build() {
Ok(p) => p,
@ -100,12 +108,8 @@ async fn main() -> ExitCode {
};
match profile.write_to_path(&profile_path) {
Ok(_) => {
return ExitCode::SUCCESS;
}
Err(_err) => {
return ExitCode::FAILURE;
}
Ok(_) => ExitCode::SUCCESS,
Err(_err) => ExitCode::FAILURE,
}
}
@ -123,15 +127,17 @@ async fn main() -> ExitCode {
info!("Performing conntest of {}", profile.control_uri());
// we're okay with just one thread here
let runtime = build_tokio_runtime(Some(1));
runtime.block_on(async {
match kani::KanidmOrcaClient::new(&profile).await {
Ok(_) => {
info!("success");
return ExitCode::SUCCESS;
}
Err(_err) => {
return ExitCode::FAILURE;
ExitCode::SUCCESS
}
Err(_err) => ExitCode::FAILURE,
}
})
}
// From the profile and test dimensions, generate the data into a state file.
@ -147,6 +153,10 @@ async fn main() -> ExitCode {
}
};
// This is single threaded.
let runtime = build_tokio_runtime(Some(1));
runtime.block_on(async {
let client = match kani::KanidmOrcaClient::new(&profile).await {
Ok(client) => client,
Err(_err) => {
@ -163,13 +173,10 @@ async fn main() -> ExitCode {
};
match state.write_to_path(&state_path) {
Ok(_) => {
return ExitCode::SUCCESS;
}
Err(_err) => {
return ExitCode::FAILURE;
}
Ok(_) => ExitCode::SUCCESS,
Err(_err) => ExitCode::FAILURE,
}
})
}
//
@ -184,14 +191,15 @@ async fn main() -> ExitCode {
}
};
// here we want all threads available to speed up the process.
let runtime = build_tokio_runtime(state.thread_count);
runtime.block_on(async {
match populate::preflight(state).await {
Ok(_) => {
return ExitCode::SUCCESS;
Ok(_) => ExitCode::SUCCESS,
Err(_err) => ExitCode::FAILURE,
}
Err(_err) => {
return ExitCode::FAILURE;
}
};
})
}
// Run the test based on the state file.
@ -205,13 +213,15 @@ async fn main() -> ExitCode {
return ExitCode::FAILURE;
}
};
// here we need to create one less worker compared to the desired amount since we later call `spawn_blocking`, which consumes
// an extra thread all on its own
let runtime = build_tokio_runtime(state.thread_count);
// We have a broadcast channel setup for controlling the state of
// various actors and parts.
//
// We want a small amount of backlog because there are a few possible
// commands that could be sent.
runtime.block_on(async {
let (control_tx, control_rx) = broadcast::channel(8);
let mut run_execute = tokio::task::spawn(run::execute(state, control_rx));
@ -273,6 +283,20 @@ async fn main() -> ExitCode {
}
}
}
})
}
}
};
}
/// Build the tokio runtime with the configured number of threads. If set to None, then the maximum
/// of the system is used.
fn build_tokio_runtime(threads: Option<usize>) -> Runtime {
let mut builder = tokio::runtime::Builder::new_multi_thread();
match threads {
Some(threads) => builder.worker_threads(threads),
None => &mut builder,
}
.enable_all()
.build()
.expect("Failed to build tokio runtime")
}

View file

@ -15,7 +15,7 @@ pub enum TransitionAction {
WriteAttributePersonMail,
ReadSelfAccount,
ReadSelfMemberOf,
SetSelfPassword,
WriteSelfPassword,
}
// Is this the right way? Should transitions/delay be part of the actor model? Should
@ -137,7 +137,7 @@ pub async fn person_set_self_password(
let duration = Instant::now().duration_since(start);
let parsed_result = parse_call_result_into_transition_result_and_event_record(
result,
EventDetail::PersonSelfSetPassword,
EventDetail::PersonSetSelfPassword,
start,
duration,
);

View file

@ -57,7 +57,7 @@ impl ActorModel for ActorBasic {
TransitionAction::ReadSelfMemberOf => {
model::person_get_self_memberof(client, person).await
}
TransitionAction::SetSelfPassword => {
TransitionAction::WriteSelfPassword => {
// I know it's dumb but here we just re-set the same password because it's the simplest thing to do
let Credential::Password { plain } = &person.credential;
model::person_set_self_password(client, person, plain).await
@ -78,11 +78,11 @@ impl ActorBasic {
};
match self.state {
State::Unauthenticated => Transition {
delay: None,
delay: Some(Duration::from_secs(15)),
action: TransitionAction::Login,
},
State::Authenticated => Transition {
delay: Some(Duration::from_millis(1000)),
delay: Some(Duration::from_secs(10)),
action: TransitionAction::PrivilegeReauth,
},
// Since this is the basic model we don't want to get too fancy and do too many things, but since the struct Person
@ -91,20 +91,20 @@ impl ActorBasic {
State::AuthenticatedWithReauth => match roles.first() {
Some(role) => match role {
ActorRole::PeopleSelfMailWrite => Transition {
delay: Some(Duration::from_millis(200)),
delay: Some(Duration::from_secs(5)),
action: TransitionAction::WriteAttributePersonMail,
},
ActorRole::PeopleSelfReadProfile => Transition {
delay: Some(Duration::from_millis(450)),
delay: Some(Duration::from_secs(2)),
action: TransitionAction::ReadSelfAccount,
},
ActorRole::PeopleSelfReadMemberOf => Transition {
delay: Some(Duration::from_millis(500)),
delay: Some(Duration::from_secs(1)),
action: TransitionAction::ReadSelfMemberOf,
},
ActorRole::PeopleSelfSetPassword => Transition {
delay: Some(Duration::from_secs(2)),
action: TransitionAction::SetSelfPassword,
delay: Some(Duration::from_secs(3)),
action: TransitionAction::WriteSelfPassword,
},
ActorRole::PeoplePiiReader | ActorRole::None => logout_transition,
},
@ -128,7 +128,7 @@ impl ActorBasic {
TransitionAction::WriteAttributePersonMail
| TransitionAction::ReadSelfAccount
| TransitionAction::ReadSelfMemberOf
| TransitionAction::SetSelfPassword,
| TransitionAction::WriteSelfPassword,
TransitionResult::Ok,
) => {
self.state = State::AuthenticatedWithReauth;

View file

@ -54,6 +54,10 @@ enum OrcaOpt {
#[clap(long = "profile")]
/// The configuration file path to update (or create)
profile_path: PathBuf,
#[clap(long)]
/// Optional thread count, defaults to maximum available on the system
threads: Option<usize>,
},
#[clap(name = "conntest")]

View file

@ -25,6 +25,7 @@ pub struct Profile {
test_time: Option<u64>,
group_count: u64,
person_count: u64,
thread_count: Option<usize>,
}
impl Profile {
@ -53,6 +54,10 @@ impl Profile {
self.person_count
}
pub fn thread_count(&self) -> Option<usize> {
self.thread_count
}
pub fn seed(&self) -> u64 {
if self.seed < 0 {
self.seed.wrapping_mul(-1) as u64
@ -81,6 +86,7 @@ pub struct ProfileBuilder {
pub test_time: Option<Option<u64>>,
pub group_count: Option<u64>,
pub person_count: Option<u64>,
pub thread_count: Option<usize>,
}
fn validate_u64_bound(value: Option<u64>, default: u64) -> Result<u64, Error> {
@ -97,17 +103,24 @@ fn validate_u64_bound(value: Option<u64>, default: u64) -> Result<u64, Error> {
}
impl ProfileBuilder {
pub fn new(control_uri: String, admin_password: String, idm_admin_password: String) -> Self {
pub fn new(
control_uri: String,
extra_uris: Vec<String>,
admin_password: String,
idm_admin_password: String,
thread_count: Option<usize>,
) -> Self {
ProfileBuilder {
control_uri,
extra_uris,
admin_password,
idm_admin_password,
seed: None,
extra_uris: Vec::new(),
warmup_time: None,
test_time: None,
group_count: None,
person_count: None,
thread_count,
}
}
@ -146,11 +159,12 @@ impl ProfileBuilder {
admin_password,
idm_admin_password,
seed,
extra_uris: _,
extra_uris,
warmup_time,
test_time,
group_count,
person_count,
thread_count,
} = self;
let seed: u64 = seed.unwrap_or_else(|| {
@ -158,8 +172,6 @@ impl ProfileBuilder {
rng.gen()
});
let extra_uris = Vec::new();
let group_count = validate_u64_bound(group_count, DEFAULT_GROUP_COUNT)?;
let person_count = validate_u64_bound(person_count, DEFAULT_PERSON_COUNT)?;
@ -184,6 +196,7 @@ impl ProfileBuilder {
test_time,
group_count,
person_count,
thread_count,
})
}
}

View file

@ -40,13 +40,14 @@ pub struct EventRecord {
pub details: EventDetail,
}
#[derive(Debug)]
pub enum EventDetail {
Login,
Logout,
PersonSetSelfMail,
PersonGetSelfAccount,
PersonGetSelfMemberOf,
PersonSelfSetPassword,
PersonSetSelfPassword,
PersonReauth,
Error,
}

View file

@ -17,7 +17,7 @@ pub struct State {
pub preflight_flags: Vec<Flag>,
pub persons: Vec<Person>,
pub groups: Vec<Group>,
// oauth_clients: Vec<Oauth2Clients>,
pub thread_count: Option<usize>, // oauth_clients: Vec<Oauth2Clients>,
}
impl State {

View file

@ -37,7 +37,7 @@ impl From<EventDetail> for OpKind {
EventDetail::PersonGetSelfMemberOf | EventDetail::PersonGetSelfAccount => {
OpKind::ReadOp
}
EventDetail::PersonSetSelfMail | EventDetail::PersonSelfSetPassword => OpKind::WriteOp,
EventDetail::PersonSetSelfMail | EventDetail::PersonSetSelfPassword => OpKind::WriteOp,
EventDetail::Error
| EventDetail::Login
| EventDetail::Logout