From 8f282e3a30da892ad57de69a0bd8bb4dc8ce207c Mon Sep 17 00:00:00 2001 From: Firstyear Date: Thu, 27 Jul 2023 12:30:22 +1000 Subject: [PATCH] 68 20230720 replication improvements (#1905) --- server/lib/src/be/mod.rs | 1 - server/lib/src/constants/uuids.rs | 2 + server/lib/src/constants/values.rs | 1 + server/lib/src/entry.rs | 228 ++++++++-- server/lib/src/repl/consumer.rs | 38 +- server/lib/src/repl/entry.rs | 5 + server/lib/src/repl/tests.rs | 549 +++++++++++++++++++++++++ server/lib/src/schema.rs | 33 +- server/lib/src/server/access/search.rs | 6 +- server/lib/src/server/mod.rs | 23 ++ 10 files changed, 821 insertions(+), 65 deletions(-) diff --git a/server/lib/src/be/mod.rs b/server/lib/src/be/mod.rs index 1244ba7b0..1c863c3b7 100644 --- a/server/lib/src/be/mod.rs +++ b/server/lib/src/be/mod.rs @@ -1379,7 +1379,6 @@ impl<'a> BackendWriteTransaction<'a> { // Update the names/uuid maps. These have to mask out entries // that are recycled or tombstones, so these pretend as "deleted" // and can trigger correct actions. - // let mask_pre = pre.and_then(|e| e.mask_recycled_ts()); let mask_pre = if !uuid_same { diff --git a/server/lib/src/constants/uuids.rs b/server/lib/src/constants/uuids.rs index 2cca82cbd..5447a5caa 100644 --- a/server/lib/src/constants/uuids.rs +++ b/server/lib/src/constants/uuids.rs @@ -234,6 +234,8 @@ pub const UUID_SCHEMA_ATTR_SYNC_CREDENTIAL_PORTAL: Uuid = pub const UUID_SCHEMA_CLASS_OAUTH2_RS_PUBLIC: Uuid = uuid!("00000000-0000-0000-0000-ffff00000137"); pub const UUID_SCHEMA_ATTR_SYNC_YIELD_AUTHORITY: Uuid = uuid!("00000000-0000-0000-0000-ffff00000138"); +pub const UUID_SCHEMA_CLASS_CONFLICT: Uuid = uuid!("00000000-0000-0000-0000-ffff00000139"); +pub const UUID_SCHEMA_ATTR_SOURCE_UUID: Uuid = uuid!("00000000-0000-0000-0000-ffff00000140"); // System and domain infos // I'd like to strongly criticise william of the past for making poor choices about these allocations. diff --git a/server/lib/src/constants/values.rs b/server/lib/src/constants/values.rs index 6813d79c6..306c01fb0 100644 --- a/server/lib/src/constants/values.rs +++ b/server/lib/src/constants/values.rs @@ -16,6 +16,7 @@ lazy_static! { pub static ref PVCLASS_ACP: PartialValue = PartialValue::new_class("access_control_profile"); pub static ref PVCLASS_ATTRIBUTETYPE: PartialValue = PartialValue::new_class("attributetype"); pub static ref PVCLASS_CLASSTYPE: PartialValue = PartialValue::new_class("classtype"); + pub static ref PVCLASS_CONFLICT: PartialValue = PartialValue::new_class("conflict"); pub static ref PVCLASS_DOMAIN_INFO: PartialValue = PartialValue::new_class("domain_info"); pub static ref PVCLASS_DYNGROUP: PartialValue = PartialValue::new_class("dyngroup"); pub static ref PVCLASS_EXTENSIBLE: PartialValue = PartialValue::new_class("extensibleobject"); diff --git a/server/lib/src/entry.rs b/server/lib/src/entry.rs index 7a6dfec85..d54cf250e 100644 --- a/server/lib/src/entry.rs +++ b/server/lib/src/entry.rs @@ -106,7 +106,6 @@ pub struct EntryInit; #[derive(Clone, Debug)] pub struct EntryInvalid { cid: Cid, - // eclog: EntryChangelog, ecstate: EntryChangeState, } @@ -133,7 +132,6 @@ pub struct EntryIncremental { pub struct EntryValid { // Asserted with schema, so we know it has a UUID now ... uuid: Uuid, - // eclog: EntryChangelog, ecstate: EntryChangeState, } @@ -146,7 +144,6 @@ pub struct EntryValid { #[derive(Clone, Debug)] pub struct EntrySealed { uuid: Uuid, - // eclog: EntryChangelog, ecstate: EntryChangeState, } @@ -232,19 +229,28 @@ where } } -impl std::fmt::Display for Entry { +impl std::fmt::Display for Entry +where + STATE: Clone, +{ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{}", self.get_uuid()) } } -impl std::fmt::Display for Entry { +impl std::fmt::Display for Entry +where + STATE: Clone, +{ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "Entry in initial state") } } -impl Entry { +impl Entry +where + STATE: Clone, +{ /// Get the uuid of this entry. pub(crate) fn get_uuid(&self) -> Option { self.attrs.get("uuid").and_then(|vs| vs.to_uuid_single()) @@ -490,26 +496,25 @@ impl Entry { /// Assign the Change Identifier to this Entry, allowing it to be modified and then /// written to the `Backend` pub fn assign_cid( - mut self, + self, cid: Cid, schema: &dyn SchemaTransaction, ) -> Entry { - /* setup our last changed time */ - self.set_last_changed(cid.clone()); - /* * Create the change log. This must be the last thing BEFORE we return! * This is because we need to capture the set_last_changed attribute in * the create transition. */ - // let eclog = EntryChangelog::new(cid.clone(), self.attrs.clone(), schema); let ecstate = EntryChangeState::new(&cid, &self.attrs, schema); - Entry { + let mut ent = Entry { valid: EntryInvalid { cid, ecstate }, state: EntryNew, attrs: self.attrs, - } + }; + // trace!("trigger_last_changed - assign_cid"); + ent.trigger_last_changed(); + ent } /// Compare this entry to another. @@ -681,6 +686,131 @@ impl Entry { } } + pub(crate) fn resolve_add_conflict( + &self, + cid: &Cid, + db_ent: &EntrySealedCommitted, + ) -> (Option, EntryIncrementalCommitted) { + use crate::repl::entry::State; + debug_assert!(self.valid.uuid == db_ent.valid.uuid); + let self_cs = &self.valid.ecstate; + let db_cs = db_ent.get_changestate(); + + match (self_cs.current(), db_cs.current()) { + ( + State::Live { + at: at_left, + changes: _changes_left, + }, + State::Live { + at: at_right, + changes: _changes_right, + }, + ) => { + debug_assert!(at_left != at_right); + // Determine which of the entries must become the conflict + // and which will now persist. There are three possible cases. + // + // 1. The incoming ReplIncremental is after DBentry. This means RI is the + // conflicting node. We take no action and just return the db_ent + // as the valid state. + if at_left > at_right { + trace!("RI > DE, return DE"); + ( + None, + Entry { + valid: EntryIncremental { + uuid: db_ent.valid.uuid, + ecstate: db_cs.clone(), + }, + state: EntryCommitted { + id: db_ent.state.id, + }, + attrs: db_ent.attrs.clone(), + }, + ) + } + // + // 2. The incoming ReplIncremental is before DBentry. This means our + // DE is the conflicting note. There are now two choices: + // a. We are the origin of the DE, and thus must create the conflict + // entry for replication (to guarantee single create) + // b. We are not the origin of the DE and so do not create a conflict + // entry. + // In both cases we update the DE with the state of RI after we have + // followed the above logic. + else { + trace!("RI < DE, return RI"); + // Are we the origin? + let conflict = if at_right.s_uuid == cid.s_uuid { + trace!("Origin process conflict entry"); + // We are making a new entry! + + let mut cnf_ent = Entry { + valid: EntryInvalid { + cid: cid.clone(), + ecstate: db_cs.clone(), + }, + state: EntryNew, + attrs: db_ent.attrs.clone(), + }; + + // Setup the last changed to now. + cnf_ent.trigger_last_changed(); + + // Move the current uuid to source_uuid + cnf_ent.add_ava("source_uuid", Value::Uuid(db_ent.valid.uuid)); + + // We need to make a random uuid in the conflict gen process. + let new_uuid = Uuid::new_v4(); + cnf_ent.purge_ava("uuid"); + cnf_ent.add_ava("uuid", Value::Uuid(new_uuid)); + cnf_ent.add_ava("class", Value::new_class("recycled")); + cnf_ent.add_ava("class", Value::new_class("conflict")); + + // Now we have to internally bypass some states. + // This is okay because conflict entries aren't subject + // to schema anyway. + let Entry { + valid: EntryInvalid { cid: _, ecstate }, + state, + attrs, + } = cnf_ent; + + let cnf_ent = Entry { + valid: EntrySealed { + uuid: new_uuid, + ecstate, + }, + state, + attrs, + }; + + Some(cnf_ent) + } else { + None + }; + + ( + conflict, + Entry { + valid: EntryIncremental { + uuid: self.valid.uuid, + ecstate: self_cs.clone(), + }, + state: EntryCommitted { + id: db_ent.state.id, + }, + attrs: self.attrs.clone(), + }, + ) + } + } + // Can never get here due to is_add_conflict above. + _ => unreachable!(), + } + } + pub(crate) fn merge_state( &self, db_ent: &EntrySealedCommitted, @@ -738,6 +868,8 @@ impl Entry { #[allow(clippy::todo)] if let Some(_attr_state) = vs_left.repl_merge_valueset(vs_right) { + // TODO note: This is for special attr types that need to merge + // rather than choose content. todo!(); } else { changes.insert(attr_name.clone(), cid_left.clone()); @@ -748,6 +880,8 @@ impl Entry { #[allow(clippy::todo)] if let Some(_attr_state) = vs_right.repl_merge_valueset(vs_left) { + // TODO note: This is for special attr types that need to merge + // rather than choose content. todo!(); } else { changes.insert(attr_name.clone(), cid_right.clone()); @@ -909,11 +1043,11 @@ impl Entry { attrs: self.attrs, }; - #[allow(clippy::todo)] if let Err(e) = ne.validate(schema) { warn!(uuid = ?self.valid.uuid, err = ?e, "Entry failed schema check, moving to a conflict state"); + ne.add_ava_int("class", Value::new_class("recycled")); ne.add_ava_int("class", Value::new_class("conflict")); - todo!(); + ne.add_ava_int("source_uuid", Value::Uuid(self.valid.uuid)); } ne } @@ -1006,6 +1140,8 @@ impl Entry { pub fn to_revived(mut self) -> Self { // This will put the modify ahead of the revive transition. self.remove_ava("class", &PVCLASS_RECYCLED); + self.remove_ava("class", &PVCLASS_CONFLICT); + self.purge_ava("source_uuid"); // Change state repl doesn't need this flag // self.valid.ecstate.revive(&self.valid.cid); @@ -1139,6 +1275,11 @@ impl Entry { } impl Entry { + #[cfg(test)] + pub(crate) fn get_last_changed(&self) -> Cid { + self.valid.ecstate.get_tail_cid() + } + #[cfg(test)] pub unsafe fn into_sealed_committed(self) -> Entry { // NO-OP to satisfy macros. @@ -1748,7 +1889,14 @@ impl Entry { return Err(SchemaError::NoClassFound); } - // Do we have extensible? + if self.attribute_equality("class", &PVCLASS_CONFLICT) { + // Conflict entries are exempt from schema enforcement. Return true. + trace!("Skipping schema validation on conflict entry"); + return Ok(()); + }; + + // Do we have extensible? We still validate syntax of attrs but don't + // check for valid object structures. let extensible = self.attribute_equality("class", &PVCLASS_EXTENSIBLE); let entry_classes = self.get_ava_set("class").ok_or_else(|| { @@ -1988,11 +2136,11 @@ impl Entry { } } -impl Entry { - pub fn invalidate(mut self, cid: Cid) -> Entry { - /* Setup our last changed time. */ - self.set_last_changed(cid.clone()); - +impl Entry +where + STATE: Clone, +{ + pub fn invalidate(self, cid: Cid) -> Entry { Entry { valid: EntryInvalid { cid, @@ -2001,18 +2149,22 @@ impl Entry { state: self.state, attrs: self.attrs, } + /* Setup our last changed time. */ + // We can't actually trigger last changed here. This creates a replication loop + // inside of memberof plugin which invalidates. For now we treat last_changed + // more as "create" so we only trigger it via assign_cid in the create path + // and in conflict entry creation. + /* + trace!("trigger_last_changed - invalidate"); + ent.trigger_last_changed(); + ent + */ } pub fn get_uuid(&self) -> Uuid { self.valid.uuid } - /* - pub fn get_changelog(&self) -> &EntryChangelog { - &self.valid.eclog - } - */ - pub fn get_changestate(&self) -> &EntryChangeState { &self.valid.ecstate } @@ -2185,19 +2337,12 @@ impl Entry { } /// Update the last_changed flag of this entry to the given change identifier. + #[cfg(test)] fn set_last_changed(&mut self, cid: Cid) { let cv = vs_cid![cid]; let _ = self.attrs.insert(AttrString::from("last_modified_cid"), cv); } - #[cfg(test)] - pub(crate) fn get_last_changed(&self) -> Cid { - self.attrs - .get("last_modified_cid") - .and_then(|vs| vs.to_cid_single()) - .unwrap() - } - pub(crate) fn get_display_id(&self) -> String { self.attrs .get("spn") @@ -2704,17 +2849,20 @@ impl Entry where STATE: Clone, { + fn trigger_last_changed(&mut self) { + self.valid + .ecstate + .change_ava(&self.valid.cid, "last_modified_cid"); + let cv = vs_cid![self.valid.cid.clone()]; + let _ = self.attrs.insert(AttrString::from("last_modified_cid"), cv); + } + // This should always work? It's only on validate that we'll build // a list of syntax violations ... // If this already exists, we silently drop the event. This is because // we need this to be *state* based where we assert presence. pub fn add_ava(&mut self, attr: &str, value: Value) { self.valid.ecstate.change_ava(&self.valid.cid, attr); - /* - self.valid - .eclog - .add_ava_iter(&self.valid.cid, attr, std::iter::once(value.clone())); - */ self.add_ava_int(attr, value); } diff --git a/server/lib/src/repl/consumer.rs b/server/lib/src/repl/consumer.rs index 5acd36abf..8c82ead2b 100644 --- a/server/lib/src/repl/consumer.rs +++ b/server/lib/src/repl/consumer.rs @@ -16,7 +16,7 @@ impl<'a> QueryServerWriteTransaction<'a> { &mut self, ctx_entries: &[ReplIncrementalEntryV1], ) -> Result<(), OperationError> { - trace!(?ctx_entries); + // trace!(?ctx_entries); // No action needed for this if the entries are empty. if ctx_entries.is_empty() { @@ -43,7 +43,6 @@ impl<'a> QueryServerWriteTransaction<'a> { e })?; - trace!("==========================================="); trace!(?ctx_entries); let db_entries = self.be_txn.incremental_prepare(&ctx_entries).map_err(|e| { @@ -69,28 +68,23 @@ impl<'a> QueryServerWriteTransaction<'a> { // /- entries that need to be created as conflicts. // | /- entries that survive and need update to the db in place. // v v - - #[allow(clippy::todo)] let (conflict_create, conflict_update): ( - Vec, + Vec>, Vec<(EntryIncrementalCommitted, Arc)>, ) = conflicts .into_iter() - .map(|(_ctx_ent, _db_ent)| { - // Determine which of the entries must become the conflict - // and which will now persist. There are two possible cases. - // - // 1. The ReplIncremental is after the DBEntry, and becomes the conflict. - // This means we just update the db entry with itself. - // - // 2. The ReplIncremental is before the DBEntry, and becomes live. - // This means we have to take the DBEntry as it exists, convert - // it to a new entry. Then we have to take the repl incremental - // entry and place it into the update queue. - todo!(); - }) + .map( + |(ctx_ent, db_ent): (&EntryIncrementalNew, Arc)| { + let (opt_create, ent) = + ctx_ent.resolve_add_conflict(self.get_cid(), db_ent.as_ref()); + (opt_create, (ent, db_ent)) + }, + ) .unzip(); + // Filter out None from conflict_create + let conflict_create: Vec = conflict_create.into_iter().flatten().collect(); + let proceed_update: Vec<(EntryIncrementalCommitted, Arc)> = proceed .into_iter() .map(|(ctx_ent, db_ent)| { @@ -219,9 +213,10 @@ impl<'a> QueryServerWriteTransaction<'a> { error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly."); Ok(ConsumerState::RefreshRequired) } - #[allow(clippy::todo)] ReplIncrementalContext::UnwillingToSupply => { - todo!(); + warn!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is ahead, and replication would introduce data corruption."); + error!("This supplier's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly."); + Ok(ConsumerState::Ok) } ReplIncrementalContext::V1 { domain_version, @@ -273,6 +268,7 @@ impl<'a> QueryServerWriteTransaction<'a> { // == ⚠️ Below this point we begin to make changes! == + debug!("Applying schema entries"); // Apply the schema entries first. self.consumer_incremental_apply_entries(ctx_schema_entries) .map_err(|e| { @@ -286,6 +282,7 @@ impl<'a> QueryServerWriteTransaction<'a> { e })?; + debug!("Applying meta entries"); // Apply meta entries now. self.consumer_incremental_apply_entries(ctx_meta_entries) .map_err(|e| { @@ -305,6 +302,7 @@ impl<'a> QueryServerWriteTransaction<'a> { self.changed_schema = true; self.changed_domain = true; + debug!("Applying all context entries"); // Update all other entries now. self.consumer_incremental_apply_entries(ctx_entries) .map_err(|e| { diff --git a/server/lib/src/repl/entry.rs b/server/lib/src/repl/entry.rs index 7708ce194..448a7781f 100644 --- a/server/lib/src/repl/entry.rs +++ b/server/lib/src/repl/entry.rs @@ -137,6 +137,11 @@ impl EntryChangeState { } } + #[cfg(test)] + pub(crate) fn get_tail_cid(&self) -> Cid { + self.cid_iter().pop().cloned().unwrap() + } + pub fn cid_iter(&self) -> Vec<&Cid> { match &self.st { State::Live { at: _, changes } => { diff --git a/server/lib/src/repl/tests.rs b/server/lib/src/repl/tests.rs index 6672e24b8..43da9eb51 100644 --- a/server/lib/src/repl/tests.rs +++ b/server/lib/src/repl/tests.rs @@ -72,6 +72,8 @@ fn repl_incremental( .supplier_provide_changes(a_ruv_range) .expect("Unable to generate supplier changes"); + trace!(?changes, "supplying changes"); + // Check the changes = should be empty. to.consumer_apply_changes(&changes) .expect("Unable to apply changes to consumer."); @@ -1122,13 +1124,560 @@ async fn test_repl_increment_basic_bidirectional_tombstone( // conflict cases. // both add entry with same uuid - only one can win! +#[qs_pair_test] +async fn test_repl_increment_creation_uuid_conflict( + server_a: &QueryServer, + server_b: &QueryServer, +) { + let ct = duration_from_epoch_now(); + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn).is_ok()); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // Now create the same entry on both servers. + let t_uuid = Uuid::new_v4(); + let e_init = entry_init!( + ("class", Value::new_class("object")), + ("class", Value::new_class("person")), + ("name", Value::new_iname("testperson1")), + ("uuid", Value::Uuid(t_uuid)), + ("description", Value::new_utf8s("testperson1")), + ("displayname", Value::new_utf8s("testperson1")) + ); + + let mut server_b_txn = server_b.write(ct).await; + assert!(server_b_txn.internal_create(vec![e_init.clone(),]).is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Get a new time. + let ct = duration_from_epoch_now(); + let mut server_a_txn = server_a.write(ct).await; + assert!(server_a_txn.internal_create(vec![e_init.clone(),]).is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + // Replicate A to B. B should ignore. + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + trace!("{:?}", e1.get_last_changed()); + trace!("{:?}", e2.get_last_changed()); + // e2 from b will be smaller as it's the older entry. + assert!(e1.get_last_changed() > e2.get_last_changed()); + + // Check that no conflict entries exist yet. + let cnf_a = server_a_txn + .internal_search_conflict_uuid(t_uuid) + .expect("Unable to conflict entries."); + assert!(cnf_a.is_empty()); + let cnf_b = server_b_txn + .internal_search_conflict_uuid(t_uuid) + .expect("Unable to conflict entries."); + assert!(cnf_b.is_empty()); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); + + // Replicate B to A. A should replace with B, and create the + // conflict entry as it's the origin of the conflict. + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + trace!("========================================"); + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1.get_last_changed() == e2.get_last_changed()); + + let cnf_a = server_a_txn + .internal_search_conflict_uuid(t_uuid) + .expect("Unable to conflict entries.") + // Should be a vec. + .pop() + .expect("No conflict entries present"); + assert!(cnf_a.get_ava_single_iname("name") == Some("testperson1")); + + let cnf_b = server_b_txn + .internal_search_conflict_uuid(t_uuid) + .expect("Unable to conflict entries."); + assert!(cnf_b.is_empty()); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // At this point server a now has the conflict entry, and we have to confirm + // it can be sent to b. + + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + // Now the repl should have caused the conflict to be on both sides. + let cnf_a = server_a_txn + .internal_search_conflict_uuid(t_uuid) + .expect("Unable to conflict entries.") + // Should be a vec. + .pop() + .expect("No conflict entries present"); + + let cnf_b = server_b_txn + .internal_search_conflict_uuid(t_uuid) + .expect("Unable to conflict entries.") + // Should be a vec. + .pop() + .expect("No conflict entries present"); + + assert!(cnf_a.get_last_changed() == cnf_b.get_last_changed()); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); +} // both add entry with same uuid, but one becomes ts - ts always wins. +#[qs_pair_test] +async fn test_repl_increment_create_tombstone_uuid_conflict( + server_a: &QueryServer, + server_b: &QueryServer, +) { + let ct = duration_from_epoch_now(); + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn).is_ok()); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // Now create the same entry on both servers. + let t_uuid = Uuid::new_v4(); + let e_init = entry_init!( + ("class", Value::new_class("object")), + ("class", Value::new_class("person")), + ("name", Value::new_iname("testperson1")), + ("uuid", Value::Uuid(t_uuid)), + ("description", Value::new_utf8s("testperson1")), + ("displayname", Value::new_utf8s("testperson1")) + ); + + let mut server_b_txn = server_b.write(ct).await; + assert!(server_b_txn.internal_create(vec![e_init.clone(),]).is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Since A was added second, this should normal be the entry that loses in the + // conflict resolve case, but here because it's tombstoned, we actually see it + // persist + + // Get a new time. + let ct = duration_from_epoch_now(); + let mut server_a_txn = server_a.write(ct).await; + assert!(server_a_txn.internal_create(vec![e_init.clone(),]).is_ok()); + // Immediately send it to the shadow realm + assert!(server_a_txn.internal_delete_uuid(t_uuid).is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + // Tombstone the entry. + let ct = ct + Duration::from_secs(RECYCLEBIN_MAX_AGE + 1); + let mut server_a_txn = server_a.write(ct).await; + assert!(server_a_txn.purge_recycled().is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + // Do B -> A - no change on A. Normally this would create the conflict + // on A since it's the origin, but here since it's a TS it now takes + // precedence. + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + trace!("========================================"); + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + assert!(e1 != e2); + // E1 from A is a ts + assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE)); + // E2 from B is not a TS + assert!(!e2.attribute_equality("class", &PVCLASS_TOMBSTONE)); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // Now A -> B - this should cause B to become a TS even though it's AT is + // earlier. + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + assert!(e1 == e2); + assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE)); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); +} // both add entry with same uuid, both become ts - merge, take lowest AT. +#[qs_pair_test] +async fn test_repl_increment_create_tombstone_conflict( + server_a: &QueryServer, + server_b: &QueryServer, +) { + let ct = duration_from_epoch_now(); + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn).is_ok()); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // Now create the same entry on both servers. + let t_uuid = Uuid::new_v4(); + let e_init = entry_init!( + ("class", Value::new_class("object")), + ("class", Value::new_class("person")), + ("name", Value::new_iname("testperson1")), + ("uuid", Value::Uuid(t_uuid)), + ("description", Value::new_utf8s("testperson1")), + ("displayname", Value::new_utf8s("testperson1")) + ); + + let mut server_b_txn = server_b.write(ct).await; + assert!(server_b_txn.internal_create(vec![e_init.clone(),]).is_ok()); + // Immediately send it to the shadow realm + assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Get a new time. + let ct = duration_from_epoch_now(); + let mut server_a_txn = server_a.write(ct).await; + assert!(server_a_txn.internal_create(vec![e_init.clone(),]).is_ok()); + // Immediately send it to the shadow realm + assert!(server_a_txn.internal_delete_uuid(t_uuid).is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + // Tombstone on both sides. + let ct = ct + Duration::from_secs(RECYCLEBIN_MAX_AGE + 1); + let mut server_b_txn = server_b.write(ct).await; + assert!(server_b_txn.purge_recycled().is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + let ct = ct + Duration::from_secs(RECYCLEBIN_MAX_AGE + 2); + let mut server_a_txn = server_a.write(ct).await; + assert!(server_a_txn.purge_recycled().is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + // Since B was tombstoned first, it is the tombstone that should persist. + + // This means A -> B - no change on B, it's the persisting tombstone. + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1.get_last_changed() > e2.get_last_changed()); + // Yet, they are both TS. Curious. + assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE)); + assert!(e2.attribute_equality("class", &PVCLASS_TOMBSTONE)); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); + + // B -> A - A should now have the lower AT reflected. + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + trace!("========================================"); + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE)); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); +} + +// Test schema conflict state - add attr A on one side, and then remove the supporting +// class on the other. On repl both sides move to conflict. +#[qs_pair_test] +async fn test_repl_increment_schema_conflict(server_a: &QueryServer, server_b: &QueryServer) { + let ct = duration_from_epoch_now(); + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn).is_ok()); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // Setup the entry we plan to break. + let mut server_b_txn = server_b.write(ct).await; + let t_uuid = Uuid::new_v4(); + assert!(server_b_txn + .internal_create(vec![entry_init!( + ("class", Value::new_class("object")), + ("class", Value::new_class("person")), + ("name", Value::new_iname("testperson1")), + ("uuid", Value::Uuid(t_uuid)), + ("description", Value::new_utf8s("testperson1")), + ("displayname", Value::new_utf8s("testperson1")) + ),]) + .is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + trace!("========================================"); + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // Now at this point we need to write to both sides. The order *does* matter + // here because we need the displayname write to happen *after* the purge + // on the B node. + + // This is a really rare/wild change to swap an object out to a group but it + // works well for our test here. + let ct = ct + Duration::from_secs(1); + let mut server_b_txn = server_b.write(ct).await; + let modlist = ModifyList::new_list(vec![ + Modify::Removed("class".into(), PVCLASS_PERSON.clone()), + Modify::Present("class".into(), CLASS_GROUP.clone()), + Modify::Purged("displayname".into()), + ]); + assert!(server_b_txn.internal_modify_uuid(t_uuid, &modlist).is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // On A we'll change the displayname which is predicated on being a person still + let ct = ct + Duration::from_secs(1); + let mut server_a_txn = server_a.write(ct).await; + assert!(server_a_txn + .internal_modify_uuid( + t_uuid, + &ModifyList::new_purge_and_set( + "displayname", + Value::Utf8("Updated displayname".to_string()) + ) + ) + .is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + // Now we have to replicate again. It shouldn't matter *which* direction we go first + // because *both* should end in the conflict state. + // + // B -> A + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + trace!("========================================"); + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + + assert!(e1.attribute_equality("class", &PVCLASS_CONFLICT)); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // A -> B + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e2.attribute_equality("class", &PVCLASS_CONFLICT)); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); +} // Test RUV content when a server's changes have been trimmed out and are not present // in a refresh. This is not about tombstones, this is about attribute state. +#[qs_pair_test] +async fn test_repl_increment_consumer_lagging_attributes( + server_a: &QueryServer, + server_b: &QueryServer, +) { + let ct = duration_from_epoch_now(); + + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + // Add an entry. + let mut server_b_txn = server_b.write(ct).await; + let t_uuid = Uuid::new_v4(); + assert!(server_b_txn + .internal_create(vec![entry_init!( + ("class", Value::new_class("object")), + ("class", Value::new_class("person")), + ("name", Value::new_iname("testperson1")), + ("uuid", Value::Uuid(t_uuid)), + ("description", Value::new_utf8s("testperson1")), + ("displayname", Value::new_utf8s("testperson1")) + ),]) + .is_ok()); + + server_b_txn.commit().expect("Failed to commit"); + + // Now setup bidirectional replication. We only need to trigger B -> A + // here because that's all that has changes. + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + trace!("========================================"); + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // Okay, now we do a change on B and then we'll push time ahead of changelog + // ruv trim. This should mean that the indexes to find those changes are lost. + let ct = ct + Duration::from_secs(1); + let mut server_b_txn = server_b.write(ct).await; + assert!(server_b_txn + .internal_modify_uuid( + t_uuid, + &ModifyList::new_purge_and_set( + "displayname", + Value::Utf8("Updated displayname".to_string()) + ) + ) + .is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Now we advance the time. + let ct = ct + Duration::from_secs(CHANGELOG_MAX_AGE + 1); + + // And setup the ruv trim. This is triggered by purge/reap tombstones. + let mut server_b_txn = server_b.write(ct).await; + assert!(server_b_txn.purge_tombstones().is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Okay, ready to go. When we do A -> B or B -> A we should get appropriate + // errors regarding the delay state. + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + let a_ruv_range = server_a_txn + .consumer_get_state() + .expect("Unable to access RUV range"); + + let changes = server_b_txn + .supplier_provide_changes(a_ruv_range) + .expect("Unable to generate supplier changes"); + + assert!(matches!(changes, ReplIncrementalContext::RefreshRequired)); + + let result = server_a_txn + .consumer_apply_changes(&changes) + .expect("Unable to apply changes to consumer."); + + assert!(matches!(result, ConsumerState::RefreshRequired)); + + drop(server_a_txn); + drop(server_b_txn); + + // Reverse it! + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(ct).await; + + let b_ruv_range = server_b_txn + .consumer_get_state() + .expect("Unable to access RUV range"); + + let changes = server_a_txn + .supplier_provide_changes(b_ruv_range) + .expect("Unable to generate supplier changes"); + + assert!(matches!(changes, ReplIncrementalContext::UnwillingToSupply)); + + let result = server_b_txn + .consumer_apply_changes(&changes) + .expect("Unable to apply changes to consumer."); + + assert!(matches!(result, ConsumerState::Ok)); + + drop(server_a_txn); + drop(server_b_txn); +} // Test change of a domain name over incremental. diff --git a/server/lib/src/schema.rs b/server/lib/src/schema.rs index 6d62dbce6..8308e1a34 100644 --- a/server/lib/src/schema.rs +++ b/server/lib/src/schema.rs @@ -98,7 +98,6 @@ pub struct SchemaAttribute { impl SchemaAttribute { pub fn try_from(value: &Entry) -> Result { // Convert entry to a schema attribute. - trace!("Converting -> {}", value); // uuid let uuid = value.get_uuid(); @@ -722,6 +721,25 @@ impl<'a> SchemaWriteTransaction<'a> { syntax: SyntaxType::Uuid, }, ); + self.attributes.insert( + AttrString::from("source_uuid"), + SchemaAttribute { + name: AttrString::from("source_uuid"), + uuid: UUID_SCHEMA_ATTR_SOURCE_UUID, + description: String::from( + "The universal unique id of the source object where this conflict came from", + ), + multivalue: false, + // Uniqueness is handled by base.rs, not attrunique here due to + // needing to check recycled objects too. + unique: false, + phantom: false, + sync_allowed: false, + replicated: true, + index: vec![IndexType::Equality, IndexType::Presence], + syntax: SyntaxType::Uuid, + }, + ); self.attributes.insert( AttrString::from("last_modified_cid"), SchemaAttribute { @@ -1731,6 +1749,19 @@ impl<'a> SchemaWriteTransaction<'a> { .. Default::default() }, ); + self.classes.insert( + AttrString::from("conflict"), + SchemaClass { + name: AttrString::from("conflict"), + uuid: UUID_SCHEMA_CLASS_CONFLICT, + description: String::from( + "An entry representing conflicts that occurred during replication", + ), + systemmust: vec![AttrString::from("source_uuid")], + systemsupplements: vec![AttrString::from("recycled")], + ..Default::default() + }, + ); // sysinfo self.classes.insert( AttrString::from("system_info"), diff --git a/server/lib/src/server/access/search.rs b/server/lib/src/server/access/search.rs index 17e6c049a..dd7389c87 100644 --- a/server/lib/src/server/access/search.rs +++ b/server/lib/src/server/access/search.rs @@ -77,12 +77,12 @@ fn search_filter_entry<'a>( // If this is an internal search, return our working set. match &ident.origin { IdentType::Internal => { - trace!("Internal operation, bypassing access check"); + trace!(uuid = ?entry.get_display_id(), "Internal operation, bypassing access check"); // No need to check ACS return AccessResult::Grant; } IdentType::Synch(_) => { - security_critical!("Blocking sync check"); + security_critical!(uuid = ?entry.get_display_id(), "Blocking sync check"); return AccessResult::Denied; } IdentType::User(_) => {} @@ -109,7 +109,7 @@ fn search_filter_entry<'a>( Some(acs.attrs.iter().map(|s| s.as_str())) } else { // should this be `security_access`? - security_debug!(entry = ?entry.get_uuid(), acs = %acs.acp.name, "entry DOES NOT match acs"); + security_debug!(entry = ?entry.get_display_id(), acs = %acs.acp.name, "entry DOES NOT match acs"); None } }) diff --git a/server/lib/src/server/mod.rs b/server/lib/src/server/mod.rs index b147e0c28..14dabf9e2 100644 --- a/server/lib/src/server/mod.rs +++ b/server/lib/src/server/mod.rs @@ -445,6 +445,25 @@ pub trait QueryServerTransaction<'a> { } } + /// Get all conflict entries that originated from a source uuid. + #[instrument(level = "debug", skip_all)] + fn internal_search_conflict_uuid( + &mut self, + uuid: Uuid, + ) -> Result>, OperationError> { + let filter = filter_all!(f_and(vec![ + f_eq("source_uuid", PartialValue::Uuid(uuid)), + f_eq("class", PVCLASS_CONFLICT.clone()) + ])); + let f_valid = filter.validate(self.get_schema()).map_err(|e| { + error!(?e, "Filter Validate - SchemaViolation"); + OperationError::SchemaViolation(e) + })?; + let se = SearchEvent::new_internal(f_valid); + + self.search(&se) + } + #[instrument(level = "debug", skip_all)] fn impersonate_search_ext_uuid( &mut self, @@ -1171,6 +1190,10 @@ impl<'a> QueryServerWriteTransaction<'a> { self.curtime } + pub(crate) fn get_cid(&self) -> &Cid { + &self.cid + } + pub(crate) fn get_dyngroup_cache(&mut self) -> &mut DynGroupCache { &mut self.dyngroup_cache }