68 20230720 replication improvements (#1905)

This commit is contained in:
Firstyear 2023-07-27 12:30:22 +10:00 committed by GitHub
parent 9bcd8d4737
commit 8f282e3a30
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 821 additions and 65 deletions

View file

@ -1379,7 +1379,6 @@ impl<'a> BackendWriteTransaction<'a> {
// Update the names/uuid maps. These have to mask out entries // Update the names/uuid maps. These have to mask out entries
// that are recycled or tombstones, so these pretend as "deleted" // that are recycled or tombstones, so these pretend as "deleted"
// and can trigger correct actions. // and can trigger correct actions.
//
let mask_pre = pre.and_then(|e| e.mask_recycled_ts()); let mask_pre = pre.and_then(|e| e.mask_recycled_ts());
let mask_pre = if !uuid_same { let mask_pre = if !uuid_same {

View file

@ -234,6 +234,8 @@ pub const UUID_SCHEMA_ATTR_SYNC_CREDENTIAL_PORTAL: Uuid =
pub const UUID_SCHEMA_CLASS_OAUTH2_RS_PUBLIC: Uuid = uuid!("00000000-0000-0000-0000-ffff00000137"); pub const UUID_SCHEMA_CLASS_OAUTH2_RS_PUBLIC: Uuid = uuid!("00000000-0000-0000-0000-ffff00000137");
pub const UUID_SCHEMA_ATTR_SYNC_YIELD_AUTHORITY: Uuid = pub const UUID_SCHEMA_ATTR_SYNC_YIELD_AUTHORITY: Uuid =
uuid!("00000000-0000-0000-0000-ffff00000138"); uuid!("00000000-0000-0000-0000-ffff00000138");
pub const UUID_SCHEMA_CLASS_CONFLICT: Uuid = uuid!("00000000-0000-0000-0000-ffff00000139");
pub const UUID_SCHEMA_ATTR_SOURCE_UUID: Uuid = uuid!("00000000-0000-0000-0000-ffff00000140");
// System and domain infos // System and domain infos
// I'd like to strongly criticise william of the past for making poor choices about these allocations. // I'd like to strongly criticise william of the past for making poor choices about these allocations.

View file

@ -16,6 +16,7 @@ lazy_static! {
pub static ref PVCLASS_ACP: PartialValue = PartialValue::new_class("access_control_profile"); pub static ref PVCLASS_ACP: PartialValue = PartialValue::new_class("access_control_profile");
pub static ref PVCLASS_ATTRIBUTETYPE: PartialValue = PartialValue::new_class("attributetype"); pub static ref PVCLASS_ATTRIBUTETYPE: PartialValue = PartialValue::new_class("attributetype");
pub static ref PVCLASS_CLASSTYPE: PartialValue = PartialValue::new_class("classtype"); pub static ref PVCLASS_CLASSTYPE: PartialValue = PartialValue::new_class("classtype");
pub static ref PVCLASS_CONFLICT: PartialValue = PartialValue::new_class("conflict");
pub static ref PVCLASS_DOMAIN_INFO: PartialValue = PartialValue::new_class("domain_info"); pub static ref PVCLASS_DOMAIN_INFO: PartialValue = PartialValue::new_class("domain_info");
pub static ref PVCLASS_DYNGROUP: PartialValue = PartialValue::new_class("dyngroup"); pub static ref PVCLASS_DYNGROUP: PartialValue = PartialValue::new_class("dyngroup");
pub static ref PVCLASS_EXTENSIBLE: PartialValue = PartialValue::new_class("extensibleobject"); pub static ref PVCLASS_EXTENSIBLE: PartialValue = PartialValue::new_class("extensibleobject");

View file

@ -106,7 +106,6 @@ pub struct EntryInit;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct EntryInvalid { pub struct EntryInvalid {
cid: Cid, cid: Cid,
// eclog: EntryChangelog,
ecstate: EntryChangeState, ecstate: EntryChangeState,
} }
@ -133,7 +132,6 @@ pub struct EntryIncremental {
pub struct EntryValid { pub struct EntryValid {
// Asserted with schema, so we know it has a UUID now ... // Asserted with schema, so we know it has a UUID now ...
uuid: Uuid, uuid: Uuid,
// eclog: EntryChangelog,
ecstate: EntryChangeState, ecstate: EntryChangeState,
} }
@ -146,7 +144,6 @@ pub struct EntryValid {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct EntrySealed { pub struct EntrySealed {
uuid: Uuid, uuid: Uuid,
// eclog: EntryChangelog,
ecstate: EntryChangeState, ecstate: EntryChangeState,
} }
@ -232,19 +229,28 @@ where
} }
} }
impl<STATE> std::fmt::Display for Entry<EntrySealed, STATE> { impl<STATE> std::fmt::Display for Entry<EntrySealed, STATE>
where
STATE: Clone,
{
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.get_uuid()) write!(f, "{}", self.get_uuid())
} }
} }
impl<STATE> std::fmt::Display for Entry<EntryInit, STATE> { impl<STATE> std::fmt::Display for Entry<EntryInit, STATE>
where
STATE: Clone,
{
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Entry in initial state") write!(f, "Entry in initial state")
} }
} }
impl<STATE> Entry<EntryInit, STATE> { impl<STATE> Entry<EntryInit, STATE>
where
STATE: Clone,
{
/// Get the uuid of this entry. /// Get the uuid of this entry.
pub(crate) fn get_uuid(&self) -> Option<Uuid> { pub(crate) fn get_uuid(&self) -> Option<Uuid> {
self.attrs.get("uuid").and_then(|vs| vs.to_uuid_single()) self.attrs.get("uuid").and_then(|vs| vs.to_uuid_single())
@ -490,26 +496,25 @@ impl Entry<EntryInit, EntryNew> {
/// Assign the Change Identifier to this Entry, allowing it to be modified and then /// Assign the Change Identifier to this Entry, allowing it to be modified and then
/// written to the `Backend` /// written to the `Backend`
pub fn assign_cid( pub fn assign_cid(
mut self, self,
cid: Cid, cid: Cid,
schema: &dyn SchemaTransaction, schema: &dyn SchemaTransaction,
) -> Entry<EntryInvalid, EntryNew> { ) -> Entry<EntryInvalid, EntryNew> {
/* setup our last changed time */
self.set_last_changed(cid.clone());
/* /*
* Create the change log. This must be the last thing BEFORE we return! * Create the change log. This must be the last thing BEFORE we return!
* This is because we need to capture the set_last_changed attribute in * This is because we need to capture the set_last_changed attribute in
* the create transition. * the create transition.
*/ */
// let eclog = EntryChangelog::new(cid.clone(), self.attrs.clone(), schema);
let ecstate = EntryChangeState::new(&cid, &self.attrs, schema); let ecstate = EntryChangeState::new(&cid, &self.attrs, schema);
Entry { let mut ent = Entry {
valid: EntryInvalid { cid, ecstate }, valid: EntryInvalid { cid, ecstate },
state: EntryNew, state: EntryNew,
attrs: self.attrs, attrs: self.attrs,
} };
// trace!("trigger_last_changed - assign_cid");
ent.trigger_last_changed();
ent
} }
/// Compare this entry to another. /// Compare this entry to another.
@ -681,6 +686,131 @@ impl Entry<EntryIncremental, EntryNew> {
} }
} }
pub(crate) fn resolve_add_conflict(
&self,
cid: &Cid,
db_ent: &EntrySealedCommitted,
) -> (Option<EntrySealedNew>, EntryIncrementalCommitted) {
use crate::repl::entry::State;
debug_assert!(self.valid.uuid == db_ent.valid.uuid);
let self_cs = &self.valid.ecstate;
let db_cs = db_ent.get_changestate();
match (self_cs.current(), db_cs.current()) {
(
State::Live {
at: at_left,
changes: _changes_left,
},
State::Live {
at: at_right,
changes: _changes_right,
},
) => {
debug_assert!(at_left != at_right);
// Determine which of the entries must become the conflict
// and which will now persist. There are three possible cases.
//
// 1. The incoming ReplIncremental is after DBentry. This means RI is the
// conflicting node. We take no action and just return the db_ent
// as the valid state.
if at_left > at_right {
trace!("RI > DE, return DE");
(
None,
Entry {
valid: EntryIncremental {
uuid: db_ent.valid.uuid,
ecstate: db_cs.clone(),
},
state: EntryCommitted {
id: db_ent.state.id,
},
attrs: db_ent.attrs.clone(),
},
)
}
//
// 2. The incoming ReplIncremental is before DBentry. This means our
// DE is the conflicting note. There are now two choices:
// a. We are the origin of the DE, and thus must create the conflict
// entry for replication (to guarantee single create)
// b. We are not the origin of the DE and so do not create a conflict
// entry.
// In both cases we update the DE with the state of RI after we have
// followed the above logic.
else {
trace!("RI < DE, return RI");
// Are we the origin?
let conflict = if at_right.s_uuid == cid.s_uuid {
trace!("Origin process conflict entry");
// We are making a new entry!
let mut cnf_ent = Entry {
valid: EntryInvalid {
cid: cid.clone(),
ecstate: db_cs.clone(),
},
state: EntryNew,
attrs: db_ent.attrs.clone(),
};
// Setup the last changed to now.
cnf_ent.trigger_last_changed();
// Move the current uuid to source_uuid
cnf_ent.add_ava("source_uuid", Value::Uuid(db_ent.valid.uuid));
// We need to make a random uuid in the conflict gen process.
let new_uuid = Uuid::new_v4();
cnf_ent.purge_ava("uuid");
cnf_ent.add_ava("uuid", Value::Uuid(new_uuid));
cnf_ent.add_ava("class", Value::new_class("recycled"));
cnf_ent.add_ava("class", Value::new_class("conflict"));
// Now we have to internally bypass some states.
// This is okay because conflict entries aren't subject
// to schema anyway.
let Entry {
valid: EntryInvalid { cid: _, ecstate },
state,
attrs,
} = cnf_ent;
let cnf_ent = Entry {
valid: EntrySealed {
uuid: new_uuid,
ecstate,
},
state,
attrs,
};
Some(cnf_ent)
} else {
None
};
(
conflict,
Entry {
valid: EntryIncremental {
uuid: self.valid.uuid,
ecstate: self_cs.clone(),
},
state: EntryCommitted {
id: db_ent.state.id,
},
attrs: self.attrs.clone(),
},
)
}
}
// Can never get here due to is_add_conflict above.
_ => unreachable!(),
}
}
pub(crate) fn merge_state( pub(crate) fn merge_state(
&self, &self,
db_ent: &EntrySealedCommitted, db_ent: &EntrySealedCommitted,
@ -738,6 +868,8 @@ impl Entry<EntryIncremental, EntryNew> {
#[allow(clippy::todo)] #[allow(clippy::todo)]
if let Some(_attr_state) = vs_left.repl_merge_valueset(vs_right) if let Some(_attr_state) = vs_left.repl_merge_valueset(vs_right)
{ {
// TODO note: This is for special attr types that need to merge
// rather than choose content.
todo!(); todo!();
} else { } else {
changes.insert(attr_name.clone(), cid_left.clone()); changes.insert(attr_name.clone(), cid_left.clone());
@ -748,6 +880,8 @@ impl Entry<EntryIncremental, EntryNew> {
#[allow(clippy::todo)] #[allow(clippy::todo)]
if let Some(_attr_state) = vs_right.repl_merge_valueset(vs_left) if let Some(_attr_state) = vs_right.repl_merge_valueset(vs_left)
{ {
// TODO note: This is for special attr types that need to merge
// rather than choose content.
todo!(); todo!();
} else { } else {
changes.insert(attr_name.clone(), cid_right.clone()); changes.insert(attr_name.clone(), cid_right.clone());
@ -909,11 +1043,11 @@ impl Entry<EntryIncremental, EntryCommitted> {
attrs: self.attrs, attrs: self.attrs,
}; };
#[allow(clippy::todo)]
if let Err(e) = ne.validate(schema) { if let Err(e) = ne.validate(schema) {
warn!(uuid = ?self.valid.uuid, err = ?e, "Entry failed schema check, moving to a conflict state"); warn!(uuid = ?self.valid.uuid, err = ?e, "Entry failed schema check, moving to a conflict state");
ne.add_ava_int("class", Value::new_class("recycled"));
ne.add_ava_int("class", Value::new_class("conflict")); ne.add_ava_int("class", Value::new_class("conflict"));
todo!(); ne.add_ava_int("source_uuid", Value::Uuid(self.valid.uuid));
} }
ne ne
} }
@ -1006,6 +1140,8 @@ impl Entry<EntryInvalid, EntryCommitted> {
pub fn to_revived(mut self) -> Self { pub fn to_revived(mut self) -> Self {
// This will put the modify ahead of the revive transition. // This will put the modify ahead of the revive transition.
self.remove_ava("class", &PVCLASS_RECYCLED); self.remove_ava("class", &PVCLASS_RECYCLED);
self.remove_ava("class", &PVCLASS_CONFLICT);
self.purge_ava("source_uuid");
// Change state repl doesn't need this flag // Change state repl doesn't need this flag
// self.valid.ecstate.revive(&self.valid.cid); // self.valid.ecstate.revive(&self.valid.cid);
@ -1139,6 +1275,11 @@ impl<STATE> Entry<EntrySealed, STATE> {
} }
impl Entry<EntrySealed, EntryCommitted> { impl Entry<EntrySealed, EntryCommitted> {
#[cfg(test)]
pub(crate) fn get_last_changed(&self) -> Cid {
self.valid.ecstate.get_tail_cid()
}
#[cfg(test)] #[cfg(test)]
pub unsafe fn into_sealed_committed(self) -> Entry<EntrySealed, EntryCommitted> { pub unsafe fn into_sealed_committed(self) -> Entry<EntrySealed, EntryCommitted> {
// NO-OP to satisfy macros. // NO-OP to satisfy macros.
@ -1748,7 +1889,14 @@ impl<STATE> Entry<EntryValid, STATE> {
return Err(SchemaError::NoClassFound); return Err(SchemaError::NoClassFound);
} }
// Do we have extensible? if self.attribute_equality("class", &PVCLASS_CONFLICT) {
// Conflict entries are exempt from schema enforcement. Return true.
trace!("Skipping schema validation on conflict entry");
return Ok(());
};
// Do we have extensible? We still validate syntax of attrs but don't
// check for valid object structures.
let extensible = self.attribute_equality("class", &PVCLASS_EXTENSIBLE); let extensible = self.attribute_equality("class", &PVCLASS_EXTENSIBLE);
let entry_classes = self.get_ava_set("class").ok_or_else(|| { let entry_classes = self.get_ava_set("class").ok_or_else(|| {
@ -1988,11 +2136,11 @@ impl<STATE> Entry<EntryValid, STATE> {
} }
} }
impl<STATE> Entry<EntrySealed, STATE> { impl<STATE> Entry<EntrySealed, STATE>
pub fn invalidate(mut self, cid: Cid) -> Entry<EntryInvalid, STATE> { where
/* Setup our last changed time. */ STATE: Clone,
self.set_last_changed(cid.clone()); {
pub fn invalidate(self, cid: Cid) -> Entry<EntryInvalid, STATE> {
Entry { Entry {
valid: EntryInvalid { valid: EntryInvalid {
cid, cid,
@ -2001,18 +2149,22 @@ impl<STATE> Entry<EntrySealed, STATE> {
state: self.state, state: self.state,
attrs: self.attrs, attrs: self.attrs,
} }
/* Setup our last changed time. */
// We can't actually trigger last changed here. This creates a replication loop
// inside of memberof plugin which invalidates. For now we treat last_changed
// more as "create" so we only trigger it via assign_cid in the create path
// and in conflict entry creation.
/*
trace!("trigger_last_changed - invalidate");
ent.trigger_last_changed();
ent
*/
} }
pub fn get_uuid(&self) -> Uuid { pub fn get_uuid(&self) -> Uuid {
self.valid.uuid self.valid.uuid
} }
/*
pub fn get_changelog(&self) -> &EntryChangelog {
&self.valid.eclog
}
*/
pub fn get_changestate(&self) -> &EntryChangeState { pub fn get_changestate(&self) -> &EntryChangeState {
&self.valid.ecstate &self.valid.ecstate
} }
@ -2185,19 +2337,12 @@ impl<VALID, STATE> Entry<VALID, STATE> {
} }
/// Update the last_changed flag of this entry to the given change identifier. /// Update the last_changed flag of this entry to the given change identifier.
#[cfg(test)]
fn set_last_changed(&mut self, cid: Cid) { fn set_last_changed(&mut self, cid: Cid) {
let cv = vs_cid![cid]; let cv = vs_cid![cid];
let _ = self.attrs.insert(AttrString::from("last_modified_cid"), cv); let _ = self.attrs.insert(AttrString::from("last_modified_cid"), cv);
} }
#[cfg(test)]
pub(crate) fn get_last_changed(&self) -> Cid {
self.attrs
.get("last_modified_cid")
.and_then(|vs| vs.to_cid_single())
.unwrap()
}
pub(crate) fn get_display_id(&self) -> String { pub(crate) fn get_display_id(&self) -> String {
self.attrs self.attrs
.get("spn") .get("spn")
@ -2704,17 +2849,20 @@ impl<STATE> Entry<EntryInvalid, STATE>
where where
STATE: Clone, STATE: Clone,
{ {
fn trigger_last_changed(&mut self) {
self.valid
.ecstate
.change_ava(&self.valid.cid, "last_modified_cid");
let cv = vs_cid![self.valid.cid.clone()];
let _ = self.attrs.insert(AttrString::from("last_modified_cid"), cv);
}
// This should always work? It's only on validate that we'll build // This should always work? It's only on validate that we'll build
// a list of syntax violations ... // a list of syntax violations ...
// If this already exists, we silently drop the event. This is because // If this already exists, we silently drop the event. This is because
// we need this to be *state* based where we assert presence. // we need this to be *state* based where we assert presence.
pub fn add_ava(&mut self, attr: &str, value: Value) { pub fn add_ava(&mut self, attr: &str, value: Value) {
self.valid.ecstate.change_ava(&self.valid.cid, attr); self.valid.ecstate.change_ava(&self.valid.cid, attr);
/*
self.valid
.eclog
.add_ava_iter(&self.valid.cid, attr, std::iter::once(value.clone()));
*/
self.add_ava_int(attr, value); self.add_ava_int(attr, value);
} }

View file

@ -16,7 +16,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
&mut self, &mut self,
ctx_entries: &[ReplIncrementalEntryV1], ctx_entries: &[ReplIncrementalEntryV1],
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
trace!(?ctx_entries); // trace!(?ctx_entries);
// No action needed for this if the entries are empty. // No action needed for this if the entries are empty.
if ctx_entries.is_empty() { if ctx_entries.is_empty() {
@ -43,7 +43,6 @@ impl<'a> QueryServerWriteTransaction<'a> {
e e
})?; })?;
trace!("===========================================");
trace!(?ctx_entries); trace!(?ctx_entries);
let db_entries = self.be_txn.incremental_prepare(&ctx_entries).map_err(|e| { let db_entries = self.be_txn.incremental_prepare(&ctx_entries).map_err(|e| {
@ -69,28 +68,23 @@ impl<'a> QueryServerWriteTransaction<'a> {
// /- entries that need to be created as conflicts. // /- entries that need to be created as conflicts.
// | /- entries that survive and need update to the db in place. // | /- entries that survive and need update to the db in place.
// v v // v v
#[allow(clippy::todo)]
let (conflict_create, conflict_update): ( let (conflict_create, conflict_update): (
Vec<EntrySealedNew>, Vec<Option<EntrySealedNew>>,
Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)>, Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)>,
) = conflicts ) = conflicts
.into_iter() .into_iter()
.map(|(_ctx_ent, _db_ent)| { .map(
// Determine which of the entries must become the conflict |(ctx_ent, db_ent): (&EntryIncrementalNew, Arc<EntrySealedCommitted>)| {
// and which will now persist. There are two possible cases. let (opt_create, ent) =
// ctx_ent.resolve_add_conflict(self.get_cid(), db_ent.as_ref());
// 1. The ReplIncremental is after the DBEntry, and becomes the conflict. (opt_create, (ent, db_ent))
// This means we just update the db entry with itself. },
// )
// 2. The ReplIncremental is before the DBEntry, and becomes live.
// This means we have to take the DBEntry as it exists, convert
// it to a new entry. Then we have to take the repl incremental
// entry and place it into the update queue.
todo!();
})
.unzip(); .unzip();
// Filter out None from conflict_create
let conflict_create: Vec<EntrySealedNew> = conflict_create.into_iter().flatten().collect();
let proceed_update: Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)> = proceed let proceed_update: Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)> = proceed
.into_iter() .into_iter()
.map(|(ctx_ent, db_ent)| { .map(|(ctx_ent, db_ent)| {
@ -219,9 +213,10 @@ impl<'a> QueryServerWriteTransaction<'a> {
error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly."); error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
Ok(ConsumerState::RefreshRequired) Ok(ConsumerState::RefreshRequired)
} }
#[allow(clippy::todo)]
ReplIncrementalContext::UnwillingToSupply => { ReplIncrementalContext::UnwillingToSupply => {
todo!(); warn!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is ahead, and replication would introduce data corruption.");
error!("This supplier's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
Ok(ConsumerState::Ok)
} }
ReplIncrementalContext::V1 { ReplIncrementalContext::V1 {
domain_version, domain_version,
@ -273,6 +268,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
// == ⚠️ Below this point we begin to make changes! == // == ⚠️ Below this point we begin to make changes! ==
debug!("Applying schema entries");
// Apply the schema entries first. // Apply the schema entries first.
self.consumer_incremental_apply_entries(ctx_schema_entries) self.consumer_incremental_apply_entries(ctx_schema_entries)
.map_err(|e| { .map_err(|e| {
@ -286,6 +282,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
e e
})?; })?;
debug!("Applying meta entries");
// Apply meta entries now. // Apply meta entries now.
self.consumer_incremental_apply_entries(ctx_meta_entries) self.consumer_incremental_apply_entries(ctx_meta_entries)
.map_err(|e| { .map_err(|e| {
@ -305,6 +302,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
self.changed_schema = true; self.changed_schema = true;
self.changed_domain = true; self.changed_domain = true;
debug!("Applying all context entries");
// Update all other entries now. // Update all other entries now.
self.consumer_incremental_apply_entries(ctx_entries) self.consumer_incremental_apply_entries(ctx_entries)
.map_err(|e| { .map_err(|e| {

View file

@ -137,6 +137,11 @@ impl EntryChangeState {
} }
} }
#[cfg(test)]
pub(crate) fn get_tail_cid(&self) -> Cid {
self.cid_iter().pop().cloned().unwrap()
}
pub fn cid_iter(&self) -> Vec<&Cid> { pub fn cid_iter(&self) -> Vec<&Cid> {
match &self.st { match &self.st {
State::Live { at: _, changes } => { State::Live { at: _, changes } => {

View file

@ -72,6 +72,8 @@ fn repl_incremental(
.supplier_provide_changes(a_ruv_range) .supplier_provide_changes(a_ruv_range)
.expect("Unable to generate supplier changes"); .expect("Unable to generate supplier changes");
trace!(?changes, "supplying changes");
// Check the changes = should be empty. // Check the changes = should be empty.
to.consumer_apply_changes(&changes) to.consumer_apply_changes(&changes)
.expect("Unable to apply changes to consumer."); .expect("Unable to apply changes to consumer.");
@ -1122,13 +1124,560 @@ async fn test_repl_increment_basic_bidirectional_tombstone(
// conflict cases. // conflict cases.
// both add entry with same uuid - only one can win! // both add entry with same uuid - only one can win!
#[qs_pair_test]
async fn test_repl_increment_creation_uuid_conflict(
server_a: &QueryServer,
server_b: &QueryServer,
) {
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn).is_ok());
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Now create the same entry on both servers.
let t_uuid = Uuid::new_v4();
let e_init = entry_init!(
("class", Value::new_class("object")),
("class", Value::new_class("person")),
("name", Value::new_iname("testperson1")),
("uuid", Value::Uuid(t_uuid)),
("description", Value::new_utf8s("testperson1")),
("displayname", Value::new_utf8s("testperson1"))
);
let mut server_b_txn = server_b.write(ct).await;
assert!(server_b_txn.internal_create(vec![e_init.clone(),]).is_ok());
server_b_txn.commit().expect("Failed to commit");
// Get a new time.
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
assert!(server_a_txn.internal_create(vec![e_init.clone(),]).is_ok());
server_a_txn.commit().expect("Failed to commit");
// Replicate A to B. B should ignore.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
trace!("{:?}", e1.get_last_changed());
trace!("{:?}", e2.get_last_changed());
// e2 from b will be smaller as it's the older entry.
assert!(e1.get_last_changed() > e2.get_last_changed());
// Check that no conflict entries exist yet.
let cnf_a = server_a_txn
.internal_search_conflict_uuid(t_uuid)
.expect("Unable to conflict entries.");
assert!(cnf_a.is_empty());
let cnf_b = server_b_txn
.internal_search_conflict_uuid(t_uuid)
.expect("Unable to conflict entries.");
assert!(cnf_b.is_empty());
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
// Replicate B to A. A should replace with B, and create the
// conflict entry as it's the origin of the conflict.
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
trace!("========================================");
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1.get_last_changed() == e2.get_last_changed());
let cnf_a = server_a_txn
.internal_search_conflict_uuid(t_uuid)
.expect("Unable to conflict entries.")
// Should be a vec.
.pop()
.expect("No conflict entries present");
assert!(cnf_a.get_ava_single_iname("name") == Some("testperson1"));
let cnf_b = server_b_txn
.internal_search_conflict_uuid(t_uuid)
.expect("Unable to conflict entries.");
assert!(cnf_b.is_empty());
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// At this point server a now has the conflict entry, and we have to confirm
// it can be sent to b.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
// Now the repl should have caused the conflict to be on both sides.
let cnf_a = server_a_txn
.internal_search_conflict_uuid(t_uuid)
.expect("Unable to conflict entries.")
// Should be a vec.
.pop()
.expect("No conflict entries present");
let cnf_b = server_b_txn
.internal_search_conflict_uuid(t_uuid)
.expect("Unable to conflict entries.")
// Should be a vec.
.pop()
.expect("No conflict entries present");
assert!(cnf_a.get_last_changed() == cnf_b.get_last_changed());
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
}
// both add entry with same uuid, but one becomes ts - ts always wins. // both add entry with same uuid, but one becomes ts - ts always wins.
#[qs_pair_test]
async fn test_repl_increment_create_tombstone_uuid_conflict(
server_a: &QueryServer,
server_b: &QueryServer,
) {
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn).is_ok());
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Now create the same entry on both servers.
let t_uuid = Uuid::new_v4();
let e_init = entry_init!(
("class", Value::new_class("object")),
("class", Value::new_class("person")),
("name", Value::new_iname("testperson1")),
("uuid", Value::Uuid(t_uuid)),
("description", Value::new_utf8s("testperson1")),
("displayname", Value::new_utf8s("testperson1"))
);
let mut server_b_txn = server_b.write(ct).await;
assert!(server_b_txn.internal_create(vec![e_init.clone(),]).is_ok());
server_b_txn.commit().expect("Failed to commit");
// Since A was added second, this should normal be the entry that loses in the
// conflict resolve case, but here because it's tombstoned, we actually see it
// persist
// Get a new time.
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
assert!(server_a_txn.internal_create(vec![e_init.clone(),]).is_ok());
// Immediately send it to the shadow realm
assert!(server_a_txn.internal_delete_uuid(t_uuid).is_ok());
server_a_txn.commit().expect("Failed to commit");
// Tombstone the entry.
let ct = ct + Duration::from_secs(RECYCLEBIN_MAX_AGE + 1);
let mut server_a_txn = server_a.write(ct).await;
assert!(server_a_txn.purge_recycled().is_ok());
server_a_txn.commit().expect("Failed to commit");
// Do B -> A - no change on A. Normally this would create the conflict
// on A since it's the origin, but here since it's a TS it now takes
// precedence.
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
trace!("========================================");
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 != e2);
// E1 from A is a ts
assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE));
// E2 from B is not a TS
assert!(!e2.attribute_equality("class", &PVCLASS_TOMBSTONE));
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Now A -> B - this should cause B to become a TS even though it's AT is
// earlier.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE));
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
}
// both add entry with same uuid, both become ts - merge, take lowest AT. // both add entry with same uuid, both become ts - merge, take lowest AT.
#[qs_pair_test]
async fn test_repl_increment_create_tombstone_conflict(
server_a: &QueryServer,
server_b: &QueryServer,
) {
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn).is_ok());
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Now create the same entry on both servers.
let t_uuid = Uuid::new_v4();
let e_init = entry_init!(
("class", Value::new_class("object")),
("class", Value::new_class("person")),
("name", Value::new_iname("testperson1")),
("uuid", Value::Uuid(t_uuid)),
("description", Value::new_utf8s("testperson1")),
("displayname", Value::new_utf8s("testperson1"))
);
let mut server_b_txn = server_b.write(ct).await;
assert!(server_b_txn.internal_create(vec![e_init.clone(),]).is_ok());
// Immediately send it to the shadow realm
assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok());
server_b_txn.commit().expect("Failed to commit");
// Get a new time.
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
assert!(server_a_txn.internal_create(vec![e_init.clone(),]).is_ok());
// Immediately send it to the shadow realm
assert!(server_a_txn.internal_delete_uuid(t_uuid).is_ok());
server_a_txn.commit().expect("Failed to commit");
// Tombstone on both sides.
let ct = ct + Duration::from_secs(RECYCLEBIN_MAX_AGE + 1);
let mut server_b_txn = server_b.write(ct).await;
assert!(server_b_txn.purge_recycled().is_ok());
server_b_txn.commit().expect("Failed to commit");
let ct = ct + Duration::from_secs(RECYCLEBIN_MAX_AGE + 2);
let mut server_a_txn = server_a.write(ct).await;
assert!(server_a_txn.purge_recycled().is_ok());
server_a_txn.commit().expect("Failed to commit");
// Since B was tombstoned first, it is the tombstone that should persist.
// This means A -> B - no change on B, it's the persisting tombstone.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1.get_last_changed() > e2.get_last_changed());
// Yet, they are both TS. Curious.
assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE));
assert!(e2.attribute_equality("class", &PVCLASS_TOMBSTONE));
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
// B -> A - A should now have the lower AT reflected.
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
trace!("========================================");
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE));
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
}
// Test schema conflict state - add attr A on one side, and then remove the supporting
// class on the other. On repl both sides move to conflict.
#[qs_pair_test]
async fn test_repl_increment_schema_conflict(server_a: &QueryServer, server_b: &QueryServer) {
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn).is_ok());
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Setup the entry we plan to break.
let mut server_b_txn = server_b.write(ct).await;
let t_uuid = Uuid::new_v4();
assert!(server_b_txn
.internal_create(vec![entry_init!(
("class", Value::new_class("object")),
("class", Value::new_class("person")),
("name", Value::new_iname("testperson1")),
("uuid", Value::Uuid(t_uuid)),
("description", Value::new_utf8s("testperson1")),
("displayname", Value::new_utf8s("testperson1"))
),])
.is_ok());
server_b_txn.commit().expect("Failed to commit");
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
trace!("========================================");
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Now at this point we need to write to both sides. The order *does* matter
// here because we need the displayname write to happen *after* the purge
// on the B node.
// This is a really rare/wild change to swap an object out to a group but it
// works well for our test here.
let ct = ct + Duration::from_secs(1);
let mut server_b_txn = server_b.write(ct).await;
let modlist = ModifyList::new_list(vec![
Modify::Removed("class".into(), PVCLASS_PERSON.clone()),
Modify::Present("class".into(), CLASS_GROUP.clone()),
Modify::Purged("displayname".into()),
]);
assert!(server_b_txn.internal_modify_uuid(t_uuid, &modlist).is_ok());
server_b_txn.commit().expect("Failed to commit");
// On A we'll change the displayname which is predicated on being a person still
let ct = ct + Duration::from_secs(1);
let mut server_a_txn = server_a.write(ct).await;
assert!(server_a_txn
.internal_modify_uuid(
t_uuid,
&ModifyList::new_purge_and_set(
"displayname",
Value::Utf8("Updated displayname".to_string())
)
)
.is_ok());
server_a_txn.commit().expect("Failed to commit");
// Now we have to replicate again. It shouldn't matter *which* direction we go first
// because *both* should end in the conflict state.
//
// B -> A
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
trace!("========================================");
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
assert!(e1.attribute_equality("class", &PVCLASS_CONFLICT));
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// A -> B
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e2.attribute_equality("class", &PVCLASS_CONFLICT));
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
}
// Test RUV content when a server's changes have been trimmed out and are not present // Test RUV content when a server's changes have been trimmed out and are not present
// in a refresh. This is not about tombstones, this is about attribute state. // in a refresh. This is not about tombstones, this is about attribute state.
#[qs_pair_test]
async fn test_repl_increment_consumer_lagging_attributes(
server_a: &QueryServer,
server_b: &QueryServer,
) {
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
// Add an entry.
let mut server_b_txn = server_b.write(ct).await;
let t_uuid = Uuid::new_v4();
assert!(server_b_txn
.internal_create(vec![entry_init!(
("class", Value::new_class("object")),
("class", Value::new_class("person")),
("name", Value::new_iname("testperson1")),
("uuid", Value::Uuid(t_uuid)),
("description", Value::new_utf8s("testperson1")),
("displayname", Value::new_utf8s("testperson1"))
),])
.is_ok());
server_b_txn.commit().expect("Failed to commit");
// Now setup bidirectional replication. We only need to trigger B -> A
// here because that's all that has changes.
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
trace!("========================================");
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Okay, now we do a change on B and then we'll push time ahead of changelog
// ruv trim. This should mean that the indexes to find those changes are lost.
let ct = ct + Duration::from_secs(1);
let mut server_b_txn = server_b.write(ct).await;
assert!(server_b_txn
.internal_modify_uuid(
t_uuid,
&ModifyList::new_purge_and_set(
"displayname",
Value::Utf8("Updated displayname".to_string())
)
)
.is_ok());
server_b_txn.commit().expect("Failed to commit");
// Now we advance the time.
let ct = ct + Duration::from_secs(CHANGELOG_MAX_AGE + 1);
// And setup the ruv trim. This is triggered by purge/reap tombstones.
let mut server_b_txn = server_b.write(ct).await;
assert!(server_b_txn.purge_tombstones().is_ok());
server_b_txn.commit().expect("Failed to commit");
// Okay, ready to go. When we do A -> B or B -> A we should get appropriate
// errors regarding the delay state.
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
let a_ruv_range = server_a_txn
.consumer_get_state()
.expect("Unable to access RUV range");
let changes = server_b_txn
.supplier_provide_changes(a_ruv_range)
.expect("Unable to generate supplier changes");
assert!(matches!(changes, ReplIncrementalContext::RefreshRequired));
let result = server_a_txn
.consumer_apply_changes(&changes)
.expect("Unable to apply changes to consumer.");
assert!(matches!(result, ConsumerState::RefreshRequired));
drop(server_a_txn);
drop(server_b_txn);
// Reverse it!
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(ct).await;
let b_ruv_range = server_b_txn
.consumer_get_state()
.expect("Unable to access RUV range");
let changes = server_a_txn
.supplier_provide_changes(b_ruv_range)
.expect("Unable to generate supplier changes");
assert!(matches!(changes, ReplIncrementalContext::UnwillingToSupply));
let result = server_b_txn
.consumer_apply_changes(&changes)
.expect("Unable to apply changes to consumer.");
assert!(matches!(result, ConsumerState::Ok));
drop(server_a_txn);
drop(server_b_txn);
}
// Test change of a domain name over incremental. // Test change of a domain name over incremental.

View file

@ -98,7 +98,6 @@ pub struct SchemaAttribute {
impl SchemaAttribute { impl SchemaAttribute {
pub fn try_from(value: &Entry<EntrySealed, EntryCommitted>) -> Result<Self, OperationError> { pub fn try_from(value: &Entry<EntrySealed, EntryCommitted>) -> Result<Self, OperationError> {
// Convert entry to a schema attribute. // Convert entry to a schema attribute.
trace!("Converting -> {}", value);
// uuid // uuid
let uuid = value.get_uuid(); let uuid = value.get_uuid();
@ -722,6 +721,25 @@ impl<'a> SchemaWriteTransaction<'a> {
syntax: SyntaxType::Uuid, syntax: SyntaxType::Uuid,
}, },
); );
self.attributes.insert(
AttrString::from("source_uuid"),
SchemaAttribute {
name: AttrString::from("source_uuid"),
uuid: UUID_SCHEMA_ATTR_SOURCE_UUID,
description: String::from(
"The universal unique id of the source object where this conflict came from",
),
multivalue: false,
// Uniqueness is handled by base.rs, not attrunique here due to
// needing to check recycled objects too.
unique: false,
phantom: false,
sync_allowed: false,
replicated: true,
index: vec![IndexType::Equality, IndexType::Presence],
syntax: SyntaxType::Uuid,
},
);
self.attributes.insert( self.attributes.insert(
AttrString::from("last_modified_cid"), AttrString::from("last_modified_cid"),
SchemaAttribute { SchemaAttribute {
@ -1731,6 +1749,19 @@ impl<'a> SchemaWriteTransaction<'a> {
.. Default::default() .. Default::default()
}, },
); );
self.classes.insert(
AttrString::from("conflict"),
SchemaClass {
name: AttrString::from("conflict"),
uuid: UUID_SCHEMA_CLASS_CONFLICT,
description: String::from(
"An entry representing conflicts that occurred during replication",
),
systemmust: vec![AttrString::from("source_uuid")],
systemsupplements: vec![AttrString::from("recycled")],
..Default::default()
},
);
// sysinfo // sysinfo
self.classes.insert( self.classes.insert(
AttrString::from("system_info"), AttrString::from("system_info"),

View file

@ -77,12 +77,12 @@ fn search_filter_entry<'a>(
// If this is an internal search, return our working set. // If this is an internal search, return our working set.
match &ident.origin { match &ident.origin {
IdentType::Internal => { IdentType::Internal => {
trace!("Internal operation, bypassing access check"); trace!(uuid = ?entry.get_display_id(), "Internal operation, bypassing access check");
// No need to check ACS // No need to check ACS
return AccessResult::Grant; return AccessResult::Grant;
} }
IdentType::Synch(_) => { IdentType::Synch(_) => {
security_critical!("Blocking sync check"); security_critical!(uuid = ?entry.get_display_id(), "Blocking sync check");
return AccessResult::Denied; return AccessResult::Denied;
} }
IdentType::User(_) => {} IdentType::User(_) => {}
@ -109,7 +109,7 @@ fn search_filter_entry<'a>(
Some(acs.attrs.iter().map(|s| s.as_str())) Some(acs.attrs.iter().map(|s| s.as_str()))
} else { } else {
// should this be `security_access`? // should this be `security_access`?
security_debug!(entry = ?entry.get_uuid(), acs = %acs.acp.name, "entry DOES NOT match acs"); security_debug!(entry = ?entry.get_display_id(), acs = %acs.acp.name, "entry DOES NOT match acs");
None None
} }
}) })

View file

@ -445,6 +445,25 @@ pub trait QueryServerTransaction<'a> {
} }
} }
/// Get all conflict entries that originated from a source uuid.
#[instrument(level = "debug", skip_all)]
fn internal_search_conflict_uuid(
&mut self,
uuid: Uuid,
) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
let filter = filter_all!(f_and(vec![
f_eq("source_uuid", PartialValue::Uuid(uuid)),
f_eq("class", PVCLASS_CONFLICT.clone())
]));
let f_valid = filter.validate(self.get_schema()).map_err(|e| {
error!(?e, "Filter Validate - SchemaViolation");
OperationError::SchemaViolation(e)
})?;
let se = SearchEvent::new_internal(f_valid);
self.search(&se)
}
#[instrument(level = "debug", skip_all)] #[instrument(level = "debug", skip_all)]
fn impersonate_search_ext_uuid( fn impersonate_search_ext_uuid(
&mut self, &mut self,
@ -1171,6 +1190,10 @@ impl<'a> QueryServerWriteTransaction<'a> {
self.curtime self.curtime
} }
pub(crate) fn get_cid(&self) -> &Cid {
&self.cid
}
pub(crate) fn get_dyngroup_cache(&mut self) -> &mut DynGroupCache { pub(crate) fn get_dyngroup_cache(&mut self) -> &mut DynGroupCache {
&mut self.dyngroup_cache &mut self.dyngroup_cache
} }