mirror of
https://github.com/kanidm/kanidm.git
synced 2025-05-22 08:53:57 +02:00
Stats collection improvements and a bunch of other stuff (#2820)
This commit is contained in:
parent
f39dd7d7a2
commit
073ed403ed
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -4330,6 +4330,7 @@ name = "orca"
|
|||
version = "1.3.0-dev"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"clap",
|
||||
"crossbeam",
|
||||
"csv",
|
||||
|
|
|
@ -19,6 +19,7 @@ doctest = false
|
|||
|
||||
[dependencies]
|
||||
async-trait = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
clap = { workspace = true }
|
||||
crossbeam = { workspace = true }
|
||||
csv = { workspace = true }
|
||||
|
|
|
@ -58,6 +58,11 @@ pub async fn populate(_client: &KanidmOrcaClient, profile: Profile) -> Result<St
|
|||
// PHASE 2 - generate groups for integration access, assign roles to groups.
|
||||
// These decide what each person is supposed to do with their life.
|
||||
let mut groups = vec![
|
||||
Group {
|
||||
name: "role_people_self_set_password".to_string(),
|
||||
role: ActorRole::PeopleSelfSetPassword,
|
||||
..Default::default()
|
||||
},
|
||||
Group {
|
||||
name: "role_people_pii_reader".to_string(),
|
||||
role: ActorRole::PeoplePiiReader,
|
||||
|
@ -65,7 +70,7 @@ pub async fn populate(_client: &KanidmOrcaClient, profile: Profile) -> Result<St
|
|||
},
|
||||
Group {
|
||||
name: "role_people_self_write_mail".to_string(),
|
||||
role: ActorRole::PeopleSelfWriteMail,
|
||||
role: ActorRole::PeopleSelfMailWrite,
|
||||
..Default::default()
|
||||
},
|
||||
Group {
|
||||
|
|
|
@ -15,6 +15,7 @@ pub enum TransitionAction {
|
|||
WriteAttributePersonMail,
|
||||
ReadSelfAccount,
|
||||
ReadSelfMemberOf,
|
||||
SetSelfPassword,
|
||||
}
|
||||
|
||||
// Is this the right way? Should transitions/delay be part of the actor model? Should
|
||||
|
@ -45,9 +46,10 @@ pub enum ActorRole {
|
|||
#[default]
|
||||
None,
|
||||
PeoplePiiReader,
|
||||
PeopleSelfWriteMail,
|
||||
PeopleSelfMailWrite,
|
||||
PeopleSelfReadProfile,
|
||||
PeopleSelfReadMemberOf,
|
||||
PeopleSelfSetPassword,
|
||||
}
|
||||
|
||||
impl ActorRole {
|
||||
|
@ -55,9 +57,10 @@ impl ActorRole {
|
|||
match self {
|
||||
ActorRole::None
|
||||
| ActorRole::PeopleSelfReadProfile
|
||||
| ActorRole::PeopleSelfReadMemberOf => None,
|
||||
| ActorRole::PeopleSelfReadMemberOf
|
||||
| ActorRole::PeopleSelfSetPassword => None,
|
||||
ActorRole::PeoplePiiReader => Some(&["idm_people_pii_read"]),
|
||||
ActorRole::PeopleSelfWriteMail => Some(&["idm_people_self_write_mail"]),
|
||||
ActorRole::PeopleSelfMailWrite => Some(&["idm_people_self_mail_write"]),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -118,6 +121,30 @@ pub async fn person_set_self_mail(
|
|||
Ok(parsed_result)
|
||||
}
|
||||
|
||||
pub async fn person_set_self_password(
|
||||
client: &KanidmClient,
|
||||
person: &Person,
|
||||
pw: &str,
|
||||
) -> Result<(TransitionResult, EventRecord), Error> {
|
||||
// Should we measure the time of each call rather than the time with multiple calls?
|
||||
let person_username = person.username.as_str();
|
||||
|
||||
let start = Instant::now();
|
||||
let result = client
|
||||
.idm_person_account_primary_credential_set_password(person_username, pw)
|
||||
.await;
|
||||
|
||||
let duration = Instant::now().duration_since(start);
|
||||
let parsed_result = parse_call_result_into_transition_result_and_event_record(
|
||||
result,
|
||||
EventDetail::PersonSelfSetPassword,
|
||||
start,
|
||||
duration,
|
||||
);
|
||||
|
||||
Ok(parsed_result)
|
||||
}
|
||||
|
||||
pub async fn privilege_reauth(
|
||||
client: &KanidmClient,
|
||||
person: &Person,
|
||||
|
|
|
@ -57,6 +57,11 @@ impl ActorModel for ActorBasic {
|
|||
TransitionAction::ReadSelfMemberOf => {
|
||||
model::person_get_self_memberof(client, person).await
|
||||
}
|
||||
TransitionAction::SetSelfPassword => {
|
||||
// I know it's dumb but here we just re-set the same password because it's the simplest thing to do
|
||||
let Credential::Password { plain } = &person.credential;
|
||||
model::person_set_self_password(client, person, plain).await
|
||||
}
|
||||
}?;
|
||||
|
||||
self.next_state(transition.action, result);
|
||||
|
@ -76,9 +81,6 @@ impl ActorBasic {
|
|||
delay: None,
|
||||
action: TransitionAction::Login,
|
||||
},
|
||||
// Doing some tests with more people I noticed that if the delay is too low somehow??! the server could start processing the reauth request before
|
||||
// the auth one, yielding an error,
|
||||
// TODO!!: understand why that happens
|
||||
State::Authenticated => Transition {
|
||||
delay: Some(Duration::from_millis(1000)),
|
||||
action: TransitionAction::PrivilegeReauth,
|
||||
|
@ -88,18 +90,22 @@ impl ActorBasic {
|
|||
// (which is always deterministic thanks to the rng seed used to choose the roles)
|
||||
State::AuthenticatedWithReauth => match roles.first() {
|
||||
Some(role) => match role {
|
||||
ActorRole::PeopleSelfWriteMail => Transition {
|
||||
ActorRole::PeopleSelfMailWrite => Transition {
|
||||
delay: Some(Duration::from_millis(200)),
|
||||
action: TransitionAction::WriteAttributePersonMail,
|
||||
},
|
||||
ActorRole::PeopleSelfReadProfile => Transition {
|
||||
delay: Some(Duration::from_millis(150)),
|
||||
delay: Some(Duration::from_millis(450)),
|
||||
action: TransitionAction::ReadSelfAccount,
|
||||
},
|
||||
ActorRole::PeopleSelfReadMemberOf => Transition {
|
||||
delay: Some(Duration::from_millis(330)),
|
||||
delay: Some(Duration::from_millis(500)),
|
||||
action: TransitionAction::ReadSelfMemberOf,
|
||||
},
|
||||
ActorRole::PeopleSelfSetPassword => Transition {
|
||||
delay: Some(Duration::from_secs(2)),
|
||||
action: TransitionAction::SetSelfPassword,
|
||||
},
|
||||
ActorRole::PeoplePiiReader | ActorRole::None => logout_transition,
|
||||
},
|
||||
None => logout_transition,
|
||||
|
@ -121,7 +127,8 @@ impl ActorBasic {
|
|||
State::AuthenticatedWithReauth,
|
||||
TransitionAction::WriteAttributePersonMail
|
||||
| TransitionAction::ReadSelfAccount
|
||||
| TransitionAction::ReadSelfMemberOf,
|
||||
| TransitionAction::ReadSelfMemberOf
|
||||
| TransitionAction::SetSelfPassword,
|
||||
TransitionResult::Ok,
|
||||
) => {
|
||||
self.state = State::AuthenticatedWithReauth;
|
||||
|
|
|
@ -26,7 +26,7 @@ async fn actor_person(
|
|||
|
||||
while let Err(broadcast::error::TryRecvError::Empty) = actor_rx.try_recv() {
|
||||
let event = model.transition(&client, &person).await?;
|
||||
|
||||
debug!("Pushed event to queue!");
|
||||
stats_queue.push(event);
|
||||
}
|
||||
|
||||
|
@ -46,6 +46,7 @@ pub enum EventDetail {
|
|||
PersonSetSelfMail,
|
||||
PersonGetSelfAccount,
|
||||
PersonGetSelfMemberOf,
|
||||
PersonSelfSetPassword,
|
||||
PersonReauth,
|
||||
Error,
|
||||
}
|
||||
|
@ -128,7 +129,9 @@ pub async fn execute(state: State, control_rx: broadcast::Receiver<Signal>) -> R
|
|||
let c_stats_queue = stats_queue.clone();
|
||||
let c_stats_ctrl = stats_ctrl.clone();
|
||||
|
||||
let mut dyn_data_collector = BasicStatistics::new();
|
||||
let node_count = 1 + state.profile.extra_uris().len();
|
||||
let mut dyn_data_collector =
|
||||
BasicStatistics::new(state.persons.len(), state.groups.len(), node_count);
|
||||
|
||||
let stats_task =
|
||||
tokio::task::spawn_blocking(move || dyn_data_collector.run(c_stats_queue, c_stats_ctrl));
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
use crate::error::Error;
|
||||
use crate::run::EventRecord;
|
||||
use crate::run::{EventDetail, EventRecord};
|
||||
use chrono::Local;
|
||||
use crossbeam::queue::{ArrayQueue, SegQueue};
|
||||
use csv::Writer;
|
||||
use serde::Serialize;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
|
@ -22,12 +25,44 @@ pub trait DataCollector {
|
|||
) -> Result<(), Error>;
|
||||
}
|
||||
|
||||
pub struct BasicStatistics {}
|
||||
enum OpKind {
|
||||
WriteOp,
|
||||
ReadOp,
|
||||
Auth, //TODO! does this make sense?
|
||||
}
|
||||
|
||||
impl From<EventDetail> for OpKind {
|
||||
fn from(value: EventDetail) -> Self {
|
||||
match value {
|
||||
EventDetail::PersonGetSelfMemberOf | EventDetail::PersonGetSelfAccount => {
|
||||
OpKind::ReadOp
|
||||
}
|
||||
EventDetail::PersonSetSelfMail | EventDetail::PersonSelfSetPassword => OpKind::WriteOp,
|
||||
EventDetail::Error
|
||||
| EventDetail::Login
|
||||
| EventDetail::Logout
|
||||
| EventDetail::PersonReauth => OpKind::Auth,
|
||||
}
|
||||
}
|
||||
}
|
||||
pub struct BasicStatistics {
|
||||
person_count: usize,
|
||||
group_count: usize,
|
||||
node_count: usize,
|
||||
}
|
||||
|
||||
impl BasicStatistics {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new() -> Box<dyn DataCollector + Send> {
|
||||
Box::new(BasicStatistics {})
|
||||
pub fn new(
|
||||
person_count: usize,
|
||||
group_count: usize,
|
||||
node_count: usize,
|
||||
) -> Box<dyn DataCollector + Send> {
|
||||
Box::new(BasicStatistics {
|
||||
person_count,
|
||||
group_count,
|
||||
node_count,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -80,8 +115,8 @@ impl DataCollector for BasicStatistics {
|
|||
|
||||
info!("start statistics processing ...");
|
||||
|
||||
let mut count: usize = 0;
|
||||
let mut optimes = Vec::new();
|
||||
let mut readop_times = Vec::new();
|
||||
let mut writeop_times = Vec::new();
|
||||
|
||||
// We will drain this now.
|
||||
while let Some(event_record) = stats_queue.pop() {
|
||||
|
@ -90,23 +125,117 @@ impl DataCollector for BasicStatistics {
|
|||
continue;
|
||||
}
|
||||
|
||||
count += 1;
|
||||
|
||||
optimes.push(event_record.duration.as_secs_f64());
|
||||
match OpKind::from(event_record.details) {
|
||||
OpKind::ReadOp => {
|
||||
readop_times.push(event_record.duration.as_secs_f64());
|
||||
}
|
||||
OpKind::WriteOp => {
|
||||
writeop_times.push(event_record.duration.as_secs_f64());
|
||||
}
|
||||
OpKind::Auth => {}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Received {} events", count);
|
||||
if readop_times.is_empty() && writeop_times.is_empty() {
|
||||
error!("For some weird reason no read and write operations were recorded, exiting...");
|
||||
return Err(Error::InvalidState);
|
||||
}
|
||||
|
||||
let distrib: Normal<f64> = Normal::from_data(&optimes);
|
||||
let sd = distrib.variance().sqrt();
|
||||
if writeop_times.is_empty() {
|
||||
error!("For some weird reason no write operations were recorded, exiting...");
|
||||
return Err(Error::InvalidState);
|
||||
}
|
||||
|
||||
info!("mean: {} seconds", distrib.mean());
|
||||
info!("variance: {}", distrib.variance());
|
||||
info!("SD: {} seconds", sd);
|
||||
info!("95%: {}", distrib.mean() + (2.0 * sd));
|
||||
if readop_times.is_empty() {
|
||||
error!("For some weird reason no read operations were recorded, exiting...");
|
||||
return Err(Error::InvalidState);
|
||||
}
|
||||
|
||||
let stats = StatsContainer::new(
|
||||
&readop_times,
|
||||
&writeop_times,
|
||||
self.node_count,
|
||||
self.person_count,
|
||||
self.group_count,
|
||||
);
|
||||
|
||||
info!(
|
||||
"Server configuration was: {} nodes, {} users and {} groups",
|
||||
self.node_count, self.person_count, self.group_count
|
||||
);
|
||||
|
||||
info!("Received {} read events", stats.read_events);
|
||||
|
||||
info!("mean: {} seconds", stats.read_mean);
|
||||
info!("variance: {}", stats.read_variance);
|
||||
info!("SD: {} seconds", stats.read_sd);
|
||||
info!("95%: {}", stats.read_95);
|
||||
|
||||
info!("Received {} write events", stats.write_events);
|
||||
|
||||
info!("mean: {} seconds", stats.write_mean);
|
||||
info!("variance: {}", stats.write_variance);
|
||||
info!("SD: {} seconds", stats.write_sd);
|
||||
info!("95%: {}", stats.write_95);
|
||||
|
||||
let now = Local::now();
|
||||
let filepath = format!("orca-run-{}.csv", now.to_rfc3339());
|
||||
|
||||
info!("Now saving stats as '{filepath}'");
|
||||
|
||||
let mut wrt = Writer::from_path(filepath).map_err(|_| Error::Io)?;
|
||||
wrt.serialize(stats).map_err(|_| Error::Io)?;
|
||||
|
||||
debug!("Ended statistics collector");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct StatsContainer {
|
||||
node_count: usize,
|
||||
person_count: usize,
|
||||
group_count: usize,
|
||||
read_events: usize,
|
||||
read_sd: f64,
|
||||
read_mean: f64,
|
||||
read_variance: f64,
|
||||
read_95: f64,
|
||||
write_events: usize,
|
||||
write_sd: f64,
|
||||
write_mean: f64,
|
||||
write_variance: f64,
|
||||
write_95: f64,
|
||||
}
|
||||
|
||||
impl StatsContainer {
|
||||
fn new(
|
||||
readop_times: &Vec<f64>,
|
||||
writeop_times: &Vec<f64>,
|
||||
node_count: usize,
|
||||
person_count: usize,
|
||||
group_count: usize,
|
||||
) -> Self {
|
||||
let readop_distrib: Normal<f64> = Normal::from_data(readop_times);
|
||||
let read_sd = readop_distrib.variance().sqrt();
|
||||
let writeop_distrib: Normal<f64> = Normal::from_data(writeop_times);
|
||||
let write_sd = writeop_distrib.variance().sqrt();
|
||||
|
||||
StatsContainer {
|
||||
person_count,
|
||||
group_count,
|
||||
node_count,
|
||||
read_events: readop_times.len(),
|
||||
read_sd: readop_distrib.variance().sqrt(),
|
||||
read_mean: readop_distrib.mean(),
|
||||
read_variance: readop_distrib.variance(),
|
||||
read_95: readop_distrib.mean() + (2.0 * read_sd),
|
||||
write_events: writeop_times.len(),
|
||||
write_sd: writeop_distrib.variance().sqrt(),
|
||||
write_mean: writeop_distrib.mean(),
|
||||
write_variance: writeop_distrib.variance(),
|
||||
write_95: writeop_distrib.mean() + (2.0 * write_sd),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue