68 20230829 replication referential integrity (#2048)

* Member of works!
* Hooray, refint over replication works.
This commit is contained in:
Firstyear 2023-09-05 21:30:51 +10:00 committed by GitHub
parent d5d76d1a3c
commit d1fe7b9127
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 852 additions and 95 deletions

View file

@ -256,6 +256,29 @@ impl Plugin for AttrUnique {
r
}
fn pre_repl_incremental(
_qs: &mut QueryServerWriteTransaction,
_cand: &mut [(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)],
) -> Result<(), OperationError> {
admin_error!(
"plugin {} has an unimplemented pre_repl_incremental!",
Self::id()
);
// Important! We need to also have a list of uuids that we conflicted AGAINST so that both
// this candidate *and* the existing item both move to conflict. This is because if we don't
// do it this way, then some nodes will conflict on potentially the inverse entries, which
// could end up pretty bad.
// We also can't realllllyyyy rely on the cid here since it could have changed multiple times
// and may not truly reflect the accurate change times, so we have to conflict on both
// itemsthat hit the attrunique.
// debug_assert!(false);
// Err(OperationError::InvalidState)
Ok(())
}
#[instrument(level = "debug", name = "attrunique::verify", skip_all)]
fn verify(qs: &mut QueryServerReadTransaction) -> Vec<Result<(), ConsistencyError>> {
// Only check live entries, not recycled.

View file

@ -235,6 +235,7 @@ impl Plugin for MemberOf {
qs: &mut QueryServerWriteTransaction,
pre_cand: &[Arc<EntrySealedCommitted>],
cand: &[EntrySealedCommitted],
_conflict_uuids: &[Uuid],
) -> Result<(), OperationError> {
// IMPORTANT - we need this for now so that dyngroup doesn't error on us, since
// repl is internal and dyngroup has a safety check to prevent external triggers.

View file

@ -157,15 +157,15 @@ trait Plugin {
"plugin {} has an unimplemented pre_repl_incremental!",
Self::id()
);
// debug_assert!(false);
// Err(OperationError::InvalidState)
Ok(())
debug_assert!(false);
Err(OperationError::InvalidState)
}
fn post_repl_incremental(
_qs: &mut QueryServerWriteTransaction,
_pre_cand: &[Arc<EntrySealedCommitted>],
_cand: &[EntrySealedCommitted],
_conflict_uuids: &[Uuid],
) -> Result<(), OperationError> {
admin_error!(
"plugin {} has an unimplemented post_repl_incremental!",
@ -352,11 +352,12 @@ impl Plugins {
qs: &mut QueryServerWriteTransaction,
pre_cand: &[Arc<EntrySealedCommitted>],
cand: &[EntrySealedCommitted],
conflict_uuids: &[Uuid],
) -> Result<(), OperationError> {
domain::Domain::post_repl_incremental(qs, pre_cand, cand)?;
spn::Spn::post_repl_incremental(qs, pre_cand, cand)?;
refint::ReferentialIntegrity::post_repl_incremental(qs, pre_cand, cand)?;
memberof::MemberOf::post_repl_incremental(qs, pre_cand, cand)
domain::Domain::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?;
spn::Spn::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?;
refint::ReferentialIntegrity::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)?;
memberof::MemberOf::post_repl_incremental(qs, pre_cand, cand, conflict_uuids)
}
#[instrument(level = "debug", name = "plugins::run_verify", skip_all)]

View file

@ -12,11 +12,11 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use hashbrown::HashSet as Set;
use kanidm_proto::v1::{ConsistencyError, PluginError};
use hashbrown::HashSet;
use kanidm_proto::v1::ConsistencyError;
use crate::event::{CreateEvent, DeleteEvent, ModifyEvent};
use crate::filter::f_eq;
use crate::filter::{f_eq, FC};
use crate::plugins::Plugin;
use crate::prelude::*;
use crate::schema::SchemaTransaction;
@ -24,19 +24,19 @@ use crate::schema::SchemaTransaction;
pub struct ReferentialIntegrity;
impl ReferentialIntegrity {
fn check_uuids_exist(
fn check_uuids_exist_fast(
qs: &mut QueryServerWriteTransaction,
inner: Vec<PartialValue>,
) -> Result<(), OperationError> {
inner: &[Uuid],
) -> Result<bool, OperationError> {
if inner.is_empty() {
// There is nothing to check! Move on.
trace!("no reference types modified, skipping check");
return Ok(());
return Ok(true);
}
let inner = inner
.into_iter()
.map(|pv| f_eq(Attribute::Uuid, pv))
let inner: Vec<_> = inner
.iter()
.map(|u| f_eq(Attribute::Uuid, PartialValue::Uuid(*u)))
.collect();
// F_inc(lusion). All items of inner must be 1 or more, or the filter
@ -50,16 +50,91 @@ impl ReferentialIntegrity {
// Is the existence of all id's confirmed?
if b {
Ok(())
Ok(true)
} else {
admin_error!(
"UUID reference set size differs from query result size <fast path, no uuid info available>"
);
Err(OperationError::Plugin(PluginError::ReferentialIntegrity(
"Uuid referenced not found in database".to_string(),
)))
Ok(false)
}
}
fn check_uuids_exist_slow(
qs: &mut QueryServerWriteTransaction,
inner: &[Uuid],
) -> Result<Vec<Uuid>, OperationError> {
if inner.is_empty() {
// There is nothing to check! Move on.
// Should be unreachable.
trace!("no reference types modified, skipping check");
return Ok(Vec::with_capacity(0));
}
let mut missing = Vec::with_capacity(inner.len());
for u in inner {
let filt_in = filter!(f_eq(Attribute::Uuid, PartialValue::Uuid(*u)));
let b = qs.internal_exists(filt_in).map_err(|e| {
admin_error!(err = ?e, "internal exists failure");
e
})?;
// If it's missing, we push it to the missing set.
if !b {
missing.push(*u)
}
}
Ok(missing)
}
fn remove_references(
qs: &mut QueryServerWriteTransaction,
uuids: Vec<Uuid>,
) -> Result<(), OperationError> {
trace!(?uuids);
// Find all reference types in the schema
let schema = qs.get_schema();
let ref_types = schema.get_reference_types();
let removed_ids: BTreeSet<_> = uuids.iter().map(|u| PartialValue::Refer(*u)).collect();
// Generate a filter which is the set of all schema reference types
// as EQ to all uuid of all entries in delete. - this INCLUDES recycled
// types too!
let filt = filter_all!(FC::Or(
uuids
.into_iter()
.flat_map(|u| ref_types.values().filter_map(move |r_type| {
let value_attribute = r_type.name.to_string();
// For everything that references the uuid's in the deleted set.
let val: Result<Attribute, OperationError> = value_attribute.try_into();
// error!("{:?}", val);
let res = match val {
Ok(val) => {
let res = f_eq(val, PartialValue::Refer(u));
Some(res)
}
Err(err) => {
// we shouldn't be able to get here...
admin_error!("post_delete invalid attribute specified - please log this as a bug! {:?}", err);
None
}
};
res
}))
.collect(),
));
trace!("refint post_delete filter {:?}", filt);
let mut work_set = qs.internal_search_writeable(&filt)?;
work_set.iter_mut().for_each(|(_, post)| {
ref_types
.values()
.for_each(|attr| post.remove_avas(attr.name.as_str(), &removed_ids));
});
qs.internal_apply_writable(work_set)
}
}
impl Plugin for ReferentialIntegrity {
@ -118,6 +193,99 @@ impl Plugin for ReferentialIntegrity {
Self::post_modify_inner(qs, cand)
}
fn post_repl_incremental(
qs: &mut QueryServerWriteTransaction,
pre_cand: &[Arc<EntrySealedCommitted>],
cand: &[EntrySealedCommitted],
conflict_uuids: &[Uuid],
) -> Result<(), OperationError> {
admin_error!(
"plugin {} has an unimplemented post_repl_incremental!",
Self::id()
);
// I think we need to check that all values in the ref type values here
// exist, and if not, we *need to remove them*. We should probably rewrite
// how we do modify/create inner to actually return missing uuids, so that
// this fn can delete, and the other parts can report what's missing.
//
// This also becomes a path to a "ref int fixup" too?
let uuids = Self::cand_references_to_uuid_filter(qs, cand)?;
let all_exist_fast = Self::check_uuids_exist_fast(qs, uuids.as_slice())?;
let mut missing_uuids = if !all_exist_fast {
debug!("Not all uuids referenced by these candidates exist. Slow path to remove them.");
Self::check_uuids_exist_slow(qs, uuids.as_slice())?
} else {
debug!("All references are valid!");
Vec::with_capacity(0)
};
// If the entry has moved from a live to a deleted state we need to clean it's reference's
// that *may* have been added on this server - the same that other references would be
// deleted.
let inactive_entries: Vec<_> = std::iter::zip(pre_cand, cand)
.filter_map(|(pre, post)| {
let pre_live = pre.mask_recycled_ts().is_some();
let post_live = post.mask_recycled_ts().is_some();
if !post_live && (pre_live != post_live) {
// We have moved from live to recycled/tombstoned. We need to
// ensure that these references are masked.
Some(post.get_uuid())
} else {
None
}
})
.collect();
if event_enabled!(tracing::Level::DEBUG) {
debug!("Removing the following reference uuids for entries that have become recycled or tombstoned");
for missing in &inactive_entries {
debug!(?missing);
}
}
// We can now combine this with the confict uuids from the incoming set.
// In a conflict case, we need to also add these uuids to the delete logic
// since on the originator node the original uuid will still persist
// meaning the member won't be removed.
// However, on a non-primary conflict handler it will remove the member
// as well. This is annoyingly a worst case, since then *every* node will
// attempt to update the cid of this group. But I think the potential cost
// in the short term will be worth consistent references.
if !conflict_uuids.is_empty() {
warn!("conflict uuids have been found, and must be cleaned from existing references. This is to prevent group memberships leaking to un-intended recipients.");
}
// Now, we need to find for each of the missing uuids, which values had them.
// We could use a clever query to internal_search_writeable?
missing_uuids.extend_from_slice(conflict_uuids);
missing_uuids.extend_from_slice(&inactive_entries);
if missing_uuids.is_empty() {
trace!("Nothing to do, shortcut");
return Ok(());
}
if event_enabled!(tracing::Level::DEBUG) {
debug!("Removing the following missing reference uuids");
for missing in &missing_uuids {
debug!(?missing);
}
}
// Now we have to look them up and clean it up. Turns out this is the
// same code path as "post delete" so we can share that!
Self::remove_references(qs, missing_uuids)
// Complete!
}
#[instrument(level = "debug", name = "refint_post_delete", skip_all)]
fn post_delete(
qs: &mut QueryServerWriteTransaction,
@ -128,56 +296,10 @@ impl Plugin for ReferentialIntegrity {
// actually the bulk of the work we'll do to clean up references
// when they are deleted.
// Find all reference types in the schema
let schema = qs.get_schema();
let ref_types = schema.get_reference_types();
// Get the UUID of all entries we are deleting
let uuids: Vec<Uuid> = cand.iter().map(|e| e.get_uuid()).collect();
// Generate a filter which is the set of all schema reference types
// as EQ to all uuid of all entries in delete. - this INCLUDES recycled
// types too!
let filt = filter_all!(FC::Or(
uuids
.into_iter()
.flat_map(|u| ref_types.values().filter_map(move |r_type| {
let value_attribute = r_type.name.to_string();
// For everything that references the uuid's in the deleted set.
let val: Result<Attribute, OperationError> = value_attribute.try_into();
// error!("{:?}", val);
let res = match val {
Ok(val) => {
let res = f_eq(val, PartialValue::Refer(u));
Some(res)
}
Err(err) => {
// we shouldn't be able to get here...
admin_error!("post_delete invalid attribute specified - please log this as a bug! {:?}", err);
None
}
};
res
}))
.collect(),
));
trace!("refint post_delete filter {:?}", filt);
let removed_ids: BTreeSet<_> = cand
.iter()
.map(|e| PartialValue::Refer(e.get_uuid()))
.collect();
let mut work_set = qs.internal_search_writeable(&filt)?;
work_set.iter_mut().for_each(|(_, post)| {
ref_types
.values()
.for_each(|attr| post.remove_avas(attr.name.as_str(), &removed_ids));
});
qs.internal_apply_writable(work_set)
Self::remove_references(qs, uuids)
}
#[instrument(level = "debug", name = "refint::verify", skip_all)]
@ -194,7 +316,7 @@ impl Plugin for ReferentialIntegrity {
Err(e) => return vec![e],
};
let acu_map: Set<Uuid> = all_cand.iter().map(|e| e.get_uuid()).collect();
let acu_map: HashSet<Uuid> = all_cand.iter().map(|e| e.get_uuid()).collect();
let schema = qs.get_schema();
let ref_types = schema.get_reference_types();
@ -228,10 +350,10 @@ impl Plugin for ReferentialIntegrity {
}
impl ReferentialIntegrity {
fn post_modify_inner(
fn cand_references_to_uuid_filter(
qs: &mut QueryServerWriteTransaction,
cand: &[Entry<EntrySealed, EntryCommitted>],
) -> Result<(), OperationError> {
cand: &[EntrySealedCommitted],
) -> Result<Vec<Uuid>, OperationError> {
let schema = qs.get_schema();
let ref_types = schema.get_reference_types();
@ -242,9 +364,10 @@ impl ReferentialIntegrity {
c.attribute_equality(Attribute::Class.as_ref(), &EntryClass::DynGroup.into());
ref_types.values().filter_map(move |rtype| {
// Skip dynamic members
// Skip dynamic members, these are recalculated by the
// memberof plugin.
let skip_mb = dyn_group && rtype.name == "dynmember";
// Skip memberOf.
// Skip memberOf, also recalculated.
let skip_mo = rtype.name == "memberof";
if skip_mb || skip_mo {
None
@ -256,12 +379,16 @@ impl ReferentialIntegrity {
});
// Could check len first?
let mut i = Vec::with_capacity(cand.len() * 2);
let mut i = Vec::with_capacity(cand.len() * 4);
let mut dedup = HashSet::new();
vsiter.try_for_each(|vs| {
if let Some(uuid_iter) = vs.as_ref_uuid_iter() {
uuid_iter.for_each(|u| {
i.push(PartialValue::Uuid(u))
// Returns true if the item is NEW in the set
if dedup.insert(u) {
i.push(u)
}
});
Ok(())
} else {
@ -273,7 +400,35 @@ impl ReferentialIntegrity {
}
})?;
Self::check_uuids_exist(qs, i)
Ok(i)
}
fn post_modify_inner(
qs: &mut QueryServerWriteTransaction,
cand: &[EntrySealedCommitted],
) -> Result<(), OperationError> {
let uuids = Self::cand_references_to_uuid_filter(qs, cand)?;
let all_exist_fast = Self::check_uuids_exist_fast(qs, uuids.as_slice())?;
if all_exist_fast {
// All good!
return Ok(());
}
// Okay taking the slow path now ...
let missing_uuids = Self::check_uuids_exist_slow(qs, uuids.as_slice())?;
error!("some uuids that were referenced in this operation do not exist.");
for missing in missing_uuids {
error!(?missing);
}
Err(OperationError::Plugin(
kanidm_proto::v1::PluginError::ReferentialIntegrity(
"Uuid referenced not found in database".to_string(),
),
))
}
}

View file

@ -77,6 +77,7 @@ impl Plugin for Spn {
qs: &mut QueryServerWriteTransaction,
pre_cand: &[Arc<EntrySealedCommitted>],
cand: &[EntrySealedCommitted],
_conflict_uuids: &[Uuid],
) -> Result<(), OperationError> {
Self::post_modify_inner(qs, pre_cand, cand)
}

View file

@ -63,6 +63,8 @@ impl<'a> QueryServerWriteTransaction<'a> {
.zip(db_entries)
.partition(|(ctx_ent, db_ent)| ctx_ent.is_add_conflict(db_ent.as_ref()));
debug!(conflicts = %conflicts.len(), proceed = %proceed.len());
// Now we have a set of conflicts and a set of entries to proceed.
//
// /- entries that need to be created as conflicts.
@ -82,6 +84,15 @@ impl<'a> QueryServerWriteTransaction<'a> {
)
.unzip();
// ⚠️ If we end up with pre-repl returning a list of conflict uuids, we DON'T need to
// add them to this list. This is just for uuid conflicts, not higher level ones!
//
// ⚠️ We need to collect this from conflict_update since we may NOT be the originator
// server for some conflicts, but we still need to know the UUID is IN the conflict
// state for plugins. We also need to do this here before the conflict_update
// set is consumed by later steps.
let conflict_uuids: Vec<_> = conflict_update.iter().map(|(_, e)| e.get_uuid()).collect();
// Filter out None from conflict_create
let conflict_create: Vec<EntrySealedNew> = conflict_create.into_iter().flatten().collect();
@ -139,11 +150,6 @@ impl<'a> QueryServerWriteTransaction<'a> {
// anything hits one of these states we need to have a way to handle this too in a consistent
// manner.
//
// Then similar to modify, we need the pre and post candidates.
// We need to unzip the schema_valid and invalid entries.
self.be_txn
.incremental_apply(&all_updates_valid, conflict_create)
.map_err(|e| {
@ -157,15 +163,19 @@ impl<'a> QueryServerWriteTransaction<'a> {
// We don't need to process conflict_creates here, since they are all conflicting
// uuids which means that the uuids are all *here* so they will trigger anything
// that requires processing anyway.
Plugins::run_post_repl_incremental(self, pre_cand.as_slice(), cand.as_slice()).map_err(
|e| {
admin_error!(
"Refresh operation failed (post_repl_incremental plugin), {:?}",
e
);
Plugins::run_post_repl_incremental(
self,
pre_cand.as_slice(),
cand.as_slice(),
conflict_uuids.as_slice(),
)
.map_err(|e| {
admin_error!(
"Refresh operation failed (post_repl_incremental plugin), {:?}",
e
},
)?;
);
e
})?;
self.changed_uuid.extend(cand.iter().map(|e| e.get_uuid()));

View file

@ -2080,12 +2080,578 @@ async fn test_repl_increment_schema_dynamic(server_a: &QueryServer, server_b: &Q
drop(server_a_txn);
}
// Test memberof over replication boundaries.
#[qs_pair_test]
async fn test_repl_increment_memberof_basic(server_a: &QueryServer, server_b: &QueryServer) {
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
// Since memberof isn't replicated, we have to check that when a group with
// a member is sent over, it's re-calced on the other side.
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let t_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Account.to_value()),
(Attribute::Class.as_ref(), EntryClass::Person.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("testperson1")),
(Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)),
(
Attribute::Description.as_ref(),
Value::new_utf8s("testperson1")
),
(
Attribute::DisplayName.as_ref(),
Value::new_utf8s("testperson1")
)
),])
.is_ok());
let g_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Group.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("testgroup1")),
(Attribute::Uuid.as_ref(), Value::Uuid(g_uuid)),
(Attribute::Member.as_ref(), Value::Refer(t_uuid))
),])
.is_ok());
server_a_txn.commit().expect("Failed to commit");
// Now replicated A -> B
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(ct).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let e1 = server_a_txn
.internal_search_all_uuid(g_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(g_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
assert!(e1.attribute_equality(Attribute::MemberOf.as_ref(), &PartialValue::Refer(g_uuid)));
// We should also check dyngroups too here :)
assert!(e1.attribute_equality(
Attribute::MemberOf.as_ref(),
&PartialValue::Refer(UUID_IDM_ALL_ACCOUNTS)
));
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
}
// Test when a group has a member A, and then the group is conflicted, that when
// group is moved to conflict the memberShip of A is removed.
// group is moved to conflict the memberShip of A is removed. The conflict must be
// a non group, or a group that doesn't have the member A.
#[qs_pair_test]
async fn test_repl_increment_memberof_conflict(server_a: &QueryServer, server_b: &QueryServer) {
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
// First, we need to create a group on b that will conflict
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
let g_uuid = Uuid::new_v4();
assert!(server_b_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Group.to_value()),
(
Attribute::Name.as_ref(),
Value::new_iname("testgroup_conflict")
),
(Attribute::Uuid.as_ref(), Value::Uuid(g_uuid))
),])
.is_ok());
server_b_txn.commit().expect("Failed to commit");
// Now on a, use the same uuid, make the user and a group as it's member.
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let t_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Account.to_value()),
(Attribute::Class.as_ref(), EntryClass::Person.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("testperson1")),
(Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)),
(
Attribute::Description.as_ref(),
Value::new_utf8s("testperson1")
),
(
Attribute::DisplayName.as_ref(),
Value::new_utf8s("testperson1")
)
),])
.is_ok());
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Group.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("testgroup1")),
(Attribute::Uuid.as_ref(), Value::Uuid(g_uuid)),
(Attribute::Member.as_ref(), Value::Refer(t_uuid))
),])
.is_ok());
server_a_txn.commit().expect("Failed to commit");
// Now do A -> B. B should show that the second group was a conflict and
// the membership drops.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(ct).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let e = server_b_txn
.internal_search_all_uuid(g_uuid)
.expect("Unable to access entry.");
assert!(!e.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid)));
assert!(e.attribute_equality(
Attribute::Name.as_ref(),
&PartialValue::new_iname("testgroup_conflict")
));
let e = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(!e.attribute_equality(Attribute::MemberOf.as_ref(), &PartialValue::Refer(g_uuid)));
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
// Now B -> A. A will now reflect the conflict as well.
let mut server_b_txn = server_b.read().await;
let mut server_a_txn = server_a.write(ct).await;
trace!("========================================");
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(g_uuid)
.expect("Unable to access entry.");
let e2 = server_b_txn
.internal_search_all_uuid(g_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
assert!(!e1.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid)));
assert!(e1.attribute_equality(
Attribute::Name.as_ref(),
&PartialValue::new_iname("testgroup_conflict")
));
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
assert!(!e1.attribute_equality(Attribute::MemberOf.as_ref(), &PartialValue::Refer(g_uuid)));
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
}
// Ref int deletes references when tombstone is replicated over. May need consumer
// to have some extra groups that need cleanup
#[qs_pair_test]
async fn test_repl_increment_refint_tombstone(server_a: &QueryServer, server_b: &QueryServer) {
let ct = duration_from_epoch_now();
// Test memberof over replication boundaries.
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
// Create a person / group on a. Don't add membership yet.
let mut server_a_txn = server_a.write(ct).await;
let t_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Account.to_value()),
(Attribute::Class.as_ref(), EntryClass::Person.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("testperson1")),
(Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)),
(
Attribute::Description.as_ref(),
Value::new_utf8s("testperson1")
),
(
Attribute::DisplayName.as_ref(),
Value::new_utf8s("testperson1")
)
),])
.is_ok());
let g_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Group.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("testgroup1")),
(Attribute::Uuid.as_ref(), Value::Uuid(g_uuid)) // Don't add the membership yet!
// (Attribute::Member.as_ref(), Value::Refer(t_uuid))
),])
.is_ok());
server_a_txn.commit().expect("Failed to commit");
// A -> B repl.
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(ct).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
// On B, delete the person.
let ct = duration_from_epoch_now();
let mut server_b_txn = server_b.write(ct).await;
assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok());
server_b_txn.commit().expect("Failed to commit");
// On A, add person to group.
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
assert!(server_a_txn
.internal_modify_uuid(
g_uuid,
&ModifyList::new_purge_and_set(Attribute::Member.as_ref(), Value::Refer(t_uuid))
)
.is_ok());
server_a_txn.commit().expect("Failed to commit");
// A -> B - B should remove the reference.
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(ct).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
// Assert on B that Member is now gone.
let e = server_b_txn
.internal_search_all_uuid(g_uuid)
.expect("Unable to access entry.");
assert!(!e.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid)));
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
// B -> A - A should remove the reference, everything is consistent again.
let ct = duration_from_epoch_now();
let mut server_b_txn = server_b.read().await;
let mut server_a_txn = server_a.write(ct).await;
trace!("========================================");
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(g_uuid)
.expect("Unable to access entry.");
let e2 = server_b_txn
.internal_search_all_uuid(g_uuid)
.expect("Unable to access entry.");
let e1_cs = e1.get_changestate();
let e2_cs = e2.get_changestate();
assert!(e1_cs == e2_cs);
assert!(e1 == e2);
assert!(!e1.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid)));
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
}
#[qs_pair_test]
async fn test_repl_increment_refint_conflict(server_a: &QueryServer, server_b: &QueryServer) {
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
// On B, create a conflicting person.
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
let t_uuid = Uuid::new_v4();
assert!(server_b_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Account.to_value()),
(Attribute::Class.as_ref(), EntryClass::Person.to_value()),
(
Attribute::Name.as_ref(),
Value::new_iname("testperson_conflict")
),
(Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)),
(
Attribute::Description.as_ref(),
Value::new_utf8s("testperson1")
),
(
Attribute::DisplayName.as_ref(),
Value::new_utf8s("testperson1")
)
),])
.is_ok());
server_b_txn.commit().expect("Failed to commit");
// Create a person / group on a. Add person to group.
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Account.to_value()),
(Attribute::Class.as_ref(), EntryClass::Person.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("testperson1")),
(Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)),
(
Attribute::Description.as_ref(),
Value::new_utf8s("testperson1")
),
(
Attribute::DisplayName.as_ref(),
Value::new_utf8s("testperson1")
)
),])
.is_ok());
let g_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Group.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("testgroup1")),
(Attribute::Uuid.as_ref(), Value::Uuid(g_uuid)),
(Attribute::Member.as_ref(), Value::Refer(t_uuid))
),])
.is_ok());
server_a_txn.commit().expect("Failed to commit");
// A -> B - B should remove the reference.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
// Note that in the case an entry conflicts we remove references to the entry that
// had the collision. This is because we don't know if our references are reflecting
// the true intent of the situation now.
//
// In this example, the users created on server A was intended to be a member of
// the group, but the user on server B *was not* intended to be a member. Therfore
// it's wrong that we retain the user from Server B *while* also the membership
// that was intended for the user on A.
let e = server_b_txn
.internal_search_all_uuid(g_uuid)
.expect("Unable to access entry.");
assert!(!e.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid)));
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
// B -> A - A should remove the reference.
let mut server_b_txn = server_b.read().await;
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
trace!("========================================");
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(g_uuid)
.expect("Unable to access entry.");
let e2 = server_b_txn
.internal_search_all_uuid(g_uuid)
.expect("Unable to access entry.");
let e1_cs = e1.get_changestate();
let e2_cs = e2.get_changestate();
assert!(e1_cs == e2_cs);
assert!(e1 == e2);
assert!(!e1.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid)));
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
}
// Ref int when we transmit a delete over the boundary. This is the opposite order to
// a previous test, where the delete is sent to the member holder first.
#[qs_pair_test]
async fn test_repl_increment_refint_delete_to_member_holder(
server_a: &QueryServer,
server_b: &QueryServer,
) {
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
// Create a person / group on a. Don't add membership yet.
let mut server_a_txn = server_a.write(ct).await;
let t_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Account.to_value()),
(Attribute::Class.as_ref(), EntryClass::Person.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("testperson1")),
(Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)),
(
Attribute::Description.as_ref(),
Value::new_utf8s("testperson1")
),
(
Attribute::DisplayName.as_ref(),
Value::new_utf8s("testperson1")
)
),])
.is_ok());
let g_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Group.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("testgroup1")),
(Attribute::Uuid.as_ref(), Value::Uuid(g_uuid)) // Don't add the membership yet!
// (Attribute::Member.as_ref(), Value::Refer(t_uuid))
),])
.is_ok());
server_a_txn.commit().expect("Failed to commit");
// A -> B repl.
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(ct).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
// On A, add person to group.
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
assert!(server_a_txn
.internal_modify_uuid(
g_uuid,
&ModifyList::new_purge_and_set(Attribute::Member.as_ref(), Value::Refer(t_uuid))
)
.is_ok());
server_a_txn.commit().expect("Failed to commit");
// On B, delete the person.
let ct = duration_from_epoch_now();
let mut server_b_txn = server_b.write(ct).await;
assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok());
server_b_txn.commit().expect("Failed to commit");
// B -> A - A should remove the reference, everything is consistent again.
let ct = duration_from_epoch_now();
let mut server_b_txn = server_b.read().await;
let mut server_a_txn = server_a.write(ct).await;
trace!("========================================");
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e = server_a_txn
.internal_search_all_uuid(g_uuid)
.expect("Unable to access entry.");
assert!(!e.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid)));
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// A -> B - Should just reflect what happened on A.
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(ct).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let e1 = server_a_txn
.internal_search_all_uuid(g_uuid)
.expect("Unable to access entry.");
let e2 = server_b_txn
.internal_search_all_uuid(g_uuid)
.expect("Unable to access entry.");
let e1_cs = e1.get_changestate();
let e2_cs = e2.get_changestate();
assert!(e1_cs == e2_cs);
assert!(e1 == e2);
assert!(!e1.attribute_equality(Attribute::Member.as_ref(), &PartialValue::Refer(t_uuid)));
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
}
// Test attrunique conflictns
// Test ref-int when attrunique makes a conflict
// Test change of domain version over incremental.
//
// todo when I have domain version migrations working.