From b3aed1df34bc38b5c2fa5479869b1a2261d7e336 Mon Sep 17 00:00:00 2001 From: Firstyear Date: Tue, 12 Sep 2023 08:50:51 +1000 Subject: [PATCH] 68 20230908 replication attrunique (#2086) Co-authored-by: James Hodgkinson --- .github/ISSUE_TEMPLATE/bug_report.md | 8 +- .../developers/designs/replication_coord.md | 8 + proto/src/v1.rs | 2 +- server/lib/src/entry.rs | 17 +- server/lib/src/plugins/attrunique.rs | 433 ++++++++++++------ server/lib/src/plugins/memberof.rs | 2 +- server/lib/src/plugins/mod.rs | 63 ++- server/lib/src/plugins/refint.rs | 10 +- server/lib/src/plugins/spn.rs | 3 +- server/lib/src/repl/consumer.rs | 77 +++- server/lib/src/repl/tests.rs | 334 +++++++++++++- server/lib/src/schema.rs | 4 +- 12 files changed, 781 insertions(+), 180 deletions(-) diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index 1a99f7a89..9f73e14e5 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -13,9 +13,9 @@ assignees: '' ### Kanidm version details -* Output of `kanidm(d) version`: -* Are you running it in a container? If so, which image/tag?: -* If not a container, how'd you install it: -* Operating System / Version (On Unix please post the output of `uname -a`): +- Output of `kanidm(d) version`: +- Are you running it in a container? If so, which image/tag?: +- If not a container, how'd you install it: +- Operating System / Version (On Unix please post the output of `uname -a`): ### Any other comments diff --git a/book/src/developers/designs/replication_coord.md b/book/src/developers/designs/replication_coord.md index c2a210884..015f83764 100644 --- a/book/src/developers/designs/replication_coord.md +++ b/book/src/developers/designs/replication_coord.md @@ -171,6 +171,14 @@ For B pulling from A. Notice that automatic refresh only goes from A -> B and not the other way around. This allows one server to be "authoritative". +TODO: The node configuration will also need to list nodes that can do certain tasks. An example of +these tasks is that to prevent "update storms" a limited set of nodes should be responsible for +recycling and tombstoning of entries. These should be defined as tasks in the replication +configuration, so that the KRC can later issue out which nodes are responsible for those processes. + +These are analogous to the AD FSMO roles, but I think we need a different name for them. Single Node +Origin Task? Single Node Operation Runner? Yes I'm trying to make silly acronyms. + ### KRC Configuration > Still not fully sure about the KRC config yet. More thinking needed! diff --git a/proto/src/v1.rs b/proto/src/v1.rs index 51b02ea43..ed6aae0e0 100644 --- a/proto/src/v1.rs +++ b/proto/src/v1.rs @@ -63,7 +63,7 @@ pub enum ConsistencyError { RefintNotUpheld(u64), MemberOfInvalid(u64), InvalidAttributeType(String), - DuplicateUniqueAttribute(String), + DuplicateUniqueAttribute, InvalidSpn(u64), SqliteIntegrityFailure, BackendAllIdsSync, diff --git a/server/lib/src/entry.rs b/server/lib/src/entry.rs index 03032e0b5..ad9ce01c4 100644 --- a/server/lib/src/entry.rs +++ b/server/lib/src/entry.rs @@ -1172,7 +1172,20 @@ impl Entry { } } - /// Convert this entry into a recycled entry, that is "in the recycle bin". + /// Convert this entry into a conflict, declaring what entries it conflicted against. + pub fn to_conflict(&mut self, iter: T) + where + T: IntoIterator, + { + self.add_ava(Attribute::Class.as_ref(), EntryClass::Recycled.into()); + self.add_ava(Attribute::Class.as_ref(), EntryClass::Conflict.into()); + // Add all the source uuids we conflicted against. + for source_uuid in iter { + self.add_ava(Attribute::SourceUuid.as_ref(), Value::Uuid(source_uuid)); + } + } + + /// Extract this entry from the recycle bin into a live state. pub fn to_revived(mut self) -> Self { // This will put the modify ahead of the revive transition. self.remove_ava(ATTR_CLASS, &EntryClass::Recycled.into()); @@ -2879,7 +2892,7 @@ impl Entry { /// Determine if this entry is recycled or a tombstone, and map that to "None". This allows /// filter_map to effectively remove entries that should not be considered as "alive". pub fn mask_recycled_ts(&self) -> Option<&Self> { - // Only when cls has ts/rc then None, else lways Some(self). + // Only when cls has ts/rc then None, else always Some(self). match self.attrs.get(Attribute::Class.as_ref()) { Some(cls) => { if cls.contains(&EntryClass::Tombstone.to_partialvalue()) diff --git a/server/lib/src/plugins/attrunique.rs b/server/lib/src/plugins/attrunique.rs index 522bbb226..7134f709c 100644 --- a/server/lib/src/plugins/attrunique.rs +++ b/server/lib/src/plugins/attrunique.rs @@ -4,8 +4,8 @@ // both change approaches. // // -use std::collections::BTreeMap; use std::collections::VecDeque; +use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; use kanidm_proto::v1::{ConsistencyError, PluginError}; @@ -18,41 +18,49 @@ use crate::schema::SchemaTransaction; pub struct AttrUnique; -fn get_cand_attr_set( - cand: &[Entry], - attr: &str, -) -> Result, OperationError> { - // This is building both the set of values to search for uniqueness, but ALSO - // is detecting if any modified or current entries in the cand set also duplicated - // do to the ennforcing that the PartialValue must be unique in the cand_attr set. - let mut cand_attr: BTreeMap = BTreeMap::new(); +fn get_cand_attr_set<'a, VALID: 'a, STATE: 'a, T>( + // cand: &[Entry], + cand: T, + uniqueattrs: &[AttrString], +) -> Result>, OperationError> +where + T: IntoIterator>, +{ + let mut cand_attr: BTreeMap<(AttrString, PartialValue), Vec> = BTreeMap::new(); - cand.iter() + cand.into_iter() + // We don't need to consider recycled or tombstoned entries + .filter_map(|e| e.mask_recycled_ts()) .try_for_each(|e| { let uuid = e .get_ava_single_uuid("uuid") - .ok_or(OperationError::InvalidEntryState)?; - // Get the value and uuid - //for each value in the ava. - e.get_ava_set(attr) - .map(|vs| { - vs.to_partialvalue_iter() - .try_for_each(|v| match cand_attr.insert(v, uuid) { - None => Ok(()), - Some(vr) => { - admin_error!( - "ava already exists -> {:?}: {:?} conflicts to {:?}", - attr, - vr, - e.get_display_id() - ); - Err(OperationError::Plugin(PluginError::AttrUnique( - "ava already exists".to_string(), - ))) - } + .ok_or_else(|| { + error!("An entry is missing its uuid. This should be impossible!"); + OperationError::InvalidEntryState + })?; + + // Faster to iterate over the attr vec inside this loop. + for attr in uniqueattrs.iter() { + if let Some(vs) = e.get_ava_set(attr) { + for pv in vs.to_partialvalue_iter() { + let key = (attr.clone(), pv); + cand_attr.entry(key) + // Must have conflicted, lets append. + .and_modify(|v| { + warn!( + "ava already exists -> {:?} on entry {:?} has conflicts within change set", + attr, + e.get_display_id() + ); + v.push(uuid) }) - }) - .unwrap_or(Ok(())) + // Not found, lets setup. + .or_insert_with(|| vec![uuid]); + } + } + } + + Ok(()) }) .map(|()| cand_attr) } @@ -60,37 +68,68 @@ fn get_cand_attr_set( fn enforce_unique( qs: &mut QueryServerWriteTransaction, cand: &[Entry], - attr: &str, ) -> Result<(), OperationError> { + let uniqueattrs = { + let schema = qs.get_schema(); + schema.get_attributes_unique() + }; + // Build a set of all the value -> uuid for the cands. // If already exist, reject due to dup. - let cand_attr = get_cand_attr_set(cand, attr).map_err(|e| { - admin_error!(err = ?e, ?attr, "failed to get cand attr set"); + let cand_attr_set = get_cand_attr_set(cand, uniqueattrs).map_err(|e| { + error!(err = ?e, "failed to get cand attr set"); e })?; // No candidates to check! - if cand_attr.is_empty() { + if cand_attr_set.is_empty() { return Ok(()); } + // Now we have to identify and error on anything that has multiple items. + let mut cand_attr = Vec::with_capacity(cand_attr_set.len()); + let mut err = false; + for (key, mut uuid_set) in cand_attr_set.into_iter() { + if let Some(uuid) = uuid_set.pop() { + if uuid_set.is_empty() { + // Good, only single uuid, this can proceed. + cand_attr.push((key, uuid)); + } else { + // Multiple uuid(s) may remain, this is a conflict. We already warned on it + // before in the processing. Do we need to warn again? + err = true; + } + } else { + // Corrupt? How did we even get here? + warn!("datastructure corruption occurred while processing candidate attribute set"); + debug_assert!(false); + return Err(OperationError::Plugin(PluginError::AttrUnique( + "corruption detected".to_string(), + ))); + } + } + + if err { + return Err(OperationError::Plugin(PluginError::AttrUnique( + "duplicate value detected".to_string(), + ))); + } + // Now do an internal search on name and !uuid for each + let cand_filters: Vec<_> = cand_attr + .iter() + .map(|((attr, v), uuid)| { + // and[ attr eq k, andnot [ uuid eq v ]] + // Basically this says where name but also not self. + f_and(vec![ + FC::Eq(attr, v.clone()), + f_andnot(FC::Eq(ATTR_UUID, PartialValue::Uuid(*uuid))), + ]) + }) + .collect(); // Or - let filt_in = filter!(f_or( - // for each cand_attr - cand_attr - .iter() - .map(|(v, uuid)| { - // and[ attr eq k, andnot [ uuid eq v ]] - // Basically this says where name but also not self. - f_and(vec![ - FC::Eq(attr, v.clone()), - f_andnot(FC::Eq("uuid", PartialValue::Uuid(*uuid))), - ]) - }) - .collect() - )); + let filt_in = filter!(f_or(cand_filters.clone())); trace!(?filt_in); @@ -113,19 +152,6 @@ fn enforce_unique( // We do a bisect rather than a linear one-at-a-time search because we want to try to somewhat minimise calls // through internal exists since that has a filter resolve and validate step. - // First create the vec of filters. - let mut cand_filters: Vec<_> = cand_attr - .into_iter() - .map(|(v, uuid)| { - // and[ attr eq k, andnot [ uuid eq v ]] - // Basically this says where name but also not self. - f_and(vec![ - FC::Eq(attr, v), - f_andnot(FC::Eq(ATTR_UUID, PartialValue::Uuid(uuid))), - ]) - }) - .collect(); - // Fast-ish path. There is 0 or 1 element, so we just fast return. if cand_filters.len() < 2 { error!( @@ -137,16 +163,16 @@ fn enforce_unique( // chunks here. let mid = cand_filters.len() / 2; - let right = cand_filters.split_off(mid); + let (left, right) = cand_filters.split_at(mid); let mut queue = VecDeque::new(); - queue.push_back(cand_filters); + queue.push_back(left); queue.push_back(right); // Ok! We are setup to go - while let Some(mut cand_query) = queue.pop_front() { - let filt_in = filter!(f_or(cand_query.clone())); + while let Some(cand_query) = queue.pop_front() { + let filt_in = filter!(f_or(cand_query.to_vec())); let conflict_cand = qs.internal_exists(filt_in).map_err(|e| { admin_error!("internal exists error {:?}", e); e @@ -157,8 +183,8 @@ fn enforce_unique( if cand_query.len() >= 2 { // Continue to split to isolate. let mid = cand_query.len() / 2; - let right = cand_query.split_off(mid); - queue.push_back(cand_query); + let (left, right) = cand_query.split_at(mid); + queue.push_back(left); queue.push_back(right); // Continue! } else { @@ -184,25 +210,13 @@ impl Plugin for AttrUnique { "plugin_attrunique" } - #[instrument( - level = "debug", - name = "attrunique_pre_create_transform", - skip(qs, _ce) - )] + #[instrument(level = "debug", name = "attrunique_pre_create_transform", skip_all)] fn pre_create_transform( qs: &mut QueryServerWriteTransaction, cand: &mut Vec>, _ce: &CreateEvent, ) -> Result<(), OperationError> { - let uniqueattrs = { - let schema = qs.get_schema(); - schema.get_attributes_unique() - }; - - let r: Result<(), OperationError> = uniqueattrs - .iter() - .try_for_each(|attr| enforce_unique(qs, cand, attr.as_str())); - r + enforce_unique(qs, cand) } #[instrument(level = "debug", name = "attrunique_pre_modify", skip_all)] @@ -212,15 +226,7 @@ impl Plugin for AttrUnique { cand: &mut Vec>, _me: &ModifyEvent, ) -> Result<(), OperationError> { - let uniqueattrs = { - let schema = qs.get_schema(); - schema.get_attributes_unique() - }; - - let r: Result<(), OperationError> = uniqueattrs - .iter() - .try_for_each(|attr| enforce_unique(qs, cand, attr.as_str())); - r + enforce_unique(qs, cand) } #[instrument(level = "debug", name = "attrunique_pre_batch_modify", skip_all)] @@ -230,53 +236,227 @@ impl Plugin for AttrUnique { cand: &mut Vec>, _me: &BatchModifyEvent, ) -> Result<(), OperationError> { - let uniqueattrs = { - let schema = qs.get_schema(); - schema.get_attributes_unique() - }; - - let r: Result<(), OperationError> = uniqueattrs - .iter() - .try_for_each(|attr| enforce_unique(qs, cand, attr.as_str())); - r + enforce_unique(qs, cand) } + #[instrument(level = "debug", name = "attrunique_pre_repl_refresh", skip_all)] fn pre_repl_refresh( qs: &mut QueryServerWriteTransaction, cand: &[EntryRefreshNew], ) -> Result<(), OperationError> { + enforce_unique(qs, cand) + } + + #[instrument(level = "debug", name = "attrunique_post_repl_incremental", skip_all)] + fn post_repl_incremental_conflict( + qs: &mut QueryServerWriteTransaction, + cand: &[(EntrySealedCommitted, Arc)], + conflict_uuids: &mut BTreeSet, + ) -> Result<(), OperationError> { + // We need to detect attribute unique violations here. This can *easily* happen in + // replication since we have two nodes where different entries can modify an attribute + // and on the next incremental replication the uniqueness violation occurs. + // + // Because of this we have some key properties that we can observe. + // + // Every node when it makes a change with regard to it's own content is already compliant + // to attribute uniqueness. This means the consumers db content before we begin is + // fully consistent. + // + // As attributes can be updated multiple times before it is replicated the cid of the + // attribute may not be a true reflection of order of events when considering which + // attribute-value should survive/conflict. + // + // Attribute uniqueness constraints can *only* be violated on entries that have been + // replicated or are involved in replication (e.g. a conflict survivor entry). + // + // The content of the cand set may contain both replicated entries and conflict survivors + // that are in the process of being updated. Entries within the cand set *may* be in + // a conflict state with each other. + // + // Since this is a post operation, the content of these cand entries is *also* current + // in the database. + // + // This means that: + // * We can build a set of attr unique queries from the cand set. + // * We can ignore conflicts while building that set. + // * Any conflicts detected in the DB on executing that filter would be a super set of the + // conflicts that exist in reality. + // * All entries that are involved in the attr unique collision must become conflicts. + let uniqueattrs = { let schema = qs.get_schema(); schema.get_attributes_unique() }; - let r: Result<(), OperationError> = uniqueattrs + // Build a set of all the value -> uuid for the cands. + // If already exist, reject due to dup. + let cand_attr_set = + get_cand_attr_set(cand.iter().map(|(e, _)| e), uniqueattrs).map_err(|e| { + error!(err = ?e, "failed to get cand attr set"); + e + })?; + + // No candidates to check! + if cand_attr_set.is_empty() { + return Ok(()); + } + + // HAPPY FAST PATH - we do the fast existence query and if it passes + // we can *proceed*, nothing has conflicted. + let cand_filters: Vec<_> = cand_attr_set .iter() - .try_for_each(|attr| enforce_unique(qs, cand, attr.as_str())); - r - } + .flat_map(|((attr, v), uuids)| { + uuids.iter().map(|uuid| { + // and[ attr eq k, andnot [ uuid eq v ]] + // Basically this says where name but also not self. + f_and(vec![ + FC::Eq(attr, v.clone()), + f_andnot(FC::Eq(ATTR_UUID, PartialValue::Uuid(*uuid))), + ]) + }) + }) + .collect(); - fn pre_repl_incremental( - _qs: &mut QueryServerWriteTransaction, - _cand: &mut [(EntryIncrementalCommitted, Arc)], - ) -> Result<(), OperationError> { - admin_error!( - "plugin {} has an unimplemented pre_repl_incremental!", - Self::id() - ); + let filt_in = filter!(f_or(cand_filters)); - // Important! We need to also have a list of uuids that we conflicted AGAINST so that both - // this candidate *and* the existing item both move to conflict. This is because if we don't - // do it this way, then some nodes will conflict on potentially the inverse entries, which - // could end up pretty bad. + trace!(?filt_in); - // We also can't realllllyyyy rely on the cid here since it could have changed multiple times - // and may not truly reflect the accurate change times, so we have to conflict on both - // itemsthat hit the attrunique. + // If any results, reject. + let conflict_cand = qs.internal_exists(filt_in).map_err(|e| { + admin_error!("internal exists error {:?}", e); + e + })?; - // debug_assert!(false); - // Err(OperationError::InvalidState) - Ok(()) + if conflict_cand { + // Unlike enforce unique, we need to be more thorough here. Enforce unique + // just has to block the whole operation. We *can't* fail the operation + // in the same way, we need to individually isolate each collision to + // turn all the involved entries into conflicts. Because of this, rather + // than bisection like we do in enforce_unique to find violating entries + // for admins to read, we need to actually isolate each and every conflicting + // uuid. To achieve this we need to change the structure of the query we perform + // to actually get everything that has a conflict now. + + // For each uuid, show the set of uuids this conflicts with. + let mut conflict_uuid_map: BTreeMap> = BTreeMap::new(); + + // We need to invert this now to have a set of uuid: Vec<(attr, pv)> + // rather than the other direction which was optimised for the detection of + // candidate conflicts during updates. + + let mut cand_attr_map: BTreeMap> = BTreeMap::new(); + + cand_attr_set.into_iter().for_each(|(key, uuids)| { + uuids.into_iter().for_each(|uuid| { + cand_attr_map + .entry(uuid) + .and_modify(|set| { + set.insert(key.clone()); + }) + .or_insert_with(|| { + let mut set = BTreeSet::new(); + set.insert(key.clone()); + set + }); + }) + }); + + for (uuid, ava_set) in cand_attr_map.into_iter() { + let cand_filters: Vec<_> = ava_set + .iter() + .map(|(attr, pv)| { + f_and(vec![ + FC::Eq(attr, pv.clone()), + f_andnot(FC::Eq(ATTR_UUID, PartialValue::Uuid(uuid))), + ]) + }) + .collect(); + + let filt_in = filter!(f_or(cand_filters.clone())); + + let filt_conflicts = qs.internal_search(filt_in).map_err(|e| { + admin_error!("internal search error {:?}", e); + e + })?; + + // Important! This needs to conflict in *both directions*. We have to + // indicate that uuid has been conflicted by the entries in filt_conflicts, + // but also that the entries in filt_conflicts now conflict on us! Also remember + // that entries in either direction *may already* be in the conflict map, so we + // need to be very careful here not to stomp anything - append only! + if !filt_conflicts.is_empty() { + let mut conflict_uuid_set = BTreeSet::new(); + + for e in filt_conflicts { + // Mark that this entry conflicted to us. + conflict_uuid_set.insert(e.get_uuid()); + // Mark that the entry needs to conflict against us. + conflict_uuid_map + .entry(e.get_uuid()) + .and_modify(|set| { + set.insert(uuid); + }) + .or_insert_with(|| { + let mut set = BTreeSet::new(); + set.insert(uuid); + set + }); + } + + conflict_uuid_map + .entry(uuid) + .and_modify(|set| set.append(&mut conflict_uuid_set)) + .or_insert_with(|| conflict_uuid_set); + } + } + + trace!(?conflict_uuid_map); + + if conflict_uuid_map.is_empty() { + error!("Impossible state. Attribute unique conflicts were detected in fast path, but were not found in slow path."); + return Err(OperationError::InvalidState); + } + + // Now get all these values out with modify writable + + let filt = filter!(FC::Or( + conflict_uuid_map + .keys() + .map(|u| f_eq(Attribute::Uuid, PartialValue::Uuid(*u))) + .collect() + )); + + let mut work_set = qs.internal_search_writeable(&filt)?; + + for (_, entry) in work_set.iter_mut() { + let Some(uuid) = entry.get_uuid() else { + error!("Impossible state. Entry that was declared in conflict map does not have a uuid."); + return Err(OperationError::InvalidState); + }; + + // Add the uuid to the conflict uuids now. + conflict_uuids.insert(uuid); + + if let Some(conflict_uuid_set) = conflict_uuid_map.get(&uuid) { + entry.to_conflict(conflict_uuid_set.iter().copied()) + } else { + error!("Impossible state. Entry that was declared in conflict map was not present in work set."); + return Err(OperationError::InvalidState); + } + } + + qs.internal_apply_writable(work_set).map_err(|e| { + admin_error!("Failed to commit memberof group set {:?}", e); + e + })?; + + // Okay we *finally got here. We are done! + Ok(()) + } else { + // 🎉 + Ok(()) + } } #[instrument(level = "debug", name = "attrunique::verify", skip_all)] @@ -301,13 +481,8 @@ impl Plugin for AttrUnique { let mut res: Vec> = Vec::new(); - for attr in uniqueattrs.iter() { - // We do a fully in memory check. - if get_cand_attr_set(&all_cand, attr.as_str()).is_err() { - res.push(Err(ConsistencyError::DuplicateUniqueAttribute( - attr.to_string(), - ))) - } + if get_cand_attr_set(&all_cand, uniqueattrs).is_err() { + res.push(Err(ConsistencyError::DuplicateUniqueAttribute)) } trace!(?res); diff --git a/server/lib/src/plugins/memberof.rs b/server/lib/src/plugins/memberof.rs index 2521bf2a7..7a80084a3 100644 --- a/server/lib/src/plugins/memberof.rs +++ b/server/lib/src/plugins/memberof.rs @@ -235,7 +235,7 @@ impl Plugin for MemberOf { qs: &mut QueryServerWriteTransaction, pre_cand: &[Arc], cand: &[EntrySealedCommitted], - _conflict_uuids: &[Uuid], + _conflict_uuids: &BTreeSet, ) -> Result<(), OperationError> { // IMPORTANT - we need this for now so that dyngroup doesn't error on us, since // repl is internal and dyngroup has a safety check to prevent external triggers. diff --git a/server/lib/src/plugins/mod.rs b/server/lib/src/plugins/mod.rs index bc5bba224..eec16fec2 100644 --- a/server/lib/src/plugins/mod.rs +++ b/server/lib/src/plugins/mod.rs @@ -3,6 +3,7 @@ //! helps to ensure that data is always in specific known states within the //! `QueryServer` +use std::collections::BTreeSet; use std::sync::Arc; use kanidm_proto::v1::{ConsistencyError, OperationError}; @@ -37,6 +38,7 @@ trait Plugin { "plugin {} has an unimplemented pre_create_transform!", Self::id() ); + debug_assert!(false); Err(OperationError::InvalidState) } @@ -47,6 +49,7 @@ trait Plugin { _ce: &CreateEvent, ) -> Result<(), OperationError> { admin_error!("plugin {} has an unimplemented pre_create!", Self::id()); + debug_assert!(false); Err(OperationError::InvalidState) } @@ -57,6 +60,7 @@ trait Plugin { _ce: &CreateEvent, ) -> Result<(), OperationError> { admin_error!("plugin {} has an unimplemented post_create!", Self::id()); + debug_assert!(false); Err(OperationError::InvalidState) } @@ -67,6 +71,7 @@ trait Plugin { _me: &ModifyEvent, ) -> Result<(), OperationError> { admin_error!("plugin {} has an unimplemented pre_modify!", Self::id()); + debug_assert!(false); Err(OperationError::InvalidState) } @@ -78,6 +83,7 @@ trait Plugin { _ce: &ModifyEvent, ) -> Result<(), OperationError> { admin_error!("plugin {} has an unimplemented post_modify!", Self::id()); + debug_assert!(false); Err(OperationError::InvalidState) } @@ -91,6 +97,7 @@ trait Plugin { "plugin {} has an unimplemented pre_batch_modify!", Self::id() ); + debug_assert!(false); Err(OperationError::InvalidState) } @@ -105,6 +112,7 @@ trait Plugin { "plugin {} has an unimplemented post_batch_modify!", Self::id() ); + debug_assert!(false); Err(OperationError::InvalidState) } @@ -114,6 +122,7 @@ trait Plugin { _de: &DeleteEvent, ) -> Result<(), OperationError> { admin_error!("plugin {} has an unimplemented pre_delete!", Self::id()); + debug_assert!(false); Err(OperationError::InvalidState) } @@ -124,6 +133,7 @@ trait Plugin { _ce: &DeleteEvent, ) -> Result<(), OperationError> { admin_error!("plugin {} has an unimplemented post_delete!", Self::id()); + debug_assert!(false); Err(OperationError::InvalidState) } @@ -135,6 +145,7 @@ trait Plugin { "plugin {} has an unimplemented pre_repl_refresh!", Self::id() ); + debug_assert!(false); Err(OperationError::InvalidState) } @@ -146,6 +157,7 @@ trait Plugin { "plugin {} has an unimplemented post_repl_refresh!", Self::id() ); + debug_assert!(false); Err(OperationError::InvalidState) } @@ -161,19 +173,31 @@ trait Plugin { Err(OperationError::InvalidState) } + fn post_repl_incremental_conflict( + _qs: &mut QueryServerWriteTransaction, + _cand: &[(EntrySealedCommitted, Arc)], + _conflict_uuids: &mut BTreeSet, + ) -> Result<(), OperationError> { + admin_error!( + "plugin {} has an unimplemented post_repl_incremental_conflict!", + Self::id() + ); + debug_assert!(false); + Err(OperationError::InvalidState) + } + fn post_repl_incremental( _qs: &mut QueryServerWriteTransaction, _pre_cand: &[Arc], _cand: &[EntrySealedCommitted], - _conflict_uuids: &[Uuid], + _conflict_uuids: &BTreeSet, ) -> Result<(), OperationError> { admin_error!( "plugin {} has an unimplemented post_repl_incremental!", Self::id() ); - // debug_assert!(false); - // Err(OperationError::InvalidState) - Ok(()) + debug_assert!(false); + Err(OperationError::InvalidState) } fn verify(_qs: &mut QueryServerReadTransaction) -> Vec> { @@ -337,14 +361,28 @@ impl Plugins { #[instrument(level = "debug", name = "plugins::run_pre_repl_incremental", skip_all)] pub fn run_pre_repl_incremental( - qs: &mut QueryServerWriteTransaction, - cand: &mut [(EntryIncrementalCommitted, Arc)], + _qs: &mut QueryServerWriteTransaction, + _cand: &mut [(EntryIncrementalCommitted, Arc)], ) -> Result<(), OperationError> { // Cleanup sessions on incoming replication? May not actually - // be needed ... + // be needed since each node will be session checking and replicating + // those cleanups as needed. // session::SessionConsistency::pre_repl_incremental(qs, cand)?; - // attr unique should always be last - attrunique::AttrUnique::pre_repl_incremental(qs, cand) + Ok(()) + } + + #[instrument( + level = "debug", + name = "plugins::run_post_repl_incremental_conflict", + skip_all + )] + pub fn run_post_repl_incremental_conflict( + qs: &mut QueryServerWriteTransaction, + cand: &[(EntrySealedCommitted, Arc)], + conflict_uuids: &mut BTreeSet, + ) -> Result<(), OperationError> { + // Attr unique MUST BE FIRST. + attrunique::AttrUnique::post_repl_incremental_conflict(qs, cand, conflict_uuids) } #[instrument(level = "debug", name = "plugins::run_post_repl_incremental", skip_all)] @@ -352,11 +390,14 @@ impl Plugins { qs: &mut QueryServerWriteTransaction, pre_cand: &[Arc], cand: &[EntrySealedCommitted], - conflict_uuids: &[Uuid], + conflict_uuids: &BTreeSet, ) -> Result<(), OperationError> { - domain::Domain::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?; + // Nothing to do yet. + // domain::Domain::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?; spn::Spn::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?; + // refint MUST proceed memberof. refint::ReferentialIntegrity::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?; + // Memberof MUST BE LAST. memberof::MemberOf::post_repl_incremental(qs, pre_cand, cand, conflict_uuids) } diff --git a/server/lib/src/plugins/refint.rs b/server/lib/src/plugins/refint.rs index 85690f794..d9901d17d 100644 --- a/server/lib/src/plugins/refint.rs +++ b/server/lib/src/plugins/refint.rs @@ -193,17 +193,13 @@ impl Plugin for ReferentialIntegrity { Self::post_modify_inner(qs, cand) } + #[instrument(level = "debug", name = "refint_post_repl_incremental", skip_all)] fn post_repl_incremental( qs: &mut QueryServerWriteTransaction, pre_cand: &[Arc], cand: &[EntrySealedCommitted], - conflict_uuids: &[Uuid], + conflict_uuids: &BTreeSet, ) -> Result<(), OperationError> { - admin_error!( - "plugin {} has an unimplemented post_repl_incremental!", - Self::id() - ); - // I think we need to check that all values in the ref type values here // exist, and if not, we *need to remove them*. We should probably rewrite // how we do modify/create inner to actually return missing uuids, so that @@ -264,7 +260,7 @@ impl Plugin for ReferentialIntegrity { // Now, we need to find for each of the missing uuids, which values had them. // We could use a clever query to internal_search_writeable? - missing_uuids.extend_from_slice(conflict_uuids); + missing_uuids.extend(conflict_uuids.iter().copied()); missing_uuids.extend_from_slice(&inactive_entries); if missing_uuids.is_empty() { diff --git a/server/lib/src/plugins/spn.rs b/server/lib/src/plugins/spn.rs index 153a67e10..359c18bbf 100644 --- a/server/lib/src/plugins/spn.rs +++ b/server/lib/src/plugins/spn.rs @@ -1,5 +1,6 @@ // Generate and manage spn's for all entries in the domain. Also deals with // the infrequent - but possible - case where a domain is renamed. +use std::collections::BTreeSet; use std::iter::once; use std::sync::Arc; @@ -77,7 +78,7 @@ impl Plugin for Spn { qs: &mut QueryServerWriteTransaction, pre_cand: &[Arc], cand: &[EntrySealedCommitted], - _conflict_uuids: &[Uuid], + _conflict_uuids: &BTreeSet, ) -> Result<(), OperationError> { Self::post_modify_inner(qs, pre_cand, cand) } diff --git a/server/lib/src/repl/consumer.rs b/server/lib/src/repl/consumer.rs index bb5dc38c5..60c8a2f7e 100644 --- a/server/lib/src/repl/consumer.rs +++ b/server/lib/src/repl/consumer.rs @@ -1,7 +1,7 @@ use super::proto::*; use crate::plugins::Plugins; use crate::prelude::*; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; pub enum ConsumerState { @@ -84,14 +84,15 @@ impl<'a> QueryServerWriteTransaction<'a> { ) .unzip(); - // ⚠️ If we end up with pre-repl returning a list of conflict uuids, we DON'T need to + // ⚠️ If we end up with plugins triggering other entries to conflicts, we DON'T need to // add them to this list. This is just for uuid conflicts, not higher level ones! // // ⚠️ We need to collect this from conflict_update since we may NOT be the originator // server for some conflicts, but we still need to know the UUID is IN the conflict // state for plugins. We also need to do this here before the conflict_update // set is consumed by later steps. - let conflict_uuids: Vec<_> = conflict_update.iter().map(|(_, e)| e.get_uuid()).collect(); + let mut conflict_uuids: BTreeSet<_> = + conflict_update.iter().map(|(_, e)| e.get_uuid()).collect(); // Filter out None from conflict_create let conflict_create: Vec = conflict_create.into_iter().flatten().collect(); @@ -108,13 +109,26 @@ impl<'a> QueryServerWriteTransaction<'a> { }) .collect(); - // To be consistent to Modify, we need to run pre-modify here. + // We now merge the conflict updates and the updates that can proceed. This is correct + // since if an entry was conflicting by uuid then there is nothing for it to merge with + // so as a result we can just by pass that step. We now have all_updates which is + // the set of live entries to write back. let mut all_updates = conflict_update .into_iter() .chain(proceed_update) .collect::>(); - // Plugins can mark entries into a conflict status. + // ⚠️ This hook is probably not what you want to use for checking entries are consistent. + // + // The main issue is that at this point we have a set of entries that need to be + // created / marked into conflicts, and until that occurs it's hard to proceed with validations + // like attr unique because then we would need to walk the various sets to find cases where + // an attribute may not be unique "currently" but *would* be unique once the various entries + // have then been conflicted and updated. + // + // Instead we treat this like refint - we allow the database to "temporarily" become + // inconsistent, then we fix it immediately. This hook remains for cases in future + // where we may wish to have session cleanup performed for example. Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| { admin_error!( "Refresh operation failed (pre_repl_incremental plugin), {:?}", @@ -123,8 +137,9 @@ impl<'a> QueryServerWriteTransaction<'a> { e })?; - // Now we have to schema check our data and separate to schema_valid and - // invalid. + // Now we have to schema check our entries. Remember, here because this is + // using into_iter it's possible that entries may be conflicted due to becoming + // schema invalid during the merge process. let all_updates_valid = all_updates .into_iter() .map(|(ctx_ent, db_ent)| { @@ -139,17 +154,17 @@ impl<'a> QueryServerWriteTransaction<'a> { }) .collect::>(); - // We now have three sets! + // We now have two sets! // // * conflict_create - entries to be created that are conflicted via add statements (duplicate uuid) - // * schema_invalid - entries that were merged and their attribute state has now become invalid to schema. - // * schema_valid - entries that were merged and are schema valid. - // - // From these sets, we will move conflict_create and schema_invalid into the replication masked - // state. However schema_valid needs to be processed to check for plugin rules as well. If - // anything hits one of these states we need to have a way to handle this too in a consistent - // manner. + // these are only created on the entry origin node! + // * all_updates_valid - this has two types of entries + // * entries that have survived a uuid conflict and need inplace write. Unlikely to become invalid. + // * entries that were merged and are schema valid. + // * entries that were merged and their attribute state has now become invalid and are conflicts. // + // incremental_apply here handles both the creations and the update processes to ensure that + // everything is updated in a single consistent operation. self.be_txn .incremental_apply(&all_updates_valid, conflict_create) .map_err(|e| { @@ -157,20 +172,42 @@ impl<'a> QueryServerWriteTransaction<'a> { e })?; + Plugins::run_post_repl_incremental_conflict( + self, + all_updates_valid.as_slice(), + &mut conflict_uuids, + ) + .map_err(|e| { + error!( + "Refresh operation failed (post_repl_incremental_conflict plugin), {:?}", + e + ); + e + })?; + // Plugins need these unzipped - let (cand, pre_cand): (Vec<_>, Vec<_>) = all_updates_valid.into_iter().unzip(); + // + let (cand, pre_cand): (Vec<_>, Vec<_>) = all_updates_valid + .into_iter() + .filter(|(cand, _)| { + // Exclude anything that is conflicted as a result of the conflict plugins. + !conflict_uuids.contains(&cand.get_uuid()) + }) + .unzip(); // We don't need to process conflict_creates here, since they are all conflicting - // uuids which means that the uuids are all *here* so they will trigger anything - // that requires processing anyway. + // uuids which means that the conflict_uuids are all *here* so they will trigger anything + // that requires processing anyway. As well conflict_creates may not be the full + // set of conflict entries as we may not be the origin node! Conflict_creates is always + // a subset of the conflicts. Plugins::run_post_repl_incremental( self, pre_cand.as_slice(), cand.as_slice(), - conflict_uuids.as_slice(), + &conflict_uuids, ) .map_err(|e| { - admin_error!( + error!( "Refresh operation failed (post_repl_incremental plugin), {:?}", e ); diff --git a/server/lib/src/repl/tests.rs b/server/lib/src/repl/tests.rs index 7bbef4155..ac890f005 100644 --- a/server/lib/src/repl/tests.rs +++ b/server/lib/src/repl/tests.rs @@ -2784,9 +2784,339 @@ async fn test_repl_increment_refint_delete_to_member_holder( drop(server_a_txn); } -// Test attrunique conflictns - +// Test attrunique conflicts // Test ref-int when attrunique makes a conflict +// Test memberof when attrunique makes a conflict +#[qs_pair_test] +async fn test_repl_increment_attrunique_conflict_basic( + server_a: &QueryServer, + server_b: &QueryServer, +) { + let ct = duration_from_epoch_now(); + + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + + // To test memberof, we add a user who is MO A/B + let t_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Account.to_value()), + (Attribute::Class.as_ref(), EntryClass::Person.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testperson1")), + (Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)), + ( + Attribute::Description.as_ref(), + Value::new_utf8s("testperson1") + ), + ( + Attribute::DisplayName.as_ref(), + Value::new_utf8s("testperson1") + ) + ),]) + .is_ok()); + + let g_a_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Group.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testgroup_a")), + (Attribute::Uuid.as_ref(), Value::Uuid(g_a_uuid)), + (Attribute::Member.as_ref(), Value::Refer(t_uuid)) + ),]) + .is_ok()); + + let g_b_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Group.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testgroup_b")), + (Attribute::Uuid.as_ref(), Value::Uuid(g_b_uuid)), + (Attribute::Member.as_ref(), Value::Refer(t_uuid)) + ),]) + .is_ok()); + + // To test ref-int, we make a third group that has both a and b as members. + let g_c_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Group.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testgroup_c")), + (Attribute::Uuid.as_ref(), Value::Uuid(g_c_uuid)), + (Attribute::Member.as_ref(), Value::Refer(g_a_uuid)), + (Attribute::Member.as_ref(), Value::Refer(g_b_uuid)) + ),]) + .is_ok()); + + server_a_txn.commit().expect("Failed to commit"); + + // Now replicated A -> B + + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(g_a_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(g_a_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + + let e1 = server_a_txn + .internal_search_all_uuid(g_b_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(g_b_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); + + // At this point both sides now have the groups. Now on each node we will rename them + // so that they conflict. + + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + assert!(server_a_txn + .internal_modify_uuid( + g_a_uuid, + &ModifyList::new_purge_and_set( + Attribute::Name.as_ref(), + Value::new_iname("name_conflict") + ) + ) + .is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + assert!(server_b_txn + .internal_modify_uuid( + g_b_uuid, + &ModifyList::new_purge_and_set( + Attribute::Name.as_ref(), + Value::new_iname("name_conflict") + ) + ) + .is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Now each node has an entry, separate uuids, but a name that will violate attr + // unique on the next replicate. + // + // Order of replication doesn't matter here! Which ever one see's it first will + // conflict the entries. In this case, A will detect the attr unique violation + // and create the conflicts. + let mut server_b_txn = server_b.read().await; + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + + trace!("========================================"); + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + // The conflict should now have occurred. + // Check both groups are conflicts. + let cnf_a = server_a_txn + .internal_search_conflict_uuid(g_a_uuid) + .expect("Unable to search conflict entries.") + .pop() + .expect("No conflict entries present"); + assert!(cnf_a.get_ava_single_iname("name") == Some("name_conflict")); + + let cnf_b = server_a_txn + .internal_search_conflict_uuid(g_b_uuid) + .expect("Unable to search conflict entries.") + .pop() + .expect("No conflict entries present"); + assert!(cnf_b.get_ava_single_iname("name") == Some("name_conflict")); + + // Check the person has MO A/B removed. + let e = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + assert!(!e.attribute_equality(Attribute::MemberOf.as_ref(), &PartialValue::Refer(g_a_uuid))); + assert!(!e.attribute_equality(Attribute::MemberOf.as_ref(), &PartialValue::Refer(g_b_uuid))); + + // Check the group has M A/B removed. + let e = server_a_txn + .internal_search_all_uuid(g_c_uuid) + .expect("Unable to access entry."); + assert!(!e.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(g_a_uuid))); + assert!(!e.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(g_b_uuid))); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // Reverse it - The conflicts will now be sent back A -> B, meaning that + // everything is consistent once more. + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + let cnf_a = server_b_txn + .internal_search_conflict_uuid(g_a_uuid) + .expect("Unable to search conflict entries.") + .pop() + .expect("No conflict entries present"); + assert!(cnf_a.get_ava_single_iname("name") == Some("name_conflict")); + + let cnf_b = server_b_txn + .internal_search_conflict_uuid(g_b_uuid) + .expect("Unable to search conflict entries.") + .pop() + .expect("No conflict entries present"); + assert!(cnf_b.get_ava_single_iname("name") == Some("name_conflict")); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); +} + +// Test a complex attr unique situation when the attrunique conflict would occur normally but is +// skipped because the entry it is going to conflict against is actually a uuid conflict. + +#[qs_pair_test] +async fn test_repl_increment_attrunique_conflict_complex( + server_a: &QueryServer, + server_b: &QueryServer, +) { + let ct = duration_from_epoch_now(); + + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + + // Create two entries on A - The entry that will be an attrunique conflict + // and the entry that will UUID conflict to the second entry. The second entry + // should not attrunique conflict within server_a + + let g_a_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Group.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("name_conflict")), + (Attribute::Uuid.as_ref(), Value::Uuid(g_a_uuid)) + ),]) + .is_ok()); + + let g_b_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Group.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("uuid_conflict")), + (Attribute::Uuid.as_ref(), Value::Uuid(g_b_uuid)) + ),]) + .is_ok()); + + server_a_txn.commit().expect("Failed to commit"); + + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + + // Create an entry on B that is a uuid conflict to the second entry on A. This entry + // should *also* have an attr conflict to name on the first entry from A. + assert!(server_b_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Group.to_value()), + // Conflicting name + (Attribute::Name.as_ref(), Value::new_iname("name_conflict")), + // Conflicting uuid + (Attribute::Uuid.as_ref(), Value::Uuid(g_b_uuid)) + ),]) + .is_ok()); + + server_b_txn.commit().expect("Failed to commit"); + + // We have to replicate B -> A first. This is so that A will not load the conflict + // entry, and the entries g_a_uuid and g_b_uuid stay present. + let mut server_b_txn = server_b.read().await; + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + + trace!("========================================"); + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + // Check these entries are still present and were NOT conflicted due to attrunique + let e = server_a_txn + .internal_search_all_uuid(g_a_uuid) + .expect("Unable to access entry."); + assert!(e.attribute_equality( + Attribute::Name.as_ref(), + &PartialValue::new_iname("name_conflict") + )); + + let e = server_a_txn + .internal_search_all_uuid(g_b_uuid) + .expect("Unable to access entry."); + assert!(e.attribute_equality( + Attribute::Name.as_ref(), + &PartialValue::new_iname("uuid_conflict") + )); + + // The other entry will not be conflicted here, since A is not the origin node. + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // Replicate A -> B now. This will cause the entry to be persisted as a conflict + // as this is the origin node. We should end up with the two entries from + // server A remaining. + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + // Check these entries are still present and were NOT conflicted due to attrunique + let e = server_b_txn + .internal_search_all_uuid(g_a_uuid) + .expect("Unable to access entry."); + assert!(e.attribute_equality( + Attribute::Name.as_ref(), + &PartialValue::new_iname("name_conflict") + )); + + let e = server_b_txn + .internal_search_all_uuid(g_b_uuid) + .expect("Unable to access entry."); + assert!(e.attribute_equality( + Attribute::Name.as_ref(), + &PartialValue::new_iname("uuid_conflict") + )); + + // Check the conflict was also now created. + let cnf_a = server_b_txn + .internal_search_conflict_uuid(g_b_uuid) + .expect("Unable to search conflict entries.") + .pop() + .expect("No conflict entries present"); + assert!(cnf_a.get_ava_single_iname("name") == Some("name_conflict")); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); +} // Test change of domain version over incremental. // diff --git a/server/lib/src/schema.rs b/server/lib/src/schema.rs index 0d4554cd8..c38d5b3fd 100644 --- a/server/lib/src/schema.rs +++ b/server/lib/src/schema.rs @@ -847,9 +847,9 @@ impl<'a> SchemaWriteTransaction<'a> { name: Attribute::SourceUuid.into(), uuid: UUID_SCHEMA_ATTR_SOURCE_UUID, description: String::from( - "The universal unique id of the source object where this conflict came from", + "The universal unique id of the source object(s) which conflicted with this entry", ), - multivalue: false, + multivalue: true, // Uniqueness is handled by base.rs, not attrunique here due to // needing to check recycled objects too. unique: false,