mirror of
https://github.com/kanidm/kanidm.git
synced 2025-02-23 20:47:01 +01:00
Improve durability of migrations (#1804)
This commit is contained in:
parent
cd7f1781ad
commit
83e4d3a85e
|
@ -166,9 +166,8 @@ unsafe impl<'a> Send for BackendReadTransaction<'a> {}
|
|||
|
||||
pub struct BackendWriteTransaction<'a> {
|
||||
idlayer: IdlArcSqliteWriteTransaction<'a>,
|
||||
idxmeta: CowCellReadTxn<IdxMeta>,
|
||||
ruv: ReplicationUpdateVectorWriteTransaction<'a>,
|
||||
idxmeta_wr: CowCellWriteTxn<'a, IdxMeta>,
|
||||
ruv: ReplicationUpdateVectorWriteTransaction<'a>,
|
||||
}
|
||||
|
||||
impl IdRawEntry {
|
||||
|
@ -968,7 +967,7 @@ impl<'a> BackendTransaction for BackendWriteTransaction<'a> {
|
|||
}
|
||||
|
||||
fn get_idxmeta_ref(&self) -> &IdxMeta {
|
||||
&self.idxmeta
|
||||
&self.idxmeta_wr
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1483,7 +1482,7 @@ impl<'a> BackendWriteTransaction<'a> {
|
|||
// this we discard the lifetime on idxmeta, because we know that it will
|
||||
// remain constant for the life of the operation.
|
||||
|
||||
let idxmeta = unsafe { &(*(&self.idxmeta.idxkeys as *const _)) };
|
||||
let idxmeta = unsafe { &(*(&self.idxmeta_wr.idxkeys as *const _)) };
|
||||
|
||||
let idx_diff = Entry::idx_diff(idxmeta, pre, post);
|
||||
|
||||
|
@ -1535,7 +1534,7 @@ impl<'a> BackendWriteTransaction<'a> {
|
|||
let idx_table_set: HashSet<_> = idx_table_list.into_iter().collect();
|
||||
|
||||
let missing: Vec<_> = self
|
||||
.idxmeta
|
||||
.idxmeta_wr
|
||||
.idxkeys
|
||||
.keys()
|
||||
.filter_map(|ikey| {
|
||||
|
@ -1567,7 +1566,7 @@ impl<'a> BackendWriteTransaction<'a> {
|
|||
trace!("Creating index -> uuid2rdn");
|
||||
self.idlayer.create_uuid2rdn()?;
|
||||
|
||||
self.idxmeta
|
||||
self.idxmeta_wr
|
||||
.idxkeys
|
||||
.keys()
|
||||
.try_for_each(|ikey| self.idlayer.create_idx(&ikey.attr, ikey.itype))
|
||||
|
@ -1766,9 +1765,8 @@ impl<'a> BackendWriteTransaction<'a> {
|
|||
pub fn commit(self) -> Result<(), OperationError> {
|
||||
let BackendWriteTransaction {
|
||||
idlayer,
|
||||
idxmeta: _,
|
||||
ruv,
|
||||
idxmeta_wr,
|
||||
ruv,
|
||||
} = self;
|
||||
|
||||
idlayer.commit().map(|()| {
|
||||
|
@ -1952,9 +1950,8 @@ impl Backend {
|
|||
pub fn write(&self) -> BackendWriteTransaction {
|
||||
BackendWriteTransaction {
|
||||
idlayer: self.idlayer.write(),
|
||||
idxmeta: self.idxmeta.read(),
|
||||
ruv: self.ruv.write(),
|
||||
idxmeta_wr: self.idxmeta.write(),
|
||||
ruv: self.ruv.write(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,10 @@ use super::ServerPhase;
|
|||
impl QueryServer {
|
||||
#[instrument(level = "info", name = "system_initialisation", skip_all)]
|
||||
pub async fn initialise_helper(&self, ts: Duration) -> Result<(), OperationError> {
|
||||
// We need to perform this in a single transaction pass to prevent tainting
|
||||
// databases during upgrades.
|
||||
let mut write_txn = self.write(ts).await;
|
||||
|
||||
// Check our database version - attempt to do an initial indexing
|
||||
// based on the in memory configuration
|
||||
//
|
||||
|
@ -18,10 +22,7 @@ impl QueryServer {
|
|||
// A major reason here to split to multiple transactions is to allow schema
|
||||
// reloading to occur, which causes the idxmeta to update, and allows validation
|
||||
// of the schema in the subsequent steps as we proceed.
|
||||
let mut reindex_write_1 = self.write(ts).await;
|
||||
reindex_write_1
|
||||
.upgrade_reindex(SYSTEM_INDEX_VERSION)
|
||||
.and_then(|_| reindex_write_1.commit())?;
|
||||
write_txn.upgrade_reindex(SYSTEM_INDEX_VERSION)?;
|
||||
|
||||
// Because we init the schema here, and commit, this reloads meaning
|
||||
// that the on-disk index meta has been loaded, so our subsequent
|
||||
|
@ -32,32 +33,30 @@ impl QueryServer {
|
|||
// the schema to tell us what's indexed), but because we have the in
|
||||
// mem schema that defines how schema is structuded, and this is all
|
||||
// marked "system", then we won't have an issue here.
|
||||
let mut ts_write_1 = self.write(ts).await;
|
||||
ts_write_1
|
||||
write_txn
|
||||
.initialise_schema_core()
|
||||
.and_then(|_| ts_write_1.commit())?;
|
||||
.and_then(|_| write_txn.reload())?;
|
||||
|
||||
let mut ts_write_2 = self.write(ts).await;
|
||||
ts_write_2
|
||||
write_txn
|
||||
.initialise_schema_idm()
|
||||
.and_then(|_| ts_write_2.commit())?;
|
||||
.and_then(|_| write_txn.reload())?;
|
||||
|
||||
// reindex and set to version + 1, this way when we bump the version
|
||||
// we are essetially pushing this version id back up to step write_1
|
||||
let mut reindex_write_2 = self.write(ts).await;
|
||||
reindex_write_2
|
||||
write_txn
|
||||
.upgrade_reindex(SYSTEM_INDEX_VERSION + 1)
|
||||
.and_then(|_| reindex_write_2.commit())?;
|
||||
.and_then(|_| write_txn.reload())?;
|
||||
|
||||
// Force the schema to reload - this is so that any changes to index slope
|
||||
// analysis are now reflected correctly.
|
||||
// analysis that was performed during the reindex are now reflected correctly
|
||||
// in the in-memory schema cache.
|
||||
//
|
||||
// A side effect of these reloads is that other plugins or elements that reload
|
||||
// on schema change are now setup.
|
||||
let mut slope_reload = self.write(ts).await;
|
||||
slope_reload.set_phase(ServerPhase::SchemaReady);
|
||||
slope_reload.force_schema_reload();
|
||||
slope_reload.commit()?;
|
||||
|
||||
write_txn.set_phase(ServerPhase::SchemaReady);
|
||||
write_txn.force_schema_reload();
|
||||
write_txn.reload()?;
|
||||
|
||||
// Now, based on the system version apply migrations. You may ask "should you not
|
||||
// be doing migrations before indexes?". And this is a very good question! The issue
|
||||
|
@ -67,11 +66,11 @@ impl QueryServer {
|
|||
// the indexing subsystem is schema/value agnostic - the fact the values still let their keys
|
||||
// be extracted, means that the pres indexes will be valid even though the entries are pending
|
||||
// migration. We must be sure to NOT use EQ/SUB indexes in the migration code however!
|
||||
let mut migrate_txn = self.write(ts).await;
|
||||
//
|
||||
// If we are "in the process of being setup" this is 0, and the migrations will have no
|
||||
// effect as ... there is nothing to migrate! It allows reset of the version to 0 to force
|
||||
// db migrations to take place.
|
||||
let system_info_version = match migrate_txn.internal_search_uuid(UUID_SYSTEM_INFO) {
|
||||
let system_info_version = match write_txn.internal_search_uuid(UUID_SYSTEM_INFO) {
|
||||
Ok(e) => Ok(e.get_ava_single_uint32("version").unwrap_or(0)),
|
||||
Err(OperationError::NoMatchingEntries) => Ok(0),
|
||||
Err(r) => Err(r),
|
||||
|
@ -86,29 +85,28 @@ impl QueryServer {
|
|||
}
|
||||
|
||||
if system_info_version < 9 {
|
||||
migrate_txn.migrate_8_to_9()?;
|
||||
write_txn.migrate_8_to_9()?;
|
||||
}
|
||||
|
||||
if system_info_version < 10 {
|
||||
migrate_txn.migrate_9_to_10()?;
|
||||
write_txn.migrate_9_to_10()?;
|
||||
}
|
||||
|
||||
if system_info_version < 11 {
|
||||
migrate_txn.migrate_10_to_11()?;
|
||||
write_txn.migrate_10_to_11()?;
|
||||
}
|
||||
|
||||
if system_info_version < 12 {
|
||||
migrate_txn.migrate_11_to_12()?;
|
||||
write_txn.migrate_11_to_12()?;
|
||||
}
|
||||
}
|
||||
|
||||
migrate_txn.commit()?;
|
||||
write_txn.reload()?;
|
||||
// Migrations complete. Init idm will now set the version as needed.
|
||||
|
||||
let mut ts_write_3 = self.write(ts).await;
|
||||
ts_write_3.initialise_idm().and_then(|_| {
|
||||
ts_write_3.set_phase(ServerPhase::Running);
|
||||
ts_write_3.commit()
|
||||
write_txn.initialise_idm().and_then(|_| {
|
||||
write_txn.set_phase(ServerPhase::Running);
|
||||
write_txn.commit()
|
||||
})?;
|
||||
|
||||
// Here is where in the future we will need to apply domain version increments.
|
||||
|
|
|
@ -1486,8 +1486,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
|
|||
*self.phase = phase
|
||||
}
|
||||
|
||||
#[instrument(level = "info", skip_all)]
|
||||
pub fn commit(mut self) -> Result<(), OperationError> {
|
||||
pub(crate) fn reload(&mut self) -> Result<(), OperationError> {
|
||||
// This could be faster if we cache the set of classes changed
|
||||
// in an operation so we can check if we need to do the reload or not
|
||||
//
|
||||
|
@ -1512,6 +1511,13 @@ impl<'a> QueryServerWriteTransaction<'a> {
|
|||
self.reload_domain_info()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level = "info", skip_all)]
|
||||
pub fn commit(mut self) -> Result<(), OperationError> {
|
||||
self.reload()?;
|
||||
|
||||
// Now destructure the transaction ready to reset it.
|
||||
let QueryServerWriteTransaction {
|
||||
committed,
|
||||
|
|
|
@ -90,8 +90,11 @@ impl ValueSetT for ValueSetJwsKeyEs256 {
|
|||
}
|
||||
}
|
||||
|
||||
fn contains(&self, _pv: &PartialValue) -> bool {
|
||||
false
|
||||
fn contains(&self, pv: &PartialValue) -> bool {
|
||||
match pv {
|
||||
PartialValue::Iutf8(kid) => self.set.iter().any(|k| k.get_kid() == kid),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn substring(&self, _pv: &PartialValue) -> bool {
|
||||
|
|
Loading…
Reference in a new issue