diff --git a/Cargo.lock b/Cargo.lock index caa20414a..9a6371809 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -99,9 +99,9 @@ dependencies = [ [[package]] name = "anstyle-query" -version = "1.0.3" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a64c907d4e79225ac72e2a354c9ce84d50ebb4586dee56c82b3ee73004f537f5" +checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391" dependencies = [ "windows-sys 0.52.0", ] @@ -202,9 +202,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.10" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c90a406b4495d129f00461241616194cb8a032c8d1c53c657f0961d5f8e0498" +checksum = "cd066d0b4ef8ecb03a55319dc13aa6910616d0f44008a045bb1835af830abff5" dependencies = [ "flate2", "futures-core", @@ -310,7 +310,7 @@ dependencies = [ "futures-util", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.28", + "hyper 0.14.29", "itoa", "matchit", "memchr", @@ -470,9 +470,9 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.72" +version = "0.3.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17c6a35df3749d2e8bb1b7b21a976d82b15548788d2735b9d82f329268f71a11" +checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a" dependencies = [ "addr2line", "cc", @@ -654,7 +654,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05efc5cfd9110c8416e471df0e96702d58690178e206e61b7173706673c93706" dependencies = [ "memchr", - "regex-automata 0.4.6", + "regex-automata 0.4.7", "serde", ] @@ -705,9 +705,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.98" +version = "1.0.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41c270e7540d725e65ac7f1b212ac8ce349719624d7bcff99f8e2e488e8cf03f" +checksum = "96c51067fd44124faa7f870b4b1c969379ad32b2ba805aa959430ceaa384f695" [[package]] name = "cexpr" @@ -834,9 +834,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" +checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" [[package]] name = "clru" @@ -893,9 +893,9 @@ dependencies = [ [[package]] name = "concread" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23bef63c371d1b3da7e61e7b72e5757f070131a399f2eb60edc2d8bb8102249a" +checksum = "732590a8115498adcbf1650c4e221ca359ac62b20d1468ec1fb99ac366a859f5" dependencies = [ "ahash", "arc-swap", @@ -1493,18 +1493,18 @@ dependencies = [ [[package]] name = "enumflags2" -version = "0.7.9" +version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3278c9d5fb675e0a51dabcf4c0d355f692b064171535ba72361be1528a9d8e8d" +checksum = "d232db7f5956f3f14313dc2f87985c58bd2c695ce124c8cdd984e08e15ac133d" dependencies = [ "enumflags2_derive", ] [[package]] name = "enumflags2_derive" -version = "0.7.9" +version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c785274071b1b420972453b306eeca06acf4633829db4223b58a2a8c5953bc4" +checksum = "de0d48a183585823424a4ce1aa132d174a6a81bd540895822eb4c8373a8e49e8" dependencies = [ "proc-macro2", "quote", @@ -1577,8 +1577,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "531e46835a22af56d1e3b66f04844bed63158bc094a628bec1d321d9b4c44bf2" dependencies = [ "bit-set", - "regex-automata 0.4.6", - "regex-syntax 0.8.3", + "regex-automata 0.4.7", + "regex-syntax 0.8.4", ] [[package]] @@ -1592,7 +1592,7 @@ dependencies = [ "futures-core", "futures-util", "http 0.2.12", - "hyper 0.14.28", + "hyper 0.14.29", "hyper-tls 0.5.0", "mime", "serde", @@ -2714,12 +2714,12 @@ dependencies = [ [[package]] name = "http-body-util" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ "bytes", - "futures-core", + "futures-util", "http 1.1.0", "http-body 1.0.0", "pin-project-lite", @@ -2733,9 +2733,9 @@ checksum = "08a397c49fec283e3d6211adbe480be95aae5f304cfb923e9970e08956d5168a" [[package]] name = "httparse" -version = "1.8.0" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" +checksum = "d0e7a4dd27b9476dc40cb050d3632d3bba3a70ddbff012285f7f8559a1e7e545" [[package]] name = "httpdate" @@ -2745,9 +2745,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "0.14.28" +version = "0.14.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" +checksum = "f361cde2f109281a220d4307746cdfd5ee3f410da58a70377762396775634b33" dependencies = [ "bytes", "futures-channel", @@ -2794,7 +2794,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper 0.14.28", + "hyper 0.14.29", "pin-project-lite", "tokio", "tokio-io-timeout", @@ -2807,7 +2807,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper 0.14.28", + "hyper 0.14.29", "native-tls", "tokio", "tokio-native-tls", @@ -3951,9 +3951,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.2" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "memmap2" @@ -4317,9 +4317,9 @@ dependencies = [ [[package]] name = "object" -version = "0.35.0" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8ec7ab813848ba4522158d5517a6093db1ded27575b070f4177b8d12b41db5e" +checksum = "576dfe1fc8f9df304abb159d767a29d0476f7750fbf8aa7ad07816004a207434" dependencies = [ "memchr", ] @@ -4597,7 +4597,7 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.5.1", + "redox_syscall 0.5.2", "smallvec", "windows-targets 0.52.5", ] @@ -5085,9 +5085,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" +checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" dependencies = [ "bitflags 2.5.0", ] @@ -5117,8 +5117,8 @@ checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.6", - "regex-syntax 0.8.3", + "regex-automata 0.4.7", + "regex-syntax 0.8.4", ] [[package]] @@ -5132,13 +5132,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" +checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.3", + "regex-syntax 0.8.4", ] [[package]] @@ -5149,9 +5149,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "regex-syntax" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" +checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" [[package]] name = "reqwest" @@ -6155,7 +6155,7 @@ dependencies = [ "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.28", + "hyper 0.14.29", "hyper-timeout", "percent-encoding", "pin-project", @@ -6438,9 +6438,9 @@ checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" [[package]] name = "unicode-width" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6" +checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" [[package]] name = "unicode-xid" @@ -6480,9 +6480,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] name = "utf8parse" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "utoipa" diff --git a/Cargo.toml b/Cargo.toml index f247007f9..61598a4e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -164,7 +164,7 @@ clap_complete = "^4.5.5" # Forced by saffron/cron chrono = "^0.4.35" compact_jwt = { version = "^0.4.1", default-features = false } -concread = "^0.5.1" +concread = "^0.5.2" cron = "0.12.1" crossbeam = "0.8.4" criterion = "^0.5.1" diff --git a/examples/insecure_server.toml b/examples/insecure_server.toml index 5ab73e590..2a69a7cec 100644 --- a/examples/insecure_server.toml +++ b/examples/insecure_server.toml @@ -5,7 +5,7 @@ db_fs_type = "zfs" db_path = "/tmp/kanidm/kanidm.db" tls_chain = "/tmp/kanidm/chain.pem" tls_key = "/tmp/kanidm/key.pem" -tls_client_ca = "/tmp/kanidm/client_ca" +# tls_client_ca = "/tmp/kanidm/client_ca" # The log level of the server. May be one of info, debug, trace # diff --git a/libs/crypto/src/lib.rs b/libs/crypto/src/lib.rs index 1589093fb..6e7b8dc66 100644 --- a/libs/crypto/src/lib.rs +++ b/libs/crypto/src/lib.rs @@ -49,8 +49,8 @@ pub const PBKDF2_MIN_NIST_SALT_LEN: usize = 14; // Min number of rounds for a pbkdf2 pub const PBKDF2_MIN_NIST_COST: usize = 10000; -// 64 * u8 -> 512 bits of out. -const PBKDF2_KEY_LEN: usize = 64; +// 32 * u8 -> 256 bits of out. +const PBKDF2_KEY_LEN: usize = 32; const PBKDF2_MIN_NIST_KEY_LEN: usize = 32; const PBKDF2_SHA1_MIN_KEY_LEN: usize = 19; @@ -62,10 +62,10 @@ const DS_SHA512_HASH_LEN: usize = 64; // Taken from the argon2 library and rfc 9106 const ARGON2_VERSION: u32 = 19; const ARGON2_SALT_LEN: usize = 16; +// 32 * u8 -> 256 bits of out. const ARGON2_KEY_LEN: usize = 32; -// Default amount of ram we sacrifice per thread is 8MB +// Default amount of ram we sacrifice per thread const ARGON2_MIN_RAM_KIB: u32 = 8 * 1024; -// Max is 64MB. This may change in time. const ARGON2_MAX_RAM_KIB: u32 = 64 * 1024; // Amount of ram to subtract when we do a T cost iter. This // is because t=2 m=32 == t=3 m=20. So we just step down a little @@ -902,10 +902,9 @@ impl Password { fn bench_argon2id(params: Params) -> Option { let mut rng = rand::thread_rng(); - let salt: Vec = (0..PBKDF2_SALT_LEN).map(|_| rng.gen()).collect(); - let input: Vec = (0..PBKDF2_SALT_LEN).map(|_| rng.gen()).collect(); - // This is 512 bits of output - let mut key: Vec = (0..PBKDF2_KEY_LEN).map(|_| 0).collect(); + let salt: Vec = (0..ARGON2_SALT_LEN).map(|_| rng.gen()).collect(); + let input: Vec = (0..ARGON2_SALT_LEN).map(|_| rng.gen()).collect(); + let mut key: Vec = (0..ARGON2_KEY_LEN).map(|_| 0).collect(); let argon = Argon2::new(Algorithm::Argon2id, Version::V0x13, params); @@ -922,7 +921,6 @@ impl Password { let pbkdf2_cost = policy.pbkdf2_cost; let mut rng = rand::thread_rng(); let salt: Vec = (0..PBKDF2_SALT_LEN).map(|_| rng.gen()).collect(); - // This is 512 bits of output let mut key: Vec = (0..PBKDF2_KEY_LEN).map(|_| 0).collect(); pbkdf2_hmac( diff --git a/libs/sketching/src/lib.rs b/libs/sketching/src/lib.rs index 696d28e4a..eac1bb4b7 100644 --- a/libs/sketching/src/lib.rs +++ b/libs/sketching/src/lib.rs @@ -10,6 +10,7 @@ use tracing_forest::printer::TestCapturePrinter; use tracing_forest::tag::NoTag; use tracing_forest::util::*; use tracing_forest::Tag; +use tracing_subscriber::filter::Directive; use tracing_subscriber::prelude::*; pub mod macros; @@ -19,9 +20,10 @@ pub use {tracing, tracing_forest, tracing_subscriber}; /// Start up the logging for test mode. pub fn test_init() { - let filter = EnvFilter::from_default_env() + let filter = EnvFilter::builder() // Skipping trace on tests by default saves a *TON* of ram. - .add_directive(LevelFilter::INFO.into()) + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy() // escargot builds cargo packages while we integration test and is SUPER noisy. .add_directive( "escargot=ERROR" @@ -137,12 +139,12 @@ impl Display for LogLevel { } } -impl From for EnvFilter { +impl From for Directive { fn from(value: LogLevel) -> Self { match value { - LogLevel::Info => EnvFilter::new("info"), - LogLevel::Debug => EnvFilter::new("debug"), - LogLevel::Trace => EnvFilter::new("trace"), + LogLevel::Info => Directive::from(Level::INFO), + LogLevel::Debug => Directive::from(Level::DEBUG), + LogLevel::Trace => Directive::from(Level::TRACE), } } } diff --git a/libs/sketching/src/otel.rs b/libs/sketching/src/otel.rs index a833b8add..d9cad64b3 100644 --- a/libs/sketching/src/otel.rs +++ b/libs/sketching/src/otel.rs @@ -34,7 +34,9 @@ pub fn start_logging_pipeline( log_filter: crate::LogLevel, service_name: String, ) -> Result, String> { - let forest_filter: EnvFilter = log_filter.into(); + let forest_filter: EnvFilter = EnvFilter::builder() + .with_default_directive(log_filter.into()) + .from_env_lossy(); // TODO: work out how to do metrics things // let meter_provider = init_metrics() @@ -56,7 +58,9 @@ pub fn start_logging_pipeline( .expect("Failed to set hyper logging to info"), ); let forest_layer = tracing_forest::ForestLayer::default().with_filter(forest_filter); - let t_filter: EnvFilter = log_filter.into(); + let t_filter: EnvFilter = EnvFilter::builder() + .with_default_directive(log_filter.into()) + .from_env_lossy(); let tracer = opentelemetry_otlp::new_pipeline().tracing().with_exporter( opentelemetry_otlp::new_exporter() diff --git a/server/core/src/actors/internal.rs b/server/core/src/actors/internal.rs index bb1627132..22fcd9cc5 100644 --- a/server/core/src/actors/internal.rs +++ b/server/core/src/actors/internal.rs @@ -100,22 +100,56 @@ impl QueryServerWriteV1 { } } - pub(crate) async fn handle_delayedaction(&self, da: DelayedAction) { + pub(crate) async fn handle_delayedaction(&self, da_batch: &mut Vec) { let eventid = Uuid::new_v4(); let span = span!(Level::INFO, "process_delayed_action", uuid = ?eventid); + let mut retry = false; + async { let ct = duration_from_epoch_now(); let mut idms_prox_write = self.idms.proxy_write(ct).await; - if let Err(res) = idms_prox_write - .process_delayedaction(da, ct) - .and_then(|_| idms_prox_write.commit()) - { - info!(?res, "delayed action error"); + + for da in da_batch.iter() { + retry = idms_prox_write.process_delayedaction(da, ct).is_err(); + if retry { + // exit the loop + warn!("delayed action failed, will be retried individually."); + break; + } + } + + if let Err(res) = idms_prox_write.commit() { + retry = true; + error!(?res, "delayed action batch commit error"); } } .instrument(span) - .await + .await; + + if retry { + // An error occured, retry each operation one at a time. + for da in da_batch.iter() { + let eventid = Uuid::new_v4(); + let span = span!(Level::INFO, "process_delayed_action_retried", uuid = ?eventid); + + async { + let ct = duration_from_epoch_now(); + let mut idms_prox_write = self.idms.proxy_write(ct).await; + if let Err(res) = idms_prox_write + .process_delayedaction(da, ct) + .and_then(|_| idms_prox_write.commit()) + { + error!(?res, "delayed action commit error"); + } + } + .instrument(span) + .await + } + } + + // We're done, clear out the buffer. + da_batch.clear(); } #[instrument( diff --git a/server/core/src/https/mod.rs b/server/core/src/https/mod.rs index 0fa938f2f..f33e8a7bc 100644 --- a/server/core/src/https/mod.rs +++ b/server/core/src/https/mod.rs @@ -48,6 +48,7 @@ use sketching::*; use tokio::{ net::{TcpListener, TcpStream}, sync::broadcast, + task, }; use tokio_openssl::SslStream; use tower::Service; @@ -199,7 +200,7 @@ pub async fn create_https_server( qe_r_ref: &'static QueryServerReadV1, mut rx: broadcast::Receiver, server_message_tx: broadcast::Sender, -) -> Result, ()> { +) -> Result, ()> { let js_files = get_js_files(config.role)?; // set up the CSP headers // script-src 'self' @@ -343,7 +344,7 @@ pub async fn create_https_server( info!("Starting the web server..."); - Ok(tokio::spawn(async move { + Ok(task::spawn(async move { tokio::select! { Ok(action) = rx.recv() => { match action { @@ -362,10 +363,10 @@ pub async fn create_https_server( return } }; - tokio::spawn(server_loop(tls_param, listener, app)) + task::spawn(server_loop(tls_param, listener, app)) }, None => { - tokio::spawn(axum_server::bind(addr).serve(app)) + task::spawn(axum_server::bind(addr).serve(app)) } } => { match res { @@ -538,7 +539,7 @@ async fn server_loop( if let Ok((stream, addr)) = listener.accept().await { let tls_acceptor = tls_acceptor.clone(); let app = app.clone(); - tokio::spawn(handle_conn(tls_acceptor, stream, app, addr)); + task::spawn(handle_conn(tls_acceptor, stream, app, addr)); } } } diff --git a/server/core/src/lib.rs b/server/core/src/lib.rs index ea49bfe6d..21362c9ef 100644 --- a/server/core/src/lib.rs +++ b/server/core/src/lib.rs @@ -52,7 +52,7 @@ use kanidmd_lib::value::CredentialType; use libc::umask; use tokio::sync::broadcast; -use tokio::task::JoinHandle; +use tokio::task; use crate::actors::{QueryServerReadV1, QueryServerWriteV1}; use crate::admin::AdminActor; @@ -754,7 +754,7 @@ pub struct CoreHandle { clean_shutdown: bool, pub tx: broadcast::Sender, /// This stores a name for the handle, and the handle itself so we can tell which failed/succeeded at the end. - handles: Vec<(TaskName, tokio::task::JoinHandle<()>)>, + handles: Vec<(TaskName, task::JoinHandle<()>)>, } impl CoreHandle { @@ -954,21 +954,22 @@ pub async fn create_server_core( // Create the server async write entry point. let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone()); - let delayed_handle = tokio::spawn(async move { + let delayed_handle = task::spawn(async move { + let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE); loop { tokio::select! { + added = idms_delayed.recv_many(&mut buffer) => { + if added == 0 { + // Channel has closed, stop the task. + break + } + server_write_ref.handle_delayedaction(&mut buffer).await; + } Ok(action) = broadcast_rx.recv() => { match action { CoreAction::Shutdown => break, } } - delayed = idms_delayed.next() => { - match delayed { - Some(da) => server_write_ref.handle_delayedaction(da).await, - // Channel has closed, stop the task. - None => break, - } - } } } info!("Stopped {}", TaskName::DelayedActionActor); @@ -976,7 +977,7 @@ pub async fn create_server_core( let mut broadcast_rx = broadcast_tx.subscribe(); - let auditd_handle = tokio::spawn(async move { + let auditd_handle = task::spawn(async move { loop { tokio::select! { Ok(action) = broadcast_rx.recv() => { @@ -1078,7 +1079,7 @@ pub async fn create_server_core( admin_info!("This config rocks! 🪨 "); None } else { - let h: tokio::task::JoinHandle<()> = match https::create_https_server( + let h: task::JoinHandle<()> = match https::create_https_server( config.clone(), jws_signer, status_ref, @@ -1121,7 +1122,7 @@ pub async fn create_server_core( None }; - let mut handles: Vec<(TaskName, JoinHandle<()>)> = vec![ + let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![ (TaskName::IntervalActor, interval_handle), (TaskName::DelayedActionActor, delayed_handle), (TaskName::AuditdActor, auditd_handle), diff --git a/server/daemon/Cargo.toml b/server/daemon/Cargo.toml index 07139881c..58f87cea6 100644 --- a/server/daemon/Cargo.toml +++ b/server/daemon/Cargo.toml @@ -39,10 +39,7 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] } tokio-util = { workspace = true, features = ["codec"] } toml = { workspace = true } tempfile = { workspace = true } -tracing = { workspace = true, features = [ - "max_level_trace", - "release_max_level_debug", -] } +tracing = { workspace = true } serde_json.workspace = true [target.'cfg(target_os = "linux")'.dependencies] diff --git a/server/daemon/src/main.rs b/server/daemon/src/main.rs index aa22b64a1..a74bf9a9d 100644 --- a/server/daemon/src/main.rs +++ b/server/daemon/src/main.rs @@ -290,7 +290,7 @@ fn main() -> ExitCode { } #[cfg(feature = "dhat-heap")] - let _profiler = dhat::Profiler::new_heap(); + let _profiler = dhat::Profiler::builder().trim_backtraces(Some(40)).build(); let maybe_rt = tokio::runtime::Builder::new_multi_thread() .enable_all() diff --git a/server/lib/src/be/idl_arc_sqlite.rs b/server/lib/src/be/idl_arc_sqlite.rs index 46a0ef878..580f63241 100644 --- a/server/lib/src/be/idl_arc_sqlite.rs +++ b/server/lib/src/be/idl_arc_sqlite.rs @@ -33,7 +33,7 @@ const DEFAULT_CACHE_TARGET: usize = 2048; const DEFAULT_IDL_CACHE_RATIO: usize = 32; const DEFAULT_NAME_CACHE_RATIO: usize = 8; const DEFAULT_CACHE_RMISS: usize = 0; -const DEFAULT_CACHE_WMISS: usize = 4; +const DEFAULT_CACHE_WMISS: usize = 0; #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)] enum NameCacheKey { diff --git a/server/lib/src/be/idl_sqlite.rs b/server/lib/src/be/idl_sqlite.rs index 7056a8907..2a0f805d1 100644 --- a/server/lib/src/be/idl_sqlite.rs +++ b/server/lib/src/be/idl_sqlite.rs @@ -1771,7 +1771,6 @@ impl IdlSqlite { "PRAGMA page_size={fs_page_size}; PRAGMA cache_size={cache_pages}; PRAGMA journal_mode=WAL; - PRAGMA synchronous=NORMAL; PRAGMA wal_autocheckpoint={checkpoint_pages}; PRAGMA wal_checkpoint(RESTART);" ) diff --git a/server/lib/src/constants/mod.rs b/server/lib/src/constants/mod.rs index 190d6b180..2af5b675f 100644 --- a/server/lib/src/constants/mod.rs +++ b/server/lib/src/constants/mod.rs @@ -99,6 +99,11 @@ pub const PURGE_FREQUENCY: u64 = 60; #[cfg(not(test))] pub const PURGE_FREQUENCY: u64 = 600; +/// The number of delayed actions to consider per write transaction. Higher +/// values allow more coalescing to occur, but may consume more ram and cause +/// some latency while dequeuing and writing those operations. +pub const DELAYED_ACTION_BATCH_SIZE: usize = 256; + #[cfg(test)] /// In test, we limit the changelog to 10 minutes. pub const CHANGELOG_MAX_AGE: u64 = 600; @@ -123,13 +128,14 @@ pub const PW_MIN_LENGTH: u32 = 10; pub const MAXIMUM_AUTH_SESSION_EXPIRY: u32 = u32::MAX; // Default - sessions last for 1 day pub const DEFAULT_AUTH_SESSION_EXPIRY: u32 = 86400; -pub const DEFAULT_AUTH_SESSION_LIMITED_EXPIRY: u32 = 3600; // Maximum - privileges last for 1 hour. pub const MAXIMUM_AUTH_PRIVILEGE_EXPIRY: u32 = 3600; // Default - privileges last for 10 minutes. pub const DEFAULT_AUTH_PRIVILEGE_EXPIRY: u32 = 600; +// Default - directly privileged sessions only last 1 hour. +pub const DEFAULT_AUTH_SESSION_LIMITED_EXPIRY: u32 = 3600; // Default - oauth refresh tokens last for 16 hours. -pub const OAUTH_REFRESH_TOKEN_EXPIRY: u64 = 3600 * 8; +pub const OAUTH_REFRESH_TOKEN_EXPIRY: u64 = 3600 * 16; // The time that a token can be used before session // status is enforced. This needs to be longer than diff --git a/server/lib/src/entry.rs b/server/lib/src/entry.rs index 4c3965953..1ccdea2b0 100644 --- a/server/lib/src/entry.rs +++ b/server/lib/src/entry.rs @@ -1138,6 +1138,15 @@ impl Entry { ne.validate(schema).map(|()| ne) } + + /// Access a reference set in a directly mutable form. This is "safe" because + /// referential integrity will check the values added are valid, and because + /// this is strongly typed it can't violate syntax. + pub(crate) fn get_ava_refer_mut(&mut self, attr: Attribute) -> Option<&mut BTreeSet> { + self.attrs + .get_mut(attr.as_ref()) + .and_then(|vs| vs.as_refer_set_mut()) + } } impl Clone for Entry diff --git a/server/lib/src/filter.rs b/server/lib/src/filter.rs index 10a2df9e0..a106ff296 100644 --- a/server/lib/src/filter.rs +++ b/server/lib/src/filter.rs @@ -14,8 +14,9 @@ use std::fmt; use std::hash::Hash; use std::iter; use std::num::NonZeroU8; +use std::sync::Arc; -use concread::arcache::ARCacheReadTxn; +use concread::arcache::{ARCache, ARCacheReadTxn}; use hashbrown::HashMap; #[cfg(test)] use hashbrown::HashSet; @@ -32,6 +33,16 @@ use crate::prelude::*; use crate::schema::SchemaTransaction; use crate::value::{IndexType, PartialValue}; +pub type ResolveFilterCache = + ARCache<(IdentityId, Arc>), Arc>>; + +pub type ResolveFilterCacheReadTxn<'a> = ARCacheReadTxn< + 'a, + (IdentityId, Arc>), + Arc>, + (), +>; + // Default filter is safe, ignores all hidden types! // This is &Value so we can lazy const then clone, but perhaps we can reconsider @@ -445,32 +456,33 @@ impl Filter { &self, ev: &Identity, idxmeta: Option<&IdxMeta>, - mut rsv_cache: Option< - &mut ARCacheReadTxn< - '_, - (IdentityId, Filter), - Filter, - (), - >, - >, + mut rsv_cache: Option<&mut ResolveFilterCacheReadTxn<'_>>, ) -> Result, OperationError> { // Given a filter, resolve Not and SelfUuid to real terms. // // The benefit of moving optimisation to this step is from various inputs, we can // get to a resolved + optimised filter, and then we can cache those outputs in many - // cases! + // cases! The exception is *large* filters, especially from the memberof plugin. We + // want to skip these because they can really jam up the server. - // do we have a cache? - let cache_key = if let Some(rcache) = rsv_cache.as_mut() { - // construct the key. For now it's expensive because we have to clone, but ... eh. - let cache_key = (ev.get_event_origin_id(), self.clone()); - if let Some(f) = rcache.get(&cache_key) { - // Got it? Shortcut and return! - return Ok(f.clone()); - }; - // Not in cache? Set the cache_key. - Some(cache_key) + let cacheable = FilterResolved::resolve_cacheable(&self.state.inner); + + let cache_key = if cacheable { + // do we have a cache? + if let Some(rcache) = rsv_cache.as_mut() { + // construct the key. For now it's expensive because we have to clone, but ... eh. + let cache_key = (ev.get_event_origin_id(), Arc::new(self.clone())); + if let Some(f) = rcache.get(&cache_key) { + // Got it? Shortcut and return! + return Ok(f.as_ref().clone()); + }; + // Not in cache? Set the cache_key. + Some(cache_key) + } else { + None + } } else { + // Not cacheable, lets just bail. None }; @@ -496,10 +508,11 @@ impl Filter { }, }; - // Now it's computed, inject it. - if let Some(rcache) = rsv_cache.as_mut() { - if let Some(cache_key) = cache_key { - rcache.insert(cache_key, resolved_filt.clone()); + // Now it's computed, inject it. Remember, we won't have a cache_key here + // if cacheable == false. + if let Some(cache_key) = cache_key { + if let Some(rcache) = rsv_cache.as_mut() { + rcache.insert(cache_key, Arc::new(resolved_filt.clone())); } } @@ -1270,6 +1283,27 @@ impl FilterResolved { } } + fn resolve_cacheable(fc: &FilterComp) -> bool { + match fc { + FilterComp::Or(vs) | FilterComp::And(vs) | FilterComp::Inclusion(vs) => { + if vs.len() < 8 { + vs.iter().all(FilterResolved::resolve_cacheable) + } else { + // Too lorge. + false + } + } + FilterComp::AndNot(f) => FilterResolved::resolve_cacheable(f.as_ref()), + FilterComp::Eq(..) + | FilterComp::SelfUuid + | FilterComp::Cnt(..) + | FilterComp::Stw(..) + | FilterComp::Enw(..) + | FilterComp::Pres(_) + | FilterComp::LessThan(..) => true, + } + } + fn resolve_idx( fc: FilterComp, ev: &Identity, diff --git a/server/lib/src/idm/scim.rs b/server/lib/src/idm/scim.rs index c382a44f3..66e6dcc67 100644 --- a/server/lib/src/idm/scim.rs +++ b/server/lib/src/idm/scim.rs @@ -1177,7 +1177,8 @@ impl<'a> IdmServerProxyWriteTransaction<'a> { // Get all the classes. debug!("Schemas valid - Proceeding with entry {}", scim_ent.id); - let mut mods = Vec::with_capacity(0); + #[allow(clippy::vec_init_then_push)] + let mut mods = Vec::with_capacity(4); mods.push(Modify::Assert( Attribute::SyncParentUuid, diff --git a/server/lib/src/idm/server.rs b/server/lib/src/idm/server.rs index 4a8d7e793..2d1dc4357 100644 --- a/server/lib/src/idm/server.rs +++ b/server/lib/src/idm/server.rs @@ -275,7 +275,7 @@ impl IdmServer { da: DelayedAction, ) -> Result { let mut pw = self.proxy_write(ct).await; - pw.process_delayedaction(da, ct) + pw.process_delayedaction(&da, ct) .and_then(|_| pw.commit()) .map(|()| true) } @@ -335,8 +335,10 @@ impl IdmServerDelayed { } } - pub async fn next(&mut self) -> Option { - self.async_rx.recv().await + pub async fn recv_many(&mut self, buffer: &mut Vec) -> usize { + debug_assert!(buffer.is_empty()); + let limit = buffer.capacity(); + self.async_rx.recv_many(buffer, limit).await } } @@ -685,7 +687,7 @@ pub trait IdmServerTransaction<'a> { &mut self, client_cert_info: &ClientCertInfo, ) -> Result, OperationError> { - let pks256 = hex::encode(&client_cert_info.public_key_s256); + let pks256 = hex::encode(client_cert_info.public_key_s256); // Using the certificate hash, find our matching cert. let mut maybe_cert_entries = self.get_qs_txn().internal_search(filter!(f_eq( Attribute::Certificate, @@ -2031,15 +2033,15 @@ impl<'a> IdmServerProxyWriteTransaction<'a> { #[instrument(level = "debug", skip_all)] pub fn process_delayedaction( &mut self, - da: DelayedAction, + da: &DelayedAction, _ct: Duration, ) -> Result<(), OperationError> { match da { - DelayedAction::PwUpgrade(pwu) => self.process_pwupgrade(&pwu), - DelayedAction::UnixPwUpgrade(upwu) => self.process_unixpwupgrade(&upwu), - DelayedAction::WebauthnCounterIncrement(wci) => self.process_webauthncounterinc(&wci), - DelayedAction::BackupCodeRemoval(bcr) => self.process_backupcoderemoval(&bcr), - DelayedAction::AuthSessionRecord(asr) => self.process_authsessionrecord(&asr), + DelayedAction::PwUpgrade(pwu) => self.process_pwupgrade(pwu), + DelayedAction::UnixPwUpgrade(upwu) => self.process_unixpwupgrade(upwu), + DelayedAction::WebauthnCounterIncrement(wci) => self.process_webauthncounterinc(wci), + DelayedAction::BackupCodeRemoval(bcr) => self.process_backupcoderemoval(bcr), + DelayedAction::AuthSessionRecord(asr) => self.process_authsessionrecord(asr), } } diff --git a/server/lib/src/plugins/dyngroup.rs b/server/lib/src/plugins/dyngroup.rs index 184c82442..b70ef95b8 100644 --- a/server/lib/src/plugins/dyngroup.rs +++ b/server/lib/src/plugins/dyngroup.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; use kanidm_proto::internal::Filter as ProtoFilter; @@ -15,14 +15,21 @@ pub struct DynGroupCache { pub struct DynGroup; impl DynGroup { + /// Determine if any dynamic groups changed as part of this operation. #[allow(clippy::too_many_arguments)] fn apply_dyngroup_change( qs: &mut QueryServerWriteTransaction, - candidate_tuples: &mut Vec<(Arc, EntryInvalidCommitted)>, - affected_uuids: &mut Vec, + // The uuids that are affected by the dyngroup change. This is both addition + // and removal of the uuids as members. + affected_uuids: &mut BTreeSet, + // If we should error when a dyngroup we thought should be cached is in fact, + // not cached. expect: bool, + // The identity in use. ident_internal: &Identity, + // The dyn group cache dyn_groups: &mut DynGroupCache, + // The list of dyn groups that were in the change set n_dyn_groups: &[&Entry], ) -> Result<(), OperationError> { /* @@ -37,45 +44,53 @@ impl DynGroup { */ if qs.get_phase() < ServerPhase::SchemaReady { - trace!("Server is not ready to load dyngroups"); + debug!("Server is not ready to load dyngroups"); return Ok(()); } - // Search all the new groups first. + // Search all dyn groups that were involved in the operation. let filt = filter!(FC::Or( n_dyn_groups .iter() .map(|e| f_eq(Attribute::Uuid, PartialValue::Uuid(e.get_uuid()))) .collect() )); - let work_set = qs.internal_search_writeable(&filt)?; + // Load the dyn groups as a writeable set. + let mut work_set = qs.internal_search_writeable(&filt)?; - // Go through them all and update the new groups. - for (pre, mut nd_group) in work_set.into_iter() { + // Go through them all and update the groups. + for (ref pre, ref mut nd_group) in work_set.iter_mut() { + trace!(dyngroup_id = %nd_group.get_display_id()); + // Load the dyngroups filter let scope_f: ProtoFilter = nd_group .get_ava_single_protofilter(Attribute::DynGroupFilter) .cloned() .ok_or_else(|| { - admin_error!("Missing {}", Attribute::DynGroupFilter); + error!("Missing {}", Attribute::DynGroupFilter); OperationError::InvalidEntryState })?; let scope_i = Filter::from_rw(ident_internal, &scope_f, qs).map_err(|e| { - admin_error!("{} validation failed {:?}", Attribute::DynGroupFilter, e); + error!("{} validation failed {:?}", Attribute::DynGroupFilter, e); e })?; + trace!(dyngroup_filter = ?scope_i); + let uuid = pre.get_uuid(); // Add our uuid as affected. - affected_uuids.push(uuid); + affected_uuids.insert(uuid); - // Apply the filter and get all the uuids. + // Apply the filter and get all the uuids that are members of this dyngroup. let entries = qs.internal_search(scope_i.clone()).map_err(|e| { - admin_error!("internal search failure -> {:?}", e); + error!("internal search failure -> {:?}", e); e })?; + trace!(entries_len = %entries.len()); + let members = ValueSetRefer::from_iter(entries.iter().map(|e| e.get_uuid())); + trace!(?members); if let Some(uuid_iter) = members.as_ref().and_then(|a| a.as_ref_uuid_iter()) { affected_uuids.extend(uuid_iter); @@ -94,14 +109,21 @@ impl DynGroup { nd_group.purge_ava(Attribute::DynMember); } - candidate_tuples.push((pre, nd_group)); - - // Insert to our new instances + // Insert it to the dyngroup cache with the compiled/resolved filter for + // fast matching in other paths. if dyn_groups.insts.insert(uuid, scope_i).is_none() == expect { - admin_error!("{} cache uuid conflict {}", Attribute::DynGroup, uuid); + error!("{} cache uuid conflict {}", Attribute::DynGroup, uuid); return Err(OperationError::InvalidState); } } + + if !work_set.is_empty() { + qs.internal_apply_writable(work_set).map_err(|e| { + error!("Failed to commit dyngroup set {:?}", e); + e + })?; + } + Ok(()) } @@ -111,7 +133,7 @@ impl DynGroup { // Internal search all our definitions. let filt = filter!(f_eq(Attribute::Class, EntryClass::DynGroup.into())); let entries = qs.internal_search(filt).map_err(|e| { - admin_error!("internal search failure -> {:?}", e); + error!("internal search failure -> {:?}", e); e })?; @@ -122,19 +144,19 @@ impl DynGroup { .get_ava_single_protofilter(Attribute::DynGroupFilter) .cloned() .ok_or_else(|| { - admin_error!("Missing {}", Attribute::DynGroupFilter); + error!("Missing {}", Attribute::DynGroupFilter); OperationError::InvalidEntryState })?; let scope_i = Filter::from_rw(&ident_internal, &scope_f, qs).map_err(|e| { - admin_error!("dyngroup_filter validation failed {:?}", e); + error!("dyngroup_filter validation failed {:?}", e); e })?; let uuid = nd_group.get_uuid(); if reload_groups.insert(uuid, scope_i).is_some() { - admin_error!("dyngroup cache uuid conflict {}", uuid); + error!("dyngroup cache uuid conflict {}", uuid); return Err(OperationError::InvalidState); } } @@ -150,8 +172,8 @@ impl DynGroup { qs: &mut QueryServerWriteTransaction, cand: &[Entry], _ident: &Identity, - ) -> Result, OperationError> { - let mut affected_uuids = Vec::with_capacity(cand.len()); + ) -> Result, OperationError> { + let mut affected_uuids = BTreeSet::new(); let ident_internal = Identity::from_internal(); @@ -172,7 +194,7 @@ impl DynGroup { // dyn groups will see the created entries on an internal search // so we don't need to reference them. - let mut candidate_tuples = Vec::with_capacity(dyn_groups.insts.len() + cand.len()); + let mut candidate_tuples = Vec::with_capacity(cand.len()); // Apply existing dyn_groups to entries. trace!(?dyn_groups.insts); @@ -208,14 +230,39 @@ impl DynGroup { .copied() .for_each(|u| d_group.add_ava(Attribute::DynMember, Value::Refer(u))); - affected_uuids.extend(matches.into_iter()); - affected_uuids.push(*dg_uuid); + // The *dyn group* isn't changing, it's that a member OF the dyn group + // is being added. This means the dyngroup isn't part of the set that + // needs update to MO, only the affected members do! + + let pre_dynmember = pre.get_ava_refer(Attribute::DynMember); + let post_dynmember = d_group.get_ava_refer(Attribute::DynMember); + + match (pre_dynmember, post_dynmember) { + (Some(pre_m), Some(post_m)) => { + // Show only the *changed* uuids. + affected_uuids.extend(pre_m.symmetric_difference(post_m)); + } + (Some(members), None) | (None, Some(members)) => { + // Doesn't matter what order, just that they are affected + affected_uuids.extend(members); + } + (None, None) => {} + }; candidate_tuples.push((pre, d_group)); } } } + // Write back the new changes. + // Write this stripe if populated. + if !candidate_tuples.is_empty() { + qs.internal_apply_writable(candidate_tuples).map_err(|e| { + error!("Failed to commit dyngroup set {:?}", e); + e + })?; + } + // If we created any dyn groups, populate them now. // if the event is not internal, reject (for now) @@ -223,7 +270,6 @@ impl DynGroup { trace!("considering new dyngroups"); Self::apply_dyngroup_change( qs, - &mut candidate_tuples, &mut affected_uuids, false, &ident_internal, @@ -232,15 +278,6 @@ impl DynGroup { )?; } - // Write back the new changes. - // Write this stripe if populated. - if !candidate_tuples.is_empty() { - qs.internal_apply_writable(candidate_tuples).map_err(|e| { - admin_error!("Failed to commit dyngroup set {:?}", e); - e - })?; - } - Ok(affected_uuids) } @@ -251,8 +288,8 @@ impl DynGroup { cand: &[Entry], _ident: &Identity, force_cand_updates: bool, - ) -> Result, OperationError> { - let mut affected_uuids = Vec::with_capacity(cand.len()); + ) -> Result, OperationError> { + let mut affected_uuids = BTreeSet::new(); let ident_internal = Identity::from_internal(); @@ -283,10 +320,8 @@ impl DynGroup { // changed in this op. if !n_dyn_groups.is_empty() { - trace!("considering modified dyngroups"); Self::apply_dyngroup_change( qs, - &mut candidate_tuples, &mut affected_uuids, true, &ident_internal, @@ -344,11 +379,23 @@ impl DynGroup { Err(u) => d_group.remove_ava(Attribute::DynMember, &PartialValue::Refer(u)), }); - affected_uuids.extend(matches.into_iter().map(|choice| match choice { - Ok(u) => u, - Err(u) => u, - })); - affected_uuids.push(*dg_uuid); + // The *dyn group* isn't changing, it's that a member OF the dyn group + // is being added. This means the dyngroup isn't part of the set that + // needs update to MO, only the affected members do! + let pre_dynmember = pre.get_ava_refer(Attribute::DynMember); + let post_dynmember = d_group.get_ava_refer(Attribute::DynMember); + + match (pre_dynmember, post_dynmember) { + (Some(pre_m), Some(post_m)) => { + // Show only the *changed* uuids. + affected_uuids.extend(pre_m.symmetric_difference(post_m)); + } + (Some(members), None) | (None, Some(members)) => { + // Doesn't matter what order, just that they are affected + affected_uuids.extend(members); + } + (None, None) => {} + }; candidate_tuples.push((pre, d_group)); } @@ -357,9 +404,10 @@ impl DynGroup { // Write back the new changes. // Write this stripe if populated. + trace!(candidate_tuples_len = %candidate_tuples.len()); if !candidate_tuples.is_empty() { qs.internal_apply_writable(candidate_tuples).map_err(|e| { - admin_error!("Failed to commit dyngroup set {:?}", e); + error!("Failed to commit dyngroup set {:?}", e); e })?; } diff --git a/server/lib/src/plugins/memberof.rs b/server/lib/src/plugins/memberof.rs index 396440ebc..4658d539a 100644 --- a/server/lib/src/plugins/memberof.rs +++ b/server/lib/src/plugins/memberof.rs @@ -10,12 +10,10 @@ // As a result, we first need to run refint to clean up all dangling references, then memberof // fixes the graph of memberships -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; -use hashbrown::HashMap; - -use crate::entry::{Entry, EntryCommitted, EntrySealed, EntryTuple}; +use crate::entry::{Entry, EntryCommitted, EntrySealed}; use crate::event::{CreateEvent, DeleteEvent, ModifyEvent}; use crate::plugins::Plugin; use crate::prelude::*; @@ -23,7 +21,7 @@ use crate::value::PartialValue; pub struct MemberOf; -fn do_memberof( +fn do_group_memberof( qs: &mut QueryServerWriteTransaction, uuid: Uuid, tgte: &mut EntryInvalidCommitted, @@ -96,55 +94,255 @@ fn do_memberof( Ok(()) } +fn do_leaf_memberof( + qs: &mut QueryServerWriteTransaction, + all_affected_uuids: BTreeSet, +) -> Result<(), OperationError> { + trace!("---"); + + // We just put everything into the filter here, the query code will remove + // anything that is a group. + let all_affected_filter: Vec<_> = all_affected_uuids + .into_iter() + .map(|u| f_eq(Attribute::Uuid, PartialValue::Uuid(u))) + .collect(); + + if all_affected_filter.is_empty() { + trace!("all affected filter is empty, return"); + return Ok(()); + } + + // These are all the affected entries. + let leaf_entries = qs.internal_search_writeable(&filter!(f_and!([ + f_andnot(f_eq(Attribute::Class, EntryClass::Group.into())), + FC::Or(all_affected_filter) + ])))?; + + if leaf_entries.is_empty() { + trace!("leaf entries empty, return"); + return Ok(()); + } + + let mut leaf_entries: BTreeMap<_, _> = leaf_entries + .into_iter() + .map(|entry_tuple| (entry_tuple.0.get_uuid(), entry_tuple)) + .collect(); + + let mut changes = Vec::with_capacity(leaf_entries.len()); + + // Now that we know which *entries* changed, we actually have to load the groups *again* + // because the affected entries could still be a DMO/MO of a group that *wasn't* in the + // change set, and we still need to reflect that they exist. + + let mut groups_or = Vec::with_capacity(leaf_entries.len() * 2); + + for uuid in leaf_entries.keys().copied() { + groups_or.push(f_eq(Attribute::Member, PartialValue::Refer(uuid))); + groups_or.push(f_eq(Attribute::DynMember, PartialValue::Refer(uuid))); + } + + let all_groups = qs + .internal_search(filter!(f_and!([ + f_eq(Attribute::Class, EntryClass::Group.into()), + FC::Or(groups_or) + ]))) + .map_err(|err| { + error!(?err, "internal search failure"); + err + })?; + + /* + * Previously we went through the remaining items and processed them one at a time, but + * that has significant performance limits, since if we update a large dyn group, we then + * have to perform N searches for each affected member, which may be repeatedly searching + * for the same groups over and over again. + * + * Instead, at this point we know that in memberof application the entire group tree is + * now stable, and all we need to do is reflect those values into our entries. We can do + * this in two steps. First we load *all* the groups that relate to our leaf entries that + * we need to reflect. + * + * Then we can go through that in a single pass updating our entries that need to be + * updated. Since we know that the leaf entries aren't groups, we don't have a collision + * as a result of using internal_search_writeable. + */ + + // Clear the existing Mo and Dmo on the write stripe. + for (_pre, tgte) in leaf_entries.values_mut() { + // Ensure we are MO capable. We only add this if it's not already present. + tgte.add_ava_if_not_exist(Attribute::Class, EntryClass::MemberOf.into()); + // Clear the dmo + mos, we will recreate them now. + // This is how we handle deletes/etc. + tgte.purge_ava(Attribute::MemberOf); + tgte.purge_ava(Attribute::DirectMemberOf); + } + + // Now, we go through all the groups, and from each one we update the relevant + // target entry as needed. + for group in all_groups { + trace!(group_id = %group.get_display_id()); + // Our group uuid that we add to direct members. + let group_uuid = group.get_uuid(); + + let memberof_ref = group.get_ava_refer(Attribute::MemberOf); + + let member_ref = group.get_ava_refer(Attribute::Member); + let dynmember_ref = group.get_ava_refer(Attribute::DynMember); + + let dir_members = member_ref + .iter() + .flat_map(|set| set.iter()) + .chain(dynmember_ref.iter().flat_map(|set| set.iter())) + .copied(); + + // These are the entries that are direct members and need to reflect the group + // as mo and it's mo for indirect mo. + for dir_member in dir_members { + if let Some((_pre, tgte)) = leaf_entries.get_mut(&dir_member) { + trace!(?dir_member, entry_id = ?tgte.get_display_id()); + // We were in the group, lets update. + if let Some(dmo_set) = tgte.get_ava_refer_mut(Attribute::DirectMemberOf) { + dmo_set.insert(group_uuid); + } else { + let dmo = ValueSetRefer::new(group_uuid); + tgte.set_ava_set(Attribute::DirectMemberOf, dmo); + } + + // We're also in member of this group. + if let Some(mo_set) = tgte.get_ava_refer_mut(Attribute::MemberOf) { + mo_set.insert(group_uuid); + } else { + let mo = ValueSetRefer::new(group_uuid); + tgte.set_ava_set(Attribute::MemberOf, mo); + } + + // If the group has memberOf attributes, we propogate these to + // our entry now. + if let Some(group_mo) = memberof_ref { + // IMPORTANT this can't be a NONE because we just create MO in + // the step above! + if let Some(mo_set) = tgte.get_ava_refer_mut(Attribute::MemberOf) { + mo_set.extend(group_mo.iter()) + } + } + + if cfg!(debug_assertions) { + if let Some(dmo) = group.get_ava_refer(Attribute::DirectMemberOf) { + if let Some(mo) = group.get_ava_refer(Attribute::MemberOf) { + debug_assert!(mo.is_superset(dmo)) + } + } + } + } + // Done updating that leaf entry. + // Remember in the None case it could be that the group has a member which *isn't* + // being altered as a leaf in this operation. + } + // Next group. + } + + // Now only write back leaf entries that actually were changed as a result of the memberof + // process. + leaf_entries + .into_iter() + .try_for_each(|(auuid, (pre, tgte))| { + // Only write if a change occurred. + if pre.get_ava_set(Attribute::MemberOf) != tgte.get_ava_set(Attribute::MemberOf) + || pre.get_ava_set(Attribute::DirectMemberOf) + != tgte.get_ava_set(Attribute::DirectMemberOf) + { + trace!("=> processing affected uuid {:?}", auuid); + + if cfg!(debug_assertions) { + if let Some(dmo_set) = tgte.get_ava_refer(Attribute::DirectMemberOf) { + trace!(?dmo_set); + + if let Some(mo_set) = tgte.get_ava_refer(Attribute::MemberOf) { + trace!(?mo_set); + debug_assert!(mo_set.is_superset(dmo_set)); + } else { + unreachable!(); + } + } else { + trace!("NONE"); + }; + + if let Some(pre_dmo_set) = pre.get_ava_refer(Attribute::DirectMemberOf) { + trace!(?pre_dmo_set); + + if let Some(pre_mo_set) = pre.get_ava_refer(Attribute::MemberOf) { + trace!(?pre_mo_set); + debug_assert!(pre_mo_set.is_superset(pre_dmo_set)); + } else { + unreachable!(); + } + } else { + trace!("NONE"); + }; + }; + + changes.push((pre, tgte)); + } else { + trace!("=> ignoring unmodified uuid {:?}", auuid); + } + Ok(()) + })?; + + // Write the batch out in a single stripe. + qs.internal_apply_writable(changes) + // Done! 🎉 +} + // This is how you know the good code is here. #[allow(clippy::cognitive_complexity)] fn apply_memberof( qs: &mut QueryServerWriteTransaction, // TODO: Experiment with HashSet/BTreeSet here instead of vec. // May require https://github.com/rust-lang/rust/issues/62924 to allow popping - mut group_affect: Vec, + mut affected_uuids: BTreeSet, ) -> Result<(), OperationError> { trace!(" => entering apply_memberof"); - trace!(" => initial group_affect {:?}", group_affect); - // We can't cache groups, because we need to be continually writing - // and querying them. But we can cache anything we find in the process - // to speed up the later other_affect write op, and we can use this - // to avoid loading things that aren't groups. - // All other changed entries (mo, dmo cleared) - let mut other_cache: HashMap = HashMap::with_capacity(group_affect.len() * 2); - while !group_affect.is_empty() { - group_affect.sort(); - group_affect.dedup(); + // Because of how replication works, we don't send MO over a replication boundary. + // As a result, we always need to trigger for any changed uuid, so we keep the + // initial affected set for the leaf resolution. + // + // As we proceed, we'll also add the affected members of our groups that are + // changing. + let mut all_affected_uuids: BTreeSet<_> = affected_uuids.iter().copied().collect(); + + // While there are still affected uuids. + while !affected_uuids.is_empty() { + trace!(?affected_uuids); // Ignore recycled/tombstones - let filt = filter!(FC::Or( - group_affect - .drain(0..) - .map(|u| f_eq(Attribute::Uuid, PartialValue::Uuid(u))) - .collect() - )); + let filt = filter!(f_and!([ + f_eq(Attribute::Class, EntryClass::Group.into()), + FC::Or( + affected_uuids + .iter() + .copied() + .map(|u| f_eq(Attribute::Uuid, PartialValue::Uuid(u))) + .collect() + ) + ])); + + // Clear the set for the next iteration + affected_uuids.clear(); let work_set = qs.internal_search_writeable(&filt)?; - // Load the vecdeque with this batch. - let mut changes = Vec::with_capacity(work_set.len()); for (pre, mut tgte) in work_set.into_iter() { let guuid = pre.get_uuid(); - // load the entry from the db. - if !tgte.attribute_equality(Attribute::Class, &EntryClass::Group.into()) { - // It's not a group, we'll deal with you later. We should NOT - // have seen this UUID before, as either we are on the first - // iteration OR the checks belowe should have filtered it out. - trace!("not a group, delaying update to -> {:?}", guuid); - other_cache.insert(guuid, (pre, tgte)); - continue; - } - trace!("=> processing group update -> {:?}", guuid); + trace!( + "=> processing group update -> {:?} {}", + guuid, + tgte.get_display_id() + ); - do_memberof(qs, guuid, &mut tgte)?; + do_group_memberof(qs, guuid, &mut tgte)?; // Did we change? Note we don't check if the class changed, only if mo changed. if pre.get_ava_set(Attribute::MemberOf) != tgte.get_ava_set(Attribute::MemberOf) @@ -153,58 +351,82 @@ fn apply_memberof( { // Yes we changed - we now must process all our members, as they need to // inherit changes. Some of these members COULD be non groups, but we - // handle that in the dbload step. + // handle them in the subsequent steps. trace!( - "{:?} changed, flagging members as groups to change. ", - guuid + "{:?} {} changed, flagging members as groups to change. ", + guuid, + tgte.get_display_id() ); - if let Some(miter) = tgte.get_ava_as_refuuid(Attribute::Member) { - group_affect.extend(miter.filter(|m| !other_cache.contains_key(m))); + + // Since our groups memberof (and related, direct member of) has changed, we + // need to propogate these values forward into our members. At this point we + // mark all our members as being part of the affected set. + let pre_member = pre.get_ava_refer(Attribute::Member); + let post_member = tgte.get_ava_refer(Attribute::Member); + + match (pre_member, post_member) { + (Some(pre_m), Some(post_m)) => { + affected_uuids.extend(pre_m); + affected_uuids.extend(post_m); + } + (Some(members), None) | (None, Some(members)) => { + // Doesn't matter what order, just that they are affected + affected_uuids.extend(members); + } + (None, None) => {} }; - if let Some(miter) = tgte.get_ava_as_refuuid(Attribute::DynMember) { - group_affect.extend(miter.filter(|m| !other_cache.contains_key(m))); + + let pre_dynmember = pre.get_ava_refer(Attribute::DynMember); + let post_dynmember = tgte.get_ava_refer(Attribute::DynMember); + + match (pre_dynmember, post_dynmember) { + (Some(pre_m), Some(post_m)) => { + affected_uuids.extend(pre_m); + affected_uuids.extend(post_m); + } + (Some(members), None) | (None, Some(members)) => { + // Doesn't matter what order, just that they are affected + affected_uuids.extend(members); + } + (None, None) => {} }; // push the entries to pre/cand changes.push((pre, tgte)); } else { - trace!("{:?} stable", guuid); + // If the group is stable, then we *only* need to update memberof + // on members that may have been added or removed. This exists to + // optimise when we add a member to a group, but without changing the + // group's mo/dmo to save re-writing mo to all the other members. + // + // If the group's memberof has been through the unstable state, + // all our members are already fully loaded into the affected sets. + // + // NOTE: This filtering of what members were actually impacted is + // performed in the call to post_modify_inner. + + trace!("{:?} {} stable", guuid, tgte.get_display_id()); } } // Write this stripe if populated. if !changes.is_empty() { - qs.internal_apply_writable(changes).map_err(|e| { - admin_error!("Failed to commit memberof group set {:?}", e); - e + trace!("wrote stripe {}", changes.len()); + qs.internal_apply_writable(changes).map_err(|err| { + error!(?err, "Failed to commit memberof group set"); + err })?; } + + // Reflect the full set of affected uuids into our all affected set. + all_affected_uuids.extend(affected_uuids.iter()); + // Next loop! + trace!("-------------------------------------"); } - // ALL GROUP MOS + DMOS ARE NOW STABLE. We can load these into other items directly. - let mut changes = Vec::with_capacity(other_cache.len()); - - other_cache - .into_iter() - .try_for_each(|(auuid, (pre, mut tgte))| { - trace!("=> processing affected uuid {:?}", auuid); - debug_assert!(!tgte.attribute_equality(Attribute::Class, &EntryClass::Group.into())); - do_memberof(qs, auuid, &mut tgte)?; - // Only write if a change occurred. - if pre.get_ava_set(Attribute::MemberOf) != tgte.get_ava_set(Attribute::MemberOf) - || pre.get_ava_set(Attribute::DirectMemberOf) - != tgte.get_ava_set(Attribute::DirectMemberOf) - { - changes.push((pre, tgte)); - } - Ok(()) - })?; - - // Turn the other_cache into a write set. - // Write the batch out in a single stripe. - qs.internal_apply_writable(changes) - // Done! 🎉 + // ALL GROUP MOS + DMOS ARE NOW STABLE. We can update oul leaf entries as required. + do_leaf_memberof(qs, all_affected_uuids) } impl Plugin for MemberOf { @@ -307,7 +529,7 @@ impl Plugin for MemberOf { ) -> Result<(), OperationError> { // Similar condition to create - we only trigger updates on groups's members, // so that they can find they are no longer a mo of what was deleted. - let group_affect = cand + let affected_uuids = cand .iter() .filter_map(|e| { // Is it a group? @@ -332,7 +554,7 @@ impl Plugin for MemberOf { ) .collect(); - apply_memberof(qs, group_affect) + apply_memberof(qs, affected_uuids) } #[instrument(level = "debug", name = "memberof::verify", skip_all)] @@ -360,6 +582,7 @@ impl Plugin for MemberOf { ]) ])); + // what groups is this entry a direct member of? let direct_memberof = match qs .internal_search(filt_in) .map_err(|_| ConsistencyError::QueryServerSearchFailure) @@ -367,8 +590,8 @@ impl Plugin for MemberOf { Ok(d_mo) => d_mo, Err(e) => return vec![Err(e)], }; - // for all direct -> add uuid to map + // for all direct -> add uuid to map let d_groups_set: BTreeSet = direct_memberof.iter().map(|e| e.get_uuid()).collect(); @@ -378,7 +601,11 @@ impl Plugin for MemberOf { Some(d_groups_set) }; - trace!("DMO search groups {:?} -> {:?}", e.get_uuid(), d_groups_set); + trace!( + "DMO search groups {:?} -> {:?}", + e.get_display_id(), + d_groups_set + ); match (e.get_ava_set(Attribute::DirectMemberOf), d_groups_set) { (Some(edmos), Some(b)) => { @@ -387,16 +614,19 @@ impl Plugin for MemberOf { Some(a) => { let diff: Vec<_> = a.symmetric_difference(&b).collect(); if !diff.is_empty() { - admin_error!( - "MemberOfInvalid: Entry {}, DMO has inconsistencies -> {:?}", - e, - diff + error!( + "MemberOfInvalid: Entry {}, DMO has inconsistencies", + e.get_display_id(), ); + trace!(entry_direct_member_of = ?a); + trace!(expected_direct_groups = ?b); + trace!(?diff); + r.push(Err(ConsistencyError::MemberOfInvalid(e.get_id()))); } } _ => { - admin_error!("MemberOfInvalid: Entry {}, DMO has incorrect syntax", e,); + error!("MemberOfInvalid: Entry {}, DMO has incorrect syntax - should be reference uuid set", e.get_display_id()); r.push(Err(ConsistencyError::MemberOfInvalid(e.get_id()))); } } @@ -404,14 +634,14 @@ impl Plugin for MemberOf { (None, None) => { // Ok } - (entry_dmo, d_groups) => { - admin_error!( + (entry_direct_member_of, expected_direct_groups) => { + error!( "MemberOfInvalid directmemberof set and DMO search set differ in size: {}", - e.get_uuid() + e.get_display_id() ); - trace!(?e); - trace!(?entry_dmo); - trace!(?d_groups); + // trace!(?e); + trace!(?entry_direct_member_of); + trace!(?expected_direct_groups); r.push(Err(ConsistencyError::MemberOfInvalid(e.get_id()))); } } @@ -442,10 +672,11 @@ impl MemberOf { ) -> Result<(), OperationError> { let dyngroup_change = super::dyngroup::DynGroup::post_create(qs, cand, ident)?; - let group_affect = cand + let affected_uuids = cand .iter() .map(|e| e.get_uuid()) .chain(dyngroup_change) + // In a create, we have to always examine our members as being affected. .chain( cand.iter() .filter_map(|e| { @@ -460,7 +691,7 @@ impl MemberOf { ) .collect(); - apply_memberof(qs, group_affect) + apply_memberof(qs, affected_uuids) } fn post_modify_inner( @@ -478,37 +709,48 @@ impl MemberOf { force_dyngroup_cand_update, )?; - // TODO: Limit this to when it's a class, member, mo, dmo change instead. - let group_affect = cand + let mut affected_uuids: BTreeSet<_> = cand .iter() .map(|post| post.get_uuid()) .chain(dyngroup_change) - .chain( - pre_cand - .iter() - .filter_map(|pre| { - if pre.attribute_equality(Attribute::Class, &EntryClass::Group.into()) { - pre.get_ava_as_refuuid(Attribute::Member) - } else { - None - } - }) - .flatten(), - ) - .chain( - cand.iter() - .filter_map(|post| { - if post.attribute_equality(Attribute::Class, &EntryClass::Group.into()) { - post.get_ava_as_refuuid(Attribute::Member) - } else { - None - } - }) - .flatten(), - ) .collect(); - apply_memberof(qs, group_affect) + for (pre, post) in pre_cand.iter().zip(cand.iter()).filter(|(pre, post)| { + post.attribute_equality(Attribute::Class, &EntryClass::Group.into()) + || pre.attribute_equality(Attribute::Class, &EntryClass::Group.into()) + }) { + let pre_member = pre.get_ava_refer(Attribute::Member); + let post_member = post.get_ava_refer(Attribute::Member); + + match (pre_member, post_member) { + (Some(pre_m), Some(post_m)) => { + // Show only the *changed* uuids for leaf resolution. + affected_uuids.extend(pre_m.symmetric_difference(post_m)); + } + (Some(members), None) | (None, Some(members)) => { + // Doesn't matter what order, just that they are affected + affected_uuids.extend(members); + } + (None, None) => {} + }; + + let pre_dynmember = pre.get_ava_refer(Attribute::DynMember); + let post_dynmember = post.get_ava_refer(Attribute::DynMember); + + match (pre_dynmember, post_dynmember) { + (Some(pre_m), Some(post_m)) => { + // Show only the *changed* uuids. + affected_uuids.extend(pre_m.symmetric_difference(post_m)); + } + (Some(members), None) | (None, Some(members)) => { + // Doesn't matter what order, just that they are affected + affected_uuids.extend(members); + } + (None, None) => {} + }; + } + + apply_memberof(qs, affected_uuids) } } diff --git a/server/lib/src/plugins/refint.rs b/server/lib/src/plugins/refint.rs index 5ec557997..4b458276b 100644 --- a/server/lib/src/plugins/refint.rs +++ b/server/lib/src/plugins/refint.rs @@ -12,17 +12,18 @@ use std::collections::BTreeSet; use std::sync::Arc; -use hashbrown::HashSet; +use hashbrown::{HashMap, HashSet}; use crate::event::{CreateEvent, DeleteEvent, ModifyEvent}; use crate::filter::{f_eq, FC}; use crate::plugins::Plugin; use crate::prelude::*; -use crate::schema::SchemaTransaction; +use crate::schema::{SchemaAttribute, SchemaTransaction}; pub struct ReferentialIntegrity; impl ReferentialIntegrity { + #[instrument(level = "debug", name = "check_uuids_exist_fast", skip_all)] fn check_uuids_exist_fast( qs: &mut QueryServerWriteTransaction, inner: &[Uuid], @@ -55,6 +56,7 @@ impl ReferentialIntegrity { } } + #[instrument(level = "debug", name = "check_uuids_exist_slow", skip_all)] fn check_uuids_exist_slow( qs: &mut QueryServerWriteTransaction, inner: &[Uuid], @@ -162,27 +164,27 @@ impl Plugin for ReferentialIntegrity { cand: &[Entry], _ce: &CreateEvent, ) -> Result<(), OperationError> { - Self::post_modify_inner(qs, cand) + Self::post_modify_inner(qs, None, cand) } #[instrument(level = "debug", name = "refint_post_modify", skip_all)] fn post_modify( qs: &mut QueryServerWriteTransaction, - _pre_cand: &[Arc>], + pre_cand: &[Arc], cand: &[Entry], _me: &ModifyEvent, ) -> Result<(), OperationError> { - Self::post_modify_inner(qs, cand) + Self::post_modify_inner(qs, Some(pre_cand), cand) } #[instrument(level = "debug", name = "refint_post_batch_modify", skip_all)] fn post_batch_modify( qs: &mut QueryServerWriteTransaction, - _pre_cand: &[Arc>], + pre_cand: &[Arc], cand: &[Entry], _me: &BatchModifyEvent, ) -> Result<(), OperationError> { - Self::post_modify_inner(qs, cand) + Self::post_modify_inner(qs, Some(pre_cand), cand) } #[instrument(level = "debug", name = "refint_post_repl_refresh", skip_all)] @@ -190,7 +192,7 @@ impl Plugin for ReferentialIntegrity { qs: &mut QueryServerWriteTransaction, cand: &[EntrySealedCommitted], ) -> Result<(), OperationError> { - Self::post_modify_inner(qs, cand) + Self::post_modify_inner(qs, None, cand) } #[instrument(level = "debug", name = "refint_post_repl_incremental", skip_all)] @@ -207,7 +209,7 @@ impl Plugin for ReferentialIntegrity { // // This also becomes a path to a "ref int fixup" too? - let uuids = Self::cand_references_to_uuid_filter(qs, cand)?; + let uuids = Self::cand_references_to_uuid_filter(qs, Some(pre_cand), cand)?; let all_exist_fast = Self::check_uuids_exist_fast(qs, uuids.as_slice())?; @@ -356,54 +358,45 @@ impl Plugin for ReferentialIntegrity { } } -impl ReferentialIntegrity { - fn cand_references_to_uuid_filter( - qs: &mut QueryServerWriteTransaction, - cand: &[EntrySealedCommitted], - ) -> Result, OperationError> { - let schema = qs.get_schema(); - let ref_types = schema.get_reference_types(); +fn update_reference_set<'a, I>( + ref_types: &HashMap, + entry_iter: I, + reference_set: &mut BTreeSet, +) -> Result<(), OperationError> +where + I: Iterator, +{ + for cand in entry_iter { + trace!(cand_id = %cand.get_display_id()); + // If it's dyngroup, skip member since this will be reset in the next step. + let dyn_group = cand.attribute_equality(Attribute::Class, &EntryClass::DynGroup.into()); - // Fast Path - let mut vsiter = cand.iter().flat_map(|c| { - // If it's dyngroup, skip member since this will be reset in the next step. - let dyn_group = c.attribute_equality(Attribute::Class, &EntryClass::DynGroup.into()); + // For all reference types that exist in the schema. + let cand_ref_valuesets = ref_types.values().filter_map(|rtype| { + // If the entry is a dyn-group, skip dyn member. + let skip_mb = dyn_group && rtype.name == Attribute::DynMember.as_ref(); + // MemberOf is always recalculated, so it can be skipped + let skip_mo = rtype.name == Attribute::MemberOf.as_ref(); - ref_types.values().filter_map(move |rtype| { - // Skip dynamic members, these are recalculated by the - // memberof plugin. - let skip_mb = dyn_group && rtype.name == Attribute::DynMember.as_ref(); - // Skip memberOf, also recalculated. - let skip_mo = rtype.name == Attribute::MemberOf.as_ref(); - if skip_mb || skip_mo { - None - } else { - trace!(rtype_name = ?rtype.name, "examining"); - c.get_ava_set( - (&rtype.name) - .try_into() - .map_err(|e| { - admin_error!(?e, "invalid attribute type {}", &rtype.name); - None:: - }) - .ok()?, - ) - } - }) + if skip_mb || skip_mo { + None + } else { + trace!(rtype_name = ?rtype.name, "examining"); + cand.get_ava_set( + (&rtype.name) + .try_into() + .map_err(|e| { + admin_error!(?e, "invalid attribute type {}", &rtype.name); + None:: + }) + .ok()?, + ) + } }); - // Could check len first? - let mut i = Vec::with_capacity(cand.len() * 4); - let mut dedup = HashSet::new(); - - vsiter.try_for_each(|vs| { + for vs in cand_ref_valuesets { if let Some(uuid_iter) = vs.as_ref_uuid_iter() { - uuid_iter.for_each(|u| { - // Returns true if the item is NEW in the set - if dedup.insert(u) { - i.push(u) - } - }); + reference_set.extend(uuid_iter); Ok(()) } else { admin_error!(?vs, "reference value could not convert to reference uuid."); @@ -411,17 +404,50 @@ impl ReferentialIntegrity { Err(OperationError::InvalidAttribute( "uuid could not become reference value".to_string(), )) - } - })?; + }?; + } + } + Ok(()) +} - Ok(i) +impl ReferentialIntegrity { + fn cand_references_to_uuid_filter( + qs: &mut QueryServerWriteTransaction, + pre_cand: Option<&[Arc]>, + post_cand: &[EntrySealedCommitted], + ) -> Result, OperationError> { + let schema = qs.get_schema(); + let ref_types = schema.get_reference_types(); + + let mut previous_reference_set = BTreeSet::new(); + let mut reference_set = BTreeSet::new(); + + if let Some(pre_cand) = pre_cand { + update_reference_set( + ref_types, + pre_cand.iter().map(|e| e.as_ref()), + &mut previous_reference_set, + )?; + } + + update_reference_set(ref_types, post_cand.iter(), &mut reference_set)?; + + // Now build the reference set from the current candidates. + + // Return only the values that are present in the new reference set. These are the values + // we actually need to check the integrity of. + Ok(reference_set + .difference(&previous_reference_set) + .copied() + .collect()) } fn post_modify_inner( qs: &mut QueryServerWriteTransaction, - cand: &[EntrySealedCommitted], + pre_cand: Option<&[Arc]>, + post_cand: &[EntrySealedCommitted], ) -> Result<(), OperationError> { - let uuids = Self::cand_references_to_uuid_filter(qs, cand)?; + let uuids = Self::cand_references_to_uuid_filter(qs, pre_cand, post_cand)?; let all_exist_fast = Self::check_uuids_exist_fast(qs, uuids.as_slice())?; diff --git a/server/lib/src/server/access/mod.rs b/server/lib/src/server/access/mod.rs index 046e4ebdd..ff8656879 100644 --- a/server/lib/src/server/access/mod.rs +++ b/server/lib/src/server/access/mod.rs @@ -19,13 +19,13 @@ use std::collections::BTreeSet; use std::ops::DerefMut; use std::sync::Arc; -use concread::arcache::{ARCache, ARCacheBuilder, ARCacheReadTxn}; +use concread::arcache::ARCacheBuilder; use concread::cowcell::*; use uuid::Uuid; use crate::entry::{Entry, EntryCommitted, EntryInit, EntryNew, EntryReduced}; use crate::event::{CreateEvent, DeleteEvent, ModifyEvent, SearchEvent}; -use crate::filter::{Filter, FilterValid, FilterValidResolved}; +use crate::filter::{Filter, FilterValid, ResolveFilterCache, ResolveFilterCacheReadTxn}; use crate::modify::Modify; use crate::prelude::*; @@ -41,8 +41,8 @@ use self::delete::{apply_delete_access, DeleteResult}; use self::modify::{apply_modify_access, ModifyResult}; use self::search::{apply_search_access, SearchResult}; -const ACP_RESOLVE_FILTER_CACHE_MAX: usize = 2048; -const ACP_RESOLVE_FILTER_CACHE_LOCAL: usize = 16; +const ACP_RESOLVE_FILTER_CACHE_MAX: usize = 256; +const ACP_RESOLVE_FILTER_CACHE_LOCAL: usize = 0; mod create; mod delete; @@ -100,8 +100,7 @@ struct AccessControlsInner { pub struct AccessControls { inner: CowCell, // acp_related_search_cache: ARCache>, - acp_resolve_filter_cache: - ARCache<(IdentityId, Filter), Filter>, + acp_resolve_filter_cache: ResolveFilterCache, } fn resolve_access_conditions( @@ -109,12 +108,7 @@ fn resolve_access_conditions( ident_memberof: Option<&BTreeSet>, receiver: &AccessControlReceiver, target: &AccessControlTarget, - acp_resolve_filter_cache: &mut ARCacheReadTxn< - '_, - (IdentityId, Filter), - Filter, - (), - >, + acp_resolve_filter_cache: &mut ResolveFilterCacheReadTxn<'_>, ) -> Option<(AccessControlReceiverCondition, AccessControlTargetCondition)> { let receiver_condition = match receiver { AccessControlReceiver::Group(groups) => { @@ -158,9 +152,7 @@ pub trait AccessControlsTransaction<'a> { fn get_sync_agreements(&self) -> &HashMap>; #[allow(clippy::mut_from_ref)] - fn get_acp_resolve_filter_cache( - &self, - ) -> &mut ARCacheReadTxn<'a, (IdentityId, Filter), Filter, ()>; + fn get_acp_resolve_filter_cache(&self) -> &mut ResolveFilterCacheReadTxn<'a>; #[instrument(level = "trace", name = "access::search_related_acp", skip_all)] fn search_related_acp<'b>(&'b self, ident: &Identity) -> Vec> { @@ -873,9 +865,7 @@ pub trait AccessControlsTransaction<'a> { pub struct AccessControlsWriteTransaction<'a> { inner: CowCellWriteTxn<'a, AccessControlsInner>, - acp_resolve_filter_cache: Cell< - ARCacheReadTxn<'a, (IdentityId, Filter), Filter, ()>, - >, + acp_resolve_filter_cache: Cell>, } impl<'a> AccessControlsWriteTransaction<'a> { @@ -949,19 +939,10 @@ impl<'a> AccessControlsTransaction<'a> for AccessControlsWriteTransaction<'a> { &self.inner.sync_agreements } - fn get_acp_resolve_filter_cache( - &self, - ) -> &mut ARCacheReadTxn<'a, (IdentityId, Filter), Filter, ()> - { + fn get_acp_resolve_filter_cache(&self) -> &mut ResolveFilterCacheReadTxn<'a> { unsafe { let mptr = self.acp_resolve_filter_cache.as_ptr(); - &mut (*mptr) - as &mut ARCacheReadTxn< - 'a, - (IdentityId, Filter), - Filter, - (), - > + &mut (*mptr) as &mut ResolveFilterCacheReadTxn<'a> } } } @@ -973,9 +954,7 @@ impl<'a> AccessControlsTransaction<'a> for AccessControlsWriteTransaction<'a> { pub struct AccessControlsReadTransaction<'a> { inner: CowCellReadTxn, // acp_related_search_cache: Cell>>, - acp_resolve_filter_cache: Cell< - ARCacheReadTxn<'a, (IdentityId, Filter), Filter, ()>, - >, + acp_resolve_filter_cache: Cell>, } unsafe impl<'a> Sync for AccessControlsReadTransaction<'a> {} @@ -1003,19 +982,10 @@ impl<'a> AccessControlsTransaction<'a> for AccessControlsReadTransaction<'a> { &self.inner.sync_agreements } - fn get_acp_resolve_filter_cache( - &self, - ) -> &mut ARCacheReadTxn<'a, (IdentityId, Filter), Filter, ()> - { + fn get_acp_resolve_filter_cache(&self) -> &mut ResolveFilterCacheReadTxn<'a> { unsafe { let mptr = self.acp_resolve_filter_cache.as_ptr(); - &mut (*mptr) - as &mut ARCacheReadTxn< - 'a, - (IdentityId, Filter), - Filter, - (), - > + &mut (*mptr) as &mut ResolveFilterCacheReadTxn<'a> } } } diff --git a/server/lib/src/server/mod.rs b/server/lib/src/server/mod.rs index 49f065d67..be1828d32 100644 --- a/server/lib/src/server/mod.rs +++ b/server/lib/src/server/mod.rs @@ -4,7 +4,7 @@ use std::str::FromStr; use std::sync::Arc; -use concread::arcache::{ARCache, ARCacheBuilder, ARCacheReadTxn}; +use concread::arcache::{ARCacheBuilder, ARCacheReadTxn}; use concread::cowcell::*; use hashbrown::{HashMap, HashSet}; use std::collections::BTreeSet; @@ -15,7 +15,10 @@ use kanidm_proto::internal::{DomainInfo as ProtoDomainInfo, UiHint}; use crate::be::{Backend, BackendReadTransaction, BackendTransaction, BackendWriteTransaction}; // We use so many, we just import them all ... -use crate::filter::{Filter, FilterInvalid, FilterValid, FilterValidResolved}; +use crate::filter::{ + Filter, FilterInvalid, FilterValid, FilterValidResolved, ResolveFilterCache, + ResolveFilterCacheReadTxn, +}; use crate::plugins::dyngroup::{DynGroup, DynGroupCache}; use crate::plugins::Plugins; use crate::prelude::*; @@ -52,11 +55,8 @@ pub(crate) mod migrations; pub mod modify; pub(crate) mod recycle; -const RESOLVE_FILTER_CACHE_MAX: usize = 4096; -const RESOLVE_FILTER_CACHE_LOCAL: usize = 0; - -pub type ResolveFilterCacheReadTxn<'a> = - ARCacheReadTxn<'a, (IdentityId, Filter), Filter, ()>; +const RESOLVE_FILTER_CACHE_MAX: usize = 256; +const RESOLVE_FILTER_CACHE_LOCAL: usize = 8; #[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq)] pub(crate) enum ServerPhase { @@ -92,9 +92,9 @@ pub struct QueryServer { schema: Arc, accesscontrols: Arc, db_tickets: Arc, + read_tickets: Arc, write_ticket: Arc, - resolve_filter_cache: - Arc), Filter>>, + resolve_filter_cache: Arc, dyngroup_cache: Arc>, cid_max: Arc>, key_providers: Arc, @@ -110,8 +110,8 @@ pub struct QueryServerReadTransaction<'a> { accesscontrols: AccessControlsReadTransaction<'a>, key_providers: KeyProvidersReadTransaction, _db_ticket: SemaphorePermit<'a>, - resolve_filter_cache: - ARCacheReadTxn<'a, (IdentityId, Filter), Filter, ()>, + _read_ticket: SemaphorePermit<'a>, + resolve_filter_cache: ResolveFilterCacheReadTxn<'a>, // Future we may need this. // cid_max: CowCellReadTxn, trim_cid: Cid, @@ -155,8 +155,12 @@ pub struct QueryServerWriteTransaction<'a> { pub(super) changed_uuid: HashSet, _db_ticket: SemaphorePermit<'a>, _write_ticket: SemaphorePermit<'a>, - resolve_filter_cache: - ARCacheReadTxn<'a, (IdentityId, Filter), Filter, ()>, + resolve_filter_cache: ARCacheReadTxn< + 'a, + (IdentityId, Arc>), + Arc>, + (), + >, dyngroup_cache: CowCellWriteTxn<'a, DynGroupCache>, } @@ -1047,10 +1051,7 @@ impl<'a> QueryServerTransaction<'a> for QueryServerReadTransaction<'a> { &self.key_providers } - fn get_resolve_filter_cache( - &mut self, - ) -> &mut ARCacheReadTxn<'a, (IdentityId, Filter), Filter, ()> - { + fn get_resolve_filter_cache(&mut self) -> &mut ResolveFilterCacheReadTxn<'a> { &mut self.resolve_filter_cache } @@ -1058,7 +1059,7 @@ impl<'a> QueryServerTransaction<'a> for QueryServerReadTransaction<'a> { &mut self, ) -> ( &mut BackendReadTransaction<'a>, - &mut ARCacheReadTxn<'a, (IdentityId, Filter), Filter, ()>, + &mut ResolveFilterCacheReadTxn<'a>, ) { (&mut self.be_txn, &mut self.resolve_filter_cache) } @@ -1201,10 +1202,7 @@ impl<'a> QueryServerTransaction<'a> for QueryServerWriteTransaction<'a> { &self.key_providers } - fn get_resolve_filter_cache( - &mut self, - ) -> &mut ARCacheReadTxn<'a, (IdentityId, Filter), Filter, ()> - { + fn get_resolve_filter_cache(&mut self) -> &mut ResolveFilterCacheReadTxn<'a> { &mut self.resolve_filter_cache } @@ -1212,7 +1210,7 @@ impl<'a> QueryServerTransaction<'a> for QueryServerWriteTransaction<'a> { &mut self, ) -> ( &mut BackendWriteTransaction<'a>, - &mut ARCacheReadTxn<'a, (IdentityId, Filter), Filter, ()>, + &mut ResolveFilterCacheReadTxn<'a>, ) { (&mut self.be_txn, &mut self.resolve_filter_cache) } @@ -1311,6 +1309,11 @@ impl QueryServer { let key_providers = Arc::new(KeyProviders::default()); + // These needs to be pool_size minus one to always leave a DB ticket + // for a writer. But it also needs to be at least one :) + debug_assert!(pool_size > 0); + let read_ticket_pool = std::cmp::max(pool_size - 1, 1); + Ok(QueryServer { phase, d_info, @@ -1319,6 +1322,7 @@ impl QueryServer { schema: Arc::new(schema), accesscontrols: Arc::new(AccessControls::default()), db_tickets: Arc::new(Semaphore::new(pool_size as usize)), + read_tickets: Arc::new(Semaphore::new(read_ticket_pool as usize)), write_ticket: Arc::new(Semaphore::new(1)), resolve_filter_cache, dyngroup_cache, @@ -1334,7 +1338,26 @@ impl QueryServer { } pub async fn read(&self) -> QueryServerReadTransaction<'_> { - // We need to ensure a db conn will be available + // Get a read ticket. Basicly this forces us to queue with other readers, while preventing + // us from competing with writers on the db tickets. This tilts us to write prioritising + // on db operations by always making sure a writer can get a db ticket. + let read_ticket = if cfg!(test) { + #[allow(clippy::expect_used)] + self.read_tickets + .try_acquire() + .expect("unable to acquire db_ticket for qsr") + } else { + #[allow(clippy::expect_used)] + self.read_tickets + .acquire() + .await + .expect("unable to acquire db_ticket for qsr") + }; + + // We need to ensure a db conn will be available. At this point either a db ticket + // *must* be available because pool_size >= 2 and the only other holders are write + // and read ticket holders, OR pool_size == 1, and we are waiting on the writer to now + // complete. let db_ticket = if cfg!(test) { #[allow(clippy::expect_used)] self.db_tickets @@ -1368,6 +1391,7 @@ impl QueryServer { accesscontrols: self.accesscontrols.read(), key_providers: self.key_providers.read(), _db_ticket: db_ticket, + _read_ticket: read_ticket, resolve_filter_cache: self.resolve_filter_cache.read(), trim_cid, } @@ -1387,7 +1411,9 @@ impl QueryServer { .expect("unable to acquire writer_ticket for qsw") }; - // We need to ensure a db conn will be available + // We need to ensure a db conn will be available. At this point either a db ticket + // *must* be available because pool_size >= 2 and the only other are readers, or + // pool_size == 1 and we are waiting on a single reader to now complete let db_ticket = if cfg!(test) { #[allow(clippy::expect_used)] self.db_tickets @@ -1401,6 +1427,10 @@ impl QueryServer { .expect("unable to acquire db_ticket for qsw") }; + // Point of no return - we now have a DB thread AND the write ticket, we MUST complete + // as soon as possible! The following locks and elements below are SYNCHRONOUS but + // will never be contented at this point, and will always progress. + #[allow(clippy::expect_used)] let be_txn = self .be @@ -1858,6 +1888,7 @@ impl<'a> QueryServerWriteTransaction<'a> { } debug!(domain_previous_version = ?previous_version, domain_target_version = ?domain_info_version); + debug!(domain_previous_patch_level = ?previous_patch_level, domain_target_patch_level = ?domain_info_patch_level); if previous_version <= DOMAIN_LEVEL_2 && domain_info_version >= DOMAIN_LEVEL_3 { self.migrate_domain_2_to_3()?; @@ -1881,7 +1912,7 @@ impl<'a> QueryServerWriteTransaction<'a> { // Similar to the older system info migration handler, these allow "one shot" fixes // to be issued and run by bumping the patch level. - if previous_patch_level <= PATCH_LEVEL_1 && domain_info_patch_level >= PATCH_LEVEL_1 { + if previous_patch_level < PATCH_LEVEL_1 && domain_info_patch_level >= PATCH_LEVEL_1 { self.migrate_domain_patch_level_1()?; } @@ -2124,7 +2155,6 @@ impl<'a> QueryServerWriteTransaction<'a> { // Point of no return - everything has been validated and reloaded. // // = Lets commit = - schema .commit() .map(|_| d_info.commit()) diff --git a/server/lib/src/value.rs b/server/lib/src/value.rs index ae2e98f5b..9f08d14fa 100644 --- a/server/lib/src/value.rs +++ b/server/lib/src/value.rs @@ -1186,7 +1186,7 @@ pub enum Value { HexString(String), - Certificate(Certificate), + Certificate(Box), } impl PartialEq for Value { @@ -1478,7 +1478,10 @@ impl Value { } pub fn new_certificate_s(cert_str: &str) -> Option { - Certificate::from_pem(cert_str).map(Value::Certificate).ok() + Certificate::from_pem(cert_str) + .map(Box::new) + .map(Value::Certificate) + .ok() } /// Want a `Value::Image`? use this! diff --git a/server/lib/src/valueset/certificate.rs b/server/lib/src/valueset/certificate.rs index b4bbe2ecb..7dcbf393a 100644 --- a/server/lib/src/valueset/certificate.rs +++ b/server/lib/src/valueset/certificate.rs @@ -16,11 +16,11 @@ use kanidm_lib_crypto::{ #[derive(Debug, Clone)] pub struct ValueSetCertificate { - map: BTreeMap, + map: BTreeMap>, } impl ValueSetCertificate { - pub fn new(certificate: Certificate) -> Result, OperationError> { + pub fn new(certificate: Box) -> Result, OperationError> { let mut map = BTreeMap::new(); let pk_s256 = x509_public_key_s256(&certificate).ok_or_else(|| { @@ -49,8 +49,9 @@ impl ValueSetCertificate { match db_cert { DbValueCertificate::V1 { certificate_der } => { // Parse the DER - let certificate = - Certificate::from_der(&certificate_der).map_err(|x509_err| { + let certificate = Certificate::from_der(&certificate_der) + .map(Box::new) + .map_err(|x509_err| { error!(?x509_err, "Unable to restore certificate from DER"); OperationError::VS0003CertificateDerDecode })?; @@ -87,9 +88,10 @@ impl ValueSetCertificate { .collect() } + #[allow(clippy::should_implement_trait)] pub fn from_iter(iter: T) -> Option> where - T: IntoIterator, + T: IntoIterator>, { let mut map = BTreeMap::new(); @@ -129,7 +131,7 @@ impl ValueSetT for ValueSetCertificate { match pv { PartialValue::HexString(hs) => { let mut buf = Sha256Digest::default(); - if hex::decode_to_slice(&hs, &mut buf).is_ok() { + if hex::decode_to_slice(hs, &mut buf).is_ok() { self.map.remove(&buf).is_some() } else { false @@ -143,7 +145,7 @@ impl ValueSetT for ValueSetCertificate { match pv { PartialValue::HexString(hs) => { let mut buf = Sha256Digest::default(); - if hex::decode_to_slice(&hs, &mut buf).is_ok() { + if hex::decode_to_slice(hs, &mut buf).is_ok() { self.map.contains_key(&buf) } else { false @@ -236,13 +238,13 @@ impl ValueSetT for ValueSetCertificate { fn to_certificate_single(&self) -> Option<&Certificate> { if self.map.len() == 1 { - self.map.values().take(1).next() + self.map.values().take(1).map(|b| b.as_ref()).next() } else { None } } - fn as_certificate_set(&self) -> Option<&BTreeMap> { + fn as_certificate_set(&self) -> Option<&BTreeMap>> { Some(&self.map) } } diff --git a/server/lib/src/valueset/mod.rs b/server/lib/src/valueset/mod.rs index 1493f2a42..d1a8c654f 100644 --- a/server/lib/src/valueset/mod.rs +++ b/server/lib/src/valueset/mod.rs @@ -245,6 +245,11 @@ pub trait ValueSetT: std::fmt::Debug + DynClone { None } + fn as_refer_set_mut(&mut self) -> Option<&mut BTreeSet> { + debug_assert!(false); + None + } + fn as_bool_set(&self) -> Option<&SmolSet<[bool; 1]>> { debug_assert!(false); None @@ -620,7 +625,7 @@ pub trait ValueSetT: std::fmt::Debug + DynClone { None } - fn as_certificate_set(&self) -> Option<&BTreeMap> { + fn as_certificate_set(&self) -> Option<&BTreeMap>> { debug_assert!(false); None } diff --git a/server/lib/src/valueset/uuid.rs b/server/lib/src/valueset/uuid.rs index 15f5f52b6..694e460e4 100644 --- a/server/lib/src/valueset/uuid.rs +++ b/server/lib/src/valueset/uuid.rs @@ -338,6 +338,10 @@ impl ValueSetT for ValueSetRefer { Some(&self.set) } + fn as_refer_set_mut(&mut self) -> Option<&mut BTreeSet> { + Some(&mut self.set) + } + fn as_ref_uuid_iter(&self) -> Option + '_>> { Some(Box::new(self.set.iter().copied())) } diff --git a/tools/cli/src/cli/person.rs b/tools/cli/src/cli/person.rs index a1438934a..31287a50f 100644 --- a/tools/cli/src/cli/person.rs +++ b/tools/cli/src/cli/person.rs @@ -547,7 +547,7 @@ impl AccountCertificate { let client = copt.to_client(OpType::Write).await; if let Err(e) = client - .idm_person_certificate_create(&account_id, &pem_data) + .idm_person_certificate_create(account_id, &pem_data) .await { handle_client_error(e, copt.output_mode); diff --git a/tools/orca/src/populate.rs b/tools/orca/src/populate.rs index 7ab495765..51efc3066 100644 --- a/tools/orca/src/populate.rs +++ b/tools/orca/src/populate.rs @@ -1,6 +1,10 @@ use crate::error::Error; use crate::kani; use crate::state::*; +use std::collections::VecDeque; + +use std::sync::atomic::{AtomicU32, Ordering}; +use tokio::sync::Mutex; use std::sync::Arc; @@ -76,47 +80,63 @@ pub async fn preflight(state: State) -> Result<(), Error> { apply_flags(client.clone(), state.preflight_flags.as_slice()).await?; let state_persons_len = state.persons.len(); - - let mut tasks = Vec::with_capacity(state_persons_len); + let mut tasks = VecDeque::with_capacity(state_persons_len); // Create persons. for person in state.persons.into_iter() { let c = client.clone(); - // Write operations are single threaded in Kanidm, so we don't need to attempt - // to parallelise that here. - // tasks.push(tokio::spawn(preflight_person(c, person))) - tasks.push(preflight_person(c, person)) + // While writes are single threaded in Kanidm, searches (such as .exists) + // and credential updates are concurrent / parallel. So these parts can be + // called in parallel, so we divide up into workers. + tasks.push_back(preflight_person(c, person)) } - let tasks_par = tasks.split_off(state_persons_len / 2); + let tasks = Arc::new(Mutex::new(tasks)); + let counter = Arc::new(AtomicU32::new(0)); + let par = std::thread::available_parallelism().unwrap(); - let left = tokio::spawn(async move { - for (i, task) in tasks.into_iter().enumerate() { - let _ = task.await; - if i % 500 == 0 { - eprint!("."); - } - } - }); - let right = tokio::spawn(async move { - for (i, task) in tasks_par.into_iter().enumerate() { - let _ = task.await; - if i % 500 == 0 { - eprint!("."); - } - } - }); + let handles: Vec<_> = (0..par.into()) + .map(|_| { + let tasks_q = tasks.clone(); + let counter_c = counter.clone(); + tokio::spawn(async move { + loop { + let maybe_task = async { + let mut guard = tasks_q.lock().await; + guard.pop_front() + } + .await; - left.await.map_err(|tokio_err| { - error!(?tokio_err, "Failed to join task"); - Error::Tokio - })?; - right.await.map_err(|tokio_err| { - error!(?tokio_err, "Failed to join task"); - Error::Tokio - })?; + if let Some(t) = maybe_task { + let _ = t.await; + let was = counter_c.fetch_add(1, Ordering::Relaxed); + if was % 1000 == 999 { + let order = was + 1; + eprint!("{}", order); + } else if was % 100 == 99 { + // Since we just added one, this just rolled over. + eprint!("."); + } + } else { + // queue drained. + break; + } + } + }) + }) + .collect(); + + for handle in handles { + handle.await.map_err(|tokio_err| { + error!(?tokio_err, "Failed to join task"); + Error::Tokio + })?; + } + + eprintln!("done"); // Create groups. + let counter = Arc::new(AtomicU32::new(0)); let mut tasks = Vec::with_capacity(state.groups.len()); for group in state.groups.into_iter() { @@ -129,11 +149,18 @@ pub async fn preflight(state: State) -> Result<(), Error> { for task in tasks { task.await?; - /* - task.await - */ + let was = counter.fetch_add(1, Ordering::Relaxed); + if was % 1000 == 999 { + let order = was + 1; + eprint!("{}", order); + } else if was % 100 == 99 { + // Since we just added one, this just rolled over. + eprint!("."); + } } + eprintln!("done"); + // Create integrations. info!("Ready to 🛫");