mirror of
https://github.com/kanidm/kanidm.git
synced 2025-02-23 04:27:02 +01:00
20240613 performance improvements (#2844)
Thanks to @Seba-T's work with Orca, we were able to identify a number of performance issues in certain high load conditions. This commit contains fixes for the following issues * Unbounded Memory Growth - due to how ARCache works, to maintain temporal consistency it must retain copies of keys (not values) in a special data set for tracking. The Filter Resolve Cache was using unresolved filters as keys. This caused memory explosions when refint or memberof were updating a group with a large number of members because they would emit a query with hundreds of filter terms that would only be used once and never again, causing the ARCache haunted set to grow without bound. To limit this, we no longer cache large/complex queries for resolution, and in future we may implement some other methods to reduce this like sha256/hmac of the queries. * When creating a new account, dyngroups would be engaged to add the account as a member due to the matching scope. However the change to the dyngroup was triggering an update of all the dyngroups *members* related memberof attributes. This would mean that adding an account would trigger every other account to be loaded an updated. * When memberof would iterate over leaf entries and update them one at a time. This mean a large number of small fragmented queries in the case of a lot of leaf entries being updated. Now leaf entries are updated in a single stripe once groups are stabilised. * Member of would always trigger it's members to always update. Instead, we should only update members where a difference is observed, or all members if the group's memberof itself has changed since this needs to propogate to all leaf entries. This significantly reduces the amount of writes and operations to examine the changed member of set. * Referential integrity would examine all reference uuids on entries for validity rather than just the reference uuids that were altered within the transaction. This change means that only uuids that were *added* are validated during an operation. * During async write backs (delayed actions) these were performed one at a time. Instead, when possible this should be done in a single transaction as the write transaction caches all writes in memory until the commit meaning that by batching we reduce overall latency. * In the server there can only be one write transaction and many readers. These are guarded by tokio semaphores that act as fair queues - first in gets the lock next. Due to the design of the server readers would be blocked on the *database* semaphore, and writers would block on the write semaphore and THEN the database semaphore. This arrangement was creating a situation which unfairly advantaged readers over writers, as any write would first have to become the head of it's queue, and then compete with all readers to access a db transaction. Instead, we now have a reader semaphore with size threads minus 1, clamped at a minimum of 1. This means that provided there are two or more threads, then a writer will *always* have a database handle available, and readers will pre-queue with each other before queueing on the db ticket. If there is only one thread, then writes and reads will alternate between each other fairly.
This commit is contained in:
parent
3da8fdc2b1
commit
10e15fd6b3
98
Cargo.lock
generated
98
Cargo.lock
generated
|
@ -99,9 +99,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "anstyle-query"
|
||||
version = "1.0.3"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a64c907d4e79225ac72e2a354c9ce84d50ebb4586dee56c82b3ee73004f537f5"
|
||||
checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391"
|
||||
dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
@ -202,9 +202,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "async-compression"
|
||||
version = "0.4.10"
|
||||
version = "0.4.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c90a406b4495d129f00461241616194cb8a032c8d1c53c657f0961d5f8e0498"
|
||||
checksum = "cd066d0b4ef8ecb03a55319dc13aa6910616d0f44008a045bb1835af830abff5"
|
||||
dependencies = [
|
||||
"flate2",
|
||||
"futures-core",
|
||||
|
@ -310,7 +310,7 @@ dependencies = [
|
|||
"futures-util",
|
||||
"http 0.2.12",
|
||||
"http-body 0.4.6",
|
||||
"hyper 0.14.28",
|
||||
"hyper 0.14.29",
|
||||
"itoa",
|
||||
"matchit",
|
||||
"memchr",
|
||||
|
@ -470,9 +470,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.72"
|
||||
version = "0.3.73"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "17c6a35df3749d2e8bb1b7b21a976d82b15548788d2735b9d82f329268f71a11"
|
||||
checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a"
|
||||
dependencies = [
|
||||
"addr2line",
|
||||
"cc",
|
||||
|
@ -654,7 +654,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "05efc5cfd9110c8416e471df0e96702d58690178e206e61b7173706673c93706"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"regex-automata 0.4.6",
|
||||
"regex-automata 0.4.7",
|
||||
"serde",
|
||||
]
|
||||
|
||||
|
@ -705,9 +705,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
|
|||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.0.98"
|
||||
version = "1.0.99"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "41c270e7540d725e65ac7f1b212ac8ce349719624d7bcff99f8e2e488e8cf03f"
|
||||
checksum = "96c51067fd44124faa7f870b4b1c969379ad32b2ba805aa959430ceaa384f695"
|
||||
|
||||
[[package]]
|
||||
name = "cexpr"
|
||||
|
@ -834,9 +834,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "clap_lex"
|
||||
version = "0.7.0"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce"
|
||||
checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70"
|
||||
|
||||
[[package]]
|
||||
name = "clru"
|
||||
|
@ -893,9 +893,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "concread"
|
||||
version = "0.5.1"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "23bef63c371d1b3da7e61e7b72e5757f070131a399f2eb60edc2d8bb8102249a"
|
||||
checksum = "732590a8115498adcbf1650c4e221ca359ac62b20d1468ec1fb99ac366a859f5"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"arc-swap",
|
||||
|
@ -1493,18 +1493,18 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "enumflags2"
|
||||
version = "0.7.9"
|
||||
version = "0.7.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3278c9d5fb675e0a51dabcf4c0d355f692b064171535ba72361be1528a9d8e8d"
|
||||
checksum = "d232db7f5956f3f14313dc2f87985c58bd2c695ce124c8cdd984e08e15ac133d"
|
||||
dependencies = [
|
||||
"enumflags2_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "enumflags2_derive"
|
||||
version = "0.7.9"
|
||||
version = "0.7.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5c785274071b1b420972453b306eeca06acf4633829db4223b58a2a8c5953bc4"
|
||||
checksum = "de0d48a183585823424a4ce1aa132d174a6a81bd540895822eb4c8373a8e49e8"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
@ -1577,8 +1577,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "531e46835a22af56d1e3b66f04844bed63158bc094a628bec1d321d9b4c44bf2"
|
||||
dependencies = [
|
||||
"bit-set",
|
||||
"regex-automata 0.4.6",
|
||||
"regex-syntax 0.8.3",
|
||||
"regex-automata 0.4.7",
|
||||
"regex-syntax 0.8.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1592,7 +1592,7 @@ dependencies = [
|
|||
"futures-core",
|
||||
"futures-util",
|
||||
"http 0.2.12",
|
||||
"hyper 0.14.28",
|
||||
"hyper 0.14.29",
|
||||
"hyper-tls 0.5.0",
|
||||
"mime",
|
||||
"serde",
|
||||
|
@ -2714,12 +2714,12 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "http-body-util"
|
||||
version = "0.1.1"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d"
|
||||
checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"pin-project-lite",
|
||||
|
@ -2733,9 +2733,9 @@ checksum = "08a397c49fec283e3d6211adbe480be95aae5f304cfb923e9970e08956d5168a"
|
|||
|
||||
[[package]]
|
||||
name = "httparse"
|
||||
version = "1.8.0"
|
||||
version = "1.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
|
||||
checksum = "d0e7a4dd27b9476dc40cb050d3632d3bba3a70ddbff012285f7f8559a1e7e545"
|
||||
|
||||
[[package]]
|
||||
name = "httpdate"
|
||||
|
@ -2745,9 +2745,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
|||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "0.14.28"
|
||||
version = "0.14.29"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80"
|
||||
checksum = "f361cde2f109281a220d4307746cdfd5ee3f410da58a70377762396775634b33"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
|
@ -2794,7 +2794,7 @@ version = "0.4.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
|
||||
dependencies = [
|
||||
"hyper 0.14.28",
|
||||
"hyper 0.14.29",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-io-timeout",
|
||||
|
@ -2807,7 +2807,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"hyper 0.14.28",
|
||||
"hyper 0.14.29",
|
||||
"native-tls",
|
||||
"tokio",
|
||||
"tokio-native-tls",
|
||||
|
@ -3951,9 +3951,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
version = "2.7.2"
|
||||
version = "2.7.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d"
|
||||
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
|
||||
|
||||
[[package]]
|
||||
name = "memmap2"
|
||||
|
@ -4317,9 +4317,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.35.0"
|
||||
version = "0.36.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b8ec7ab813848ba4522158d5517a6093db1ded27575b070f4177b8d12b41db5e"
|
||||
checksum = "576dfe1fc8f9df304abb159d767a29d0476f7750fbf8aa7ad07816004a207434"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
@ -4597,7 +4597,7 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
|
|||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall 0.5.1",
|
||||
"redox_syscall 0.5.2",
|
||||
"smallvec",
|
||||
"windows-targets 0.52.5",
|
||||
]
|
||||
|
@ -5085,9 +5085,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.5.1"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e"
|
||||
checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd"
|
||||
dependencies = [
|
||||
"bitflags 2.5.0",
|
||||
]
|
||||
|
@ -5117,8 +5117,8 @@ checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f"
|
|||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
"regex-automata 0.4.6",
|
||||
"regex-syntax 0.8.3",
|
||||
"regex-automata 0.4.7",
|
||||
"regex-syntax 0.8.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -5132,13 +5132,13 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "regex-automata"
|
||||
version = "0.4.6"
|
||||
version = "0.4.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea"
|
||||
checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
"regex-syntax 0.8.3",
|
||||
"regex-syntax 0.8.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -5149,9 +5149,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
|
|||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.8.3"
|
||||
version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56"
|
||||
checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b"
|
||||
|
||||
[[package]]
|
||||
name = "reqwest"
|
||||
|
@ -6155,7 +6155,7 @@ dependencies = [
|
|||
"h2 0.3.26",
|
||||
"http 0.2.12",
|
||||
"http-body 0.4.6",
|
||||
"hyper 0.14.28",
|
||||
"hyper 0.14.29",
|
||||
"hyper-timeout",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
|
@ -6438,9 +6438,9 @@ checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202"
|
|||
|
||||
[[package]]
|
||||
name = "unicode-width"
|
||||
version = "0.1.12"
|
||||
version = "0.1.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6"
|
||||
checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-xid"
|
||||
|
@ -6480,9 +6480,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
|
|||
|
||||
[[package]]
|
||||
name = "utf8parse"
|
||||
version = "0.2.1"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
|
||||
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
|
||||
|
||||
[[package]]
|
||||
name = "utoipa"
|
||||
|
|
|
@ -164,7 +164,7 @@ clap_complete = "^4.5.5"
|
|||
# Forced by saffron/cron
|
||||
chrono = "^0.4.35"
|
||||
compact_jwt = { version = "^0.4.1", default-features = false }
|
||||
concread = "^0.5.1"
|
||||
concread = "^0.5.2"
|
||||
cron = "0.12.1"
|
||||
crossbeam = "0.8.4"
|
||||
criterion = "^0.5.1"
|
||||
|
|
|
@ -5,7 +5,7 @@ db_fs_type = "zfs"
|
|||
db_path = "/tmp/kanidm/kanidm.db"
|
||||
tls_chain = "/tmp/kanidm/chain.pem"
|
||||
tls_key = "/tmp/kanidm/key.pem"
|
||||
tls_client_ca = "/tmp/kanidm/client_ca"
|
||||
# tls_client_ca = "/tmp/kanidm/client_ca"
|
||||
|
||||
# The log level of the server. May be one of info, debug, trace
|
||||
#
|
||||
|
|
|
@ -49,8 +49,8 @@ pub const PBKDF2_MIN_NIST_SALT_LEN: usize = 14;
|
|||
// Min number of rounds for a pbkdf2
|
||||
pub const PBKDF2_MIN_NIST_COST: usize = 10000;
|
||||
|
||||
// 64 * u8 -> 512 bits of out.
|
||||
const PBKDF2_KEY_LEN: usize = 64;
|
||||
// 32 * u8 -> 256 bits of out.
|
||||
const PBKDF2_KEY_LEN: usize = 32;
|
||||
const PBKDF2_MIN_NIST_KEY_LEN: usize = 32;
|
||||
const PBKDF2_SHA1_MIN_KEY_LEN: usize = 19;
|
||||
|
||||
|
@ -62,10 +62,10 @@ const DS_SHA512_HASH_LEN: usize = 64;
|
|||
// Taken from the argon2 library and rfc 9106
|
||||
const ARGON2_VERSION: u32 = 19;
|
||||
const ARGON2_SALT_LEN: usize = 16;
|
||||
// 32 * u8 -> 256 bits of out.
|
||||
const ARGON2_KEY_LEN: usize = 32;
|
||||
// Default amount of ram we sacrifice per thread is 8MB
|
||||
// Default amount of ram we sacrifice per thread
|
||||
const ARGON2_MIN_RAM_KIB: u32 = 8 * 1024;
|
||||
// Max is 64MB. This may change in time.
|
||||
const ARGON2_MAX_RAM_KIB: u32 = 64 * 1024;
|
||||
// Amount of ram to subtract when we do a T cost iter. This
|
||||
// is because t=2 m=32 == t=3 m=20. So we just step down a little
|
||||
|
@ -902,10 +902,9 @@ impl Password {
|
|||
|
||||
fn bench_argon2id(params: Params) -> Option<Duration> {
|
||||
let mut rng = rand::thread_rng();
|
||||
let salt: Vec<u8> = (0..PBKDF2_SALT_LEN).map(|_| rng.gen()).collect();
|
||||
let input: Vec<u8> = (0..PBKDF2_SALT_LEN).map(|_| rng.gen()).collect();
|
||||
// This is 512 bits of output
|
||||
let mut key: Vec<u8> = (0..PBKDF2_KEY_LEN).map(|_| 0).collect();
|
||||
let salt: Vec<u8> = (0..ARGON2_SALT_LEN).map(|_| rng.gen()).collect();
|
||||
let input: Vec<u8> = (0..ARGON2_SALT_LEN).map(|_| rng.gen()).collect();
|
||||
let mut key: Vec<u8> = (0..ARGON2_KEY_LEN).map(|_| 0).collect();
|
||||
|
||||
let argon = Argon2::new(Algorithm::Argon2id, Version::V0x13, params);
|
||||
|
||||
|
@ -922,7 +921,6 @@ impl Password {
|
|||
let pbkdf2_cost = policy.pbkdf2_cost;
|
||||
let mut rng = rand::thread_rng();
|
||||
let salt: Vec<u8> = (0..PBKDF2_SALT_LEN).map(|_| rng.gen()).collect();
|
||||
// This is 512 bits of output
|
||||
let mut key: Vec<u8> = (0..PBKDF2_KEY_LEN).map(|_| 0).collect();
|
||||
|
||||
pbkdf2_hmac(
|
||||
|
|
|
@ -10,6 +10,7 @@ use tracing_forest::printer::TestCapturePrinter;
|
|||
use tracing_forest::tag::NoTag;
|
||||
use tracing_forest::util::*;
|
||||
use tracing_forest::Tag;
|
||||
use tracing_subscriber::filter::Directive;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
pub mod macros;
|
||||
|
@ -19,9 +20,10 @@ pub use {tracing, tracing_forest, tracing_subscriber};
|
|||
|
||||
/// Start up the logging for test mode.
|
||||
pub fn test_init() {
|
||||
let filter = EnvFilter::from_default_env()
|
||||
let filter = EnvFilter::builder()
|
||||
// Skipping trace on tests by default saves a *TON* of ram.
|
||||
.add_directive(LevelFilter::INFO.into())
|
||||
.with_default_directive(LevelFilter::INFO.into())
|
||||
.from_env_lossy()
|
||||
// escargot builds cargo packages while we integration test and is SUPER noisy.
|
||||
.add_directive(
|
||||
"escargot=ERROR"
|
||||
|
@ -137,12 +139,12 @@ impl Display for LogLevel {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<LogLevel> for EnvFilter {
|
||||
impl From<LogLevel> for Directive {
|
||||
fn from(value: LogLevel) -> Self {
|
||||
match value {
|
||||
LogLevel::Info => EnvFilter::new("info"),
|
||||
LogLevel::Debug => EnvFilter::new("debug"),
|
||||
LogLevel::Trace => EnvFilter::new("trace"),
|
||||
LogLevel::Info => Directive::from(Level::INFO),
|
||||
LogLevel::Debug => Directive::from(Level::DEBUG),
|
||||
LogLevel::Trace => Directive::from(Level::TRACE),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,9 @@ pub fn start_logging_pipeline(
|
|||
log_filter: crate::LogLevel,
|
||||
service_name: String,
|
||||
) -> Result<Box<dyn Subscriber + Send + Sync>, String> {
|
||||
let forest_filter: EnvFilter = log_filter.into();
|
||||
let forest_filter: EnvFilter = EnvFilter::builder()
|
||||
.with_default_directive(log_filter.into())
|
||||
.from_env_lossy();
|
||||
|
||||
// TODO: work out how to do metrics things
|
||||
// let meter_provider = init_metrics()
|
||||
|
@ -56,7 +58,9 @@ pub fn start_logging_pipeline(
|
|||
.expect("Failed to set hyper logging to info"),
|
||||
);
|
||||
let forest_layer = tracing_forest::ForestLayer::default().with_filter(forest_filter);
|
||||
let t_filter: EnvFilter = log_filter.into();
|
||||
let t_filter: EnvFilter = EnvFilter::builder()
|
||||
.with_default_directive(log_filter.into())
|
||||
.from_env_lossy();
|
||||
|
||||
let tracer = opentelemetry_otlp::new_pipeline().tracing().with_exporter(
|
||||
opentelemetry_otlp::new_exporter()
|
||||
|
|
|
@ -100,22 +100,56 @@ impl QueryServerWriteV1 {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_delayedaction(&self, da: DelayedAction) {
|
||||
pub(crate) async fn handle_delayedaction(&self, da_batch: &mut Vec<DelayedAction>) {
|
||||
let eventid = Uuid::new_v4();
|
||||
let span = span!(Level::INFO, "process_delayed_action", uuid = ?eventid);
|
||||
|
||||
let mut retry = false;
|
||||
|
||||
async {
|
||||
let ct = duration_from_epoch_now();
|
||||
let mut idms_prox_write = self.idms.proxy_write(ct).await;
|
||||
if let Err(res) = idms_prox_write
|
||||
.process_delayedaction(da, ct)
|
||||
.and_then(|_| idms_prox_write.commit())
|
||||
{
|
||||
info!(?res, "delayed action error");
|
||||
|
||||
for da in da_batch.iter() {
|
||||
retry = idms_prox_write.process_delayedaction(da, ct).is_err();
|
||||
if retry {
|
||||
// exit the loop
|
||||
warn!("delayed action failed, will be retried individually.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(res) = idms_prox_write.commit() {
|
||||
retry = true;
|
||||
error!(?res, "delayed action batch commit error");
|
||||
}
|
||||
}
|
||||
.instrument(span)
|
||||
.await
|
||||
.await;
|
||||
|
||||
if retry {
|
||||
// An error occured, retry each operation one at a time.
|
||||
for da in da_batch.iter() {
|
||||
let eventid = Uuid::new_v4();
|
||||
let span = span!(Level::INFO, "process_delayed_action_retried", uuid = ?eventid);
|
||||
|
||||
async {
|
||||
let ct = duration_from_epoch_now();
|
||||
let mut idms_prox_write = self.idms.proxy_write(ct).await;
|
||||
if let Err(res) = idms_prox_write
|
||||
.process_delayedaction(da, ct)
|
||||
.and_then(|_| idms_prox_write.commit())
|
||||
{
|
||||
error!(?res, "delayed action commit error");
|
||||
}
|
||||
}
|
||||
.instrument(span)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
// We're done, clear out the buffer.
|
||||
da_batch.clear();
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
|
|
|
@ -48,6 +48,7 @@ use sketching::*;
|
|||
use tokio::{
|
||||
net::{TcpListener, TcpStream},
|
||||
sync::broadcast,
|
||||
task,
|
||||
};
|
||||
use tokio_openssl::SslStream;
|
||||
use tower::Service;
|
||||
|
@ -199,7 +200,7 @@ pub async fn create_https_server(
|
|||
qe_r_ref: &'static QueryServerReadV1,
|
||||
mut rx: broadcast::Receiver<CoreAction>,
|
||||
server_message_tx: broadcast::Sender<CoreAction>,
|
||||
) -> Result<tokio::task::JoinHandle<()>, ()> {
|
||||
) -> Result<task::JoinHandle<()>, ()> {
|
||||
let js_files = get_js_files(config.role)?;
|
||||
// set up the CSP headers
|
||||
// script-src 'self'
|
||||
|
@ -343,7 +344,7 @@ pub async fn create_https_server(
|
|||
|
||||
info!("Starting the web server...");
|
||||
|
||||
Ok(tokio::spawn(async move {
|
||||
Ok(task::spawn(async move {
|
||||
tokio::select! {
|
||||
Ok(action) = rx.recv() => {
|
||||
match action {
|
||||
|
@ -362,10 +363,10 @@ pub async fn create_https_server(
|
|||
return
|
||||
}
|
||||
};
|
||||
tokio::spawn(server_loop(tls_param, listener, app))
|
||||
task::spawn(server_loop(tls_param, listener, app))
|
||||
},
|
||||
None => {
|
||||
tokio::spawn(axum_server::bind(addr).serve(app))
|
||||
task::spawn(axum_server::bind(addr).serve(app))
|
||||
}
|
||||
} => {
|
||||
match res {
|
||||
|
@ -538,7 +539,7 @@ async fn server_loop(
|
|||
if let Ok((stream, addr)) = listener.accept().await {
|
||||
let tls_acceptor = tls_acceptor.clone();
|
||||
let app = app.clone();
|
||||
tokio::spawn(handle_conn(tls_acceptor, stream, app, addr));
|
||||
task::spawn(handle_conn(tls_acceptor, stream, app, addr));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ use kanidmd_lib::value::CredentialType;
|
|||
use libc::umask;
|
||||
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::task;
|
||||
|
||||
use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
|
||||
use crate::admin::AdminActor;
|
||||
|
@ -754,7 +754,7 @@ pub struct CoreHandle {
|
|||
clean_shutdown: bool,
|
||||
pub tx: broadcast::Sender<CoreAction>,
|
||||
/// This stores a name for the handle, and the handle itself so we can tell which failed/succeeded at the end.
|
||||
handles: Vec<(TaskName, tokio::task::JoinHandle<()>)>,
|
||||
handles: Vec<(TaskName, task::JoinHandle<()>)>,
|
||||
}
|
||||
|
||||
impl CoreHandle {
|
||||
|
@ -954,21 +954,22 @@ pub async fn create_server_core(
|
|||
// Create the server async write entry point.
|
||||
let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
|
||||
|
||||
let delayed_handle = tokio::spawn(async move {
|
||||
let delayed_handle = task::spawn(async move {
|
||||
let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
|
||||
loop {
|
||||
tokio::select! {
|
||||
added = idms_delayed.recv_many(&mut buffer) => {
|
||||
if added == 0 {
|
||||
// Channel has closed, stop the task.
|
||||
break
|
||||
}
|
||||
server_write_ref.handle_delayedaction(&mut buffer).await;
|
||||
}
|
||||
Ok(action) = broadcast_rx.recv() => {
|
||||
match action {
|
||||
CoreAction::Shutdown => break,
|
||||
}
|
||||
}
|
||||
delayed = idms_delayed.next() => {
|
||||
match delayed {
|
||||
Some(da) => server_write_ref.handle_delayedaction(da).await,
|
||||
// Channel has closed, stop the task.
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Stopped {}", TaskName::DelayedActionActor);
|
||||
|
@ -976,7 +977,7 @@ pub async fn create_server_core(
|
|||
|
||||
let mut broadcast_rx = broadcast_tx.subscribe();
|
||||
|
||||
let auditd_handle = tokio::spawn(async move {
|
||||
let auditd_handle = task::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Ok(action) = broadcast_rx.recv() => {
|
||||
|
@ -1078,7 +1079,7 @@ pub async fn create_server_core(
|
|||
admin_info!("This config rocks! 🪨 ");
|
||||
None
|
||||
} else {
|
||||
let h: tokio::task::JoinHandle<()> = match https::create_https_server(
|
||||
let h: task::JoinHandle<()> = match https::create_https_server(
|
||||
config.clone(),
|
||||
jws_signer,
|
||||
status_ref,
|
||||
|
@ -1121,7 +1122,7 @@ pub async fn create_server_core(
|
|||
None
|
||||
};
|
||||
|
||||
let mut handles: Vec<(TaskName, JoinHandle<()>)> = vec![
|
||||
let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
|
||||
(TaskName::IntervalActor, interval_handle),
|
||||
(TaskName::DelayedActionActor, delayed_handle),
|
||||
(TaskName::AuditdActor, auditd_handle),
|
||||
|
|
|
@ -39,10 +39,7 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] }
|
|||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
toml = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
tracing = { workspace = true, features = [
|
||||
"max_level_trace",
|
||||
"release_max_level_debug",
|
||||
] }
|
||||
tracing = { workspace = true }
|
||||
serde_json.workspace = true
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
|
|
|
@ -290,7 +290,7 @@ fn main() -> ExitCode {
|
|||
}
|
||||
|
||||
#[cfg(feature = "dhat-heap")]
|
||||
let _profiler = dhat::Profiler::new_heap();
|
||||
let _profiler = dhat::Profiler::builder().trim_backtraces(Some(40)).build();
|
||||
|
||||
let maybe_rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
|
|
|
@ -33,7 +33,7 @@ const DEFAULT_CACHE_TARGET: usize = 2048;
|
|||
const DEFAULT_IDL_CACHE_RATIO: usize = 32;
|
||||
const DEFAULT_NAME_CACHE_RATIO: usize = 8;
|
||||
const DEFAULT_CACHE_RMISS: usize = 0;
|
||||
const DEFAULT_CACHE_WMISS: usize = 4;
|
||||
const DEFAULT_CACHE_WMISS: usize = 0;
|
||||
|
||||
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
|
||||
enum NameCacheKey {
|
||||
|
|
|
@ -1771,7 +1771,6 @@ impl IdlSqlite {
|
|||
"PRAGMA page_size={fs_page_size};
|
||||
PRAGMA cache_size={cache_pages};
|
||||
PRAGMA journal_mode=WAL;
|
||||
PRAGMA synchronous=NORMAL;
|
||||
PRAGMA wal_autocheckpoint={checkpoint_pages};
|
||||
PRAGMA wal_checkpoint(RESTART);"
|
||||
)
|
||||
|
|
|
@ -99,6 +99,11 @@ pub const PURGE_FREQUENCY: u64 = 60;
|
|||
#[cfg(not(test))]
|
||||
pub const PURGE_FREQUENCY: u64 = 600;
|
||||
|
||||
/// The number of delayed actions to consider per write transaction. Higher
|
||||
/// values allow more coalescing to occur, but may consume more ram and cause
|
||||
/// some latency while dequeuing and writing those operations.
|
||||
pub const DELAYED_ACTION_BATCH_SIZE: usize = 256;
|
||||
|
||||
#[cfg(test)]
|
||||
/// In test, we limit the changelog to 10 minutes.
|
||||
pub const CHANGELOG_MAX_AGE: u64 = 600;
|
||||
|
@ -123,13 +128,14 @@ pub const PW_MIN_LENGTH: u32 = 10;
|
|||
pub const MAXIMUM_AUTH_SESSION_EXPIRY: u32 = u32::MAX;
|
||||
// Default - sessions last for 1 day
|
||||
pub const DEFAULT_AUTH_SESSION_EXPIRY: u32 = 86400;
|
||||
pub const DEFAULT_AUTH_SESSION_LIMITED_EXPIRY: u32 = 3600;
|
||||
// Maximum - privileges last for 1 hour.
|
||||
pub const MAXIMUM_AUTH_PRIVILEGE_EXPIRY: u32 = 3600;
|
||||
// Default - privileges last for 10 minutes.
|
||||
pub const DEFAULT_AUTH_PRIVILEGE_EXPIRY: u32 = 600;
|
||||
// Default - directly privileged sessions only last 1 hour.
|
||||
pub const DEFAULT_AUTH_SESSION_LIMITED_EXPIRY: u32 = 3600;
|
||||
// Default - oauth refresh tokens last for 16 hours.
|
||||
pub const OAUTH_REFRESH_TOKEN_EXPIRY: u64 = 3600 * 8;
|
||||
pub const OAUTH_REFRESH_TOKEN_EXPIRY: u64 = 3600 * 16;
|
||||
|
||||
// The time that a token can be used before session
|
||||
// status is enforced. This needs to be longer than
|
||||
|
|
|
@ -1138,6 +1138,15 @@ impl<STATE> Entry<EntryInvalid, STATE> {
|
|||
|
||||
ne.validate(schema).map(|()| ne)
|
||||
}
|
||||
|
||||
/// Access a reference set in a directly mutable form. This is "safe" because
|
||||
/// referential integrity will check the values added are valid, and because
|
||||
/// this is strongly typed it can't violate syntax.
|
||||
pub(crate) fn get_ava_refer_mut(&mut self, attr: Attribute) -> Option<&mut BTreeSet<Uuid>> {
|
||||
self.attrs
|
||||
.get_mut(attr.as_ref())
|
||||
.and_then(|vs| vs.as_refer_set_mut())
|
||||
}
|
||||
}
|
||||
|
||||
impl<VALID, STATE> Clone for Entry<VALID, STATE>
|
||||
|
|
|
@ -14,8 +14,9 @@ use std::fmt;
|
|||
use std::hash::Hash;
|
||||
use std::iter;
|
||||
use std::num::NonZeroU8;
|
||||
use std::sync::Arc;
|
||||
|
||||
use concread::arcache::ARCacheReadTxn;
|
||||
use concread::arcache::{ARCache, ARCacheReadTxn};
|
||||
use hashbrown::HashMap;
|
||||
#[cfg(test)]
|
||||
use hashbrown::HashSet;
|
||||
|
@ -32,6 +33,16 @@ use crate::prelude::*;
|
|||
use crate::schema::SchemaTransaction;
|
||||
use crate::value::{IndexType, PartialValue};
|
||||
|
||||
pub type ResolveFilterCache =
|
||||
ARCache<(IdentityId, Arc<Filter<FilterValid>>), Arc<Filter<FilterValidResolved>>>;
|
||||
|
||||
pub type ResolveFilterCacheReadTxn<'a> = ARCacheReadTxn<
|
||||
'a,
|
||||
(IdentityId, Arc<Filter<FilterValid>>),
|
||||
Arc<Filter<FilterValidResolved>>,
|
||||
(),
|
||||
>;
|
||||
|
||||
// Default filter is safe, ignores all hidden types!
|
||||
|
||||
// This is &Value so we can lazy const then clone, but perhaps we can reconsider
|
||||
|
@ -445,32 +456,33 @@ impl Filter<FilterValid> {
|
|||
&self,
|
||||
ev: &Identity,
|
||||
idxmeta: Option<&IdxMeta>,
|
||||
mut rsv_cache: Option<
|
||||
&mut ARCacheReadTxn<
|
||||
'_,
|
||||
(IdentityId, Filter<FilterValid>),
|
||||
Filter<FilterValidResolved>,
|
||||
(),
|
||||
>,
|
||||
>,
|
||||
mut rsv_cache: Option<&mut ResolveFilterCacheReadTxn<'_>>,
|
||||
) -> Result<Filter<FilterValidResolved>, OperationError> {
|
||||
// Given a filter, resolve Not and SelfUuid to real terms.
|
||||
//
|
||||
// The benefit of moving optimisation to this step is from various inputs, we can
|
||||
// get to a resolved + optimised filter, and then we can cache those outputs in many
|
||||
// cases!
|
||||
// cases! The exception is *large* filters, especially from the memberof plugin. We
|
||||
// want to skip these because they can really jam up the server.
|
||||
|
||||
// do we have a cache?
|
||||
let cache_key = if let Some(rcache) = rsv_cache.as_mut() {
|
||||
// construct the key. For now it's expensive because we have to clone, but ... eh.
|
||||
let cache_key = (ev.get_event_origin_id(), self.clone());
|
||||
if let Some(f) = rcache.get(&cache_key) {
|
||||
// Got it? Shortcut and return!
|
||||
return Ok(f.clone());
|
||||
};
|
||||
// Not in cache? Set the cache_key.
|
||||
Some(cache_key)
|
||||
let cacheable = FilterResolved::resolve_cacheable(&self.state.inner);
|
||||
|
||||
let cache_key = if cacheable {
|
||||
// do we have a cache?
|
||||
if let Some(rcache) = rsv_cache.as_mut() {
|
||||
// construct the key. For now it's expensive because we have to clone, but ... eh.
|
||||
let cache_key = (ev.get_event_origin_id(), Arc::new(self.clone()));
|
||||
if let Some(f) = rcache.get(&cache_key) {
|
||||
// Got it? Shortcut and return!
|
||||
return Ok(f.as_ref().clone());
|
||||
};
|
||||
// Not in cache? Set the cache_key.
|
||||
Some(cache_key)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
// Not cacheable, lets just bail.
|
||||
None
|
||||
};
|
||||
|
||||
|
@ -496,10 +508,11 @@ impl Filter<FilterValid> {
|
|||
},
|
||||
};
|
||||
|
||||
// Now it's computed, inject it.
|
||||
if let Some(rcache) = rsv_cache.as_mut() {
|
||||
if let Some(cache_key) = cache_key {
|
||||
rcache.insert(cache_key, resolved_filt.clone());
|
||||
// Now it's computed, inject it. Remember, we won't have a cache_key here
|
||||
// if cacheable == false.
|
||||
if let Some(cache_key) = cache_key {
|
||||
if let Some(rcache) = rsv_cache.as_mut() {
|
||||
rcache.insert(cache_key, Arc::new(resolved_filt.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1270,6 +1283,27 @@ impl FilterResolved {
|
|||
}
|
||||
}
|
||||
|
||||
fn resolve_cacheable(fc: &FilterComp) -> bool {
|
||||
match fc {
|
||||
FilterComp::Or(vs) | FilterComp::And(vs) | FilterComp::Inclusion(vs) => {
|
||||
if vs.len() < 8 {
|
||||
vs.iter().all(FilterResolved::resolve_cacheable)
|
||||
} else {
|
||||
// Too lorge.
|
||||
false
|
||||
}
|
||||
}
|
||||
FilterComp::AndNot(f) => FilterResolved::resolve_cacheable(f.as_ref()),
|
||||
FilterComp::Eq(..)
|
||||
| FilterComp::SelfUuid
|
||||
| FilterComp::Cnt(..)
|
||||
| FilterComp::Stw(..)
|
||||
| FilterComp::Enw(..)
|
||||
| FilterComp::Pres(_)
|
||||
| FilterComp::LessThan(..) => true,
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_idx(
|
||||
fc: FilterComp,
|
||||
ev: &Identity,
|
||||
|
|
|
@ -1177,7 +1177,8 @@ impl<'a> IdmServerProxyWriteTransaction<'a> {
|
|||
// Get all the classes.
|
||||
debug!("Schemas valid - Proceeding with entry {}", scim_ent.id);
|
||||
|
||||
let mut mods = Vec::with_capacity(0);
|
||||
#[allow(clippy::vec_init_then_push)]
|
||||
let mut mods = Vec::with_capacity(4);
|
||||
|
||||
mods.push(Modify::Assert(
|
||||
Attribute::SyncParentUuid,
|
||||
|
|
|
@ -275,7 +275,7 @@ impl IdmServer {
|
|||
da: DelayedAction,
|
||||
) -> Result<bool, OperationError> {
|
||||
let mut pw = self.proxy_write(ct).await;
|
||||
pw.process_delayedaction(da, ct)
|
||||
pw.process_delayedaction(&da, ct)
|
||||
.and_then(|_| pw.commit())
|
||||
.map(|()| true)
|
||||
}
|
||||
|
@ -335,8 +335,10 @@ impl IdmServerDelayed {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn next(&mut self) -> Option<DelayedAction> {
|
||||
self.async_rx.recv().await
|
||||
pub async fn recv_many(&mut self, buffer: &mut Vec<DelayedAction>) -> usize {
|
||||
debug_assert!(buffer.is_empty());
|
||||
let limit = buffer.capacity();
|
||||
self.async_rx.recv_many(buffer, limit).await
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -685,7 +687,7 @@ pub trait IdmServerTransaction<'a> {
|
|||
&mut self,
|
||||
client_cert_info: &ClientCertInfo,
|
||||
) -> Result<Arc<EntrySealedCommitted>, OperationError> {
|
||||
let pks256 = hex::encode(&client_cert_info.public_key_s256);
|
||||
let pks256 = hex::encode(client_cert_info.public_key_s256);
|
||||
// Using the certificate hash, find our matching cert.
|
||||
let mut maybe_cert_entries = self.get_qs_txn().internal_search(filter!(f_eq(
|
||||
Attribute::Certificate,
|
||||
|
@ -2031,15 +2033,15 @@ impl<'a> IdmServerProxyWriteTransaction<'a> {
|
|||
#[instrument(level = "debug", skip_all)]
|
||||
pub fn process_delayedaction(
|
||||
&mut self,
|
||||
da: DelayedAction,
|
||||
da: &DelayedAction,
|
||||
_ct: Duration,
|
||||
) -> Result<(), OperationError> {
|
||||
match da {
|
||||
DelayedAction::PwUpgrade(pwu) => self.process_pwupgrade(&pwu),
|
||||
DelayedAction::UnixPwUpgrade(upwu) => self.process_unixpwupgrade(&upwu),
|
||||
DelayedAction::WebauthnCounterIncrement(wci) => self.process_webauthncounterinc(&wci),
|
||||
DelayedAction::BackupCodeRemoval(bcr) => self.process_backupcoderemoval(&bcr),
|
||||
DelayedAction::AuthSessionRecord(asr) => self.process_authsessionrecord(&asr),
|
||||
DelayedAction::PwUpgrade(pwu) => self.process_pwupgrade(pwu),
|
||||
DelayedAction::UnixPwUpgrade(upwu) => self.process_unixpwupgrade(upwu),
|
||||
DelayedAction::WebauthnCounterIncrement(wci) => self.process_webauthncounterinc(wci),
|
||||
DelayedAction::BackupCodeRemoval(bcr) => self.process_backupcoderemoval(bcr),
|
||||
DelayedAction::AuthSessionRecord(asr) => self.process_authsessionrecord(asr),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use kanidm_proto::internal::Filter as ProtoFilter;
|
||||
|
@ -15,14 +15,21 @@ pub struct DynGroupCache {
|
|||
pub struct DynGroup;
|
||||
|
||||
impl DynGroup {
|
||||
/// Determine if any dynamic groups changed as part of this operation.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn apply_dyngroup_change(
|
||||
qs: &mut QueryServerWriteTransaction,
|
||||
candidate_tuples: &mut Vec<(Arc<EntrySealedCommitted>, EntryInvalidCommitted)>,
|
||||
affected_uuids: &mut Vec<Uuid>,
|
||||
// The uuids that are affected by the dyngroup change. This is both addition
|
||||
// and removal of the uuids as members.
|
||||
affected_uuids: &mut BTreeSet<Uuid>,
|
||||
// If we should error when a dyngroup we thought should be cached is in fact,
|
||||
// not cached.
|
||||
expect: bool,
|
||||
// The identity in use.
|
||||
ident_internal: &Identity,
|
||||
// The dyn group cache
|
||||
dyn_groups: &mut DynGroupCache,
|
||||
// The list of dyn groups that were in the change set
|
||||
n_dyn_groups: &[&Entry<EntrySealed, EntryCommitted>],
|
||||
) -> Result<(), OperationError> {
|
||||
/*
|
||||
|
@ -37,45 +44,53 @@ impl DynGroup {
|
|||
*/
|
||||
|
||||
if qs.get_phase() < ServerPhase::SchemaReady {
|
||||
trace!("Server is not ready to load dyngroups");
|
||||
debug!("Server is not ready to load dyngroups");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Search all the new groups first.
|
||||
// Search all dyn groups that were involved in the operation.
|
||||
let filt = filter!(FC::Or(
|
||||
n_dyn_groups
|
||||
.iter()
|
||||
.map(|e| f_eq(Attribute::Uuid, PartialValue::Uuid(e.get_uuid())))
|
||||
.collect()
|
||||
));
|
||||
let work_set = qs.internal_search_writeable(&filt)?;
|
||||
// Load the dyn groups as a writeable set.
|
||||
let mut work_set = qs.internal_search_writeable(&filt)?;
|
||||
|
||||
// Go through them all and update the new groups.
|
||||
for (pre, mut nd_group) in work_set.into_iter() {
|
||||
// Go through them all and update the groups.
|
||||
for (ref pre, ref mut nd_group) in work_set.iter_mut() {
|
||||
trace!(dyngroup_id = %nd_group.get_display_id());
|
||||
// Load the dyngroups filter
|
||||
let scope_f: ProtoFilter = nd_group
|
||||
.get_ava_single_protofilter(Attribute::DynGroupFilter)
|
||||
.cloned()
|
||||
.ok_or_else(|| {
|
||||
admin_error!("Missing {}", Attribute::DynGroupFilter);
|
||||
error!("Missing {}", Attribute::DynGroupFilter);
|
||||
OperationError::InvalidEntryState
|
||||
})?;
|
||||
|
||||
let scope_i = Filter::from_rw(ident_internal, &scope_f, qs).map_err(|e| {
|
||||
admin_error!("{} validation failed {:?}", Attribute::DynGroupFilter, e);
|
||||
error!("{} validation failed {:?}", Attribute::DynGroupFilter, e);
|
||||
e
|
||||
})?;
|
||||
|
||||
trace!(dyngroup_filter = ?scope_i);
|
||||
|
||||
let uuid = pre.get_uuid();
|
||||
// Add our uuid as affected.
|
||||
affected_uuids.push(uuid);
|
||||
affected_uuids.insert(uuid);
|
||||
|
||||
// Apply the filter and get all the uuids.
|
||||
// Apply the filter and get all the uuids that are members of this dyngroup.
|
||||
let entries = qs.internal_search(scope_i.clone()).map_err(|e| {
|
||||
admin_error!("internal search failure -> {:?}", e);
|
||||
error!("internal search failure -> {:?}", e);
|
||||
e
|
||||
})?;
|
||||
|
||||
trace!(entries_len = %entries.len());
|
||||
|
||||
let members = ValueSetRefer::from_iter(entries.iter().map(|e| e.get_uuid()));
|
||||
trace!(?members);
|
||||
|
||||
if let Some(uuid_iter) = members.as_ref().and_then(|a| a.as_ref_uuid_iter()) {
|
||||
affected_uuids.extend(uuid_iter);
|
||||
|
@ -94,14 +109,21 @@ impl DynGroup {
|
|||
nd_group.purge_ava(Attribute::DynMember);
|
||||
}
|
||||
|
||||
candidate_tuples.push((pre, nd_group));
|
||||
|
||||
// Insert to our new instances
|
||||
// Insert it to the dyngroup cache with the compiled/resolved filter for
|
||||
// fast matching in other paths.
|
||||
if dyn_groups.insts.insert(uuid, scope_i).is_none() == expect {
|
||||
admin_error!("{} cache uuid conflict {}", Attribute::DynGroup, uuid);
|
||||
error!("{} cache uuid conflict {}", Attribute::DynGroup, uuid);
|
||||
return Err(OperationError::InvalidState);
|
||||
}
|
||||
}
|
||||
|
||||
if !work_set.is_empty() {
|
||||
qs.internal_apply_writable(work_set).map_err(|e| {
|
||||
error!("Failed to commit dyngroup set {:?}", e);
|
||||
e
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -111,7 +133,7 @@ impl DynGroup {
|
|||
// Internal search all our definitions.
|
||||
let filt = filter!(f_eq(Attribute::Class, EntryClass::DynGroup.into()));
|
||||
let entries = qs.internal_search(filt).map_err(|e| {
|
||||
admin_error!("internal search failure -> {:?}", e);
|
||||
error!("internal search failure -> {:?}", e);
|
||||
e
|
||||
})?;
|
||||
|
||||
|
@ -122,19 +144,19 @@ impl DynGroup {
|
|||
.get_ava_single_protofilter(Attribute::DynGroupFilter)
|
||||
.cloned()
|
||||
.ok_or_else(|| {
|
||||
admin_error!("Missing {}", Attribute::DynGroupFilter);
|
||||
error!("Missing {}", Attribute::DynGroupFilter);
|
||||
OperationError::InvalidEntryState
|
||||
})?;
|
||||
|
||||
let scope_i = Filter::from_rw(&ident_internal, &scope_f, qs).map_err(|e| {
|
||||
admin_error!("dyngroup_filter validation failed {:?}", e);
|
||||
error!("dyngroup_filter validation failed {:?}", e);
|
||||
e
|
||||
})?;
|
||||
|
||||
let uuid = nd_group.get_uuid();
|
||||
|
||||
if reload_groups.insert(uuid, scope_i).is_some() {
|
||||
admin_error!("dyngroup cache uuid conflict {}", uuid);
|
||||
error!("dyngroup cache uuid conflict {}", uuid);
|
||||
return Err(OperationError::InvalidState);
|
||||
}
|
||||
}
|
||||
|
@ -150,8 +172,8 @@ impl DynGroup {
|
|||
qs: &mut QueryServerWriteTransaction,
|
||||
cand: &[Entry<EntrySealed, EntryCommitted>],
|
||||
_ident: &Identity,
|
||||
) -> Result<Vec<Uuid>, OperationError> {
|
||||
let mut affected_uuids = Vec::with_capacity(cand.len());
|
||||
) -> Result<BTreeSet<Uuid>, OperationError> {
|
||||
let mut affected_uuids = BTreeSet::new();
|
||||
|
||||
let ident_internal = Identity::from_internal();
|
||||
|
||||
|
@ -172,7 +194,7 @@ impl DynGroup {
|
|||
// dyn groups will see the created entries on an internal search
|
||||
// so we don't need to reference them.
|
||||
|
||||
let mut candidate_tuples = Vec::with_capacity(dyn_groups.insts.len() + cand.len());
|
||||
let mut candidate_tuples = Vec::with_capacity(cand.len());
|
||||
|
||||
// Apply existing dyn_groups to entries.
|
||||
trace!(?dyn_groups.insts);
|
||||
|
@ -208,14 +230,39 @@ impl DynGroup {
|
|||
.copied()
|
||||
.for_each(|u| d_group.add_ava(Attribute::DynMember, Value::Refer(u)));
|
||||
|
||||
affected_uuids.extend(matches.into_iter());
|
||||
affected_uuids.push(*dg_uuid);
|
||||
// The *dyn group* isn't changing, it's that a member OF the dyn group
|
||||
// is being added. This means the dyngroup isn't part of the set that
|
||||
// needs update to MO, only the affected members do!
|
||||
|
||||
let pre_dynmember = pre.get_ava_refer(Attribute::DynMember);
|
||||
let post_dynmember = d_group.get_ava_refer(Attribute::DynMember);
|
||||
|
||||
match (pre_dynmember, post_dynmember) {
|
||||
(Some(pre_m), Some(post_m)) => {
|
||||
// Show only the *changed* uuids.
|
||||
affected_uuids.extend(pre_m.symmetric_difference(post_m));
|
||||
}
|
||||
(Some(members), None) | (None, Some(members)) => {
|
||||
// Doesn't matter what order, just that they are affected
|
||||
affected_uuids.extend(members);
|
||||
}
|
||||
(None, None) => {}
|
||||
};
|
||||
|
||||
candidate_tuples.push((pre, d_group));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write back the new changes.
|
||||
// Write this stripe if populated.
|
||||
if !candidate_tuples.is_empty() {
|
||||
qs.internal_apply_writable(candidate_tuples).map_err(|e| {
|
||||
error!("Failed to commit dyngroup set {:?}", e);
|
||||
e
|
||||
})?;
|
||||
}
|
||||
|
||||
// If we created any dyn groups, populate them now.
|
||||
// if the event is not internal, reject (for now)
|
||||
|
||||
|
@ -223,7 +270,6 @@ impl DynGroup {
|
|||
trace!("considering new dyngroups");
|
||||
Self::apply_dyngroup_change(
|
||||
qs,
|
||||
&mut candidate_tuples,
|
||||
&mut affected_uuids,
|
||||
false,
|
||||
&ident_internal,
|
||||
|
@ -232,15 +278,6 @@ impl DynGroup {
|
|||
)?;
|
||||
}
|
||||
|
||||
// Write back the new changes.
|
||||
// Write this stripe if populated.
|
||||
if !candidate_tuples.is_empty() {
|
||||
qs.internal_apply_writable(candidate_tuples).map_err(|e| {
|
||||
admin_error!("Failed to commit dyngroup set {:?}", e);
|
||||
e
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(affected_uuids)
|
||||
}
|
||||
|
||||
|
@ -251,8 +288,8 @@ impl DynGroup {
|
|||
cand: &[Entry<EntrySealed, EntryCommitted>],
|
||||
_ident: &Identity,
|
||||
force_cand_updates: bool,
|
||||
) -> Result<Vec<Uuid>, OperationError> {
|
||||
let mut affected_uuids = Vec::with_capacity(cand.len());
|
||||
) -> Result<BTreeSet<Uuid>, OperationError> {
|
||||
let mut affected_uuids = BTreeSet::new();
|
||||
|
||||
let ident_internal = Identity::from_internal();
|
||||
|
||||
|
@ -283,10 +320,8 @@ impl DynGroup {
|
|||
// changed in this op.
|
||||
|
||||
if !n_dyn_groups.is_empty() {
|
||||
trace!("considering modified dyngroups");
|
||||
Self::apply_dyngroup_change(
|
||||
qs,
|
||||
&mut candidate_tuples,
|
||||
&mut affected_uuids,
|
||||
true,
|
||||
&ident_internal,
|
||||
|
@ -344,11 +379,23 @@ impl DynGroup {
|
|||
Err(u) => d_group.remove_ava(Attribute::DynMember, &PartialValue::Refer(u)),
|
||||
});
|
||||
|
||||
affected_uuids.extend(matches.into_iter().map(|choice| match choice {
|
||||
Ok(u) => u,
|
||||
Err(u) => u,
|
||||
}));
|
||||
affected_uuids.push(*dg_uuid);
|
||||
// The *dyn group* isn't changing, it's that a member OF the dyn group
|
||||
// is being added. This means the dyngroup isn't part of the set that
|
||||
// needs update to MO, only the affected members do!
|
||||
let pre_dynmember = pre.get_ava_refer(Attribute::DynMember);
|
||||
let post_dynmember = d_group.get_ava_refer(Attribute::DynMember);
|
||||
|
||||
match (pre_dynmember, post_dynmember) {
|
||||
(Some(pre_m), Some(post_m)) => {
|
||||
// Show only the *changed* uuids.
|
||||
affected_uuids.extend(pre_m.symmetric_difference(post_m));
|
||||
}
|
||||
(Some(members), None) | (None, Some(members)) => {
|
||||
// Doesn't matter what order, just that they are affected
|
||||
affected_uuids.extend(members);
|
||||
}
|
||||
(None, None) => {}
|
||||
};
|
||||
|
||||
candidate_tuples.push((pre, d_group));
|
||||
}
|
||||
|
@ -357,9 +404,10 @@ impl DynGroup {
|
|||
|
||||
// Write back the new changes.
|
||||
// Write this stripe if populated.
|
||||
trace!(candidate_tuples_len = %candidate_tuples.len());
|
||||
if !candidate_tuples.is_empty() {
|
||||
qs.internal_apply_writable(candidate_tuples).map_err(|e| {
|
||||
admin_error!("Failed to commit dyngroup set {:?}", e);
|
||||
error!("Failed to commit dyngroup set {:?}", e);
|
||||
e
|
||||
})?;
|
||||
}
|
||||
|
|
|
@ -10,12 +10,10 @@
|
|||
// As a result, we first need to run refint to clean up all dangling references, then memberof
|
||||
// fixes the graph of memberships
|
||||
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use hashbrown::HashMap;
|
||||
|
||||
use crate::entry::{Entry, EntryCommitted, EntrySealed, EntryTuple};
|
||||
use crate::entry::{Entry, EntryCommitted, EntrySealed};
|
||||
use crate::event::{CreateEvent, DeleteEvent, ModifyEvent};
|
||||
use crate::plugins::Plugin;
|
||||
use crate::prelude::*;
|
||||
|
@ -23,7 +21,7 @@ use crate::value::PartialValue;
|
|||
|
||||
pub struct MemberOf;
|
||||
|
||||
fn do_memberof(
|
||||
fn do_group_memberof(
|
||||
qs: &mut QueryServerWriteTransaction,
|
||||
uuid: Uuid,
|
||||
tgte: &mut EntryInvalidCommitted,
|
||||
|
@ -96,55 +94,255 @@ fn do_memberof(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn do_leaf_memberof(
|
||||
qs: &mut QueryServerWriteTransaction,
|
||||
all_affected_uuids: BTreeSet<Uuid>,
|
||||
) -> Result<(), OperationError> {
|
||||
trace!("---");
|
||||
|
||||
// We just put everything into the filter here, the query code will remove
|
||||
// anything that is a group.
|
||||
let all_affected_filter: Vec<_> = all_affected_uuids
|
||||
.into_iter()
|
||||
.map(|u| f_eq(Attribute::Uuid, PartialValue::Uuid(u)))
|
||||
.collect();
|
||||
|
||||
if all_affected_filter.is_empty() {
|
||||
trace!("all affected filter is empty, return");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// These are all the affected entries.
|
||||
let leaf_entries = qs.internal_search_writeable(&filter!(f_and!([
|
||||
f_andnot(f_eq(Attribute::Class, EntryClass::Group.into())),
|
||||
FC::Or(all_affected_filter)
|
||||
])))?;
|
||||
|
||||
if leaf_entries.is_empty() {
|
||||
trace!("leaf entries empty, return");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut leaf_entries: BTreeMap<_, _> = leaf_entries
|
||||
.into_iter()
|
||||
.map(|entry_tuple| (entry_tuple.0.get_uuid(), entry_tuple))
|
||||
.collect();
|
||||
|
||||
let mut changes = Vec::with_capacity(leaf_entries.len());
|
||||
|
||||
// Now that we know which *entries* changed, we actually have to load the groups *again*
|
||||
// because the affected entries could still be a DMO/MO of a group that *wasn't* in the
|
||||
// change set, and we still need to reflect that they exist.
|
||||
|
||||
let mut groups_or = Vec::with_capacity(leaf_entries.len() * 2);
|
||||
|
||||
for uuid in leaf_entries.keys().copied() {
|
||||
groups_or.push(f_eq(Attribute::Member, PartialValue::Refer(uuid)));
|
||||
groups_or.push(f_eq(Attribute::DynMember, PartialValue::Refer(uuid)));
|
||||
}
|
||||
|
||||
let all_groups = qs
|
||||
.internal_search(filter!(f_and!([
|
||||
f_eq(Attribute::Class, EntryClass::Group.into()),
|
||||
FC::Or(groups_or)
|
||||
])))
|
||||
.map_err(|err| {
|
||||
error!(?err, "internal search failure");
|
||||
err
|
||||
})?;
|
||||
|
||||
/*
|
||||
* Previously we went through the remaining items and processed them one at a time, but
|
||||
* that has significant performance limits, since if we update a large dyn group, we then
|
||||
* have to perform N searches for each affected member, which may be repeatedly searching
|
||||
* for the same groups over and over again.
|
||||
*
|
||||
* Instead, at this point we know that in memberof application the entire group tree is
|
||||
* now stable, and all we need to do is reflect those values into our entries. We can do
|
||||
* this in two steps. First we load *all* the groups that relate to our leaf entries that
|
||||
* we need to reflect.
|
||||
*
|
||||
* Then we can go through that in a single pass updating our entries that need to be
|
||||
* updated. Since we know that the leaf entries aren't groups, we don't have a collision
|
||||
* as a result of using internal_search_writeable.
|
||||
*/
|
||||
|
||||
// Clear the existing Mo and Dmo on the write stripe.
|
||||
for (_pre, tgte) in leaf_entries.values_mut() {
|
||||
// Ensure we are MO capable. We only add this if it's not already present.
|
||||
tgte.add_ava_if_not_exist(Attribute::Class, EntryClass::MemberOf.into());
|
||||
// Clear the dmo + mos, we will recreate them now.
|
||||
// This is how we handle deletes/etc.
|
||||
tgte.purge_ava(Attribute::MemberOf);
|
||||
tgte.purge_ava(Attribute::DirectMemberOf);
|
||||
}
|
||||
|
||||
// Now, we go through all the groups, and from each one we update the relevant
|
||||
// target entry as needed.
|
||||
for group in all_groups {
|
||||
trace!(group_id = %group.get_display_id());
|
||||
// Our group uuid that we add to direct members.
|
||||
let group_uuid = group.get_uuid();
|
||||
|
||||
let memberof_ref = group.get_ava_refer(Attribute::MemberOf);
|
||||
|
||||
let member_ref = group.get_ava_refer(Attribute::Member);
|
||||
let dynmember_ref = group.get_ava_refer(Attribute::DynMember);
|
||||
|
||||
let dir_members = member_ref
|
||||
.iter()
|
||||
.flat_map(|set| set.iter())
|
||||
.chain(dynmember_ref.iter().flat_map(|set| set.iter()))
|
||||
.copied();
|
||||
|
||||
// These are the entries that are direct members and need to reflect the group
|
||||
// as mo and it's mo for indirect mo.
|
||||
for dir_member in dir_members {
|
||||
if let Some((_pre, tgte)) = leaf_entries.get_mut(&dir_member) {
|
||||
trace!(?dir_member, entry_id = ?tgte.get_display_id());
|
||||
// We were in the group, lets update.
|
||||
if let Some(dmo_set) = tgte.get_ava_refer_mut(Attribute::DirectMemberOf) {
|
||||
dmo_set.insert(group_uuid);
|
||||
} else {
|
||||
let dmo = ValueSetRefer::new(group_uuid);
|
||||
tgte.set_ava_set(Attribute::DirectMemberOf, dmo);
|
||||
}
|
||||
|
||||
// We're also in member of this group.
|
||||
if let Some(mo_set) = tgte.get_ava_refer_mut(Attribute::MemberOf) {
|
||||
mo_set.insert(group_uuid);
|
||||
} else {
|
||||
let mo = ValueSetRefer::new(group_uuid);
|
||||
tgte.set_ava_set(Attribute::MemberOf, mo);
|
||||
}
|
||||
|
||||
// If the group has memberOf attributes, we propogate these to
|
||||
// our entry now.
|
||||
if let Some(group_mo) = memberof_ref {
|
||||
// IMPORTANT this can't be a NONE because we just create MO in
|
||||
// the step above!
|
||||
if let Some(mo_set) = tgte.get_ava_refer_mut(Attribute::MemberOf) {
|
||||
mo_set.extend(group_mo.iter())
|
||||
}
|
||||
}
|
||||
|
||||
if cfg!(debug_assertions) {
|
||||
if let Some(dmo) = group.get_ava_refer(Attribute::DirectMemberOf) {
|
||||
if let Some(mo) = group.get_ava_refer(Attribute::MemberOf) {
|
||||
debug_assert!(mo.is_superset(dmo))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Done updating that leaf entry.
|
||||
// Remember in the None case it could be that the group has a member which *isn't*
|
||||
// being altered as a leaf in this operation.
|
||||
}
|
||||
// Next group.
|
||||
}
|
||||
|
||||
// Now only write back leaf entries that actually were changed as a result of the memberof
|
||||
// process.
|
||||
leaf_entries
|
||||
.into_iter()
|
||||
.try_for_each(|(auuid, (pre, tgte))| {
|
||||
// Only write if a change occurred.
|
||||
if pre.get_ava_set(Attribute::MemberOf) != tgte.get_ava_set(Attribute::MemberOf)
|
||||
|| pre.get_ava_set(Attribute::DirectMemberOf)
|
||||
!= tgte.get_ava_set(Attribute::DirectMemberOf)
|
||||
{
|
||||
trace!("=> processing affected uuid {:?}", auuid);
|
||||
|
||||
if cfg!(debug_assertions) {
|
||||
if let Some(dmo_set) = tgte.get_ava_refer(Attribute::DirectMemberOf) {
|
||||
trace!(?dmo_set);
|
||||
|
||||
if let Some(mo_set) = tgte.get_ava_refer(Attribute::MemberOf) {
|
||||
trace!(?mo_set);
|
||||
debug_assert!(mo_set.is_superset(dmo_set));
|
||||
} else {
|
||||
unreachable!();
|
||||
}
|
||||
} else {
|
||||
trace!("NONE");
|
||||
};
|
||||
|
||||
if let Some(pre_dmo_set) = pre.get_ava_refer(Attribute::DirectMemberOf) {
|
||||
trace!(?pre_dmo_set);
|
||||
|
||||
if let Some(pre_mo_set) = pre.get_ava_refer(Attribute::MemberOf) {
|
||||
trace!(?pre_mo_set);
|
||||
debug_assert!(pre_mo_set.is_superset(pre_dmo_set));
|
||||
} else {
|
||||
unreachable!();
|
||||
}
|
||||
} else {
|
||||
trace!("NONE");
|
||||
};
|
||||
};
|
||||
|
||||
changes.push((pre, tgte));
|
||||
} else {
|
||||
trace!("=> ignoring unmodified uuid {:?}", auuid);
|
||||
}
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
// Write the batch out in a single stripe.
|
||||
qs.internal_apply_writable(changes)
|
||||
// Done! 🎉
|
||||
}
|
||||
|
||||
// This is how you know the good code is here.
|
||||
#[allow(clippy::cognitive_complexity)]
|
||||
fn apply_memberof(
|
||||
qs: &mut QueryServerWriteTransaction,
|
||||
// TODO: Experiment with HashSet/BTreeSet here instead of vec.
|
||||
// May require https://github.com/rust-lang/rust/issues/62924 to allow popping
|
||||
mut group_affect: Vec<Uuid>,
|
||||
mut affected_uuids: BTreeSet<Uuid>,
|
||||
) -> Result<(), OperationError> {
|
||||
trace!(" => entering apply_memberof");
|
||||
trace!(" => initial group_affect {:?}", group_affect);
|
||||
|
||||
// We can't cache groups, because we need to be continually writing
|
||||
// and querying them. But we can cache anything we find in the process
|
||||
// to speed up the later other_affect write op, and we can use this
|
||||
// to avoid loading things that aren't groups.
|
||||
// All other changed entries (mo, dmo cleared)
|
||||
let mut other_cache: HashMap<Uuid, EntryTuple> = HashMap::with_capacity(group_affect.len() * 2);
|
||||
while !group_affect.is_empty() {
|
||||
group_affect.sort();
|
||||
group_affect.dedup();
|
||||
// Because of how replication works, we don't send MO over a replication boundary.
|
||||
// As a result, we always need to trigger for any changed uuid, so we keep the
|
||||
// initial affected set for the leaf resolution.
|
||||
//
|
||||
// As we proceed, we'll also add the affected members of our groups that are
|
||||
// changing.
|
||||
let mut all_affected_uuids: BTreeSet<_> = affected_uuids.iter().copied().collect();
|
||||
|
||||
// While there are still affected uuids.
|
||||
while !affected_uuids.is_empty() {
|
||||
trace!(?affected_uuids);
|
||||
|
||||
// Ignore recycled/tombstones
|
||||
let filt = filter!(FC::Or(
|
||||
group_affect
|
||||
.drain(0..)
|
||||
.map(|u| f_eq(Attribute::Uuid, PartialValue::Uuid(u)))
|
||||
.collect()
|
||||
));
|
||||
let filt = filter!(f_and!([
|
||||
f_eq(Attribute::Class, EntryClass::Group.into()),
|
||||
FC::Or(
|
||||
affected_uuids
|
||||
.iter()
|
||||
.copied()
|
||||
.map(|u| f_eq(Attribute::Uuid, PartialValue::Uuid(u)))
|
||||
.collect()
|
||||
)
|
||||
]));
|
||||
|
||||
// Clear the set for the next iteration
|
||||
affected_uuids.clear();
|
||||
|
||||
let work_set = qs.internal_search_writeable(&filt)?;
|
||||
// Load the vecdeque with this batch.
|
||||
|
||||
let mut changes = Vec::with_capacity(work_set.len());
|
||||
|
||||
for (pre, mut tgte) in work_set.into_iter() {
|
||||
let guuid = pre.get_uuid();
|
||||
// load the entry from the db.
|
||||
if !tgte.attribute_equality(Attribute::Class, &EntryClass::Group.into()) {
|
||||
// It's not a group, we'll deal with you later. We should NOT
|
||||
// have seen this UUID before, as either we are on the first
|
||||
// iteration OR the checks belowe should have filtered it out.
|
||||
trace!("not a group, delaying update to -> {:?}", guuid);
|
||||
other_cache.insert(guuid, (pre, tgte));
|
||||
continue;
|
||||
}
|
||||
|
||||
trace!("=> processing group update -> {:?}", guuid);
|
||||
trace!(
|
||||
"=> processing group update -> {:?} {}",
|
||||
guuid,
|
||||
tgte.get_display_id()
|
||||
);
|
||||
|
||||
do_memberof(qs, guuid, &mut tgte)?;
|
||||
do_group_memberof(qs, guuid, &mut tgte)?;
|
||||
|
||||
// Did we change? Note we don't check if the class changed, only if mo changed.
|
||||
if pre.get_ava_set(Attribute::MemberOf) != tgte.get_ava_set(Attribute::MemberOf)
|
||||
|
@ -153,58 +351,82 @@ fn apply_memberof(
|
|||
{
|
||||
// Yes we changed - we now must process all our members, as they need to
|
||||
// inherit changes. Some of these members COULD be non groups, but we
|
||||
// handle that in the dbload step.
|
||||
// handle them in the subsequent steps.
|
||||
trace!(
|
||||
"{:?} changed, flagging members as groups to change. ",
|
||||
guuid
|
||||
"{:?} {} changed, flagging members as groups to change. ",
|
||||
guuid,
|
||||
tgte.get_display_id()
|
||||
);
|
||||
if let Some(miter) = tgte.get_ava_as_refuuid(Attribute::Member) {
|
||||
group_affect.extend(miter.filter(|m| !other_cache.contains_key(m)));
|
||||
|
||||
// Since our groups memberof (and related, direct member of) has changed, we
|
||||
// need to propogate these values forward into our members. At this point we
|
||||
// mark all our members as being part of the affected set.
|
||||
let pre_member = pre.get_ava_refer(Attribute::Member);
|
||||
let post_member = tgte.get_ava_refer(Attribute::Member);
|
||||
|
||||
match (pre_member, post_member) {
|
||||
(Some(pre_m), Some(post_m)) => {
|
||||
affected_uuids.extend(pre_m);
|
||||
affected_uuids.extend(post_m);
|
||||
}
|
||||
(Some(members), None) | (None, Some(members)) => {
|
||||
// Doesn't matter what order, just that they are affected
|
||||
affected_uuids.extend(members);
|
||||
}
|
||||
(None, None) => {}
|
||||
};
|
||||
if let Some(miter) = tgte.get_ava_as_refuuid(Attribute::DynMember) {
|
||||
group_affect.extend(miter.filter(|m| !other_cache.contains_key(m)));
|
||||
|
||||
let pre_dynmember = pre.get_ava_refer(Attribute::DynMember);
|
||||
let post_dynmember = tgte.get_ava_refer(Attribute::DynMember);
|
||||
|
||||
match (pre_dynmember, post_dynmember) {
|
||||
(Some(pre_m), Some(post_m)) => {
|
||||
affected_uuids.extend(pre_m);
|
||||
affected_uuids.extend(post_m);
|
||||
}
|
||||
(Some(members), None) | (None, Some(members)) => {
|
||||
// Doesn't matter what order, just that they are affected
|
||||
affected_uuids.extend(members);
|
||||
}
|
||||
(None, None) => {}
|
||||
};
|
||||
|
||||
// push the entries to pre/cand
|
||||
changes.push((pre, tgte));
|
||||
} else {
|
||||
trace!("{:?} stable", guuid);
|
||||
// If the group is stable, then we *only* need to update memberof
|
||||
// on members that may have been added or removed. This exists to
|
||||
// optimise when we add a member to a group, but without changing the
|
||||
// group's mo/dmo to save re-writing mo to all the other members.
|
||||
//
|
||||
// If the group's memberof has been through the unstable state,
|
||||
// all our members are already fully loaded into the affected sets.
|
||||
//
|
||||
// NOTE: This filtering of what members were actually impacted is
|
||||
// performed in the call to post_modify_inner.
|
||||
|
||||
trace!("{:?} {} stable", guuid, tgte.get_display_id());
|
||||
}
|
||||
}
|
||||
|
||||
// Write this stripe if populated.
|
||||
if !changes.is_empty() {
|
||||
qs.internal_apply_writable(changes).map_err(|e| {
|
||||
admin_error!("Failed to commit memberof group set {:?}", e);
|
||||
e
|
||||
trace!("wrote stripe {}", changes.len());
|
||||
qs.internal_apply_writable(changes).map_err(|err| {
|
||||
error!(?err, "Failed to commit memberof group set");
|
||||
err
|
||||
})?;
|
||||
}
|
||||
|
||||
// Reflect the full set of affected uuids into our all affected set.
|
||||
all_affected_uuids.extend(affected_uuids.iter());
|
||||
|
||||
// Next loop!
|
||||
trace!("-------------------------------------");
|
||||
}
|
||||
|
||||
// ALL GROUP MOS + DMOS ARE NOW STABLE. We can load these into other items directly.
|
||||
let mut changes = Vec::with_capacity(other_cache.len());
|
||||
|
||||
other_cache
|
||||
.into_iter()
|
||||
.try_for_each(|(auuid, (pre, mut tgte))| {
|
||||
trace!("=> processing affected uuid {:?}", auuid);
|
||||
debug_assert!(!tgte.attribute_equality(Attribute::Class, &EntryClass::Group.into()));
|
||||
do_memberof(qs, auuid, &mut tgte)?;
|
||||
// Only write if a change occurred.
|
||||
if pre.get_ava_set(Attribute::MemberOf) != tgte.get_ava_set(Attribute::MemberOf)
|
||||
|| pre.get_ava_set(Attribute::DirectMemberOf)
|
||||
!= tgte.get_ava_set(Attribute::DirectMemberOf)
|
||||
{
|
||||
changes.push((pre, tgte));
|
||||
}
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
// Turn the other_cache into a write set.
|
||||
// Write the batch out in a single stripe.
|
||||
qs.internal_apply_writable(changes)
|
||||
// Done! 🎉
|
||||
// ALL GROUP MOS + DMOS ARE NOW STABLE. We can update oul leaf entries as required.
|
||||
do_leaf_memberof(qs, all_affected_uuids)
|
||||
}
|
||||
|
||||
impl Plugin for MemberOf {
|
||||
|
@ -307,7 +529,7 @@ impl Plugin for MemberOf {
|
|||
) -> Result<(), OperationError> {
|
||||
// Similar condition to create - we only trigger updates on groups's members,
|
||||
// so that they can find they are no longer a mo of what was deleted.
|
||||
let group_affect = cand
|
||||
let affected_uuids = cand
|
||||
.iter()
|
||||
.filter_map(|e| {
|
||||
// Is it a group?
|
||||
|
@ -332,7 +554,7 @@ impl Plugin for MemberOf {
|
|||
)
|
||||
.collect();
|
||||
|
||||
apply_memberof(qs, group_affect)
|
||||
apply_memberof(qs, affected_uuids)
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", name = "memberof::verify", skip_all)]
|
||||
|
@ -360,6 +582,7 @@ impl Plugin for MemberOf {
|
|||
])
|
||||
]));
|
||||
|
||||
// what groups is this entry a direct member of?
|
||||
let direct_memberof = match qs
|
||||
.internal_search(filt_in)
|
||||
.map_err(|_| ConsistencyError::QueryServerSearchFailure)
|
||||
|
@ -367,8 +590,8 @@ impl Plugin for MemberOf {
|
|||
Ok(d_mo) => d_mo,
|
||||
Err(e) => return vec![Err(e)],
|
||||
};
|
||||
// for all direct -> add uuid to map
|
||||
|
||||
// for all direct -> add uuid to map
|
||||
let d_groups_set: BTreeSet<Uuid> =
|
||||
direct_memberof.iter().map(|e| e.get_uuid()).collect();
|
||||
|
||||
|
@ -378,7 +601,11 @@ impl Plugin for MemberOf {
|
|||
Some(d_groups_set)
|
||||
};
|
||||
|
||||
trace!("DMO search groups {:?} -> {:?}", e.get_uuid(), d_groups_set);
|
||||
trace!(
|
||||
"DMO search groups {:?} -> {:?}",
|
||||
e.get_display_id(),
|
||||
d_groups_set
|
||||
);
|
||||
|
||||
match (e.get_ava_set(Attribute::DirectMemberOf), d_groups_set) {
|
||||
(Some(edmos), Some(b)) => {
|
||||
|
@ -387,16 +614,19 @@ impl Plugin for MemberOf {
|
|||
Some(a) => {
|
||||
let diff: Vec<_> = a.symmetric_difference(&b).collect();
|
||||
if !diff.is_empty() {
|
||||
admin_error!(
|
||||
"MemberOfInvalid: Entry {}, DMO has inconsistencies -> {:?}",
|
||||
e,
|
||||
diff
|
||||
error!(
|
||||
"MemberOfInvalid: Entry {}, DMO has inconsistencies",
|
||||
e.get_display_id(),
|
||||
);
|
||||
trace!(entry_direct_member_of = ?a);
|
||||
trace!(expected_direct_groups = ?b);
|
||||
trace!(?diff);
|
||||
|
||||
r.push(Err(ConsistencyError::MemberOfInvalid(e.get_id())));
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
admin_error!("MemberOfInvalid: Entry {}, DMO has incorrect syntax", e,);
|
||||
error!("MemberOfInvalid: Entry {}, DMO has incorrect syntax - should be reference uuid set", e.get_display_id());
|
||||
r.push(Err(ConsistencyError::MemberOfInvalid(e.get_id())));
|
||||
}
|
||||
}
|
||||
|
@ -404,14 +634,14 @@ impl Plugin for MemberOf {
|
|||
(None, None) => {
|
||||
// Ok
|
||||
}
|
||||
(entry_dmo, d_groups) => {
|
||||
admin_error!(
|
||||
(entry_direct_member_of, expected_direct_groups) => {
|
||||
error!(
|
||||
"MemberOfInvalid directmemberof set and DMO search set differ in size: {}",
|
||||
e.get_uuid()
|
||||
e.get_display_id()
|
||||
);
|
||||
trace!(?e);
|
||||
trace!(?entry_dmo);
|
||||
trace!(?d_groups);
|
||||
// trace!(?e);
|
||||
trace!(?entry_direct_member_of);
|
||||
trace!(?expected_direct_groups);
|
||||
r.push(Err(ConsistencyError::MemberOfInvalid(e.get_id())));
|
||||
}
|
||||
}
|
||||
|
@ -442,10 +672,11 @@ impl MemberOf {
|
|||
) -> Result<(), OperationError> {
|
||||
let dyngroup_change = super::dyngroup::DynGroup::post_create(qs, cand, ident)?;
|
||||
|
||||
let group_affect = cand
|
||||
let affected_uuids = cand
|
||||
.iter()
|
||||
.map(|e| e.get_uuid())
|
||||
.chain(dyngroup_change)
|
||||
// In a create, we have to always examine our members as being affected.
|
||||
.chain(
|
||||
cand.iter()
|
||||
.filter_map(|e| {
|
||||
|
@ -460,7 +691,7 @@ impl MemberOf {
|
|||
)
|
||||
.collect();
|
||||
|
||||
apply_memberof(qs, group_affect)
|
||||
apply_memberof(qs, affected_uuids)
|
||||
}
|
||||
|
||||
fn post_modify_inner(
|
||||
|
@ -478,37 +709,48 @@ impl MemberOf {
|
|||
force_dyngroup_cand_update,
|
||||
)?;
|
||||
|
||||
// TODO: Limit this to when it's a class, member, mo, dmo change instead.
|
||||
let group_affect = cand
|
||||
let mut affected_uuids: BTreeSet<_> = cand
|
||||
.iter()
|
||||
.map(|post| post.get_uuid())
|
||||
.chain(dyngroup_change)
|
||||
.chain(
|
||||
pre_cand
|
||||
.iter()
|
||||
.filter_map(|pre| {
|
||||
if pre.attribute_equality(Attribute::Class, &EntryClass::Group.into()) {
|
||||
pre.get_ava_as_refuuid(Attribute::Member)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.flatten(),
|
||||
)
|
||||
.chain(
|
||||
cand.iter()
|
||||
.filter_map(|post| {
|
||||
if post.attribute_equality(Attribute::Class, &EntryClass::Group.into()) {
|
||||
post.get_ava_as_refuuid(Attribute::Member)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.flatten(),
|
||||
)
|
||||
.collect();
|
||||
|
||||
apply_memberof(qs, group_affect)
|
||||
for (pre, post) in pre_cand.iter().zip(cand.iter()).filter(|(pre, post)| {
|
||||
post.attribute_equality(Attribute::Class, &EntryClass::Group.into())
|
||||
|| pre.attribute_equality(Attribute::Class, &EntryClass::Group.into())
|
||||
}) {
|
||||
let pre_member = pre.get_ava_refer(Attribute::Member);
|
||||
let post_member = post.get_ava_refer(Attribute::Member);
|
||||
|
||||
match (pre_member, post_member) {
|
||||
(Some(pre_m), Some(post_m)) => {
|
||||
// Show only the *changed* uuids for leaf resolution.
|
||||
affected_uuids.extend(pre_m.symmetric_difference(post_m));
|
||||
}
|
||||
(Some(members), None) | (None, Some(members)) => {
|
||||
// Doesn't matter what order, just that they are affected
|
||||
affected_uuids.extend(members);
|
||||
}
|
||||
(None, None) => {}
|
||||
};
|
||||
|
||||
let pre_dynmember = pre.get_ava_refer(Attribute::DynMember);
|
||||
let post_dynmember = post.get_ava_refer(Attribute::DynMember);
|
||||
|
||||
match (pre_dynmember, post_dynmember) {
|
||||
(Some(pre_m), Some(post_m)) => {
|
||||
// Show only the *changed* uuids.
|
||||
affected_uuids.extend(pre_m.symmetric_difference(post_m));
|
||||
}
|
||||
(Some(members), None) | (None, Some(members)) => {
|
||||
// Doesn't matter what order, just that they are affected
|
||||
affected_uuids.extend(members);
|
||||
}
|
||||
(None, None) => {}
|
||||
};
|
||||
}
|
||||
|
||||
apply_memberof(qs, affected_uuids)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,17 +12,18 @@
|
|||
use std::collections::BTreeSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use hashbrown::HashSet;
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
|
||||
use crate::event::{CreateEvent, DeleteEvent, ModifyEvent};
|
||||
use crate::filter::{f_eq, FC};
|
||||
use crate::plugins::Plugin;
|
||||
use crate::prelude::*;
|
||||
use crate::schema::SchemaTransaction;
|
||||
use crate::schema::{SchemaAttribute, SchemaTransaction};
|
||||
|
||||
pub struct ReferentialIntegrity;
|
||||
|
||||
impl ReferentialIntegrity {
|
||||
#[instrument(level = "debug", name = "check_uuids_exist_fast", skip_all)]
|
||||
fn check_uuids_exist_fast(
|
||||
qs: &mut QueryServerWriteTransaction,
|
||||
inner: &[Uuid],
|
||||
|
@ -55,6 +56,7 @@ impl ReferentialIntegrity {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", name = "check_uuids_exist_slow", skip_all)]
|
||||
fn check_uuids_exist_slow(
|
||||
qs: &mut QueryServerWriteTransaction,
|
||||
inner: &[Uuid],
|
||||
|
@ -162,27 +164,27 @@ impl Plugin for ReferentialIntegrity {
|
|||
cand: &[Entry<EntrySealed, EntryCommitted>],
|
||||
_ce: &CreateEvent,
|
||||
) -> Result<(), OperationError> {
|
||||
Self::post_modify_inner(qs, cand)
|
||||
Self::post_modify_inner(qs, None, cand)
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", name = "refint_post_modify", skip_all)]
|
||||
fn post_modify(
|
||||
qs: &mut QueryServerWriteTransaction,
|
||||
_pre_cand: &[Arc<Entry<EntrySealed, EntryCommitted>>],
|
||||
pre_cand: &[Arc<EntrySealedCommitted>],
|
||||
cand: &[Entry<EntrySealed, EntryCommitted>],
|
||||
_me: &ModifyEvent,
|
||||
) -> Result<(), OperationError> {
|
||||
Self::post_modify_inner(qs, cand)
|
||||
Self::post_modify_inner(qs, Some(pre_cand), cand)
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", name = "refint_post_batch_modify", skip_all)]
|
||||
fn post_batch_modify(
|
||||
qs: &mut QueryServerWriteTransaction,
|
||||
_pre_cand: &[Arc<Entry<EntrySealed, EntryCommitted>>],
|
||||
pre_cand: &[Arc<EntrySealedCommitted>],
|
||||
cand: &[Entry<EntrySealed, EntryCommitted>],
|
||||
_me: &BatchModifyEvent,
|
||||
) -> Result<(), OperationError> {
|
||||
Self::post_modify_inner(qs, cand)
|
||||
Self::post_modify_inner(qs, Some(pre_cand), cand)
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", name = "refint_post_repl_refresh", skip_all)]
|
||||
|
@ -190,7 +192,7 @@ impl Plugin for ReferentialIntegrity {
|
|||
qs: &mut QueryServerWriteTransaction,
|
||||
cand: &[EntrySealedCommitted],
|
||||
) -> Result<(), OperationError> {
|
||||
Self::post_modify_inner(qs, cand)
|
||||
Self::post_modify_inner(qs, None, cand)
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", name = "refint_post_repl_incremental", skip_all)]
|
||||
|
@ -207,7 +209,7 @@ impl Plugin for ReferentialIntegrity {
|
|||
//
|
||||
// This also becomes a path to a "ref int fixup" too?
|
||||
|
||||
let uuids = Self::cand_references_to_uuid_filter(qs, cand)?;
|
||||
let uuids = Self::cand_references_to_uuid_filter(qs, Some(pre_cand), cand)?;
|
||||
|
||||
let all_exist_fast = Self::check_uuids_exist_fast(qs, uuids.as_slice())?;
|
||||
|
||||
|
@ -356,54 +358,45 @@ impl Plugin for ReferentialIntegrity {
|
|||
}
|
||||
}
|
||||
|
||||
impl ReferentialIntegrity {
|
||||
fn cand_references_to_uuid_filter(
|
||||
qs: &mut QueryServerWriteTransaction,
|
||||
cand: &[EntrySealedCommitted],
|
||||
) -> Result<Vec<Uuid>, OperationError> {
|
||||
let schema = qs.get_schema();
|
||||
let ref_types = schema.get_reference_types();
|
||||
fn update_reference_set<'a, I>(
|
||||
ref_types: &HashMap<AttrString, SchemaAttribute>,
|
||||
entry_iter: I,
|
||||
reference_set: &mut BTreeSet<Uuid>,
|
||||
) -> Result<(), OperationError>
|
||||
where
|
||||
I: Iterator<Item = &'a EntrySealedCommitted>,
|
||||
{
|
||||
for cand in entry_iter {
|
||||
trace!(cand_id = %cand.get_display_id());
|
||||
// If it's dyngroup, skip member since this will be reset in the next step.
|
||||
let dyn_group = cand.attribute_equality(Attribute::Class, &EntryClass::DynGroup.into());
|
||||
|
||||
// Fast Path
|
||||
let mut vsiter = cand.iter().flat_map(|c| {
|
||||
// If it's dyngroup, skip member since this will be reset in the next step.
|
||||
let dyn_group = c.attribute_equality(Attribute::Class, &EntryClass::DynGroup.into());
|
||||
// For all reference types that exist in the schema.
|
||||
let cand_ref_valuesets = ref_types.values().filter_map(|rtype| {
|
||||
// If the entry is a dyn-group, skip dyn member.
|
||||
let skip_mb = dyn_group && rtype.name == Attribute::DynMember.as_ref();
|
||||
// MemberOf is always recalculated, so it can be skipped
|
||||
let skip_mo = rtype.name == Attribute::MemberOf.as_ref();
|
||||
|
||||
ref_types.values().filter_map(move |rtype| {
|
||||
// Skip dynamic members, these are recalculated by the
|
||||
// memberof plugin.
|
||||
let skip_mb = dyn_group && rtype.name == Attribute::DynMember.as_ref();
|
||||
// Skip memberOf, also recalculated.
|
||||
let skip_mo = rtype.name == Attribute::MemberOf.as_ref();
|
||||
if skip_mb || skip_mo {
|
||||
None
|
||||
} else {
|
||||
trace!(rtype_name = ?rtype.name, "examining");
|
||||
c.get_ava_set(
|
||||
(&rtype.name)
|
||||
.try_into()
|
||||
.map_err(|e| {
|
||||
admin_error!(?e, "invalid attribute type {}", &rtype.name);
|
||||
None::<Attribute>
|
||||
})
|
||||
.ok()?,
|
||||
)
|
||||
}
|
||||
})
|
||||
if skip_mb || skip_mo {
|
||||
None
|
||||
} else {
|
||||
trace!(rtype_name = ?rtype.name, "examining");
|
||||
cand.get_ava_set(
|
||||
(&rtype.name)
|
||||
.try_into()
|
||||
.map_err(|e| {
|
||||
admin_error!(?e, "invalid attribute type {}", &rtype.name);
|
||||
None::<Attribute>
|
||||
})
|
||||
.ok()?,
|
||||
)
|
||||
}
|
||||
});
|
||||
|
||||
// Could check len first?
|
||||
let mut i = Vec::with_capacity(cand.len() * 4);
|
||||
let mut dedup = HashSet::new();
|
||||
|
||||
vsiter.try_for_each(|vs| {
|
||||
for vs in cand_ref_valuesets {
|
||||
if let Some(uuid_iter) = vs.as_ref_uuid_iter() {
|
||||
uuid_iter.for_each(|u| {
|
||||
// Returns true if the item is NEW in the set
|
||||
if dedup.insert(u) {
|
||||
i.push(u)
|
||||
}
|
||||
});
|
||||
reference_set.extend(uuid_iter);
|
||||
Ok(())
|
||||
} else {
|
||||
admin_error!(?vs, "reference value could not convert to reference uuid.");
|
||||
|
@ -411,17 +404,50 @@ impl ReferentialIntegrity {
|
|||
Err(OperationError::InvalidAttribute(
|
||||
"uuid could not become reference value".to_string(),
|
||||
))
|
||||
}
|
||||
})?;
|
||||
}?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Ok(i)
|
||||
impl ReferentialIntegrity {
|
||||
fn cand_references_to_uuid_filter(
|
||||
qs: &mut QueryServerWriteTransaction,
|
||||
pre_cand: Option<&[Arc<EntrySealedCommitted>]>,
|
||||
post_cand: &[EntrySealedCommitted],
|
||||
) -> Result<Vec<Uuid>, OperationError> {
|
||||
let schema = qs.get_schema();
|
||||
let ref_types = schema.get_reference_types();
|
||||
|
||||
let mut previous_reference_set = BTreeSet::new();
|
||||
let mut reference_set = BTreeSet::new();
|
||||
|
||||
if let Some(pre_cand) = pre_cand {
|
||||
update_reference_set(
|
||||
ref_types,
|
||||
pre_cand.iter().map(|e| e.as_ref()),
|
||||
&mut previous_reference_set,
|
||||
)?;
|
||||
}
|
||||
|
||||
update_reference_set(ref_types, post_cand.iter(), &mut reference_set)?;
|
||||
|
||||
// Now build the reference set from the current candidates.
|
||||
|
||||
// Return only the values that are present in the new reference set. These are the values
|
||||
// we actually need to check the integrity of.
|
||||
Ok(reference_set
|
||||
.difference(&previous_reference_set)
|
||||
.copied()
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn post_modify_inner(
|
||||
qs: &mut QueryServerWriteTransaction,
|
||||
cand: &[EntrySealedCommitted],
|
||||
pre_cand: Option<&[Arc<EntrySealedCommitted>]>,
|
||||
post_cand: &[EntrySealedCommitted],
|
||||
) -> Result<(), OperationError> {
|
||||
let uuids = Self::cand_references_to_uuid_filter(qs, cand)?;
|
||||
let uuids = Self::cand_references_to_uuid_filter(qs, pre_cand, post_cand)?;
|
||||
|
||||
let all_exist_fast = Self::check_uuids_exist_fast(qs, uuids.as_slice())?;
|
||||
|
||||
|
|
|
@ -19,13 +19,13 @@ use std::collections::BTreeSet;
|
|||
use std::ops::DerefMut;
|
||||
use std::sync::Arc;
|
||||
|
||||
use concread::arcache::{ARCache, ARCacheBuilder, ARCacheReadTxn};
|
||||
use concread::arcache::ARCacheBuilder;
|
||||
use concread::cowcell::*;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::entry::{Entry, EntryCommitted, EntryInit, EntryNew, EntryReduced};
|
||||
use crate::event::{CreateEvent, DeleteEvent, ModifyEvent, SearchEvent};
|
||||
use crate::filter::{Filter, FilterValid, FilterValidResolved};
|
||||
use crate::filter::{Filter, FilterValid, ResolveFilterCache, ResolveFilterCacheReadTxn};
|
||||
use crate::modify::Modify;
|
||||
use crate::prelude::*;
|
||||
|
||||
|
@ -41,8 +41,8 @@ use self::delete::{apply_delete_access, DeleteResult};
|
|||
use self::modify::{apply_modify_access, ModifyResult};
|
||||
use self::search::{apply_search_access, SearchResult};
|
||||
|
||||
const ACP_RESOLVE_FILTER_CACHE_MAX: usize = 2048;
|
||||
const ACP_RESOLVE_FILTER_CACHE_LOCAL: usize = 16;
|
||||
const ACP_RESOLVE_FILTER_CACHE_MAX: usize = 256;
|
||||
const ACP_RESOLVE_FILTER_CACHE_LOCAL: usize = 0;
|
||||
|
||||
mod create;
|
||||
mod delete;
|
||||
|
@ -100,8 +100,7 @@ struct AccessControlsInner {
|
|||
pub struct AccessControls {
|
||||
inner: CowCell<AccessControlsInner>,
|
||||
// acp_related_search_cache: ARCache<Uuid, Vec<Uuid>>,
|
||||
acp_resolve_filter_cache:
|
||||
ARCache<(IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>>,
|
||||
acp_resolve_filter_cache: ResolveFilterCache,
|
||||
}
|
||||
|
||||
fn resolve_access_conditions(
|
||||
|
@ -109,12 +108,7 @@ fn resolve_access_conditions(
|
|||
ident_memberof: Option<&BTreeSet<Uuid>>,
|
||||
receiver: &AccessControlReceiver,
|
||||
target: &AccessControlTarget,
|
||||
acp_resolve_filter_cache: &mut ARCacheReadTxn<
|
||||
'_,
|
||||
(IdentityId, Filter<FilterValid>),
|
||||
Filter<FilterValidResolved>,
|
||||
(),
|
||||
>,
|
||||
acp_resolve_filter_cache: &mut ResolveFilterCacheReadTxn<'_>,
|
||||
) -> Option<(AccessControlReceiverCondition, AccessControlTargetCondition)> {
|
||||
let receiver_condition = match receiver {
|
||||
AccessControlReceiver::Group(groups) => {
|
||||
|
@ -158,9 +152,7 @@ pub trait AccessControlsTransaction<'a> {
|
|||
fn get_sync_agreements(&self) -> &HashMap<Uuid, BTreeSet<String>>;
|
||||
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
fn get_acp_resolve_filter_cache(
|
||||
&self,
|
||||
) -> &mut ARCacheReadTxn<'a, (IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>, ()>;
|
||||
fn get_acp_resolve_filter_cache(&self) -> &mut ResolveFilterCacheReadTxn<'a>;
|
||||
|
||||
#[instrument(level = "trace", name = "access::search_related_acp", skip_all)]
|
||||
fn search_related_acp<'b>(&'b self, ident: &Identity) -> Vec<AccessControlSearchResolved<'b>> {
|
||||
|
@ -873,9 +865,7 @@ pub trait AccessControlsTransaction<'a> {
|
|||
|
||||
pub struct AccessControlsWriteTransaction<'a> {
|
||||
inner: CowCellWriteTxn<'a, AccessControlsInner>,
|
||||
acp_resolve_filter_cache: Cell<
|
||||
ARCacheReadTxn<'a, (IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>, ()>,
|
||||
>,
|
||||
acp_resolve_filter_cache: Cell<ResolveFilterCacheReadTxn<'a>>,
|
||||
}
|
||||
|
||||
impl<'a> AccessControlsWriteTransaction<'a> {
|
||||
|
@ -949,19 +939,10 @@ impl<'a> AccessControlsTransaction<'a> for AccessControlsWriteTransaction<'a> {
|
|||
&self.inner.sync_agreements
|
||||
}
|
||||
|
||||
fn get_acp_resolve_filter_cache(
|
||||
&self,
|
||||
) -> &mut ARCacheReadTxn<'a, (IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>, ()>
|
||||
{
|
||||
fn get_acp_resolve_filter_cache(&self) -> &mut ResolveFilterCacheReadTxn<'a> {
|
||||
unsafe {
|
||||
let mptr = self.acp_resolve_filter_cache.as_ptr();
|
||||
&mut (*mptr)
|
||||
as &mut ARCacheReadTxn<
|
||||
'a,
|
||||
(IdentityId, Filter<FilterValid>),
|
||||
Filter<FilterValidResolved>,
|
||||
(),
|
||||
>
|
||||
&mut (*mptr) as &mut ResolveFilterCacheReadTxn<'a>
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -973,9 +954,7 @@ impl<'a> AccessControlsTransaction<'a> for AccessControlsWriteTransaction<'a> {
|
|||
pub struct AccessControlsReadTransaction<'a> {
|
||||
inner: CowCellReadTxn<AccessControlsInner>,
|
||||
// acp_related_search_cache: Cell<ARCacheReadTxn<'a, Uuid, Vec<Uuid>>>,
|
||||
acp_resolve_filter_cache: Cell<
|
||||
ARCacheReadTxn<'a, (IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>, ()>,
|
||||
>,
|
||||
acp_resolve_filter_cache: Cell<ResolveFilterCacheReadTxn<'a>>,
|
||||
}
|
||||
|
||||
unsafe impl<'a> Sync for AccessControlsReadTransaction<'a> {}
|
||||
|
@ -1003,19 +982,10 @@ impl<'a> AccessControlsTransaction<'a> for AccessControlsReadTransaction<'a> {
|
|||
&self.inner.sync_agreements
|
||||
}
|
||||
|
||||
fn get_acp_resolve_filter_cache(
|
||||
&self,
|
||||
) -> &mut ARCacheReadTxn<'a, (IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>, ()>
|
||||
{
|
||||
fn get_acp_resolve_filter_cache(&self) -> &mut ResolveFilterCacheReadTxn<'a> {
|
||||
unsafe {
|
||||
let mptr = self.acp_resolve_filter_cache.as_ptr();
|
||||
&mut (*mptr)
|
||||
as &mut ARCacheReadTxn<
|
||||
'a,
|
||||
(IdentityId, Filter<FilterValid>),
|
||||
Filter<FilterValidResolved>,
|
||||
(),
|
||||
>
|
||||
&mut (*mptr) as &mut ResolveFilterCacheReadTxn<'a>
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use concread::arcache::{ARCache, ARCacheBuilder, ARCacheReadTxn};
|
||||
use concread::arcache::{ARCacheBuilder, ARCacheReadTxn};
|
||||
use concread::cowcell::*;
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
use std::collections::BTreeSet;
|
||||
|
@ -15,7 +15,10 @@ use kanidm_proto::internal::{DomainInfo as ProtoDomainInfo, UiHint};
|
|||
|
||||
use crate::be::{Backend, BackendReadTransaction, BackendTransaction, BackendWriteTransaction};
|
||||
// We use so many, we just import them all ...
|
||||
use crate::filter::{Filter, FilterInvalid, FilterValid, FilterValidResolved};
|
||||
use crate::filter::{
|
||||
Filter, FilterInvalid, FilterValid, FilterValidResolved, ResolveFilterCache,
|
||||
ResolveFilterCacheReadTxn,
|
||||
};
|
||||
use crate::plugins::dyngroup::{DynGroup, DynGroupCache};
|
||||
use crate::plugins::Plugins;
|
||||
use crate::prelude::*;
|
||||
|
@ -52,11 +55,8 @@ pub(crate) mod migrations;
|
|||
pub mod modify;
|
||||
pub(crate) mod recycle;
|
||||
|
||||
const RESOLVE_FILTER_CACHE_MAX: usize = 4096;
|
||||
const RESOLVE_FILTER_CACHE_LOCAL: usize = 0;
|
||||
|
||||
pub type ResolveFilterCacheReadTxn<'a> =
|
||||
ARCacheReadTxn<'a, (IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>, ()>;
|
||||
const RESOLVE_FILTER_CACHE_MAX: usize = 256;
|
||||
const RESOLVE_FILTER_CACHE_LOCAL: usize = 8;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq)]
|
||||
pub(crate) enum ServerPhase {
|
||||
|
@ -92,9 +92,9 @@ pub struct QueryServer {
|
|||
schema: Arc<Schema>,
|
||||
accesscontrols: Arc<AccessControls>,
|
||||
db_tickets: Arc<Semaphore>,
|
||||
read_tickets: Arc<Semaphore>,
|
||||
write_ticket: Arc<Semaphore>,
|
||||
resolve_filter_cache:
|
||||
Arc<ARCache<(IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>>>,
|
||||
resolve_filter_cache: Arc<ResolveFilterCache>,
|
||||
dyngroup_cache: Arc<CowCell<DynGroupCache>>,
|
||||
cid_max: Arc<CowCell<Cid>>,
|
||||
key_providers: Arc<KeyProviders>,
|
||||
|
@ -110,8 +110,8 @@ pub struct QueryServerReadTransaction<'a> {
|
|||
accesscontrols: AccessControlsReadTransaction<'a>,
|
||||
key_providers: KeyProvidersReadTransaction,
|
||||
_db_ticket: SemaphorePermit<'a>,
|
||||
resolve_filter_cache:
|
||||
ARCacheReadTxn<'a, (IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>, ()>,
|
||||
_read_ticket: SemaphorePermit<'a>,
|
||||
resolve_filter_cache: ResolveFilterCacheReadTxn<'a>,
|
||||
// Future we may need this.
|
||||
// cid_max: CowCellReadTxn<Cid>,
|
||||
trim_cid: Cid,
|
||||
|
@ -155,8 +155,12 @@ pub struct QueryServerWriteTransaction<'a> {
|
|||
pub(super) changed_uuid: HashSet<Uuid>,
|
||||
_db_ticket: SemaphorePermit<'a>,
|
||||
_write_ticket: SemaphorePermit<'a>,
|
||||
resolve_filter_cache:
|
||||
ARCacheReadTxn<'a, (IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>, ()>,
|
||||
resolve_filter_cache: ARCacheReadTxn<
|
||||
'a,
|
||||
(IdentityId, Arc<Filter<FilterValid>>),
|
||||
Arc<Filter<FilterValidResolved>>,
|
||||
(),
|
||||
>,
|
||||
dyngroup_cache: CowCellWriteTxn<'a, DynGroupCache>,
|
||||
}
|
||||
|
||||
|
@ -1047,10 +1051,7 @@ impl<'a> QueryServerTransaction<'a> for QueryServerReadTransaction<'a> {
|
|||
&self.key_providers
|
||||
}
|
||||
|
||||
fn get_resolve_filter_cache(
|
||||
&mut self,
|
||||
) -> &mut ARCacheReadTxn<'a, (IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>, ()>
|
||||
{
|
||||
fn get_resolve_filter_cache(&mut self) -> &mut ResolveFilterCacheReadTxn<'a> {
|
||||
&mut self.resolve_filter_cache
|
||||
}
|
||||
|
||||
|
@ -1058,7 +1059,7 @@ impl<'a> QueryServerTransaction<'a> for QueryServerReadTransaction<'a> {
|
|||
&mut self,
|
||||
) -> (
|
||||
&mut BackendReadTransaction<'a>,
|
||||
&mut ARCacheReadTxn<'a, (IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>, ()>,
|
||||
&mut ResolveFilterCacheReadTxn<'a>,
|
||||
) {
|
||||
(&mut self.be_txn, &mut self.resolve_filter_cache)
|
||||
}
|
||||
|
@ -1201,10 +1202,7 @@ impl<'a> QueryServerTransaction<'a> for QueryServerWriteTransaction<'a> {
|
|||
&self.key_providers
|
||||
}
|
||||
|
||||
fn get_resolve_filter_cache(
|
||||
&mut self,
|
||||
) -> &mut ARCacheReadTxn<'a, (IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>, ()>
|
||||
{
|
||||
fn get_resolve_filter_cache(&mut self) -> &mut ResolveFilterCacheReadTxn<'a> {
|
||||
&mut self.resolve_filter_cache
|
||||
}
|
||||
|
||||
|
@ -1212,7 +1210,7 @@ impl<'a> QueryServerTransaction<'a> for QueryServerWriteTransaction<'a> {
|
|||
&mut self,
|
||||
) -> (
|
||||
&mut BackendWriteTransaction<'a>,
|
||||
&mut ARCacheReadTxn<'a, (IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>, ()>,
|
||||
&mut ResolveFilterCacheReadTxn<'a>,
|
||||
) {
|
||||
(&mut self.be_txn, &mut self.resolve_filter_cache)
|
||||
}
|
||||
|
@ -1311,6 +1309,11 @@ impl QueryServer {
|
|||
|
||||
let key_providers = Arc::new(KeyProviders::default());
|
||||
|
||||
// These needs to be pool_size minus one to always leave a DB ticket
|
||||
// for a writer. But it also needs to be at least one :)
|
||||
debug_assert!(pool_size > 0);
|
||||
let read_ticket_pool = std::cmp::max(pool_size - 1, 1);
|
||||
|
||||
Ok(QueryServer {
|
||||
phase,
|
||||
d_info,
|
||||
|
@ -1319,6 +1322,7 @@ impl QueryServer {
|
|||
schema: Arc::new(schema),
|
||||
accesscontrols: Arc::new(AccessControls::default()),
|
||||
db_tickets: Arc::new(Semaphore::new(pool_size as usize)),
|
||||
read_tickets: Arc::new(Semaphore::new(read_ticket_pool as usize)),
|
||||
write_ticket: Arc::new(Semaphore::new(1)),
|
||||
resolve_filter_cache,
|
||||
dyngroup_cache,
|
||||
|
@ -1334,7 +1338,26 @@ impl QueryServer {
|
|||
}
|
||||
|
||||
pub async fn read(&self) -> QueryServerReadTransaction<'_> {
|
||||
// We need to ensure a db conn will be available
|
||||
// Get a read ticket. Basicly this forces us to queue with other readers, while preventing
|
||||
// us from competing with writers on the db tickets. This tilts us to write prioritising
|
||||
// on db operations by always making sure a writer can get a db ticket.
|
||||
let read_ticket = if cfg!(test) {
|
||||
#[allow(clippy::expect_used)]
|
||||
self.read_tickets
|
||||
.try_acquire()
|
||||
.expect("unable to acquire db_ticket for qsr")
|
||||
} else {
|
||||
#[allow(clippy::expect_used)]
|
||||
self.read_tickets
|
||||
.acquire()
|
||||
.await
|
||||
.expect("unable to acquire db_ticket for qsr")
|
||||
};
|
||||
|
||||
// We need to ensure a db conn will be available. At this point either a db ticket
|
||||
// *must* be available because pool_size >= 2 and the only other holders are write
|
||||
// and read ticket holders, OR pool_size == 1, and we are waiting on the writer to now
|
||||
// complete.
|
||||
let db_ticket = if cfg!(test) {
|
||||
#[allow(clippy::expect_used)]
|
||||
self.db_tickets
|
||||
|
@ -1368,6 +1391,7 @@ impl QueryServer {
|
|||
accesscontrols: self.accesscontrols.read(),
|
||||
key_providers: self.key_providers.read(),
|
||||
_db_ticket: db_ticket,
|
||||
_read_ticket: read_ticket,
|
||||
resolve_filter_cache: self.resolve_filter_cache.read(),
|
||||
trim_cid,
|
||||
}
|
||||
|
@ -1387,7 +1411,9 @@ impl QueryServer {
|
|||
.expect("unable to acquire writer_ticket for qsw")
|
||||
};
|
||||
|
||||
// We need to ensure a db conn will be available
|
||||
// We need to ensure a db conn will be available. At this point either a db ticket
|
||||
// *must* be available because pool_size >= 2 and the only other are readers, or
|
||||
// pool_size == 1 and we are waiting on a single reader to now complete
|
||||
let db_ticket = if cfg!(test) {
|
||||
#[allow(clippy::expect_used)]
|
||||
self.db_tickets
|
||||
|
@ -1401,6 +1427,10 @@ impl QueryServer {
|
|||
.expect("unable to acquire db_ticket for qsw")
|
||||
};
|
||||
|
||||
// Point of no return - we now have a DB thread AND the write ticket, we MUST complete
|
||||
// as soon as possible! The following locks and elements below are SYNCHRONOUS but
|
||||
// will never be contented at this point, and will always progress.
|
||||
|
||||
#[allow(clippy::expect_used)]
|
||||
let be_txn = self
|
||||
.be
|
||||
|
@ -1858,6 +1888,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
|
|||
}
|
||||
|
||||
debug!(domain_previous_version = ?previous_version, domain_target_version = ?domain_info_version);
|
||||
debug!(domain_previous_patch_level = ?previous_patch_level, domain_target_patch_level = ?domain_info_patch_level);
|
||||
|
||||
if previous_version <= DOMAIN_LEVEL_2 && domain_info_version >= DOMAIN_LEVEL_3 {
|
||||
self.migrate_domain_2_to_3()?;
|
||||
|
@ -1881,7 +1912,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
|
|||
|
||||
// Similar to the older system info migration handler, these allow "one shot" fixes
|
||||
// to be issued and run by bumping the patch level.
|
||||
if previous_patch_level <= PATCH_LEVEL_1 && domain_info_patch_level >= PATCH_LEVEL_1 {
|
||||
if previous_patch_level < PATCH_LEVEL_1 && domain_info_patch_level >= PATCH_LEVEL_1 {
|
||||
self.migrate_domain_patch_level_1()?;
|
||||
}
|
||||
|
||||
|
@ -2124,7 +2155,6 @@ impl<'a> QueryServerWriteTransaction<'a> {
|
|||
// Point of no return - everything has been validated and reloaded.
|
||||
//
|
||||
// = Lets commit =
|
||||
|
||||
schema
|
||||
.commit()
|
||||
.map(|_| d_info.commit())
|
||||
|
|
|
@ -1186,7 +1186,7 @@ pub enum Value {
|
|||
|
||||
HexString(String),
|
||||
|
||||
Certificate(Certificate),
|
||||
Certificate(Box<Certificate>),
|
||||
}
|
||||
|
||||
impl PartialEq for Value {
|
||||
|
@ -1478,7 +1478,10 @@ impl Value {
|
|||
}
|
||||
|
||||
pub fn new_certificate_s(cert_str: &str) -> Option<Self> {
|
||||
Certificate::from_pem(cert_str).map(Value::Certificate).ok()
|
||||
Certificate::from_pem(cert_str)
|
||||
.map(Box::new)
|
||||
.map(Value::Certificate)
|
||||
.ok()
|
||||
}
|
||||
|
||||
/// Want a `Value::Image`? use this!
|
||||
|
|
|
@ -16,11 +16,11 @@ use kanidm_lib_crypto::{
|
|||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ValueSetCertificate {
|
||||
map: BTreeMap<Sha256Digest, Certificate>,
|
||||
map: BTreeMap<Sha256Digest, Box<Certificate>>,
|
||||
}
|
||||
|
||||
impl ValueSetCertificate {
|
||||
pub fn new(certificate: Certificate) -> Result<Box<Self>, OperationError> {
|
||||
pub fn new(certificate: Box<Certificate>) -> Result<Box<Self>, OperationError> {
|
||||
let mut map = BTreeMap::new();
|
||||
|
||||
let pk_s256 = x509_public_key_s256(&certificate).ok_or_else(|| {
|
||||
|
@ -49,8 +49,9 @@ impl ValueSetCertificate {
|
|||
match db_cert {
|
||||
DbValueCertificate::V1 { certificate_der } => {
|
||||
// Parse the DER
|
||||
let certificate =
|
||||
Certificate::from_der(&certificate_der).map_err(|x509_err| {
|
||||
let certificate = Certificate::from_der(&certificate_der)
|
||||
.map(Box::new)
|
||||
.map_err(|x509_err| {
|
||||
error!(?x509_err, "Unable to restore certificate from DER");
|
||||
OperationError::VS0003CertificateDerDecode
|
||||
})?;
|
||||
|
@ -87,9 +88,10 @@ impl ValueSetCertificate {
|
|||
.collect()
|
||||
}
|
||||
|
||||
#[allow(clippy::should_implement_trait)]
|
||||
pub fn from_iter<T>(iter: T) -> Option<Box<Self>>
|
||||
where
|
||||
T: IntoIterator<Item = Certificate>,
|
||||
T: IntoIterator<Item = Box<Certificate>>,
|
||||
{
|
||||
let mut map = BTreeMap::new();
|
||||
|
||||
|
@ -129,7 +131,7 @@ impl ValueSetT for ValueSetCertificate {
|
|||
match pv {
|
||||
PartialValue::HexString(hs) => {
|
||||
let mut buf = Sha256Digest::default();
|
||||
if hex::decode_to_slice(&hs, &mut buf).is_ok() {
|
||||
if hex::decode_to_slice(hs, &mut buf).is_ok() {
|
||||
self.map.remove(&buf).is_some()
|
||||
} else {
|
||||
false
|
||||
|
@ -143,7 +145,7 @@ impl ValueSetT for ValueSetCertificate {
|
|||
match pv {
|
||||
PartialValue::HexString(hs) => {
|
||||
let mut buf = Sha256Digest::default();
|
||||
if hex::decode_to_slice(&hs, &mut buf).is_ok() {
|
||||
if hex::decode_to_slice(hs, &mut buf).is_ok() {
|
||||
self.map.contains_key(&buf)
|
||||
} else {
|
||||
false
|
||||
|
@ -236,13 +238,13 @@ impl ValueSetT for ValueSetCertificate {
|
|||
|
||||
fn to_certificate_single(&self) -> Option<&Certificate> {
|
||||
if self.map.len() == 1 {
|
||||
self.map.values().take(1).next()
|
||||
self.map.values().take(1).map(|b| b.as_ref()).next()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn as_certificate_set(&self) -> Option<&BTreeMap<Sha256Digest, Certificate>> {
|
||||
fn as_certificate_set(&self) -> Option<&BTreeMap<Sha256Digest, Box<Certificate>>> {
|
||||
Some(&self.map)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -245,6 +245,11 @@ pub trait ValueSetT: std::fmt::Debug + DynClone {
|
|||
None
|
||||
}
|
||||
|
||||
fn as_refer_set_mut(&mut self) -> Option<&mut BTreeSet<Uuid>> {
|
||||
debug_assert!(false);
|
||||
None
|
||||
}
|
||||
|
||||
fn as_bool_set(&self) -> Option<&SmolSet<[bool; 1]>> {
|
||||
debug_assert!(false);
|
||||
None
|
||||
|
@ -620,7 +625,7 @@ pub trait ValueSetT: std::fmt::Debug + DynClone {
|
|||
None
|
||||
}
|
||||
|
||||
fn as_certificate_set(&self) -> Option<&BTreeMap<Sha256Digest, Certificate>> {
|
||||
fn as_certificate_set(&self) -> Option<&BTreeMap<Sha256Digest, Box<Certificate>>> {
|
||||
debug_assert!(false);
|
||||
None
|
||||
}
|
||||
|
|
|
@ -338,6 +338,10 @@ impl ValueSetT for ValueSetRefer {
|
|||
Some(&self.set)
|
||||
}
|
||||
|
||||
fn as_refer_set_mut(&mut self) -> Option<&mut BTreeSet<Uuid>> {
|
||||
Some(&mut self.set)
|
||||
}
|
||||
|
||||
fn as_ref_uuid_iter(&self) -> Option<Box<dyn Iterator<Item = Uuid> + '_>> {
|
||||
Some(Box::new(self.set.iter().copied()))
|
||||
}
|
||||
|
|
|
@ -547,7 +547,7 @@ impl AccountCertificate {
|
|||
let client = copt.to_client(OpType::Write).await;
|
||||
|
||||
if let Err(e) = client
|
||||
.idm_person_certificate_create(&account_id, &pem_data)
|
||||
.idm_person_certificate_create(account_id, &pem_data)
|
||||
.await
|
||||
{
|
||||
handle_client_error(e, copt.output_mode);
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
use crate::error::Error;
|
||||
use crate::kani;
|
||||
use crate::state::*;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
|
@ -76,47 +80,63 @@ pub async fn preflight(state: State) -> Result<(), Error> {
|
|||
apply_flags(client.clone(), state.preflight_flags.as_slice()).await?;
|
||||
|
||||
let state_persons_len = state.persons.len();
|
||||
|
||||
let mut tasks = Vec::with_capacity(state_persons_len);
|
||||
let mut tasks = VecDeque::with_capacity(state_persons_len);
|
||||
|
||||
// Create persons.
|
||||
for person in state.persons.into_iter() {
|
||||
let c = client.clone();
|
||||
// Write operations are single threaded in Kanidm, so we don't need to attempt
|
||||
// to parallelise that here.
|
||||
// tasks.push(tokio::spawn(preflight_person(c, person)))
|
||||
tasks.push(preflight_person(c, person))
|
||||
// While writes are single threaded in Kanidm, searches (such as .exists)
|
||||
// and credential updates are concurrent / parallel. So these parts can be
|
||||
// called in parallel, so we divide up into workers.
|
||||
tasks.push_back(preflight_person(c, person))
|
||||
}
|
||||
|
||||
let tasks_par = tasks.split_off(state_persons_len / 2);
|
||||
let tasks = Arc::new(Mutex::new(tasks));
|
||||
let counter = Arc::new(AtomicU32::new(0));
|
||||
let par = std::thread::available_parallelism().unwrap();
|
||||
|
||||
let left = tokio::spawn(async move {
|
||||
for (i, task) in tasks.into_iter().enumerate() {
|
||||
let _ = task.await;
|
||||
if i % 500 == 0 {
|
||||
eprint!(".");
|
||||
}
|
||||
}
|
||||
});
|
||||
let right = tokio::spawn(async move {
|
||||
for (i, task) in tasks_par.into_iter().enumerate() {
|
||||
let _ = task.await;
|
||||
if i % 500 == 0 {
|
||||
eprint!(".");
|
||||
}
|
||||
}
|
||||
});
|
||||
let handles: Vec<_> = (0..par.into())
|
||||
.map(|_| {
|
||||
let tasks_q = tasks.clone();
|
||||
let counter_c = counter.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let maybe_task = async {
|
||||
let mut guard = tasks_q.lock().await;
|
||||
guard.pop_front()
|
||||
}
|
||||
.await;
|
||||
|
||||
left.await.map_err(|tokio_err| {
|
||||
error!(?tokio_err, "Failed to join task");
|
||||
Error::Tokio
|
||||
})?;
|
||||
right.await.map_err(|tokio_err| {
|
||||
error!(?tokio_err, "Failed to join task");
|
||||
Error::Tokio
|
||||
})?;
|
||||
if let Some(t) = maybe_task {
|
||||
let _ = t.await;
|
||||
let was = counter_c.fetch_add(1, Ordering::Relaxed);
|
||||
if was % 1000 == 999 {
|
||||
let order = was + 1;
|
||||
eprint!("{}", order);
|
||||
} else if was % 100 == 99 {
|
||||
// Since we just added one, this just rolled over.
|
||||
eprint!(".");
|
||||
}
|
||||
} else {
|
||||
// queue drained.
|
||||
break;
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
for handle in handles {
|
||||
handle.await.map_err(|tokio_err| {
|
||||
error!(?tokio_err, "Failed to join task");
|
||||
Error::Tokio
|
||||
})?;
|
||||
}
|
||||
|
||||
eprintln!("done");
|
||||
|
||||
// Create groups.
|
||||
let counter = Arc::new(AtomicU32::new(0));
|
||||
let mut tasks = Vec::with_capacity(state.groups.len());
|
||||
|
||||
for group in state.groups.into_iter() {
|
||||
|
@ -129,11 +149,18 @@ pub async fn preflight(state: State) -> Result<(), Error> {
|
|||
|
||||
for task in tasks {
|
||||
task.await?;
|
||||
/*
|
||||
task.await
|
||||
*/
|
||||
let was = counter.fetch_add(1, Ordering::Relaxed);
|
||||
if was % 1000 == 999 {
|
||||
let order = was + 1;
|
||||
eprint!("{}", order);
|
||||
} else if was % 100 == 99 {
|
||||
// Since we just added one, this just rolled over.
|
||||
eprint!(".");
|
||||
}
|
||||
}
|
||||
|
||||
eprintln!("done");
|
||||
|
||||
// Create integrations.
|
||||
|
||||
info!("Ready to 🛫");
|
||||
|
|
Loading…
Reference in a new issue