From f5924443f08e462067937a5dd0e2c19e5e1255da Mon Sep 17 00:00:00 2001 From: Firstyear Date: Mon, 3 Jul 2023 12:20:11 +1000 Subject: [PATCH] Improve durability of migrations (#1804) --- server/lib/src/be/mod.rs | 17 ++++----- server/lib/src/server/migrations.rs | 56 ++++++++++++++--------------- server/lib/src/server/mod.rs | 10 ++++-- server/lib/src/valueset/jws.rs | 7 ++-- 4 files changed, 47 insertions(+), 43 deletions(-) diff --git a/server/lib/src/be/mod.rs b/server/lib/src/be/mod.rs index 8df271b5b..e13dc105e 100644 --- a/server/lib/src/be/mod.rs +++ b/server/lib/src/be/mod.rs @@ -164,9 +164,8 @@ unsafe impl<'a> Send for BackendReadTransaction<'a> {} pub struct BackendWriteTransaction<'a> { idlayer: IdlArcSqliteWriteTransaction<'a>, - idxmeta: CowCellReadTxn, - ruv: ReplicationUpdateVectorWriteTransaction<'a>, idxmeta_wr: CowCellWriteTxn<'a, IdxMeta>, + ruv: ReplicationUpdateVectorWriteTransaction<'a>, } impl IdRawEntry { @@ -946,7 +945,7 @@ impl<'a> BackendTransaction for BackendWriteTransaction<'a> { } fn get_idxmeta_ref(&self) -> &IdxMeta { - &self.idxmeta + &self.idxmeta_wr } } @@ -1328,7 +1327,7 @@ impl<'a> BackendWriteTransaction<'a> { // this we discard the lifetime on idxmeta, because we know that it will // remain constant for the life of the operation. - let idxmeta = unsafe { &(*(&self.idxmeta.idxkeys as *const _)) }; + let idxmeta = unsafe { &(*(&self.idxmeta_wr.idxkeys as *const _)) }; let idx_diff = Entry::idx_diff(idxmeta, pre, post); @@ -1380,7 +1379,7 @@ impl<'a> BackendWriteTransaction<'a> { let idx_table_set: HashSet<_> = idx_table_list.into_iter().collect(); let missing: Vec<_> = self - .idxmeta + .idxmeta_wr .idxkeys .keys() .filter_map(|ikey| { @@ -1412,7 +1411,7 @@ impl<'a> BackendWriteTransaction<'a> { trace!("Creating index -> uuid2rdn"); self.idlayer.create_uuid2rdn()?; - self.idxmeta + self.idxmeta_wr .idxkeys .keys() .try_for_each(|ikey| self.idlayer.create_idx(&ikey.attr, ikey.itype)) @@ -1610,9 +1609,8 @@ impl<'a> BackendWriteTransaction<'a> { pub fn commit(self) -> Result<(), OperationError> { let BackendWriteTransaction { idlayer, - idxmeta: _, - ruv, idxmeta_wr, + ruv, } = self; idlayer.commit().map(|()| { @@ -1796,9 +1794,8 @@ impl Backend { pub fn write(&self) -> BackendWriteTransaction { BackendWriteTransaction { idlayer: self.idlayer.write(), - idxmeta: self.idxmeta.read(), - ruv: self.ruv.write(), idxmeta_wr: self.idxmeta.write(), + ruv: self.ruv.write(), } } diff --git a/server/lib/src/server/migrations.rs b/server/lib/src/server/migrations.rs index eb6368632..4f1a66702 100644 --- a/server/lib/src/server/migrations.rs +++ b/server/lib/src/server/migrations.rs @@ -8,6 +8,10 @@ use super::ServerPhase; impl QueryServer { #[instrument(level = "info", name = "system_initialisation", skip_all)] pub async fn initialise_helper(&self, ts: Duration) -> Result<(), OperationError> { + // We need to perform this in a single transaction pass to prevent tainting + // databases during upgrades. + let mut write_txn = self.write(ts).await; + // Check our database version - attempt to do an initial indexing // based on the in memory configuration // @@ -18,10 +22,7 @@ impl QueryServer { // A major reason here to split to multiple transactions is to allow schema // reloading to occur, which causes the idxmeta to update, and allows validation // of the schema in the subsequent steps as we proceed. - let mut reindex_write_1 = self.write(ts).await; - reindex_write_1 - .upgrade_reindex(SYSTEM_INDEX_VERSION) - .and_then(|_| reindex_write_1.commit())?; + write_txn.upgrade_reindex(SYSTEM_INDEX_VERSION)?; // Because we init the schema here, and commit, this reloads meaning // that the on-disk index meta has been loaded, so our subsequent @@ -32,32 +33,30 @@ impl QueryServer { // the schema to tell us what's indexed), but because we have the in // mem schema that defines how schema is structuded, and this is all // marked "system", then we won't have an issue here. - let mut ts_write_1 = self.write(ts).await; - ts_write_1 + write_txn .initialise_schema_core() - .and_then(|_| ts_write_1.commit())?; + .and_then(|_| write_txn.reload())?; - let mut ts_write_2 = self.write(ts).await; - ts_write_2 + write_txn .initialise_schema_idm() - .and_then(|_| ts_write_2.commit())?; + .and_then(|_| write_txn.reload())?; // reindex and set to version + 1, this way when we bump the version // we are essetially pushing this version id back up to step write_1 - let mut reindex_write_2 = self.write(ts).await; - reindex_write_2 + write_txn .upgrade_reindex(SYSTEM_INDEX_VERSION + 1) - .and_then(|_| reindex_write_2.commit())?; + .and_then(|_| write_txn.reload())?; // Force the schema to reload - this is so that any changes to index slope - // analysis are now reflected correctly. + // analysis that was performed during the reindex are now reflected correctly + // in the in-memory schema cache. // // A side effect of these reloads is that other plugins or elements that reload // on schema change are now setup. - let mut slope_reload = self.write(ts).await; - slope_reload.set_phase(ServerPhase::SchemaReady); - slope_reload.force_schema_reload(); - slope_reload.commit()?; + + write_txn.set_phase(ServerPhase::SchemaReady); + write_txn.force_schema_reload(); + write_txn.reload()?; // Now, based on the system version apply migrations. You may ask "should you not // be doing migrations before indexes?". And this is a very good question! The issue @@ -67,11 +66,11 @@ impl QueryServer { // the indexing subsystem is schema/value agnostic - the fact the values still let their keys // be extracted, means that the pres indexes will be valid even though the entries are pending // migration. We must be sure to NOT use EQ/SUB indexes in the migration code however! - let mut migrate_txn = self.write(ts).await; + // // If we are "in the process of being setup" this is 0, and the migrations will have no // effect as ... there is nothing to migrate! It allows reset of the version to 0 to force // db migrations to take place. - let system_info_version = match migrate_txn.internal_search_uuid(UUID_SYSTEM_INFO) { + let system_info_version = match write_txn.internal_search_uuid(UUID_SYSTEM_INFO) { Ok(e) => Ok(e.get_ava_single_uint32("version").unwrap_or(0)), Err(OperationError::NoMatchingEntries) => Ok(0), Err(r) => Err(r), @@ -86,29 +85,28 @@ impl QueryServer { } if system_info_version < 9 { - migrate_txn.migrate_8_to_9()?; + write_txn.migrate_8_to_9()?; } if system_info_version < 10 { - migrate_txn.migrate_9_to_10()?; + write_txn.migrate_9_to_10()?; } if system_info_version < 11 { - migrate_txn.migrate_10_to_11()?; + write_txn.migrate_10_to_11()?; } if system_info_version < 12 { - migrate_txn.migrate_11_to_12()?; + write_txn.migrate_11_to_12()?; } } - migrate_txn.commit()?; + write_txn.reload()?; // Migrations complete. Init idm will now set the version as needed. - let mut ts_write_3 = self.write(ts).await; - ts_write_3.initialise_idm().and_then(|_| { - ts_write_3.set_phase(ServerPhase::Running); - ts_write_3.commit() + write_txn.initialise_idm().and_then(|_| { + write_txn.set_phase(ServerPhase::Running); + write_txn.commit() })?; // Here is where in the future we will need to apply domain version increments. diff --git a/server/lib/src/server/mod.rs b/server/lib/src/server/mod.rs index 9526a15fe..6417221ba 100644 --- a/server/lib/src/server/mod.rs +++ b/server/lib/src/server/mod.rs @@ -1427,8 +1427,7 @@ impl<'a> QueryServerWriteTransaction<'a> { *self.phase = phase } - #[instrument(level = "info", skip_all)] - pub fn commit(mut self) -> Result<(), OperationError> { + pub(crate) fn reload(&mut self) -> Result<(), OperationError> { // This could be faster if we cache the set of classes changed // in an operation so we can check if we need to do the reload or not // @@ -1453,6 +1452,13 @@ impl<'a> QueryServerWriteTransaction<'a> { self.reload_domain_info()?; } + Ok(()) + } + + #[instrument(level = "info", skip_all)] + pub fn commit(mut self) -> Result<(), OperationError> { + self.reload()?; + // Now destructure the transaction ready to reset it. let QueryServerWriteTransaction { committed, diff --git a/server/lib/src/valueset/jws.rs b/server/lib/src/valueset/jws.rs index d204d86a4..b2aae8efa 100644 --- a/server/lib/src/valueset/jws.rs +++ b/server/lib/src/valueset/jws.rs @@ -90,8 +90,11 @@ impl ValueSetT for ValueSetJwsKeyEs256 { } } - fn contains(&self, _pv: &PartialValue) -> bool { - false + fn contains(&self, pv: &PartialValue) -> bool { + match pv { + PartialValue::Iutf8(kid) => self.set.iter().any(|k| k.get_kid() == kid), + _ => false, + } } fn substring(&self, _pv: &PartialValue) -> bool {