Add further incremental replication tests (#1707)

This commit is contained in:
Firstyear 2023-06-07 14:14:43 +10:00 committed by GitHub
parent 4f3bfd1025
commit 152bf95e71
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 459 additions and 105 deletions

View file

@ -665,9 +665,20 @@ impl Entry<EntryIncremental, EntryNew> {
} }
pub(crate) fn is_add_conflict(&self, db_entry: &EntrySealedCommitted) -> bool { pub(crate) fn is_add_conflict(&self, db_entry: &EntrySealedCommitted) -> bool {
use crate::repl::entry::State;
debug_assert!(self.valid.uuid == db_entry.valid.uuid); debug_assert!(self.valid.uuid == db_entry.valid.uuid);
// This is a conflict if the state 'at' is not identical // This is a conflict if the state 'at' is not identical
self.valid.ecstate.at() != db_entry.valid.ecstate.at() let self_cs = &self.valid.ecstate;
let db_cs = db_entry.get_changestate();
// Can only add conflict on live entries.
match (self_cs.current(), db_cs.current()) {
(State::Live { at: at_left, .. }, State::Live { at: at_right, .. }) => {
at_left != at_right
}
// Tombstone will always overwrite.
_ => false,
}
} }
pub(crate) fn merge_state( pub(crate) fn merge_state(
@ -805,10 +816,7 @@ impl Entry<EntryIncremental, EntryNew> {
attrs: eattrs, attrs: eattrs,
} }
} }
(State::Tombstone { at: left_at }, State::Tombstone { at: right_at }) => { (State::Tombstone { at: left_at }, State::Live { .. }) => {
// Due to previous checks, this must be equal!
debug_assert!(left_at == right_at);
debug_assert!(self.attrs == db_ent.attrs);
// We have to generate the attrs here, since on replication // We have to generate the attrs here, since on replication
// we just send the tombstone ecstate rather than attrs. Our // we just send the tombstone ecstate rather than attrs. Our
// db stub also lacks these attributes too. // db stub also lacks these attributes too.
@ -831,23 +839,14 @@ impl Entry<EntryIncremental, EntryNew> {
attrs: attrs_new, attrs: attrs_new,
} }
} }
(State::Tombstone { .. }, State::Live { .. }) => {
debug_assert!(false);
// Keep the left side.
Entry {
valid: EntryIncremental {
uuid: self.valid.uuid,
ecstate: self.valid.ecstate.clone(),
},
state: EntryCommitted {
id: db_ent.state.id,
},
attrs: self.attrs.clone(),
}
}
(State::Live { .. }, State::Tombstone { .. }) => { (State::Live { .. }, State::Tombstone { .. }) => {
debug_assert!(false); // Our current DB entry is a tombstone - ignore the incoming live
// Keep the right side // entry and just retain our DB tombstone.
//
// Note we don't need to gen the attrs here since if a stub was made then
// we'd be live:live. To be in live:ts, then our db entry MUST exist and
// must be a ts.
Entry { Entry {
valid: EntryIncremental { valid: EntryIncremental {
uuid: db_ent.valid.uuid, uuid: db_ent.valid.uuid,
@ -859,6 +858,36 @@ impl Entry<EntryIncremental, EntryNew> {
attrs: db_ent.attrs.clone(), attrs: db_ent.attrs.clone(),
} }
} }
(State::Tombstone { at: left_at }, State::Tombstone { at: right_at }) => {
// WARNING - this differs from the other tombstone check cases
// lower of the two AT values. This way replicas always have the
// earliest TS value. It's a rare case but needs handling.
let (at, ecstate) = if left_at < right_at {
(left_at, self.valid.ecstate.clone())
} else {
(right_at, db_ent.valid.ecstate.clone())
};
let mut attrs_new: Eattrs = Map::new();
let class_ava = vs_iutf8!["object", "tombstone"];
let last_mod_ava = vs_cid![at.clone()];
attrs_new.insert(AttrString::from("uuid"), vs_uuid![db_ent.valid.uuid]);
attrs_new.insert(AttrString::from("class"), class_ava);
attrs_new.insert(AttrString::from("last_modified_cid"), last_mod_ava);
Entry {
valid: EntryIncremental {
uuid: db_ent.valid.uuid,
ecstate,
},
state: EntryCommitted {
id: db_ent.state.id,
},
attrs: attrs_new,
}
}
} }
} }
} }
@ -2158,6 +2187,14 @@ impl<VALID, STATE> Entry<VALID, STATE> {
let _ = self.attrs.insert(AttrString::from("last_modified_cid"), cv); let _ = self.attrs.insert(AttrString::from("last_modified_cid"), cv);
} }
#[cfg(test)]
pub(crate) fn get_last_changed(&self) -> Cid {
self.attrs
.get("last_modified_cid")
.and_then(|vs| vs.to_cid_single())
.unwrap()
}
#[inline(always)] #[inline(always)]
/// Get an iterator over the current set of attribute names that this entry contains. /// Get an iterator over the current set of attribute names that this entry contains.
pub fn get_ava_names(&self) -> impl Iterator<Item = &str> { pub fn get_ava_names(&self) -> impl Iterator<Item = &str> {

View file

@ -818,4 +818,12 @@ impl ReviveRecycledEvent {
filter: filter.into_valid(), filter: filter.into_valid(),
} }
} }
#[cfg(test)]
pub(crate) fn new_internal(filter: Filter<FilterValid>) -> Self {
ReviveRecycledEvent {
ident: Identity::from_internal(),
filter,
}
}
} }

View file

@ -51,6 +51,8 @@ impl<'a> QueryServerWriteTransaction<'a> {
e e
})?; })?;
trace!(?db_entries);
// Need to probably handle conflicts here in this phase. I think they // Need to probably handle conflicts here in this phase. I think they
// need to be pushed to a separate list where they are then "created" // need to be pushed to a separate list where they are then "created"
// as a conflict. // as a conflict.

View file

@ -172,16 +172,20 @@ impl<'a> QueryServerReadTransaction<'a> {
f_eq("uuid", PVUUID_SYSTEM_CONFIG.clone()), f_eq("uuid", PVUUID_SYSTEM_CONFIG.clone()),
])); ]));
let entry_filter = filter!(f_and!([ let entry_filter = filter_all!(f_or!([
f_pres("class"), f_and!([
f_andnot(f_or(vec![ f_pres("class"),
// These are from above! f_andnot(f_or(vec![
f_eq("class", PVCLASS_ATTRIBUTETYPE.clone()), // These are from above!
f_eq("class", PVCLASS_CLASSTYPE.clone()), f_eq("class", PVCLASS_ATTRIBUTETYPE.clone()),
f_eq("uuid", PVUUID_DOMAIN_INFO.clone()), f_eq("class", PVCLASS_CLASSTYPE.clone()),
f_eq("uuid", PVUUID_SYSTEM_INFO.clone()), f_eq("uuid", PVUUID_DOMAIN_INFO.clone()),
f_eq("uuid", PVUUID_SYSTEM_CONFIG.clone()), f_eq("uuid", PVUUID_SYSTEM_INFO.clone()),
])), f_eq("uuid", PVUUID_SYSTEM_CONFIG.clone()),
])),
]),
f_eq("class", PVCLASS_TOMBSTONE.clone()),
f_eq("class", PVCLASS_RECYCLED.clone()),
])); ]));
let schema_entries = self let schema_entries = self

View file

@ -1,6 +1,7 @@
use crate::be::BackendTransaction; use crate::be::BackendTransaction;
use crate::prelude::*; use crate::prelude::*;
use crate::repl::consumer::ConsumerState; use crate::repl::consumer::ConsumerState;
use crate::repl::entry::State;
use crate::repl::proto::ReplIncrementalContext; use crate::repl::proto::ReplIncrementalContext;
use crate::repl::ruv::ReplicationUpdateVectorTransaction; use crate::repl::ruv::ReplicationUpdateVectorTransaction;
use crate::repl::ruv::{RangeDiffStatus, ReplicationUpdateVector}; use crate::repl::ruv::{RangeDiffStatus, ReplicationUpdateVector};
@ -750,16 +751,374 @@ async fn test_repl_increment_simultaneous_bidirectional_write(
// TS on B // TS on B
// B -> A TS // B -> A TS
#[qs_pair_test]
async fn test_repl_increment_basic_bidirectional_lifecycle(
server_a: &QueryServer,
server_b: &QueryServer,
) {
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
// Add an entry.
let mut server_b_txn = server_b.write(ct).await;
let t_uuid = Uuid::new_v4();
assert!(server_b_txn
.internal_create(vec![entry_init!(
("class", Value::new_class("object")),
("class", Value::new_class("person")),
("name", Value::new_iname("testperson1")),
("uuid", Value::Uuid(t_uuid)),
("description", Value::new_utf8s("testperson1")),
("displayname", Value::new_utf8s("testperson1"))
),])
.is_ok());
server_b_txn.commit().expect("Failed to commit");
// Assert the entry is not on A.
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert_eq!(
server_a_txn.internal_search_uuid(t_uuid),
Err(OperationError::NoMatchingEntries)
);
// from to
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Delete on A
let mut server_a_txn = server_a.write(ct).await;
assert!(server_a_txn.internal_delete_uuid(t_uuid).is_ok());
server_a_txn.commit().expect("Failed to commit");
// Repl A -> B
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(ct).await;
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
// They are consistent again.
assert!(e1 == e2);
assert!(e1.attribute_equality("class", &PVCLASS_RECYCLED));
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
// At an earlier time make a change on A.
let mut server_a_txn = server_a.write(ct).await;
assert!(server_a_txn.internal_revive_uuid(t_uuid).is_ok());
server_a_txn.commit().expect("Failed to commit");
// Now move past the recyclebin time.
let ct = ct + Duration::from_secs(RECYCLEBIN_MAX_AGE + 1);
// Now TS on B.
let mut server_b_txn = server_b.write(ct).await;
assert!(server_b_txn.purge_recycled().is_ok());
server_b_txn.commit().expect("Failed to commit");
// Repl A -> B - B will silently reject the update due to the TS state on B.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(ct).await;
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
// They are NOT consistent.
assert!(e1 != e2);
// E1 from A is NOT a tombstone ... yet.
assert!(!e1.attribute_equality("class", &PVCLASS_TOMBSTONE));
// E2 from B is a tombstone!
assert!(e2.attribute_equality("class", &PVCLASS_TOMBSTONE));
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
// Repl B -> A - will have a TS at the end.
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
// Ts on both.
assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE));
assert!(e1 == e2);
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
}
// Create entry on A -> B // Create entry on A -> B
// Recycle on Both A/B // Recycle on Both A/B
// Recycle propagates from A -> B, B -> A, keep earliest. // Recycle propagates from A -> B, B -> A, keep latest.
// TS on A // We already know the recycle -> ts state is good from other tests.
// A -> B TS
// Create + recycle entry on A -> B #[qs_pair_test]
async fn test_repl_increment_basic_bidirectional_recycle(
server_a: &QueryServer,
server_b: &QueryServer,
) {
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
// Add an entry.
let mut server_b_txn = server_b.write(ct).await;
let t_uuid = Uuid::new_v4();
assert!(server_b_txn
.internal_create(vec![entry_init!(
("class", Value::new_class("object")),
("class", Value::new_class("person")),
("name", Value::new_iname("testperson1")),
("uuid", Value::Uuid(t_uuid)),
("description", Value::new_utf8s("testperson1")),
("displayname", Value::new_utf8s("testperson1"))
),])
.is_ok());
server_b_txn.commit().expect("Failed to commit");
// Assert the entry is not on A.
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
// from to
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// On both servers, at seperate timestamps, run the recycle.
let ct = ct + Duration::from_secs(1);
let mut server_a_txn = server_a.write(ct).await;
assert!(server_a_txn.internal_delete_uuid(t_uuid).is_ok());
server_a_txn.commit().expect("Failed to commit");
let ct = ct + Duration::from_secs(2);
let mut server_b_txn = server_b.write(ct).await;
assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok());
server_b_txn.commit().expect("Failed to commit");
// Send server a -> b - ignored.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(ct).await;
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
// They are equal, but their CL states are not. e2 should have been
// retained due to being the latest!
assert!(e1 == e2);
assert!(e1.attribute_equality("class", &PVCLASS_RECYCLED));
// Remember entry comparison doesn't compare last_mod_cid.
assert!(e1.get_last_changed() < e2.get_last_changed());
let e1_cs = e1.get_changestate();
let e2_cs = e2.get_changestate();
let valid = match (e1_cs.current(), e2_cs.current()) {
(
State::Live {
at: _,
changes: changes_left,
},
State::Live {
at: _,
changes: changes_right,
},
) => match (changes_left.get("class"), changes_right.get("class")) {
(Some(cid_left), Some(cid_right)) => cid_left < cid_right,
_ => false,
},
_ => false,
};
assert!(valid);
// Now go the other way. They'll be equal again.
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
let e1_cs = e1.get_changestate();
let e2_cs = e2.get_changestate();
assert!(e1_cs == e2_cs);
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
}
// Create + recycle entry on B -> A
// TS on Both, // TS on Both,
// TS resolves to lowest AT. // TS resolves to lowest AT.
#[qs_pair_test]
async fn test_repl_increment_basic_bidirectional_tombstone(
server_a: &QueryServer,
server_b: &QueryServer,
) {
let ct = duration_from_epoch_now();
let mut server_b_txn = server_b.write(ct).await;
let t_uuid = Uuid::new_v4();
assert!(server_b_txn
.internal_create(vec![entry_init!(
("class", Value::new_class("object")),
("class", Value::new_class("person")),
("name", Value::new_iname("testperson1")),
("uuid", Value::Uuid(t_uuid)),
("description", Value::new_utf8s("testperson1")),
("displayname", Value::new_utf8s("testperson1"))
),])
.is_ok());
// And then recycle it.
assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok());
server_b_txn.commit().expect("Failed to commit");
// Now setup repl
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn).is_ok());
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Now on both servers, perform a recycle -> ts at different times.
let ct = ct + Duration::from_secs(RECYCLEBIN_MAX_AGE + 1);
let mut server_a_txn = server_a.write(ct).await;
assert!(server_a_txn.purge_recycled().is_ok());
server_a_txn.commit().expect("Failed to commit");
let ct = ct + Duration::from_secs(1);
let mut server_b_txn = server_b.write(ct).await;
assert!(server_b_txn.purge_recycled().is_ok());
server_b_txn.commit().expect("Failed to commit");
// Now do B -> A - no change on A as it's TS was earlier.
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE));
assert!(e2.attribute_equality("class", &PVCLASS_TOMBSTONE));
trace!("{:?}", e1.get_last_changed());
trace!("{:?}", e2.get_last_changed());
assert!(e1.get_last_changed() < e2.get_last_changed());
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// A -> B - B should now have the A TS time.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE));
assert!(e2.attribute_equality("class", &PVCLASS_TOMBSTONE));
assert!(e1.get_last_changed() == e2.get_last_changed());
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
}
// conflict cases. // conflict cases.
// both add entry with same uuid - only one can win! // both add entry with same uuid - only one can win!

View file

@ -220,75 +220,16 @@ impl<'a> QueryServerWriteTransaction<'a> {
Ok(()) Ok(())
} }
/* #[cfg(test)]
#[instrument(level = "debug", skip_all)] pub(crate) fn internal_revive_uuid(&mut self, target_uuid: Uuid) -> Result<(), OperationError> {
pub(crate) fn revive_recycled_legacy( // Note the use of filter_rec here for only recycled targets.
&mut self, let filter = filter_rec!(f_eq("uuid", PartialValue::Uuid(target_uuid)));
re: &ReviveRecycledEvent, let f_valid = filter
) -> Result<(), OperationError> { .validate(self.get_schema())
// Revive an entry to live. This is a specialised function, and draws a lot of .map_err(OperationError::SchemaViolation)?;
// inspiration from modify. let re = ReviveRecycledEvent::new_internal(f_valid);
// self.revive_recycled(&re)
//
// Access is granted by the ability to ability to search the class=recycled
// and the ability modify + remove that class from the object.
// create the modify for access testing.
// tl;dr, remove the class=recycled
let modlist = ModifyList::new_list(vec![Modify::Removed(
AttrString::from("class"),
PVCLASS_RECYCLED.clone(),
)]);
let m_valid = modlist.validate(self.get_schema()).map_err(|e| {
admin_error!(
"Schema Violation in revive recycled modlist validate: {:?}",
e
);
OperationError::SchemaViolation(e)
})?;
// Get the entries we are about to revive.
// we make a set of per-entry mod lists. A list of lists even ...
let revive_cands =
self.impersonate_search_valid(re.filter.clone(), re.filter.clone(), &re.ident)?;
let mut dm_mods: HashMap<Uuid, ModifyList<ModifyInvalid>> =
HashMap::with_capacity(revive_cands.len());
for e in revive_cands {
// Get this entries uuid.
let u: Uuid = e.get_uuid();
if let Some(riter) = e.get_ava_as_refuuid("directmemberof") {
for g_uuid in riter {
dm_mods
.entry(g_uuid)
.and_modify(|mlist| {
let m = Modify::Present(AttrString::from("member"), Value::Refer(u));
mlist.push_mod(m);
})
.or_insert({
let m = Modify::Present(AttrString::from("member"), Value::Refer(u));
ModifyList::new_list(vec![m])
});
}
}
}
// Now impersonate the modify
self.impersonate_modify_valid(re.filter.clone(), re.filter.clone(), m_valid, &re.ident)?;
// If and only if that succeeds, apply the direct membership modifications
// if possible.
for (g, mods) in dm_mods {
// I think the filter/filter_all shouldn't matter here because the only
// valid direct memberships should be still valid/live references.
let f = filter_all!(f_eq("uuid", PartialValue::Uuid(g)));
self.internal_modify(&f, &mods)?;
}
Ok(())
} }
*/
} }
#[cfg(test)] #[cfg(test)]

View file

@ -157,7 +157,6 @@ impl ValueSetT for ValueSetCid {
} }
} }
/*
fn to_cid_single(&self) -> Option<Cid> { fn to_cid_single(&self) -> Option<Cid> {
if self.set.len() == 1 { if self.set.len() == 1 {
self.set.iter().cloned().take(1).next() self.set.iter().cloned().take(1).next()
@ -165,7 +164,6 @@ impl ValueSetT for ValueSetCid {
None None
} }
} }
*/
fn as_cid_set(&self) -> Option<&SmolSet<[Cid; 1]>> { fn as_cid_set(&self) -> Option<&SmolSet<[Cid; 1]>> {
Some(&self.set) Some(&self.set)

View file

@ -357,6 +357,11 @@ pub trait ValueSetT: std::fmt::Debug + DynClone {
None None
} }
fn to_cid_single(&self) -> Option<Cid> {
error!("to_cid_single should not be called on {:?}", self.syntax());
None
}
fn to_refer_single(&self) -> Option<Uuid> { fn to_refer_single(&self) -> Option<Uuid> {
error!( error!(
"to_refer_single should not be called on {:?}", "to_refer_single should not be called on {:?}",