68 20230908 replication attrunique (#2086)

Co-authored-by: James Hodgkinson <james@terminaloutcomes.com>
This commit is contained in:
Firstyear 2023-09-12 08:50:51 +10:00 committed by GitHub
parent 38b3c51862
commit b3aed1df34
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 781 additions and 180 deletions

View file

@ -13,9 +13,9 @@ assignees: ''
### Kanidm version details ### Kanidm version details
* Output of `kanidm(d) version`: - Output of `kanidm(d) version`:
* Are you running it in a container? If so, which image/tag?: - Are you running it in a container? If so, which image/tag?:
* If not a container, how'd you install it: - If not a container, how'd you install it:
* Operating System / Version (On Unix please post the output of `uname -a`): - Operating System / Version (On Unix please post the output of `uname -a`):
### Any other comments ### Any other comments

View file

@ -171,6 +171,14 @@ For B pulling from A.
Notice that automatic refresh only goes from A -> B and not the other way around. This allows one Notice that automatic refresh only goes from A -> B and not the other way around. This allows one
server to be "authoritative". server to be "authoritative".
TODO: The node configuration will also need to list nodes that can do certain tasks. An example of
these tasks is that to prevent "update storms" a limited set of nodes should be responsible for
recycling and tombstoning of entries. These should be defined as tasks in the replication
configuration, so that the KRC can later issue out which nodes are responsible for those processes.
These are analogous to the AD FSMO roles, but I think we need a different name for them. Single Node
Origin Task? Single Node Operation Runner? Yes I'm trying to make silly acronyms.
### KRC Configuration ### KRC Configuration
> Still not fully sure about the KRC config yet. More thinking needed! > Still not fully sure about the KRC config yet. More thinking needed!

View file

@ -63,7 +63,7 @@ pub enum ConsistencyError {
RefintNotUpheld(u64), RefintNotUpheld(u64),
MemberOfInvalid(u64), MemberOfInvalid(u64),
InvalidAttributeType(String), InvalidAttributeType(String),
DuplicateUniqueAttribute(String), DuplicateUniqueAttribute,
InvalidSpn(u64), InvalidSpn(u64),
SqliteIntegrityFailure, SqliteIntegrityFailure,
BackendAllIdsSync, BackendAllIdsSync,

View file

@ -1172,7 +1172,20 @@ impl Entry<EntryInvalid, EntryCommitted> {
} }
} }
/// Convert this entry into a recycled entry, that is "in the recycle bin". /// Convert this entry into a conflict, declaring what entries it conflicted against.
pub fn to_conflict<T>(&mut self, iter: T)
where
T: IntoIterator<Item = Uuid>,
{
self.add_ava(Attribute::Class.as_ref(), EntryClass::Recycled.into());
self.add_ava(Attribute::Class.as_ref(), EntryClass::Conflict.into());
// Add all the source uuids we conflicted against.
for source_uuid in iter {
self.add_ava(Attribute::SourceUuid.as_ref(), Value::Uuid(source_uuid));
}
}
/// Extract this entry from the recycle bin into a live state.
pub fn to_revived(mut self) -> Self { pub fn to_revived(mut self) -> Self {
// This will put the modify ahead of the revive transition. // This will put the modify ahead of the revive transition.
self.remove_ava(ATTR_CLASS, &EntryClass::Recycled.into()); self.remove_ava(ATTR_CLASS, &EntryClass::Recycled.into());
@ -2879,7 +2892,7 @@ impl<VALID, STATE> Entry<VALID, STATE> {
/// Determine if this entry is recycled or a tombstone, and map that to "None". This allows /// Determine if this entry is recycled or a tombstone, and map that to "None". This allows
/// filter_map to effectively remove entries that should not be considered as "alive". /// filter_map to effectively remove entries that should not be considered as "alive".
pub fn mask_recycled_ts(&self) -> Option<&Self> { pub fn mask_recycled_ts(&self) -> Option<&Self> {
// Only when cls has ts/rc then None, else lways Some(self). // Only when cls has ts/rc then None, else always Some(self).
match self.attrs.get(Attribute::Class.as_ref()) { match self.attrs.get(Attribute::Class.as_ref()) {
Some(cls) => { Some(cls) => {
if cls.contains(&EntryClass::Tombstone.to_partialvalue()) if cls.contains(&EntryClass::Tombstone.to_partialvalue())

View file

@ -4,8 +4,8 @@
// both change approaches. // both change approaches.
// //
// //
use std::collections::BTreeMap;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc; use std::sync::Arc;
use kanidm_proto::v1::{ConsistencyError, PluginError}; use kanidm_proto::v1::{ConsistencyError, PluginError};
@ -18,41 +18,49 @@ use crate::schema::SchemaTransaction;
pub struct AttrUnique; pub struct AttrUnique;
fn get_cand_attr_set<VALID, STATE>( fn get_cand_attr_set<'a, VALID: 'a, STATE: 'a, T>(
cand: &[Entry<VALID, STATE>], // cand: &[Entry<VALID, STATE>],
attr: &str, cand: T,
) -> Result<BTreeMap<PartialValue, Uuid>, OperationError> { uniqueattrs: &[AttrString],
// This is building both the set of values to search for uniqueness, but ALSO ) -> Result<BTreeMap<(AttrString, PartialValue), Vec<Uuid>>, OperationError>
// is detecting if any modified or current entries in the cand set also duplicated where
// do to the ennforcing that the PartialValue must be unique in the cand_attr set. T: IntoIterator<Item = &'a Entry<VALID, STATE>>,
let mut cand_attr: BTreeMap<PartialValue, Uuid> = BTreeMap::new(); {
let mut cand_attr: BTreeMap<(AttrString, PartialValue), Vec<Uuid>> = BTreeMap::new();
cand.iter() cand.into_iter()
// We don't need to consider recycled or tombstoned entries
.filter_map(|e| e.mask_recycled_ts())
.try_for_each(|e| { .try_for_each(|e| {
let uuid = e let uuid = e
.get_ava_single_uuid("uuid") .get_ava_single_uuid("uuid")
.ok_or(OperationError::InvalidEntryState)?; .ok_or_else(|| {
// Get the value and uuid error!("An entry is missing its uuid. This should be impossible!");
//for each value in the ava. OperationError::InvalidEntryState
e.get_ava_set(attr) })?;
.map(|vs| {
vs.to_partialvalue_iter() // Faster to iterate over the attr vec inside this loop.
.try_for_each(|v| match cand_attr.insert(v, uuid) { for attr in uniqueattrs.iter() {
None => Ok(()), if let Some(vs) = e.get_ava_set(attr) {
Some(vr) => { for pv in vs.to_partialvalue_iter() {
admin_error!( let key = (attr.clone(), pv);
"ava already exists -> {:?}: {:?} conflicts to {:?}", cand_attr.entry(key)
attr, // Must have conflicted, lets append.
vr, .and_modify(|v| {
e.get_display_id() warn!(
); "ava already exists -> {:?} on entry {:?} has conflicts within change set",
Err(OperationError::Plugin(PluginError::AttrUnique( attr,
"ava already exists".to_string(), e.get_display_id()
))) );
} v.push(uuid)
}) })
}) // Not found, lets setup.
.unwrap_or(Ok(())) .or_insert_with(|| vec![uuid]);
}
}
}
Ok(())
}) })
.map(|()| cand_attr) .map(|()| cand_attr)
} }
@ -60,37 +68,68 @@ fn get_cand_attr_set<VALID, STATE>(
fn enforce_unique<VALID, STATE>( fn enforce_unique<VALID, STATE>(
qs: &mut QueryServerWriteTransaction, qs: &mut QueryServerWriteTransaction,
cand: &[Entry<VALID, STATE>], cand: &[Entry<VALID, STATE>],
attr: &str,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
let uniqueattrs = {
let schema = qs.get_schema();
schema.get_attributes_unique()
};
// Build a set of all the value -> uuid for the cands. // Build a set of all the value -> uuid for the cands.
// If already exist, reject due to dup. // If already exist, reject due to dup.
let cand_attr = get_cand_attr_set(cand, attr).map_err(|e| { let cand_attr_set = get_cand_attr_set(cand, uniqueattrs).map_err(|e| {
admin_error!(err = ?e, ?attr, "failed to get cand attr set"); error!(err = ?e, "failed to get cand attr set");
e e
})?; })?;
// No candidates to check! // No candidates to check!
if cand_attr.is_empty() { if cand_attr_set.is_empty() {
return Ok(()); return Ok(());
} }
// Now we have to identify and error on anything that has multiple items.
let mut cand_attr = Vec::with_capacity(cand_attr_set.len());
let mut err = false;
for (key, mut uuid_set) in cand_attr_set.into_iter() {
if let Some(uuid) = uuid_set.pop() {
if uuid_set.is_empty() {
// Good, only single uuid, this can proceed.
cand_attr.push((key, uuid));
} else {
// Multiple uuid(s) may remain, this is a conflict. We already warned on it
// before in the processing. Do we need to warn again?
err = true;
}
} else {
// Corrupt? How did we even get here?
warn!("datastructure corruption occurred while processing candidate attribute set");
debug_assert!(false);
return Err(OperationError::Plugin(PluginError::AttrUnique(
"corruption detected".to_string(),
)));
}
}
if err {
return Err(OperationError::Plugin(PluginError::AttrUnique(
"duplicate value detected".to_string(),
)));
}
// Now do an internal search on name and !uuid for each // Now do an internal search on name and !uuid for each
let cand_filters: Vec<_> = cand_attr
.iter()
.map(|((attr, v), uuid)| {
// and[ attr eq k, andnot [ uuid eq v ]]
// Basically this says where name but also not self.
f_and(vec![
FC::Eq(attr, v.clone()),
f_andnot(FC::Eq(ATTR_UUID, PartialValue::Uuid(*uuid))),
])
})
.collect();
// Or // Or
let filt_in = filter!(f_or( let filt_in = filter!(f_or(cand_filters.clone()));
// for each cand_attr
cand_attr
.iter()
.map(|(v, uuid)| {
// and[ attr eq k, andnot [ uuid eq v ]]
// Basically this says where name but also not self.
f_and(vec![
FC::Eq(attr, v.clone()),
f_andnot(FC::Eq("uuid", PartialValue::Uuid(*uuid))),
])
})
.collect()
));
trace!(?filt_in); trace!(?filt_in);
@ -113,19 +152,6 @@ fn enforce_unique<VALID, STATE>(
// We do a bisect rather than a linear one-at-a-time search because we want to try to somewhat minimise calls // We do a bisect rather than a linear one-at-a-time search because we want to try to somewhat minimise calls
// through internal exists since that has a filter resolve and validate step. // through internal exists since that has a filter resolve and validate step.
// First create the vec of filters.
let mut cand_filters: Vec<_> = cand_attr
.into_iter()
.map(|(v, uuid)| {
// and[ attr eq k, andnot [ uuid eq v ]]
// Basically this says where name but also not self.
f_and(vec![
FC::Eq(attr, v),
f_andnot(FC::Eq(ATTR_UUID, PartialValue::Uuid(uuid))),
])
})
.collect();
// Fast-ish path. There is 0 or 1 element, so we just fast return. // Fast-ish path. There is 0 or 1 element, so we just fast return.
if cand_filters.len() < 2 { if cand_filters.len() < 2 {
error!( error!(
@ -137,16 +163,16 @@ fn enforce_unique<VALID, STATE>(
// chunks here. // chunks here.
let mid = cand_filters.len() / 2; let mid = cand_filters.len() / 2;
let right = cand_filters.split_off(mid); let (left, right) = cand_filters.split_at(mid);
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
queue.push_back(cand_filters); queue.push_back(left);
queue.push_back(right); queue.push_back(right);
// Ok! We are setup to go // Ok! We are setup to go
while let Some(mut cand_query) = queue.pop_front() { while let Some(cand_query) = queue.pop_front() {
let filt_in = filter!(f_or(cand_query.clone())); let filt_in = filter!(f_or(cand_query.to_vec()));
let conflict_cand = qs.internal_exists(filt_in).map_err(|e| { let conflict_cand = qs.internal_exists(filt_in).map_err(|e| {
admin_error!("internal exists error {:?}", e); admin_error!("internal exists error {:?}", e);
e e
@ -157,8 +183,8 @@ fn enforce_unique<VALID, STATE>(
if cand_query.len() >= 2 { if cand_query.len() >= 2 {
// Continue to split to isolate. // Continue to split to isolate.
let mid = cand_query.len() / 2; let mid = cand_query.len() / 2;
let right = cand_query.split_off(mid); let (left, right) = cand_query.split_at(mid);
queue.push_back(cand_query); queue.push_back(left);
queue.push_back(right); queue.push_back(right);
// Continue! // Continue!
} else { } else {
@ -184,25 +210,13 @@ impl Plugin for AttrUnique {
"plugin_attrunique" "plugin_attrunique"
} }
#[instrument( #[instrument(level = "debug", name = "attrunique_pre_create_transform", skip_all)]
level = "debug",
name = "attrunique_pre_create_transform",
skip(qs, _ce)
)]
fn pre_create_transform( fn pre_create_transform(
qs: &mut QueryServerWriteTransaction, qs: &mut QueryServerWriteTransaction,
cand: &mut Vec<Entry<EntryInvalid, EntryNew>>, cand: &mut Vec<Entry<EntryInvalid, EntryNew>>,
_ce: &CreateEvent, _ce: &CreateEvent,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
let uniqueattrs = { enforce_unique(qs, cand)
let schema = qs.get_schema();
schema.get_attributes_unique()
};
let r: Result<(), OperationError> = uniqueattrs
.iter()
.try_for_each(|attr| enforce_unique(qs, cand, attr.as_str()));
r
} }
#[instrument(level = "debug", name = "attrunique_pre_modify", skip_all)] #[instrument(level = "debug", name = "attrunique_pre_modify", skip_all)]
@ -212,15 +226,7 @@ impl Plugin for AttrUnique {
cand: &mut Vec<Entry<EntryInvalid, EntryCommitted>>, cand: &mut Vec<Entry<EntryInvalid, EntryCommitted>>,
_me: &ModifyEvent, _me: &ModifyEvent,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
let uniqueattrs = { enforce_unique(qs, cand)
let schema = qs.get_schema();
schema.get_attributes_unique()
};
let r: Result<(), OperationError> = uniqueattrs
.iter()
.try_for_each(|attr| enforce_unique(qs, cand, attr.as_str()));
r
} }
#[instrument(level = "debug", name = "attrunique_pre_batch_modify", skip_all)] #[instrument(level = "debug", name = "attrunique_pre_batch_modify", skip_all)]
@ -230,53 +236,227 @@ impl Plugin for AttrUnique {
cand: &mut Vec<Entry<EntryInvalid, EntryCommitted>>, cand: &mut Vec<Entry<EntryInvalid, EntryCommitted>>,
_me: &BatchModifyEvent, _me: &BatchModifyEvent,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
let uniqueattrs = { enforce_unique(qs, cand)
let schema = qs.get_schema();
schema.get_attributes_unique()
};
let r: Result<(), OperationError> = uniqueattrs
.iter()
.try_for_each(|attr| enforce_unique(qs, cand, attr.as_str()));
r
} }
#[instrument(level = "debug", name = "attrunique_pre_repl_refresh", skip_all)]
fn pre_repl_refresh( fn pre_repl_refresh(
qs: &mut QueryServerWriteTransaction, qs: &mut QueryServerWriteTransaction,
cand: &[EntryRefreshNew], cand: &[EntryRefreshNew],
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
enforce_unique(qs, cand)
}
#[instrument(level = "debug", name = "attrunique_post_repl_incremental", skip_all)]
fn post_repl_incremental_conflict(
qs: &mut QueryServerWriteTransaction,
cand: &[(EntrySealedCommitted, Arc<EntrySealedCommitted>)],
conflict_uuids: &mut BTreeSet<Uuid>,
) -> Result<(), OperationError> {
// We need to detect attribute unique violations here. This can *easily* happen in
// replication since we have two nodes where different entries can modify an attribute
// and on the next incremental replication the uniqueness violation occurs.
//
// Because of this we have some key properties that we can observe.
//
// Every node when it makes a change with regard to it's own content is already compliant
// to attribute uniqueness. This means the consumers db content before we begin is
// fully consistent.
//
// As attributes can be updated multiple times before it is replicated the cid of the
// attribute may not be a true reflection of order of events when considering which
// attribute-value should survive/conflict.
//
// Attribute uniqueness constraints can *only* be violated on entries that have been
// replicated or are involved in replication (e.g. a conflict survivor entry).
//
// The content of the cand set may contain both replicated entries and conflict survivors
// that are in the process of being updated. Entries within the cand set *may* be in
// a conflict state with each other.
//
// Since this is a post operation, the content of these cand entries is *also* current
// in the database.
//
// This means that:
// * We can build a set of attr unique queries from the cand set.
// * We can ignore conflicts while building that set.
// * Any conflicts detected in the DB on executing that filter would be a super set of the
// conflicts that exist in reality.
// * All entries that are involved in the attr unique collision must become conflicts.
let uniqueattrs = { let uniqueattrs = {
let schema = qs.get_schema(); let schema = qs.get_schema();
schema.get_attributes_unique() schema.get_attributes_unique()
}; };
let r: Result<(), OperationError> = uniqueattrs // Build a set of all the value -> uuid for the cands.
// If already exist, reject due to dup.
let cand_attr_set =
get_cand_attr_set(cand.iter().map(|(e, _)| e), uniqueattrs).map_err(|e| {
error!(err = ?e, "failed to get cand attr set");
e
})?;
// No candidates to check!
if cand_attr_set.is_empty() {
return Ok(());
}
// HAPPY FAST PATH - we do the fast existence query and if it passes
// we can *proceed*, nothing has conflicted.
let cand_filters: Vec<_> = cand_attr_set
.iter() .iter()
.try_for_each(|attr| enforce_unique(qs, cand, attr.as_str())); .flat_map(|((attr, v), uuids)| {
r uuids.iter().map(|uuid| {
} // and[ attr eq k, andnot [ uuid eq v ]]
// Basically this says where name but also not self.
f_and(vec![
FC::Eq(attr, v.clone()),
f_andnot(FC::Eq(ATTR_UUID, PartialValue::Uuid(*uuid))),
])
})
})
.collect();
fn pre_repl_incremental( let filt_in = filter!(f_or(cand_filters));
_qs: &mut QueryServerWriteTransaction,
_cand: &mut [(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)],
) -> Result<(), OperationError> {
admin_error!(
"plugin {} has an unimplemented pre_repl_incremental!",
Self::id()
);
// Important! We need to also have a list of uuids that we conflicted AGAINST so that both trace!(?filt_in);
// this candidate *and* the existing item both move to conflict. This is because if we don't
// do it this way, then some nodes will conflict on potentially the inverse entries, which
// could end up pretty bad.
// We also can't realllllyyyy rely on the cid here since it could have changed multiple times // If any results, reject.
// and may not truly reflect the accurate change times, so we have to conflict on both let conflict_cand = qs.internal_exists(filt_in).map_err(|e| {
// itemsthat hit the attrunique. admin_error!("internal exists error {:?}", e);
e
})?;
// debug_assert!(false); if conflict_cand {
// Err(OperationError::InvalidState) // Unlike enforce unique, we need to be more thorough here. Enforce unique
Ok(()) // just has to block the whole operation. We *can't* fail the operation
// in the same way, we need to individually isolate each collision to
// turn all the involved entries into conflicts. Because of this, rather
// than bisection like we do in enforce_unique to find violating entries
// for admins to read, we need to actually isolate each and every conflicting
// uuid. To achieve this we need to change the structure of the query we perform
// to actually get everything that has a conflict now.
// For each uuid, show the set of uuids this conflicts with.
let mut conflict_uuid_map: BTreeMap<Uuid, BTreeSet<Uuid>> = BTreeMap::new();
// We need to invert this now to have a set of uuid: Vec<(attr, pv)>
// rather than the other direction which was optimised for the detection of
// candidate conflicts during updates.
let mut cand_attr_map: BTreeMap<Uuid, BTreeSet<_>> = BTreeMap::new();
cand_attr_set.into_iter().for_each(|(key, uuids)| {
uuids.into_iter().for_each(|uuid| {
cand_attr_map
.entry(uuid)
.and_modify(|set| {
set.insert(key.clone());
})
.or_insert_with(|| {
let mut set = BTreeSet::new();
set.insert(key.clone());
set
});
})
});
for (uuid, ava_set) in cand_attr_map.into_iter() {
let cand_filters: Vec<_> = ava_set
.iter()
.map(|(attr, pv)| {
f_and(vec![
FC::Eq(attr, pv.clone()),
f_andnot(FC::Eq(ATTR_UUID, PartialValue::Uuid(uuid))),
])
})
.collect();
let filt_in = filter!(f_or(cand_filters.clone()));
let filt_conflicts = qs.internal_search(filt_in).map_err(|e| {
admin_error!("internal search error {:?}", e);
e
})?;
// Important! This needs to conflict in *both directions*. We have to
// indicate that uuid has been conflicted by the entries in filt_conflicts,
// but also that the entries in filt_conflicts now conflict on us! Also remember
// that entries in either direction *may already* be in the conflict map, so we
// need to be very careful here not to stomp anything - append only!
if !filt_conflicts.is_empty() {
let mut conflict_uuid_set = BTreeSet::new();
for e in filt_conflicts {
// Mark that this entry conflicted to us.
conflict_uuid_set.insert(e.get_uuid());
// Mark that the entry needs to conflict against us.
conflict_uuid_map
.entry(e.get_uuid())
.and_modify(|set| {
set.insert(uuid);
})
.or_insert_with(|| {
let mut set = BTreeSet::new();
set.insert(uuid);
set
});
}
conflict_uuid_map
.entry(uuid)
.and_modify(|set| set.append(&mut conflict_uuid_set))
.or_insert_with(|| conflict_uuid_set);
}
}
trace!(?conflict_uuid_map);
if conflict_uuid_map.is_empty() {
error!("Impossible state. Attribute unique conflicts were detected in fast path, but were not found in slow path.");
return Err(OperationError::InvalidState);
}
// Now get all these values out with modify writable
let filt = filter!(FC::Or(
conflict_uuid_map
.keys()
.map(|u| f_eq(Attribute::Uuid, PartialValue::Uuid(*u)))
.collect()
));
let mut work_set = qs.internal_search_writeable(&filt)?;
for (_, entry) in work_set.iter_mut() {
let Some(uuid) = entry.get_uuid() else {
error!("Impossible state. Entry that was declared in conflict map does not have a uuid.");
return Err(OperationError::InvalidState);
};
// Add the uuid to the conflict uuids now.
conflict_uuids.insert(uuid);
if let Some(conflict_uuid_set) = conflict_uuid_map.get(&uuid) {
entry.to_conflict(conflict_uuid_set.iter().copied())
} else {
error!("Impossible state. Entry that was declared in conflict map was not present in work set.");
return Err(OperationError::InvalidState);
}
}
qs.internal_apply_writable(work_set).map_err(|e| {
admin_error!("Failed to commit memberof group set {:?}", e);
e
})?;
// Okay we *finally got here. We are done!
Ok(())
} else {
// 🎉
Ok(())
}
} }
#[instrument(level = "debug", name = "attrunique::verify", skip_all)] #[instrument(level = "debug", name = "attrunique::verify", skip_all)]
@ -301,13 +481,8 @@ impl Plugin for AttrUnique {
let mut res: Vec<Result<(), ConsistencyError>> = Vec::new(); let mut res: Vec<Result<(), ConsistencyError>> = Vec::new();
for attr in uniqueattrs.iter() { if get_cand_attr_set(&all_cand, uniqueattrs).is_err() {
// We do a fully in memory check. res.push(Err(ConsistencyError::DuplicateUniqueAttribute))
if get_cand_attr_set(&all_cand, attr.as_str()).is_err() {
res.push(Err(ConsistencyError::DuplicateUniqueAttribute(
attr.to_string(),
)))
}
} }
trace!(?res); trace!(?res);

View file

@ -235,7 +235,7 @@ impl Plugin for MemberOf {
qs: &mut QueryServerWriteTransaction, qs: &mut QueryServerWriteTransaction,
pre_cand: &[Arc<EntrySealedCommitted>], pre_cand: &[Arc<EntrySealedCommitted>],
cand: &[EntrySealedCommitted], cand: &[EntrySealedCommitted],
_conflict_uuids: &[Uuid], _conflict_uuids: &BTreeSet<Uuid>,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
// IMPORTANT - we need this for now so that dyngroup doesn't error on us, since // IMPORTANT - we need this for now so that dyngroup doesn't error on us, since
// repl is internal and dyngroup has a safety check to prevent external triggers. // repl is internal and dyngroup has a safety check to prevent external triggers.

View file

@ -3,6 +3,7 @@
//! helps to ensure that data is always in specific known states within the //! helps to ensure that data is always in specific known states within the
//! `QueryServer` //! `QueryServer`
use std::collections::BTreeSet;
use std::sync::Arc; use std::sync::Arc;
use kanidm_proto::v1::{ConsistencyError, OperationError}; use kanidm_proto::v1::{ConsistencyError, OperationError};
@ -37,6 +38,7 @@ trait Plugin {
"plugin {} has an unimplemented pre_create_transform!", "plugin {} has an unimplemented pre_create_transform!",
Self::id() Self::id()
); );
debug_assert!(false);
Err(OperationError::InvalidState) Err(OperationError::InvalidState)
} }
@ -47,6 +49,7 @@ trait Plugin {
_ce: &CreateEvent, _ce: &CreateEvent,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
admin_error!("plugin {} has an unimplemented pre_create!", Self::id()); admin_error!("plugin {} has an unimplemented pre_create!", Self::id());
debug_assert!(false);
Err(OperationError::InvalidState) Err(OperationError::InvalidState)
} }
@ -57,6 +60,7 @@ trait Plugin {
_ce: &CreateEvent, _ce: &CreateEvent,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
admin_error!("plugin {} has an unimplemented post_create!", Self::id()); admin_error!("plugin {} has an unimplemented post_create!", Self::id());
debug_assert!(false);
Err(OperationError::InvalidState) Err(OperationError::InvalidState)
} }
@ -67,6 +71,7 @@ trait Plugin {
_me: &ModifyEvent, _me: &ModifyEvent,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
admin_error!("plugin {} has an unimplemented pre_modify!", Self::id()); admin_error!("plugin {} has an unimplemented pre_modify!", Self::id());
debug_assert!(false);
Err(OperationError::InvalidState) Err(OperationError::InvalidState)
} }
@ -78,6 +83,7 @@ trait Plugin {
_ce: &ModifyEvent, _ce: &ModifyEvent,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
admin_error!("plugin {} has an unimplemented post_modify!", Self::id()); admin_error!("plugin {} has an unimplemented post_modify!", Self::id());
debug_assert!(false);
Err(OperationError::InvalidState) Err(OperationError::InvalidState)
} }
@ -91,6 +97,7 @@ trait Plugin {
"plugin {} has an unimplemented pre_batch_modify!", "plugin {} has an unimplemented pre_batch_modify!",
Self::id() Self::id()
); );
debug_assert!(false);
Err(OperationError::InvalidState) Err(OperationError::InvalidState)
} }
@ -105,6 +112,7 @@ trait Plugin {
"plugin {} has an unimplemented post_batch_modify!", "plugin {} has an unimplemented post_batch_modify!",
Self::id() Self::id()
); );
debug_assert!(false);
Err(OperationError::InvalidState) Err(OperationError::InvalidState)
} }
@ -114,6 +122,7 @@ trait Plugin {
_de: &DeleteEvent, _de: &DeleteEvent,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
admin_error!("plugin {} has an unimplemented pre_delete!", Self::id()); admin_error!("plugin {} has an unimplemented pre_delete!", Self::id());
debug_assert!(false);
Err(OperationError::InvalidState) Err(OperationError::InvalidState)
} }
@ -124,6 +133,7 @@ trait Plugin {
_ce: &DeleteEvent, _ce: &DeleteEvent,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
admin_error!("plugin {} has an unimplemented post_delete!", Self::id()); admin_error!("plugin {} has an unimplemented post_delete!", Self::id());
debug_assert!(false);
Err(OperationError::InvalidState) Err(OperationError::InvalidState)
} }
@ -135,6 +145,7 @@ trait Plugin {
"plugin {} has an unimplemented pre_repl_refresh!", "plugin {} has an unimplemented pre_repl_refresh!",
Self::id() Self::id()
); );
debug_assert!(false);
Err(OperationError::InvalidState) Err(OperationError::InvalidState)
} }
@ -146,6 +157,7 @@ trait Plugin {
"plugin {} has an unimplemented post_repl_refresh!", "plugin {} has an unimplemented post_repl_refresh!",
Self::id() Self::id()
); );
debug_assert!(false);
Err(OperationError::InvalidState) Err(OperationError::InvalidState)
} }
@ -161,19 +173,31 @@ trait Plugin {
Err(OperationError::InvalidState) Err(OperationError::InvalidState)
} }
fn post_repl_incremental_conflict(
_qs: &mut QueryServerWriteTransaction,
_cand: &[(EntrySealedCommitted, Arc<EntrySealedCommitted>)],
_conflict_uuids: &mut BTreeSet<Uuid>,
) -> Result<(), OperationError> {
admin_error!(
"plugin {} has an unimplemented post_repl_incremental_conflict!",
Self::id()
);
debug_assert!(false);
Err(OperationError::InvalidState)
}
fn post_repl_incremental( fn post_repl_incremental(
_qs: &mut QueryServerWriteTransaction, _qs: &mut QueryServerWriteTransaction,
_pre_cand: &[Arc<EntrySealedCommitted>], _pre_cand: &[Arc<EntrySealedCommitted>],
_cand: &[EntrySealedCommitted], _cand: &[EntrySealedCommitted],
_conflict_uuids: &[Uuid], _conflict_uuids: &BTreeSet<Uuid>,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
admin_error!( admin_error!(
"plugin {} has an unimplemented post_repl_incremental!", "plugin {} has an unimplemented post_repl_incremental!",
Self::id() Self::id()
); );
// debug_assert!(false); debug_assert!(false);
// Err(OperationError::InvalidState) Err(OperationError::InvalidState)
Ok(())
} }
fn verify(_qs: &mut QueryServerReadTransaction) -> Vec<Result<(), ConsistencyError>> { fn verify(_qs: &mut QueryServerReadTransaction) -> Vec<Result<(), ConsistencyError>> {
@ -337,14 +361,28 @@ impl Plugins {
#[instrument(level = "debug", name = "plugins::run_pre_repl_incremental", skip_all)] #[instrument(level = "debug", name = "plugins::run_pre_repl_incremental", skip_all)]
pub fn run_pre_repl_incremental( pub fn run_pre_repl_incremental(
qs: &mut QueryServerWriteTransaction, _qs: &mut QueryServerWriteTransaction,
cand: &mut [(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)], _cand: &mut [(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)],
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
// Cleanup sessions on incoming replication? May not actually // Cleanup sessions on incoming replication? May not actually
// be needed ... // be needed since each node will be session checking and replicating
// those cleanups as needed.
// session::SessionConsistency::pre_repl_incremental(qs, cand)?; // session::SessionConsistency::pre_repl_incremental(qs, cand)?;
// attr unique should always be last Ok(())
attrunique::AttrUnique::pre_repl_incremental(qs, cand) }
#[instrument(
level = "debug",
name = "plugins::run_post_repl_incremental_conflict",
skip_all
)]
pub fn run_post_repl_incremental_conflict(
qs: &mut QueryServerWriteTransaction,
cand: &[(EntrySealedCommitted, Arc<EntrySealedCommitted>)],
conflict_uuids: &mut BTreeSet<Uuid>,
) -> Result<(), OperationError> {
// Attr unique MUST BE FIRST.
attrunique::AttrUnique::post_repl_incremental_conflict(qs, cand, conflict_uuids)
} }
#[instrument(level = "debug", name = "plugins::run_post_repl_incremental", skip_all)] #[instrument(level = "debug", name = "plugins::run_post_repl_incremental", skip_all)]
@ -352,11 +390,14 @@ impl Plugins {
qs: &mut QueryServerWriteTransaction, qs: &mut QueryServerWriteTransaction,
pre_cand: &[Arc<EntrySealedCommitted>], pre_cand: &[Arc<EntrySealedCommitted>],
cand: &[EntrySealedCommitted], cand: &[EntrySealedCommitted],
conflict_uuids: &[Uuid], conflict_uuids: &BTreeSet<Uuid>,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
domain::Domain::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?; // Nothing to do yet.
// domain::Domain::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?;
spn::Spn::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?; spn::Spn::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?;
// refint MUST proceed memberof.
refint::ReferentialIntegrity::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?; refint::ReferentialIntegrity::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?;
// Memberof MUST BE LAST.
memberof::MemberOf::post_repl_incremental(qs, pre_cand, cand, conflict_uuids) memberof::MemberOf::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)
} }

View file

@ -193,17 +193,13 @@ impl Plugin for ReferentialIntegrity {
Self::post_modify_inner(qs, cand) Self::post_modify_inner(qs, cand)
} }
#[instrument(level = "debug", name = "refint_post_repl_incremental", skip_all)]
fn post_repl_incremental( fn post_repl_incremental(
qs: &mut QueryServerWriteTransaction, qs: &mut QueryServerWriteTransaction,
pre_cand: &[Arc<EntrySealedCommitted>], pre_cand: &[Arc<EntrySealedCommitted>],
cand: &[EntrySealedCommitted], cand: &[EntrySealedCommitted],
conflict_uuids: &[Uuid], conflict_uuids: &BTreeSet<Uuid>,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
admin_error!(
"plugin {} has an unimplemented post_repl_incremental!",
Self::id()
);
// I think we need to check that all values in the ref type values here // I think we need to check that all values in the ref type values here
// exist, and if not, we *need to remove them*. We should probably rewrite // exist, and if not, we *need to remove them*. We should probably rewrite
// how we do modify/create inner to actually return missing uuids, so that // how we do modify/create inner to actually return missing uuids, so that
@ -264,7 +260,7 @@ impl Plugin for ReferentialIntegrity {
// Now, we need to find for each of the missing uuids, which values had them. // Now, we need to find for each of the missing uuids, which values had them.
// We could use a clever query to internal_search_writeable? // We could use a clever query to internal_search_writeable?
missing_uuids.extend_from_slice(conflict_uuids); missing_uuids.extend(conflict_uuids.iter().copied());
missing_uuids.extend_from_slice(&inactive_entries); missing_uuids.extend_from_slice(&inactive_entries);
if missing_uuids.is_empty() { if missing_uuids.is_empty() {

View file

@ -1,5 +1,6 @@
// Generate and manage spn's for all entries in the domain. Also deals with // Generate and manage spn's for all entries in the domain. Also deals with
// the infrequent - but possible - case where a domain is renamed. // the infrequent - but possible - case where a domain is renamed.
use std::collections::BTreeSet;
use std::iter::once; use std::iter::once;
use std::sync::Arc; use std::sync::Arc;
@ -77,7 +78,7 @@ impl Plugin for Spn {
qs: &mut QueryServerWriteTransaction, qs: &mut QueryServerWriteTransaction,
pre_cand: &[Arc<EntrySealedCommitted>], pre_cand: &[Arc<EntrySealedCommitted>],
cand: &[EntrySealedCommitted], cand: &[EntrySealedCommitted],
_conflict_uuids: &[Uuid], _conflict_uuids: &BTreeSet<Uuid>,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
Self::post_modify_inner(qs, pre_cand, cand) Self::post_modify_inner(qs, pre_cand, cand)
} }

View file

@ -1,7 +1,7 @@
use super::proto::*; use super::proto::*;
use crate::plugins::Plugins; use crate::plugins::Plugins;
use crate::prelude::*; use crate::prelude::*;
use std::collections::BTreeMap; use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc; use std::sync::Arc;
pub enum ConsumerState { pub enum ConsumerState {
@ -84,14 +84,15 @@ impl<'a> QueryServerWriteTransaction<'a> {
) )
.unzip(); .unzip();
// ⚠️ If we end up with pre-repl returning a list of conflict uuids, we DON'T need to // ⚠️ If we end up with plugins triggering other entries to conflicts, we DON'T need to
// add them to this list. This is just for uuid conflicts, not higher level ones! // add them to this list. This is just for uuid conflicts, not higher level ones!
// //
// ⚠️ We need to collect this from conflict_update since we may NOT be the originator // ⚠️ We need to collect this from conflict_update since we may NOT be the originator
// server for some conflicts, but we still need to know the UUID is IN the conflict // server for some conflicts, but we still need to know the UUID is IN the conflict
// state for plugins. We also need to do this here before the conflict_update // state for plugins. We also need to do this here before the conflict_update
// set is consumed by later steps. // set is consumed by later steps.
let conflict_uuids: Vec<_> = conflict_update.iter().map(|(_, e)| e.get_uuid()).collect(); let mut conflict_uuids: BTreeSet<_> =
conflict_update.iter().map(|(_, e)| e.get_uuid()).collect();
// Filter out None from conflict_create // Filter out None from conflict_create
let conflict_create: Vec<EntrySealedNew> = conflict_create.into_iter().flatten().collect(); let conflict_create: Vec<EntrySealedNew> = conflict_create.into_iter().flatten().collect();
@ -108,13 +109,26 @@ impl<'a> QueryServerWriteTransaction<'a> {
}) })
.collect(); .collect();
// To be consistent to Modify, we need to run pre-modify here. // We now merge the conflict updates and the updates that can proceed. This is correct
// since if an entry was conflicting by uuid then there is nothing for it to merge with
// so as a result we can just by pass that step. We now have all_updates which is
// the set of live entries to write back.
let mut all_updates = conflict_update let mut all_updates = conflict_update
.into_iter() .into_iter()
.chain(proceed_update) .chain(proceed_update)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// Plugins can mark entries into a conflict status. // ⚠️ This hook is probably not what you want to use for checking entries are consistent.
//
// The main issue is that at this point we have a set of entries that need to be
// created / marked into conflicts, and until that occurs it's hard to proceed with validations
// like attr unique because then we would need to walk the various sets to find cases where
// an attribute may not be unique "currently" but *would* be unique once the various entries
// have then been conflicted and updated.
//
// Instead we treat this like refint - we allow the database to "temporarily" become
// inconsistent, then we fix it immediately. This hook remains for cases in future
// where we may wish to have session cleanup performed for example.
Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| { Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| {
admin_error!( admin_error!(
"Refresh operation failed (pre_repl_incremental plugin), {:?}", "Refresh operation failed (pre_repl_incremental plugin), {:?}",
@ -123,8 +137,9 @@ impl<'a> QueryServerWriteTransaction<'a> {
e e
})?; })?;
// Now we have to schema check our data and separate to schema_valid and // Now we have to schema check our entries. Remember, here because this is
// invalid. // using into_iter it's possible that entries may be conflicted due to becoming
// schema invalid during the merge process.
let all_updates_valid = all_updates let all_updates_valid = all_updates
.into_iter() .into_iter()
.map(|(ctx_ent, db_ent)| { .map(|(ctx_ent, db_ent)| {
@ -139,17 +154,17 @@ impl<'a> QueryServerWriteTransaction<'a> {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// We now have three sets! // We now have two sets!
// //
// * conflict_create - entries to be created that are conflicted via add statements (duplicate uuid) // * conflict_create - entries to be created that are conflicted via add statements (duplicate uuid)
// * schema_invalid - entries that were merged and their attribute state has now become invalid to schema. // these are only created on the entry origin node!
// * schema_valid - entries that were merged and are schema valid. // * all_updates_valid - this has two types of entries
// // * entries that have survived a uuid conflict and need inplace write. Unlikely to become invalid.
// From these sets, we will move conflict_create and schema_invalid into the replication masked // * entries that were merged and are schema valid.
// state. However schema_valid needs to be processed to check for plugin rules as well. If // * entries that were merged and their attribute state has now become invalid and are conflicts.
// anything hits one of these states we need to have a way to handle this too in a consistent
// manner.
// //
// incremental_apply here handles both the creations and the update processes to ensure that
// everything is updated in a single consistent operation.
self.be_txn self.be_txn
.incremental_apply(&all_updates_valid, conflict_create) .incremental_apply(&all_updates_valid, conflict_create)
.map_err(|e| { .map_err(|e| {
@ -157,20 +172,42 @@ impl<'a> QueryServerWriteTransaction<'a> {
e e
})?; })?;
Plugins::run_post_repl_incremental_conflict(
self,
all_updates_valid.as_slice(),
&mut conflict_uuids,
)
.map_err(|e| {
error!(
"Refresh operation failed (post_repl_incremental_conflict plugin), {:?}",
e
);
e
})?;
// Plugins need these unzipped // Plugins need these unzipped
let (cand, pre_cand): (Vec<_>, Vec<_>) = all_updates_valid.into_iter().unzip(); //
let (cand, pre_cand): (Vec<_>, Vec<_>) = all_updates_valid
.into_iter()
.filter(|(cand, _)| {
// Exclude anything that is conflicted as a result of the conflict plugins.
!conflict_uuids.contains(&cand.get_uuid())
})
.unzip();
// We don't need to process conflict_creates here, since they are all conflicting // We don't need to process conflict_creates here, since they are all conflicting
// uuids which means that the uuids are all *here* so they will trigger anything // uuids which means that the conflict_uuids are all *here* so they will trigger anything
// that requires processing anyway. // that requires processing anyway. As well conflict_creates may not be the full
// set of conflict entries as we may not be the origin node! Conflict_creates is always
// a subset of the conflicts.
Plugins::run_post_repl_incremental( Plugins::run_post_repl_incremental(
self, self,
pre_cand.as_slice(), pre_cand.as_slice(),
cand.as_slice(), cand.as_slice(),
conflict_uuids.as_slice(), &conflict_uuids,
) )
.map_err(|e| { .map_err(|e| {
admin_error!( error!(
"Refresh operation failed (post_repl_incremental plugin), {:?}", "Refresh operation failed (post_repl_incremental plugin), {:?}",
e e
); );

View file

@ -2784,9 +2784,339 @@ async fn test_repl_increment_refint_delete_to_member_holder(
drop(server_a_txn); drop(server_a_txn);
} }
// Test attrunique conflictns // Test attrunique conflicts
// Test ref-int when attrunique makes a conflict // Test ref-int when attrunique makes a conflict
// Test memberof when attrunique makes a conflict
#[qs_pair_test]
async fn test_repl_increment_attrunique_conflict_basic(
server_a: &QueryServer,
server_b: &QueryServer,
) {
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
// To test memberof, we add a user who is MO A/B
let t_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Account.to_value()),
(Attribute::Class.as_ref(), EntryClass::Person.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("testperson1")),
(Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)),
(
Attribute::Description.as_ref(),
Value::new_utf8s("testperson1")
),
(
Attribute::DisplayName.as_ref(),
Value::new_utf8s("testperson1")
)
),])
.is_ok());
let g_a_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Group.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("testgroup_a")),
(Attribute::Uuid.as_ref(), Value::Uuid(g_a_uuid)),
(Attribute::Member.as_ref(), Value::Refer(t_uuid))
),])
.is_ok());
let g_b_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Group.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("testgroup_b")),
(Attribute::Uuid.as_ref(), Value::Uuid(g_b_uuid)),
(Attribute::Member.as_ref(), Value::Refer(t_uuid))
),])
.is_ok());
// To test ref-int, we make a third group that has both a and b as members.
let g_c_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Group.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("testgroup_c")),
(Attribute::Uuid.as_ref(), Value::Uuid(g_c_uuid)),
(Attribute::Member.as_ref(), Value::Refer(g_a_uuid)),
(Attribute::Member.as_ref(), Value::Refer(g_b_uuid))
),])
.is_ok());
server_a_txn.commit().expect("Failed to commit");
// Now replicated A -> B
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let e1 = server_a_txn
.internal_search_all_uuid(g_a_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(g_a_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
let e1 = server_a_txn
.internal_search_all_uuid(g_b_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(g_b_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
// At this point both sides now have the groups. Now on each node we will rename them
// so that they conflict.
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
assert!(server_a_txn
.internal_modify_uuid(
g_a_uuid,
&ModifyList::new_purge_and_set(
Attribute::Name.as_ref(),
Value::new_iname("name_conflict")
)
)
.is_ok());
server_a_txn.commit().expect("Failed to commit");
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
assert!(server_b_txn
.internal_modify_uuid(
g_b_uuid,
&ModifyList::new_purge_and_set(
Attribute::Name.as_ref(),
Value::new_iname("name_conflict")
)
)
.is_ok());
server_b_txn.commit().expect("Failed to commit");
// Now each node has an entry, separate uuids, but a name that will violate attr
// unique on the next replicate.
//
// Order of replication doesn't matter here! Which ever one see's it first will
// conflict the entries. In this case, A will detect the attr unique violation
// and create the conflicts.
let mut server_b_txn = server_b.read().await;
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
trace!("========================================");
repl_incremental(&mut server_b_txn, &mut server_a_txn);
// The conflict should now have occurred.
// Check both groups are conflicts.
let cnf_a = server_a_txn
.internal_search_conflict_uuid(g_a_uuid)
.expect("Unable to search conflict entries.")
.pop()
.expect("No conflict entries present");
assert!(cnf_a.get_ava_single_iname("name") == Some("name_conflict"));
let cnf_b = server_a_txn
.internal_search_conflict_uuid(g_b_uuid)
.expect("Unable to search conflict entries.")
.pop()
.expect("No conflict entries present");
assert!(cnf_b.get_ava_single_iname("name") == Some("name_conflict"));
// Check the person has MO A/B removed.
let e = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(!e.attribute_equality(Attribute::MemberOf.as_ref(), &PartialValue::Refer(g_a_uuid)));
assert!(!e.attribute_equality(Attribute::MemberOf.as_ref(), &PartialValue::Refer(g_b_uuid)));
// Check the group has M A/B removed.
let e = server_a_txn
.internal_search_all_uuid(g_c_uuid)
.expect("Unable to access entry.");
assert!(!e.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(g_a_uuid)));
assert!(!e.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(g_b_uuid)));
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Reverse it - The conflicts will now be sent back A -> B, meaning that
// everything is consistent once more.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let cnf_a = server_b_txn
.internal_search_conflict_uuid(g_a_uuid)
.expect("Unable to search conflict entries.")
.pop()
.expect("No conflict entries present");
assert!(cnf_a.get_ava_single_iname("name") == Some("name_conflict"));
let cnf_b = server_b_txn
.internal_search_conflict_uuid(g_b_uuid)
.expect("Unable to search conflict entries.")
.pop()
.expect("No conflict entries present");
assert!(cnf_b.get_ava_single_iname("name") == Some("name_conflict"));
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
}
// Test a complex attr unique situation when the attrunique conflict would occur normally but is
// skipped because the entry it is going to conflict against is actually a uuid conflict.
#[qs_pair_test]
async fn test_repl_increment_attrunique_conflict_complex(
server_a: &QueryServer,
server_b: &QueryServer,
) {
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
// Create two entries on A - The entry that will be an attrunique conflict
// and the entry that will UUID conflict to the second entry. The second entry
// should not attrunique conflict within server_a
let g_a_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Group.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("name_conflict")),
(Attribute::Uuid.as_ref(), Value::Uuid(g_a_uuid))
),])
.is_ok());
let g_b_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Group.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("uuid_conflict")),
(Attribute::Uuid.as_ref(), Value::Uuid(g_b_uuid))
),])
.is_ok());
server_a_txn.commit().expect("Failed to commit");
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
// Create an entry on B that is a uuid conflict to the second entry on A. This entry
// should *also* have an attr conflict to name on the first entry from A.
assert!(server_b_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Group.to_value()),
// Conflicting name
(Attribute::Name.as_ref(), Value::new_iname("name_conflict")),
// Conflicting uuid
(Attribute::Uuid.as_ref(), Value::Uuid(g_b_uuid))
),])
.is_ok());
server_b_txn.commit().expect("Failed to commit");
// We have to replicate B -> A first. This is so that A will not load the conflict
// entry, and the entries g_a_uuid and g_b_uuid stay present.
let mut server_b_txn = server_b.read().await;
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
trace!("========================================");
repl_incremental(&mut server_b_txn, &mut server_a_txn);
// Check these entries are still present and were NOT conflicted due to attrunique
let e = server_a_txn
.internal_search_all_uuid(g_a_uuid)
.expect("Unable to access entry.");
assert!(e.attribute_equality(
Attribute::Name.as_ref(),
&PartialValue::new_iname("name_conflict")
));
let e = server_a_txn
.internal_search_all_uuid(g_b_uuid)
.expect("Unable to access entry.");
assert!(e.attribute_equality(
Attribute::Name.as_ref(),
&PartialValue::new_iname("uuid_conflict")
));
// The other entry will not be conflicted here, since A is not the origin node.
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Replicate A -> B now. This will cause the entry to be persisted as a conflict
// as this is the origin node. We should end up with the two entries from
// server A remaining.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
// Check these entries are still present and were NOT conflicted due to attrunique
let e = server_b_txn
.internal_search_all_uuid(g_a_uuid)
.expect("Unable to access entry.");
assert!(e.attribute_equality(
Attribute::Name.as_ref(),
&PartialValue::new_iname("name_conflict")
));
let e = server_b_txn
.internal_search_all_uuid(g_b_uuid)
.expect("Unable to access entry.");
assert!(e.attribute_equality(
Attribute::Name.as_ref(),
&PartialValue::new_iname("uuid_conflict")
));
// Check the conflict was also now created.
let cnf_a = server_b_txn
.internal_search_conflict_uuid(g_b_uuid)
.expect("Unable to search conflict entries.")
.pop()
.expect("No conflict entries present");
assert!(cnf_a.get_ava_single_iname("name") == Some("name_conflict"));
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
}
// Test change of domain version over incremental. // Test change of domain version over incremental.
// //

View file

@ -847,9 +847,9 @@ impl<'a> SchemaWriteTransaction<'a> {
name: Attribute::SourceUuid.into(), name: Attribute::SourceUuid.into(),
uuid: UUID_SCHEMA_ATTR_SOURCE_UUID, uuid: UUID_SCHEMA_ATTR_SOURCE_UUID,
description: String::from( description: String::from(
"The universal unique id of the source object where this conflict came from", "The universal unique id of the source object(s) which conflicted with this entry",
), ),
multivalue: false, multivalue: true,
// Uniqueness is handled by base.rs, not attrunique here due to // Uniqueness is handled by base.rs, not attrunique here due to
// needing to check recycled objects too. // needing to check recycled objects too.
unique: false, unique: false,