Improve durability of migrations (#1804)

This commit is contained in:
Firstyear 2023-07-03 12:20:11 +10:00 committed by William Brown
parent bf8b074389
commit f5924443f0
4 changed files with 47 additions and 43 deletions

View file

@ -164,9 +164,8 @@ unsafe impl<'a> Send for BackendReadTransaction<'a> {}
pub struct BackendWriteTransaction<'a> { pub struct BackendWriteTransaction<'a> {
idlayer: IdlArcSqliteWriteTransaction<'a>, idlayer: IdlArcSqliteWriteTransaction<'a>,
idxmeta: CowCellReadTxn<IdxMeta>,
ruv: ReplicationUpdateVectorWriteTransaction<'a>,
idxmeta_wr: CowCellWriteTxn<'a, IdxMeta>, idxmeta_wr: CowCellWriteTxn<'a, IdxMeta>,
ruv: ReplicationUpdateVectorWriteTransaction<'a>,
} }
impl IdRawEntry { impl IdRawEntry {
@ -946,7 +945,7 @@ impl<'a> BackendTransaction for BackendWriteTransaction<'a> {
} }
fn get_idxmeta_ref(&self) -> &IdxMeta { fn get_idxmeta_ref(&self) -> &IdxMeta {
&self.idxmeta &self.idxmeta_wr
} }
} }
@ -1328,7 +1327,7 @@ impl<'a> BackendWriteTransaction<'a> {
// this we discard the lifetime on idxmeta, because we know that it will // this we discard the lifetime on idxmeta, because we know that it will
// remain constant for the life of the operation. // remain constant for the life of the operation.
let idxmeta = unsafe { &(*(&self.idxmeta.idxkeys as *const _)) }; let idxmeta = unsafe { &(*(&self.idxmeta_wr.idxkeys as *const _)) };
let idx_diff = Entry::idx_diff(idxmeta, pre, post); let idx_diff = Entry::idx_diff(idxmeta, pre, post);
@ -1380,7 +1379,7 @@ impl<'a> BackendWriteTransaction<'a> {
let idx_table_set: HashSet<_> = idx_table_list.into_iter().collect(); let idx_table_set: HashSet<_> = idx_table_list.into_iter().collect();
let missing: Vec<_> = self let missing: Vec<_> = self
.idxmeta .idxmeta_wr
.idxkeys .idxkeys
.keys() .keys()
.filter_map(|ikey| { .filter_map(|ikey| {
@ -1412,7 +1411,7 @@ impl<'a> BackendWriteTransaction<'a> {
trace!("Creating index -> uuid2rdn"); trace!("Creating index -> uuid2rdn");
self.idlayer.create_uuid2rdn()?; self.idlayer.create_uuid2rdn()?;
self.idxmeta self.idxmeta_wr
.idxkeys .idxkeys
.keys() .keys()
.try_for_each(|ikey| self.idlayer.create_idx(&ikey.attr, ikey.itype)) .try_for_each(|ikey| self.idlayer.create_idx(&ikey.attr, ikey.itype))
@ -1610,9 +1609,8 @@ impl<'a> BackendWriteTransaction<'a> {
pub fn commit(self) -> Result<(), OperationError> { pub fn commit(self) -> Result<(), OperationError> {
let BackendWriteTransaction { let BackendWriteTransaction {
idlayer, idlayer,
idxmeta: _,
ruv,
idxmeta_wr, idxmeta_wr,
ruv,
} = self; } = self;
idlayer.commit().map(|()| { idlayer.commit().map(|()| {
@ -1796,9 +1794,8 @@ impl Backend {
pub fn write(&self) -> BackendWriteTransaction { pub fn write(&self) -> BackendWriteTransaction {
BackendWriteTransaction { BackendWriteTransaction {
idlayer: self.idlayer.write(), idlayer: self.idlayer.write(),
idxmeta: self.idxmeta.read(),
ruv: self.ruv.write(),
idxmeta_wr: self.idxmeta.write(), idxmeta_wr: self.idxmeta.write(),
ruv: self.ruv.write(),
} }
} }

View file

@ -8,6 +8,10 @@ use super::ServerPhase;
impl QueryServer { impl QueryServer {
#[instrument(level = "info", name = "system_initialisation", skip_all)] #[instrument(level = "info", name = "system_initialisation", skip_all)]
pub async fn initialise_helper(&self, ts: Duration) -> Result<(), OperationError> { pub async fn initialise_helper(&self, ts: Duration) -> Result<(), OperationError> {
// We need to perform this in a single transaction pass to prevent tainting
// databases during upgrades.
let mut write_txn = self.write(ts).await;
// Check our database version - attempt to do an initial indexing // Check our database version - attempt to do an initial indexing
// based on the in memory configuration // based on the in memory configuration
// //
@ -18,10 +22,7 @@ impl QueryServer {
// A major reason here to split to multiple transactions is to allow schema // A major reason here to split to multiple transactions is to allow schema
// reloading to occur, which causes the idxmeta to update, and allows validation // reloading to occur, which causes the idxmeta to update, and allows validation
// of the schema in the subsequent steps as we proceed. // of the schema in the subsequent steps as we proceed.
let mut reindex_write_1 = self.write(ts).await; write_txn.upgrade_reindex(SYSTEM_INDEX_VERSION)?;
reindex_write_1
.upgrade_reindex(SYSTEM_INDEX_VERSION)
.and_then(|_| reindex_write_1.commit())?;
// Because we init the schema here, and commit, this reloads meaning // Because we init the schema here, and commit, this reloads meaning
// that the on-disk index meta has been loaded, so our subsequent // that the on-disk index meta has been loaded, so our subsequent
@ -32,32 +33,30 @@ impl QueryServer {
// the schema to tell us what's indexed), but because we have the in // the schema to tell us what's indexed), but because we have the in
// mem schema that defines how schema is structuded, and this is all // mem schema that defines how schema is structuded, and this is all
// marked "system", then we won't have an issue here. // marked "system", then we won't have an issue here.
let mut ts_write_1 = self.write(ts).await; write_txn
ts_write_1
.initialise_schema_core() .initialise_schema_core()
.and_then(|_| ts_write_1.commit())?; .and_then(|_| write_txn.reload())?;
let mut ts_write_2 = self.write(ts).await; write_txn
ts_write_2
.initialise_schema_idm() .initialise_schema_idm()
.and_then(|_| ts_write_2.commit())?; .and_then(|_| write_txn.reload())?;
// reindex and set to version + 1, this way when we bump the version // reindex and set to version + 1, this way when we bump the version
// we are essetially pushing this version id back up to step write_1 // we are essetially pushing this version id back up to step write_1
let mut reindex_write_2 = self.write(ts).await; write_txn
reindex_write_2
.upgrade_reindex(SYSTEM_INDEX_VERSION + 1) .upgrade_reindex(SYSTEM_INDEX_VERSION + 1)
.and_then(|_| reindex_write_2.commit())?; .and_then(|_| write_txn.reload())?;
// Force the schema to reload - this is so that any changes to index slope // Force the schema to reload - this is so that any changes to index slope
// analysis are now reflected correctly. // analysis that was performed during the reindex are now reflected correctly
// in the in-memory schema cache.
// //
// A side effect of these reloads is that other plugins or elements that reload // A side effect of these reloads is that other plugins or elements that reload
// on schema change are now setup. // on schema change are now setup.
let mut slope_reload = self.write(ts).await;
slope_reload.set_phase(ServerPhase::SchemaReady); write_txn.set_phase(ServerPhase::SchemaReady);
slope_reload.force_schema_reload(); write_txn.force_schema_reload();
slope_reload.commit()?; write_txn.reload()?;
// Now, based on the system version apply migrations. You may ask "should you not // Now, based on the system version apply migrations. You may ask "should you not
// be doing migrations before indexes?". And this is a very good question! The issue // be doing migrations before indexes?". And this is a very good question! The issue
@ -67,11 +66,11 @@ impl QueryServer {
// the indexing subsystem is schema/value agnostic - the fact the values still let their keys // the indexing subsystem is schema/value agnostic - the fact the values still let their keys
// be extracted, means that the pres indexes will be valid even though the entries are pending // be extracted, means that the pres indexes will be valid even though the entries are pending
// migration. We must be sure to NOT use EQ/SUB indexes in the migration code however! // migration. We must be sure to NOT use EQ/SUB indexes in the migration code however!
let mut migrate_txn = self.write(ts).await; //
// If we are "in the process of being setup" this is 0, and the migrations will have no // If we are "in the process of being setup" this is 0, and the migrations will have no
// effect as ... there is nothing to migrate! It allows reset of the version to 0 to force // effect as ... there is nothing to migrate! It allows reset of the version to 0 to force
// db migrations to take place. // db migrations to take place.
let system_info_version = match migrate_txn.internal_search_uuid(UUID_SYSTEM_INFO) { let system_info_version = match write_txn.internal_search_uuid(UUID_SYSTEM_INFO) {
Ok(e) => Ok(e.get_ava_single_uint32("version").unwrap_or(0)), Ok(e) => Ok(e.get_ava_single_uint32("version").unwrap_or(0)),
Err(OperationError::NoMatchingEntries) => Ok(0), Err(OperationError::NoMatchingEntries) => Ok(0),
Err(r) => Err(r), Err(r) => Err(r),
@ -86,29 +85,28 @@ impl QueryServer {
} }
if system_info_version < 9 { if system_info_version < 9 {
migrate_txn.migrate_8_to_9()?; write_txn.migrate_8_to_9()?;
} }
if system_info_version < 10 { if system_info_version < 10 {
migrate_txn.migrate_9_to_10()?; write_txn.migrate_9_to_10()?;
} }
if system_info_version < 11 { if system_info_version < 11 {
migrate_txn.migrate_10_to_11()?; write_txn.migrate_10_to_11()?;
} }
if system_info_version < 12 { if system_info_version < 12 {
migrate_txn.migrate_11_to_12()?; write_txn.migrate_11_to_12()?;
} }
} }
migrate_txn.commit()?; write_txn.reload()?;
// Migrations complete. Init idm will now set the version as needed. // Migrations complete. Init idm will now set the version as needed.
let mut ts_write_3 = self.write(ts).await; write_txn.initialise_idm().and_then(|_| {
ts_write_3.initialise_idm().and_then(|_| { write_txn.set_phase(ServerPhase::Running);
ts_write_3.set_phase(ServerPhase::Running); write_txn.commit()
ts_write_3.commit()
})?; })?;
// Here is where in the future we will need to apply domain version increments. // Here is where in the future we will need to apply domain version increments.

View file

@ -1427,8 +1427,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
*self.phase = phase *self.phase = phase
} }
#[instrument(level = "info", skip_all)] pub(crate) fn reload(&mut self) -> Result<(), OperationError> {
pub fn commit(mut self) -> Result<(), OperationError> {
// This could be faster if we cache the set of classes changed // This could be faster if we cache the set of classes changed
// in an operation so we can check if we need to do the reload or not // in an operation so we can check if we need to do the reload or not
// //
@ -1453,6 +1452,13 @@ impl<'a> QueryServerWriteTransaction<'a> {
self.reload_domain_info()?; self.reload_domain_info()?;
} }
Ok(())
}
#[instrument(level = "info", skip_all)]
pub fn commit(mut self) -> Result<(), OperationError> {
self.reload()?;
// Now destructure the transaction ready to reset it. // Now destructure the transaction ready to reset it.
let QueryServerWriteTransaction { let QueryServerWriteTransaction {
committed, committed,

View file

@ -90,8 +90,11 @@ impl ValueSetT for ValueSetJwsKeyEs256 {
} }
} }
fn contains(&self, _pv: &PartialValue) -> bool { fn contains(&self, pv: &PartialValue) -> bool {
false match pv {
PartialValue::Iutf8(kid) => self.set.iter().any(|k| k.get_kid() == kid),
_ => false,
}
} }
fn substring(&self, _pv: &PartialValue) -> bool { fn substring(&self, _pv: &PartialValue) -> bool {