Improve batch migrations

This improves batch migrations to work on groups of entries rather
than iterating over one entry at a time. Generally this makes logs
clearer, but also yields an ~8% performance improvement in test
execution time.
This commit is contained in:
William Brown 2025-02-23 13:14:04 +10:00
parent cb287eeb61
commit 97bf309b3a
3 changed files with 150 additions and 46 deletions
proto/src/internal
server/lib/src
migration_data/dl10
server

View file

@ -223,6 +223,7 @@ pub enum OperationError {
MG0007Oauth2StrictConstraintsNotMet,
MG0008SkipUpgradeAttempted,
MG0009InvalidTargetLevelForBootstrap,
MG0010MigrationDataMissingUuid,
//
KP0001KeyProviderNotLoaded,
KP0002KeyProviderInvalidClass,
@ -464,6 +465,7 @@ impl OperationError {
Self::MG0007Oauth2StrictConstraintsNotMet => Some("Migration Constraints Not Met - All OAuth2 clients must have strict-redirect-uri mode enabled.".into()),
Self::MG0008SkipUpgradeAttempted => Some("Skip Upgrade Attempted.".into()),
Self::MG0009InvalidTargetLevelForBootstrap => Some("The request target domain level was not valid for bootstrapping a new server instance".into()),
Self::MG0010MigrationDataMissingUuid => Some("A migration entry was found to be invalid and missing a uuid.".into()),
Self::PL0001GidOverlapsSystemRange => None,
Self::SC0001IncomingSshPublicKey => None,
Self::SC0002ReferenceSyntaxInvalid => Some("A SCIM Reference Set contained invalid syntax and can not be processed.".into()),

View file

@ -69,7 +69,6 @@ pub fn phase_1_schema_attrs() -> Vec<EntryInitNew> {
SCHEMA_ATTR_SYNC_TOKEN_SESSION.clone().into(),
SCHEMA_ATTR_UNIX_PASSWORD.clone().into(),
SCHEMA_ATTR_USER_AUTH_TOKEN_SESSION.clone().into(),
SCHEMA_ATTR_DENIED_NAME.clone().into(),
SCHEMA_ATTR_CREDENTIAL_TYPE_MINIMUM.clone().into(),
SCHEMA_ATTR_WEBAUTHN_ATTESTATION_CA_LIST.clone().into(),
// DL4

View file

@ -1,13 +1,12 @@
use crate::prelude::*;
use super::ServerPhase;
use crate::migration_data;
use crate::prelude::*;
use kanidm_proto::internal::{
DomainUpgradeCheckItem as ProtoDomainUpgradeCheckItem,
DomainUpgradeCheckReport as ProtoDomainUpgradeCheckReport,
DomainUpgradeCheckStatus as ProtoDomainUpgradeCheckStatus,
};
use super::ServerPhase;
use std::cmp::Ordering;
impl QueryServer {
#[instrument(level = "info", name = "system_initialisation", skip_all)]
@ -221,43 +220,158 @@ impl QueryServerWriteTransaction<'_> {
.and_then(|()| self.reload())
}
/// Given a set of entries, create entries that do not exist, and migrate entries
/// that do exist. This operation always applies the create step first, and
/// migrations second.
///
/// This means if you have *ordering* requirements for the entries in the migration
/// then you *MUST* express that ordering requirement in multiple subsequent calls
/// to this function.
#[instrument(level = "debug", skip_all)]
fn internal_migrate_or_create_batch(
&mut self,
msg: &str,
_msg: &str,
entries: Vec<EntryInitNew>,
) -> Result<(), OperationError> {
let r: Result<(), _> = entries
.into_iter()
.try_for_each(|entry| self.internal_migrate_or_create(entry));
// Pull out the uuids of all the entries.
let mut entries_w_uuid: Vec<(Uuid, EntryInitNew)> = Vec::with_capacity(entries.len());
if let Err(err) = r {
error!(?err, msg);
debug_assert!(false);
for entry in entries.into_iter() {
let entry_uuid = entry
.get_uuid()
.ok_or(OperationError::MG0010MigrationDataMissingUuid)?;
entries_w_uuid.push((entry_uuid, entry));
}
// Now we can search for the entries.
let inner: Vec<_> = entries_w_uuid
.iter()
.map(|(u, _)| f_eq(Attribute::Uuid, PartialValue::Uuid(*u)))
.collect();
let filter = filter!(f_or(inner));
let mut entries = self.internal_search(filter)?;
// Now we have to partition the entries_w_uuid into entries that
// need creation, and ones that need to be migrated.
//
// To do this, we sort both lists by uuid and then look through them.
entries.sort_unstable_by_key(|e| e.get_uuid());
entries_w_uuid.sort_unstable_by_key(|(u, _)| *u);
let mut entries_to_create = Vec::with_capacity(entries_w_uuid.len() - entries.len());
let mut entries_to_migrate = Vec::with_capacity(entries.len());
let mut entry_iter = entries.into_iter();
let mut next_entry = entry_iter.next();
// This looks at both lists like:
//
// migrate: [ A, B, C, D, E ]
// db: [ B, C, E ]
//
// As we iter we start in the migrate list:
//
// v
// migrate: [ A, B, C, D, E ]
// db: [ B, C, E ]
// ^
//
// Since uuid A != B, and A < B, we push A to the create set.
//
// v
// migrate: [ A, B, C, D, E ]
// db: [ B, C, E ]
// ^
//
// Now B == B, so we push this to the migrate set and advance both.
//
// This normally will continue, but we have to account for *one* case
// that should be impossible, but paranoia is good.
//
// That case is a uuid in db that isn't in migrate.
//
// v
// migrate: [ A, B, D, E ]
// db: [ B, C, E ]
// ^
// In this case, we have to advance db iter, but not the entries_w_uuid iter
// so that D is still considered correctly.
'outer: for (u, entry) in entries_w_uuid.into_iter() {
// There is something here to compare against - is it our entry?
'inner: loop {
if let Some(ref db_entry) = next_entry {
match u.cmp(&db_entry.get_uuid()) {
Ordering::Equal => {
// This entry needs migration.
entries_to_migrate.push((u, entry));
// Advanced the db_entry iter.
next_entry = entry_iter.next();
continue 'outer;
}
Ordering::Less => {
// Since our uuid is less than db_entry, iterate only the
// entries_w_uuid set
entries_to_create.push(entry);
continue 'outer;
}
Ordering::Greater => {
// IMPOSSIBLE CASE
debug_assert!(false);
next_entry = entry_iter.next();
continue 'inner;
}
}
} else {
// Nothing left to compare, must just need create
entries_to_create.push(entry);
continue 'outer;
}
}
}
// Given the set of entries to create, perform that now.
if !entries_to_create.is_empty() {
self.internal_create(entries_to_create)?;
}
// Apply batched modifications now.
if !entries_to_migrate.is_empty() {
let mut modifications: Vec<(Uuid, _)> = Vec::with_capacity(entries_to_migrate.len());
for (u, entry) in entries_to_migrate {
/*
// If we need to ignore attrs, do so here.
for attr in attrs.iter() {
e.remove_ava(attr);
}
*/
let modlist = entry
.gen_modlist_assert(&self.schema)
.map_err(OperationError::SchemaViolation)?;
modifications.push((u, modlist));
}
self.internal_batch_modify(modifications.into_iter())?;
}
// Complete!!!
Ok(())
}
#[instrument(level = "debug", skip_all)]
/// - If the thing exists:
/// - Ensure the set of attributes match and are present
/// (but don't delete multivalue, or extended attributes in the situation.
/// - If not:
/// - Create the entry
///
/// This will extra classes an attributes alone!
/// This will ignore the specified list of attributes, so that if an admin has
/// modified those values then we don't stomp them.
///
/// NOTE: `gen_modlist*` IS schema aware and will handle multivalue correctly!
fn internal_migrate_or_create(
&mut self,
e: Entry<EntryInit, EntryNew>,
) -> Result<(), OperationError> {
self.internal_migrate_or_create_ignore_attrs(e, &[])
}
/// This is the same as [QueryServerWriteTransaction::internal_migrate_or_create] but it will ignore the specified
/// list of attributes, so that if an admin has modified those values then we don't
/// stomp them.
#[instrument(level = "trace", skip_all)]
fn internal_migrate_or_create_ignore_attrs(
&mut self,
@ -449,15 +563,10 @@ impl QueryServerWriteTransaction<'_> {
debug_assert!(*self.phase >= ServerPhase::SchemaReady);
let idm_data = migration_data::dl9::phase_7_builtin_access_control_profiles();
idm_data
.into_iter()
.try_for_each(|entry| self.internal_migrate_or_create(entry))
.map_err(|err| {
error!(?err, "migrate_domain_patch_level_2 -> Error");
err
})?;
self.internal_migrate_or_create_batch(
"patch level 2 - access control profiles",
migration_data::dl9::phase_7_builtin_access_control_profiles(),
)?;
self.reload()?;
@ -548,24 +657,18 @@ impl QueryServerWriteTransaction<'_> {
#[instrument(level = "info", skip_all)]
pub(crate) fn initialise_schema_core(&mut self) -> Result<(), OperationError> {
admin_debug!("initialise_schema_core -> start ...");
// Load in all the "core" schema, that we already have in "memory".
let entries = self.schema.to_entries();
// admin_debug!("Dumping schemas: {:?}", entries);
let r = self
.internal_migrate_or_create_batch("initialise schema core", self.schema.to_entries());
// internal_migrate_or_create.
let r: Result<_, _> = entries.into_iter().try_for_each(|e| {
trace!(?e, "init schema entry");
self.internal_migrate_or_create(e)
});
if r.is_ok() {
admin_debug!("initialise_schema_core -> Ok!");
if let Err(err) = &r {
error!(?err);
debug_assert!(false);
} else {
admin_error!(?r, "initialise_schema_core -> Error");
debug!("Ok!");
}
// why do we have error handling if it's always supposed to be `Ok`?
debug_assert!(r.is_ok());
r
}
}