From d1fe7b9127a1fb718b02fdedc7983802e615535e Mon Sep 17 00:00:00 2001 From: Firstyear Date: Tue, 5 Sep 2023 21:30:51 +1000 Subject: [PATCH] 68 20230829 replication referential integrity (#2048) * Member of works! * Hooray, refint over replication works. --- server/lib/src/plugins/attrunique.rs | 23 ++ server/lib/src/plugins/memberof.rs | 1 + server/lib/src/plugins/mod.rs | 15 +- server/lib/src/plugins/refint.rs | 301 ++++++++++---- server/lib/src/plugins/spn.rs | 1 + server/lib/src/repl/consumer.rs | 36 +- server/lib/src/repl/tests.rs | 570 ++++++++++++++++++++++++++- 7 files changed, 852 insertions(+), 95 deletions(-) diff --git a/server/lib/src/plugins/attrunique.rs b/server/lib/src/plugins/attrunique.rs index 7841505b4..522bbb226 100644 --- a/server/lib/src/plugins/attrunique.rs +++ b/server/lib/src/plugins/attrunique.rs @@ -256,6 +256,29 @@ impl Plugin for AttrUnique { r } + fn pre_repl_incremental( + _qs: &mut QueryServerWriteTransaction, + _cand: &mut [(EntryIncrementalCommitted, Arc)], + ) -> Result<(), OperationError> { + admin_error!( + "plugin {} has an unimplemented pre_repl_incremental!", + Self::id() + ); + + // Important! We need to also have a list of uuids that we conflicted AGAINST so that both + // this candidate *and* the existing item both move to conflict. This is because if we don't + // do it this way, then some nodes will conflict on potentially the inverse entries, which + // could end up pretty bad. + + // We also can't realllllyyyy rely on the cid here since it could have changed multiple times + // and may not truly reflect the accurate change times, so we have to conflict on both + // itemsthat hit the attrunique. + + // debug_assert!(false); + // Err(OperationError::InvalidState) + Ok(()) + } + #[instrument(level = "debug", name = "attrunique::verify", skip_all)] fn verify(qs: &mut QueryServerReadTransaction) -> Vec> { // Only check live entries, not recycled. diff --git a/server/lib/src/plugins/memberof.rs b/server/lib/src/plugins/memberof.rs index baeb31ebd..2521bf2a7 100644 --- a/server/lib/src/plugins/memberof.rs +++ b/server/lib/src/plugins/memberof.rs @@ -235,6 +235,7 @@ impl Plugin for MemberOf { qs: &mut QueryServerWriteTransaction, pre_cand: &[Arc], cand: &[EntrySealedCommitted], + _conflict_uuids: &[Uuid], ) -> Result<(), OperationError> { // IMPORTANT - we need this for now so that dyngroup doesn't error on us, since // repl is internal and dyngroup has a safety check to prevent external triggers. diff --git a/server/lib/src/plugins/mod.rs b/server/lib/src/plugins/mod.rs index c469fca3f..bc5bba224 100644 --- a/server/lib/src/plugins/mod.rs +++ b/server/lib/src/plugins/mod.rs @@ -157,15 +157,15 @@ trait Plugin { "plugin {} has an unimplemented pre_repl_incremental!", Self::id() ); - // debug_assert!(false); - // Err(OperationError::InvalidState) - Ok(()) + debug_assert!(false); + Err(OperationError::InvalidState) } fn post_repl_incremental( _qs: &mut QueryServerWriteTransaction, _pre_cand: &[Arc], _cand: &[EntrySealedCommitted], + _conflict_uuids: &[Uuid], ) -> Result<(), OperationError> { admin_error!( "plugin {} has an unimplemented post_repl_incremental!", @@ -352,11 +352,12 @@ impl Plugins { qs: &mut QueryServerWriteTransaction, pre_cand: &[Arc], cand: &[EntrySealedCommitted], + conflict_uuids: &[Uuid], ) -> Result<(), OperationError> { - domain::Domain::post_repl_incremental(qs, pre_cand, cand)?; - spn::Spn::post_repl_incremental(qs, pre_cand, cand)?; - refint::ReferentialIntegrity::post_repl_incremental(qs, pre_cand, cand)?; - memberof::MemberOf::post_repl_incremental(qs, pre_cand, cand) + domain::Domain::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?; + spn::Spn::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?; + refint::ReferentialIntegrity::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?; + memberof::MemberOf::post_repl_incremental(qs, pre_cand, cand, conflict_uuids) } #[instrument(level = "debug", name = "plugins::run_verify", skip_all)] diff --git a/server/lib/src/plugins/refint.rs b/server/lib/src/plugins/refint.rs index c5e9dfbc6..fccf0d631 100644 --- a/server/lib/src/plugins/refint.rs +++ b/server/lib/src/plugins/refint.rs @@ -12,11 +12,11 @@ use std::collections::BTreeSet; use std::sync::Arc; -use hashbrown::HashSet as Set; -use kanidm_proto::v1::{ConsistencyError, PluginError}; +use hashbrown::HashSet; +use kanidm_proto::v1::ConsistencyError; use crate::event::{CreateEvent, DeleteEvent, ModifyEvent}; -use crate::filter::f_eq; +use crate::filter::{f_eq, FC}; use crate::plugins::Plugin; use crate::prelude::*; use crate::schema::SchemaTransaction; @@ -24,19 +24,19 @@ use crate::schema::SchemaTransaction; pub struct ReferentialIntegrity; impl ReferentialIntegrity { - fn check_uuids_exist( + fn check_uuids_exist_fast( qs: &mut QueryServerWriteTransaction, - inner: Vec, - ) -> Result<(), OperationError> { + inner: &[Uuid], + ) -> Result { if inner.is_empty() { // There is nothing to check! Move on. trace!("no reference types modified, skipping check"); - return Ok(()); + return Ok(true); } - let inner = inner - .into_iter() - .map(|pv| f_eq(Attribute::Uuid, pv)) + let inner: Vec<_> = inner + .iter() + .map(|u| f_eq(Attribute::Uuid, PartialValue::Uuid(*u))) .collect(); // F_inc(lusion). All items of inner must be 1 or more, or the filter @@ -50,16 +50,91 @@ impl ReferentialIntegrity { // Is the existence of all id's confirmed? if b { - Ok(()) + Ok(true) } else { - admin_error!( - "UUID reference set size differs from query result size " - ); - Err(OperationError::Plugin(PluginError::ReferentialIntegrity( - "Uuid referenced not found in database".to_string(), - ))) + Ok(false) } } + + fn check_uuids_exist_slow( + qs: &mut QueryServerWriteTransaction, + inner: &[Uuid], + ) -> Result, OperationError> { + if inner.is_empty() { + // There is nothing to check! Move on. + // Should be unreachable. + trace!("no reference types modified, skipping check"); + return Ok(Vec::with_capacity(0)); + } + + let mut missing = Vec::with_capacity(inner.len()); + for u in inner { + let filt_in = filter!(f_eq(Attribute::Uuid, PartialValue::Uuid(*u))); + let b = qs.internal_exists(filt_in).map_err(|e| { + admin_error!(err = ?e, "internal exists failure"); + e + })?; + + // If it's missing, we push it to the missing set. + if !b { + missing.push(*u) + } + } + + Ok(missing) + } + + fn remove_references( + qs: &mut QueryServerWriteTransaction, + uuids: Vec, + ) -> Result<(), OperationError> { + trace!(?uuids); + + // Find all reference types in the schema + let schema = qs.get_schema(); + let ref_types = schema.get_reference_types(); + + let removed_ids: BTreeSet<_> = uuids.iter().map(|u| PartialValue::Refer(*u)).collect(); + + // Generate a filter which is the set of all schema reference types + // as EQ to all uuid of all entries in delete. - this INCLUDES recycled + // types too! + let filt = filter_all!(FC::Or( + uuids + .into_iter() + .flat_map(|u| ref_types.values().filter_map(move |r_type| { + let value_attribute = r_type.name.to_string(); + // For everything that references the uuid's in the deleted set. + let val: Result = value_attribute.try_into(); + // error!("{:?}", val); + let res = match val { + Ok(val) => { + let res = f_eq(val, PartialValue::Refer(u)); + Some(res) + } + Err(err) => { + // we shouldn't be able to get here... + admin_error!("post_delete invalid attribute specified - please log this as a bug! {:?}", err); + None + } + }; + res + })) + .collect(), + )); + + trace!("refint post_delete filter {:?}", filt); + + let mut work_set = qs.internal_search_writeable(&filt)?; + + work_set.iter_mut().for_each(|(_, post)| { + ref_types + .values() + .for_each(|attr| post.remove_avas(attr.name.as_str(), &removed_ids)); + }); + + qs.internal_apply_writable(work_set) + } } impl Plugin for ReferentialIntegrity { @@ -118,6 +193,99 @@ impl Plugin for ReferentialIntegrity { Self::post_modify_inner(qs, cand) } + fn post_repl_incremental( + qs: &mut QueryServerWriteTransaction, + pre_cand: &[Arc], + cand: &[EntrySealedCommitted], + conflict_uuids: &[Uuid], + ) -> Result<(), OperationError> { + admin_error!( + "plugin {} has an unimplemented post_repl_incremental!", + Self::id() + ); + + // I think we need to check that all values in the ref type values here + // exist, and if not, we *need to remove them*. We should probably rewrite + // how we do modify/create inner to actually return missing uuids, so that + // this fn can delete, and the other parts can report what's missing. + // + // This also becomes a path to a "ref int fixup" too? + + let uuids = Self::cand_references_to_uuid_filter(qs, cand)?; + + let all_exist_fast = Self::check_uuids_exist_fast(qs, uuids.as_slice())?; + + let mut missing_uuids = if !all_exist_fast { + debug!("Not all uuids referenced by these candidates exist. Slow path to remove them."); + Self::check_uuids_exist_slow(qs, uuids.as_slice())? + } else { + debug!("All references are valid!"); + Vec::with_capacity(0) + }; + + // If the entry has moved from a live to a deleted state we need to clean it's reference's + // that *may* have been added on this server - the same that other references would be + // deleted. + let inactive_entries: Vec<_> = std::iter::zip(pre_cand, cand) + .filter_map(|(pre, post)| { + let pre_live = pre.mask_recycled_ts().is_some(); + let post_live = post.mask_recycled_ts().is_some(); + + if !post_live && (pre_live != post_live) { + // We have moved from live to recycled/tombstoned. We need to + // ensure that these references are masked. + Some(post.get_uuid()) + } else { + None + } + }) + .collect(); + + if event_enabled!(tracing::Level::DEBUG) { + debug!("Removing the following reference uuids for entries that have become recycled or tombstoned"); + for missing in &inactive_entries { + debug!(?missing); + } + } + + // We can now combine this with the confict uuids from the incoming set. + + // In a conflict case, we need to also add these uuids to the delete logic + // since on the originator node the original uuid will still persist + // meaning the member won't be removed. + // However, on a non-primary conflict handler it will remove the member + // as well. This is annoyingly a worst case, since then *every* node will + // attempt to update the cid of this group. But I think the potential cost + // in the short term will be worth consistent references. + + if !conflict_uuids.is_empty() { + warn!("conflict uuids have been found, and must be cleaned from existing references. This is to prevent group memberships leaking to un-intended recipients."); + } + + // Now, we need to find for each of the missing uuids, which values had them. + // We could use a clever query to internal_search_writeable? + missing_uuids.extend_from_slice(conflict_uuids); + missing_uuids.extend_from_slice(&inactive_entries); + + if missing_uuids.is_empty() { + trace!("Nothing to do, shortcut"); + return Ok(()); + } + + if event_enabled!(tracing::Level::DEBUG) { + debug!("Removing the following missing reference uuids"); + for missing in &missing_uuids { + debug!(?missing); + } + } + + // Now we have to look them up and clean it up. Turns out this is the + // same code path as "post delete" so we can share that! + Self::remove_references(qs, missing_uuids) + + // Complete! + } + #[instrument(level = "debug", name = "refint_post_delete", skip_all)] fn post_delete( qs: &mut QueryServerWriteTransaction, @@ -128,56 +296,10 @@ impl Plugin for ReferentialIntegrity { // actually the bulk of the work we'll do to clean up references // when they are deleted. - // Find all reference types in the schema - let schema = qs.get_schema(); - let ref_types = schema.get_reference_types(); - // Get the UUID of all entries we are deleting let uuids: Vec = cand.iter().map(|e| e.get_uuid()).collect(); - // Generate a filter which is the set of all schema reference types - // as EQ to all uuid of all entries in delete. - this INCLUDES recycled - // types too! - let filt = filter_all!(FC::Or( - uuids - .into_iter() - .flat_map(|u| ref_types.values().filter_map(move |r_type| { - let value_attribute = r_type.name.to_string(); - // For everything that references the uuid's in the deleted set. - let val: Result = value_attribute.try_into(); - // error!("{:?}", val); - let res = match val { - Ok(val) => { - let res = f_eq(val, PartialValue::Refer(u)); - Some(res) - } - Err(err) => { - // we shouldn't be able to get here... - admin_error!("post_delete invalid attribute specified - please log this as a bug! {:?}", err); - None - } - }; - res - })) - .collect(), - )); - - trace!("refint post_delete filter {:?}", filt); - - let removed_ids: BTreeSet<_> = cand - .iter() - .map(|e| PartialValue::Refer(e.get_uuid())) - .collect(); - - let mut work_set = qs.internal_search_writeable(&filt)?; - - work_set.iter_mut().for_each(|(_, post)| { - ref_types - .values() - .for_each(|attr| post.remove_avas(attr.name.as_str(), &removed_ids)); - }); - - qs.internal_apply_writable(work_set) + Self::remove_references(qs, uuids) } #[instrument(level = "debug", name = "refint::verify", skip_all)] @@ -194,7 +316,7 @@ impl Plugin for ReferentialIntegrity { Err(e) => return vec![e], }; - let acu_map: Set = all_cand.iter().map(|e| e.get_uuid()).collect(); + let acu_map: HashSet = all_cand.iter().map(|e| e.get_uuid()).collect(); let schema = qs.get_schema(); let ref_types = schema.get_reference_types(); @@ -228,10 +350,10 @@ impl Plugin for ReferentialIntegrity { } impl ReferentialIntegrity { - fn post_modify_inner( + fn cand_references_to_uuid_filter( qs: &mut QueryServerWriteTransaction, - cand: &[Entry], - ) -> Result<(), OperationError> { + cand: &[EntrySealedCommitted], + ) -> Result, OperationError> { let schema = qs.get_schema(); let ref_types = schema.get_reference_types(); @@ -242,9 +364,10 @@ impl ReferentialIntegrity { c.attribute_equality(Attribute::Class.as_ref(), &EntryClass::DynGroup.into()); ref_types.values().filter_map(move |rtype| { - // Skip dynamic members + // Skip dynamic members, these are recalculated by the + // memberof plugin. let skip_mb = dyn_group && rtype.name == "dynmember"; - // Skip memberOf. + // Skip memberOf, also recalculated. let skip_mo = rtype.name == "memberof"; if skip_mb || skip_mo { None @@ -256,12 +379,16 @@ impl ReferentialIntegrity { }); // Could check len first? - let mut i = Vec::with_capacity(cand.len() * 2); + let mut i = Vec::with_capacity(cand.len() * 4); + let mut dedup = HashSet::new(); vsiter.try_for_each(|vs| { if let Some(uuid_iter) = vs.as_ref_uuid_iter() { uuid_iter.for_each(|u| { - i.push(PartialValue::Uuid(u)) + // Returns true if the item is NEW in the set + if dedup.insert(u) { + i.push(u) + } }); Ok(()) } else { @@ -273,7 +400,35 @@ impl ReferentialIntegrity { } })?; - Self::check_uuids_exist(qs, i) + Ok(i) + } + + fn post_modify_inner( + qs: &mut QueryServerWriteTransaction, + cand: &[EntrySealedCommitted], + ) -> Result<(), OperationError> { + let uuids = Self::cand_references_to_uuid_filter(qs, cand)?; + + let all_exist_fast = Self::check_uuids_exist_fast(qs, uuids.as_slice())?; + + if all_exist_fast { + // All good! + return Ok(()); + } + + // Okay taking the slow path now ... + let missing_uuids = Self::check_uuids_exist_slow(qs, uuids.as_slice())?; + + error!("some uuids that were referenced in this operation do not exist."); + for missing in missing_uuids { + error!(?missing); + } + + Err(OperationError::Plugin( + kanidm_proto::v1::PluginError::ReferentialIntegrity( + "Uuid referenced not found in database".to_string(), + ), + )) } } diff --git a/server/lib/src/plugins/spn.rs b/server/lib/src/plugins/spn.rs index 1b4238dea..153a67e10 100644 --- a/server/lib/src/plugins/spn.rs +++ b/server/lib/src/plugins/spn.rs @@ -77,6 +77,7 @@ impl Plugin for Spn { qs: &mut QueryServerWriteTransaction, pre_cand: &[Arc], cand: &[EntrySealedCommitted], + _conflict_uuids: &[Uuid], ) -> Result<(), OperationError> { Self::post_modify_inner(qs, pre_cand, cand) } diff --git a/server/lib/src/repl/consumer.rs b/server/lib/src/repl/consumer.rs index c2ea4d4ad..bb5dc38c5 100644 --- a/server/lib/src/repl/consumer.rs +++ b/server/lib/src/repl/consumer.rs @@ -63,6 +63,8 @@ impl<'a> QueryServerWriteTransaction<'a> { .zip(db_entries) .partition(|(ctx_ent, db_ent)| ctx_ent.is_add_conflict(db_ent.as_ref())); + debug!(conflicts = %conflicts.len(), proceed = %proceed.len()); + // Now we have a set of conflicts and a set of entries to proceed. // // /- entries that need to be created as conflicts. @@ -82,6 +84,15 @@ impl<'a> QueryServerWriteTransaction<'a> { ) .unzip(); + // ⚠️ If we end up with pre-repl returning a list of conflict uuids, we DON'T need to + // add them to this list. This is just for uuid conflicts, not higher level ones! + // + // ⚠️ We need to collect this from conflict_update since we may NOT be the originator + // server for some conflicts, but we still need to know the UUID is IN the conflict + // state for plugins. We also need to do this here before the conflict_update + // set is consumed by later steps. + let conflict_uuids: Vec<_> = conflict_update.iter().map(|(_, e)| e.get_uuid()).collect(); + // Filter out None from conflict_create let conflict_create: Vec = conflict_create.into_iter().flatten().collect(); @@ -139,11 +150,6 @@ impl<'a> QueryServerWriteTransaction<'a> { // anything hits one of these states we need to have a way to handle this too in a consistent // manner. // - - // Then similar to modify, we need the pre and post candidates. - - // We need to unzip the schema_valid and invalid entries. - self.be_txn .incremental_apply(&all_updates_valid, conflict_create) .map_err(|e| { @@ -157,15 +163,19 @@ impl<'a> QueryServerWriteTransaction<'a> { // We don't need to process conflict_creates here, since they are all conflicting // uuids which means that the uuids are all *here* so they will trigger anything // that requires processing anyway. - Plugins::run_post_repl_incremental(self, pre_cand.as_slice(), cand.as_slice()).map_err( - |e| { - admin_error!( - "Refresh operation failed (post_repl_incremental plugin), {:?}", - e - ); + Plugins::run_post_repl_incremental( + self, + pre_cand.as_slice(), + cand.as_slice(), + conflict_uuids.as_slice(), + ) + .map_err(|e| { + admin_error!( + "Refresh operation failed (post_repl_incremental plugin), {:?}", e - }, - )?; + ); + e + })?; self.changed_uuid.extend(cand.iter().map(|e| e.get_uuid())); diff --git a/server/lib/src/repl/tests.rs b/server/lib/src/repl/tests.rs index 74d223d8a..27f8dad27 100644 --- a/server/lib/src/repl/tests.rs +++ b/server/lib/src/repl/tests.rs @@ -2080,12 +2080,578 @@ async fn test_repl_increment_schema_dynamic(server_a: &QueryServer, server_b: &Q drop(server_a_txn); } +// Test memberof over replication boundaries. +#[qs_pair_test] +async fn test_repl_increment_memberof_basic(server_a: &QueryServer, server_b: &QueryServer) { + let ct = duration_from_epoch_now(); + + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + // Since memberof isn't replicated, we have to check that when a group with + // a member is sent over, it's re-calced on the other side. + + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + let t_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Account.to_value()), + (Attribute::Class.as_ref(), EntryClass::Person.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testperson1")), + (Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)), + ( + Attribute::Description.as_ref(), + Value::new_utf8s("testperson1") + ), + ( + Attribute::DisplayName.as_ref(), + Value::new_utf8s("testperson1") + ) + ),]) + .is_ok()); + + let g_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Group.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testgroup1")), + (Attribute::Uuid.as_ref(), Value::Uuid(g_uuid)), + (Attribute::Member.as_ref(), Value::Refer(t_uuid)) + ),]) + .is_ok()); + + server_a_txn.commit().expect("Failed to commit"); + + // Now replicated A -> B + + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(ct).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(g_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(g_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + assert!(e1.attribute_equality(Attribute::MemberOf.as_ref(), &PartialValue::Refer(g_uuid))); + // We should also check dyngroups too here :) + assert!(e1.attribute_equality( + Attribute::MemberOf.as_ref(), + &PartialValue::Refer(UUID_IDM_ALL_ACCOUNTS) + )); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); +} + // Test when a group has a member A, and then the group is conflicted, that when -// group is moved to conflict the memberShip of A is removed. +// group is moved to conflict the memberShip of A is removed. The conflict must be +// a non group, or a group that doesn't have the member A. +#[qs_pair_test] +async fn test_repl_increment_memberof_conflict(server_a: &QueryServer, server_b: &QueryServer) { + let ct = duration_from_epoch_now(); + + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + // First, we need to create a group on b that will conflict + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + let g_uuid = Uuid::new_v4(); + + assert!(server_b_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Group.to_value()), + ( + Attribute::Name.as_ref(), + Value::new_iname("testgroup_conflict") + ), + (Attribute::Uuid.as_ref(), Value::Uuid(g_uuid)) + ),]) + .is_ok()); + + server_b_txn.commit().expect("Failed to commit"); + + // Now on a, use the same uuid, make the user and a group as it's member. + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + let t_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Account.to_value()), + (Attribute::Class.as_ref(), EntryClass::Person.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testperson1")), + (Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)), + ( + Attribute::Description.as_ref(), + Value::new_utf8s("testperson1") + ), + ( + Attribute::DisplayName.as_ref(), + Value::new_utf8s("testperson1") + ) + ),]) + .is_ok()); + + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Group.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testgroup1")), + (Attribute::Uuid.as_ref(), Value::Uuid(g_uuid)), + (Attribute::Member.as_ref(), Value::Refer(t_uuid)) + ),]) + .is_ok()); + + server_a_txn.commit().expect("Failed to commit"); + + // Now do A -> B. B should show that the second group was a conflict and + // the membership drops. + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(ct).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + let e = server_b_txn + .internal_search_all_uuid(g_uuid) + .expect("Unable to access entry."); + assert!(!e.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid))); + assert!(e.attribute_equality( + Attribute::Name.as_ref(), + &PartialValue::new_iname("testgroup_conflict") + )); + + let e = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + assert!(!e.attribute_equality(Attribute::MemberOf.as_ref(), &PartialValue::Refer(g_uuid))); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); + + // Now B -> A. A will now reflect the conflict as well. + let mut server_b_txn = server_b.read().await; + let mut server_a_txn = server_a.write(ct).await; + + trace!("========================================"); + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(g_uuid) + .expect("Unable to access entry."); + let e2 = server_b_txn + .internal_search_all_uuid(g_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + assert!(!e1.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid))); + assert!(e1.attribute_equality( + Attribute::Name.as_ref(), + &PartialValue::new_iname("testgroup_conflict") + )); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + assert!(!e1.attribute_equality(Attribute::MemberOf.as_ref(), &PartialValue::Refer(g_uuid))); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); +} // Ref int deletes references when tombstone is replicated over. May need consumer // to have some extra groups that need cleanup +#[qs_pair_test] +async fn test_repl_increment_refint_tombstone(server_a: &QueryServer, server_b: &QueryServer) { + let ct = duration_from_epoch_now(); -// Test memberof over replication boundaries. + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + // Create a person / group on a. Don't add membership yet. + let mut server_a_txn = server_a.write(ct).await; + let t_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Account.to_value()), + (Attribute::Class.as_ref(), EntryClass::Person.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testperson1")), + (Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)), + ( + Attribute::Description.as_ref(), + Value::new_utf8s("testperson1") + ), + ( + Attribute::DisplayName.as_ref(), + Value::new_utf8s("testperson1") + ) + ),]) + .is_ok()); + + let g_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Group.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testgroup1")), + (Attribute::Uuid.as_ref(), Value::Uuid(g_uuid)) // Don't add the membership yet! + // (Attribute::Member.as_ref(), Value::Refer(t_uuid)) + ),]) + .is_ok()); + + server_a_txn.commit().expect("Failed to commit"); + + // A -> B repl. + let ct = duration_from_epoch_now(); + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(ct).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); + + // On B, delete the person. + let ct = duration_from_epoch_now(); + let mut server_b_txn = server_b.write(ct).await; + assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // On A, add person to group. + let ct = duration_from_epoch_now(); + let mut server_a_txn = server_a.write(ct).await; + assert!(server_a_txn + .internal_modify_uuid( + g_uuid, + &ModifyList::new_purge_and_set(Attribute::Member.as_ref(), Value::Refer(t_uuid)) + ) + .is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + // A -> B - B should remove the reference. + let ct = duration_from_epoch_now(); + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(ct).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + // Assert on B that Member is now gone. + let e = server_b_txn + .internal_search_all_uuid(g_uuid) + .expect("Unable to access entry."); + assert!(!e.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid))); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); + + // B -> A - A should remove the reference, everything is consistent again. + let ct = duration_from_epoch_now(); + let mut server_b_txn = server_b.read().await; + let mut server_a_txn = server_a.write(ct).await; + + trace!("========================================"); + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(g_uuid) + .expect("Unable to access entry."); + let e2 = server_b_txn + .internal_search_all_uuid(g_uuid) + .expect("Unable to access entry."); + + let e1_cs = e1.get_changestate(); + let e2_cs = e2.get_changestate(); + + assert!(e1_cs == e2_cs); + + assert!(e1 == e2); + assert!(!e1.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid))); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); +} + +#[qs_pair_test] +async fn test_repl_increment_refint_conflict(server_a: &QueryServer, server_b: &QueryServer) { + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + // On B, create a conflicting person. + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + let t_uuid = Uuid::new_v4(); + assert!(server_b_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Account.to_value()), + (Attribute::Class.as_ref(), EntryClass::Person.to_value()), + ( + Attribute::Name.as_ref(), + Value::new_iname("testperson_conflict") + ), + (Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)), + ( + Attribute::Description.as_ref(), + Value::new_utf8s("testperson1") + ), + ( + Attribute::DisplayName.as_ref(), + Value::new_utf8s("testperson1") + ) + ),]) + .is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Create a person / group on a. Add person to group. + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Account.to_value()), + (Attribute::Class.as_ref(), EntryClass::Person.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testperson1")), + (Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)), + ( + Attribute::Description.as_ref(), + Value::new_utf8s("testperson1") + ), + ( + Attribute::DisplayName.as_ref(), + Value::new_utf8s("testperson1") + ) + ),]) + .is_ok()); + + let g_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Group.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testgroup1")), + (Attribute::Uuid.as_ref(), Value::Uuid(g_uuid)), + (Attribute::Member.as_ref(), Value::Refer(t_uuid)) + ),]) + .is_ok()); + + server_a_txn.commit().expect("Failed to commit"); + + // A -> B - B should remove the reference. + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + // Note that in the case an entry conflicts we remove references to the entry that + // had the collision. This is because we don't know if our references are reflecting + // the true intent of the situation now. + // + // In this example, the users created on server A was intended to be a member of + // the group, but the user on server B *was not* intended to be a member. Therfore + // it's wrong that we retain the user from Server B *while* also the membership + // that was intended for the user on A. + let e = server_b_txn + .internal_search_all_uuid(g_uuid) + .expect("Unable to access entry."); + assert!(!e.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid))); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); + + // B -> A - A should remove the reference. + let mut server_b_txn = server_b.read().await; + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + + trace!("========================================"); + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(g_uuid) + .expect("Unable to access entry."); + let e2 = server_b_txn + .internal_search_all_uuid(g_uuid) + .expect("Unable to access entry."); + + let e1_cs = e1.get_changestate(); + let e2_cs = e2.get_changestate(); + assert!(e1_cs == e2_cs); + + assert!(e1 == e2); + assert!(!e1.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid))); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); +} + +// Ref int when we transmit a delete over the boundary. This is the opposite order to +// a previous test, where the delete is sent to the member holder first. +#[qs_pair_test] +async fn test_repl_increment_refint_delete_to_member_holder( + server_a: &QueryServer, + server_b: &QueryServer, +) { + let ct = duration_from_epoch_now(); + + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + // Create a person / group on a. Don't add membership yet. + let mut server_a_txn = server_a.write(ct).await; + let t_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Account.to_value()), + (Attribute::Class.as_ref(), EntryClass::Person.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testperson1")), + (Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)), + ( + Attribute::Description.as_ref(), + Value::new_utf8s("testperson1") + ), + ( + Attribute::DisplayName.as_ref(), + Value::new_utf8s("testperson1") + ) + ),]) + .is_ok()); + + let g_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Group.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testgroup1")), + (Attribute::Uuid.as_ref(), Value::Uuid(g_uuid)) // Don't add the membership yet! + // (Attribute::Member.as_ref(), Value::Refer(t_uuid)) + ),]) + .is_ok()); + + server_a_txn.commit().expect("Failed to commit"); + + // A -> B repl. + let ct = duration_from_epoch_now(); + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(ct).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); + + // On A, add person to group. + let ct = duration_from_epoch_now(); + let mut server_a_txn = server_a.write(ct).await; + assert!(server_a_txn + .internal_modify_uuid( + g_uuid, + &ModifyList::new_purge_and_set(Attribute::Member.as_ref(), Value::Refer(t_uuid)) + ) + .is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + // On B, delete the person. + let ct = duration_from_epoch_now(); + let mut server_b_txn = server_b.write(ct).await; + assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // B -> A - A should remove the reference, everything is consistent again. + let ct = duration_from_epoch_now(); + let mut server_b_txn = server_b.read().await; + let mut server_a_txn = server_a.write(ct).await; + + trace!("========================================"); + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e = server_a_txn + .internal_search_all_uuid(g_uuid) + .expect("Unable to access entry."); + assert!(!e.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid))); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // A -> B - Should just reflect what happened on A. + let ct = duration_from_epoch_now(); + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(ct).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(g_uuid) + .expect("Unable to access entry."); + let e2 = server_b_txn + .internal_search_all_uuid(g_uuid) + .expect("Unable to access entry."); + + let e1_cs = e1.get_changestate(); + let e2_cs = e2.get_changestate(); + + assert!(e1_cs == e2_cs); + assert!(e1 == e2); + assert!(!e1.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid))); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); +} + +// Test attrunique conflictns + +// Test ref-int when attrunique makes a conflict // Test change of domain version over incremental. +// +// todo when I have domain version migrations working.