diff --git a/server/lib/src/entry.rs b/server/lib/src/entry.rs index 882c63068..4e64bd002 100644 --- a/server/lib/src/entry.rs +++ b/server/lib/src/entry.rs @@ -665,9 +665,20 @@ impl Entry { } pub(crate) fn is_add_conflict(&self, db_entry: &EntrySealedCommitted) -> bool { + use crate::repl::entry::State; debug_assert!(self.valid.uuid == db_entry.valid.uuid); // This is a conflict if the state 'at' is not identical - self.valid.ecstate.at() != db_entry.valid.ecstate.at() + let self_cs = &self.valid.ecstate; + let db_cs = db_entry.get_changestate(); + + // Can only add conflict on live entries. + match (self_cs.current(), db_cs.current()) { + (State::Live { at: at_left, .. }, State::Live { at: at_right, .. }) => { + at_left != at_right + } + // Tombstone will always overwrite. + _ => false, + } } pub(crate) fn merge_state( @@ -805,10 +816,7 @@ impl Entry { attrs: eattrs, } } - (State::Tombstone { at: left_at }, State::Tombstone { at: right_at }) => { - // Due to previous checks, this must be equal! - debug_assert!(left_at == right_at); - debug_assert!(self.attrs == db_ent.attrs); + (State::Tombstone { at: left_at }, State::Live { .. }) => { // We have to generate the attrs here, since on replication // we just send the tombstone ecstate rather than attrs. Our // db stub also lacks these attributes too. @@ -831,23 +839,14 @@ impl Entry { attrs: attrs_new, } } - (State::Tombstone { .. }, State::Live { .. }) => { - debug_assert!(false); - // Keep the left side. - Entry { - valid: EntryIncremental { - uuid: self.valid.uuid, - ecstate: self.valid.ecstate.clone(), - }, - state: EntryCommitted { - id: db_ent.state.id, - }, - attrs: self.attrs.clone(), - } - } (State::Live { .. }, State::Tombstone { .. }) => { - debug_assert!(false); - // Keep the right side + // Our current DB entry is a tombstone - ignore the incoming live + // entry and just retain our DB tombstone. + // + // Note we don't need to gen the attrs here since if a stub was made then + // we'd be live:live. To be in live:ts, then our db entry MUST exist and + // must be a ts. + Entry { valid: EntryIncremental { uuid: db_ent.valid.uuid, @@ -859,6 +858,36 @@ impl Entry { attrs: db_ent.attrs.clone(), } } + (State::Tombstone { at: left_at }, State::Tombstone { at: right_at }) => { + // WARNING - this differs from the other tombstone check cases + // lower of the two AT values. This way replicas always have the + // earliest TS value. It's a rare case but needs handling. + + let (at, ecstate) = if left_at < right_at { + (left_at, self.valid.ecstate.clone()) + } else { + (right_at, db_ent.valid.ecstate.clone()) + }; + + let mut attrs_new: Eattrs = Map::new(); + let class_ava = vs_iutf8!["object", "tombstone"]; + let last_mod_ava = vs_cid![at.clone()]; + + attrs_new.insert(AttrString::from("uuid"), vs_uuid![db_ent.valid.uuid]); + attrs_new.insert(AttrString::from("class"), class_ava); + attrs_new.insert(AttrString::from("last_modified_cid"), last_mod_ava); + + Entry { + valid: EntryIncremental { + uuid: db_ent.valid.uuid, + ecstate, + }, + state: EntryCommitted { + id: db_ent.state.id, + }, + attrs: attrs_new, + } + } } } } @@ -2158,6 +2187,14 @@ impl Entry { let _ = self.attrs.insert(AttrString::from("last_modified_cid"), cv); } + #[cfg(test)] + pub(crate) fn get_last_changed(&self) -> Cid { + self.attrs + .get("last_modified_cid") + .and_then(|vs| vs.to_cid_single()) + .unwrap() + } + #[inline(always)] /// Get an iterator over the current set of attribute names that this entry contains. pub fn get_ava_names(&self) -> impl Iterator { diff --git a/server/lib/src/event.rs b/server/lib/src/event.rs index 7d6389536..c43e97386 100644 --- a/server/lib/src/event.rs +++ b/server/lib/src/event.rs @@ -818,4 +818,12 @@ impl ReviveRecycledEvent { filter: filter.into_valid(), } } + + #[cfg(test)] + pub(crate) fn new_internal(filter: Filter) -> Self { + ReviveRecycledEvent { + ident: Identity::from_internal(), + filter, + } + } } diff --git a/server/lib/src/repl/consumer.rs b/server/lib/src/repl/consumer.rs index 16431561a..00f7c2cdb 100644 --- a/server/lib/src/repl/consumer.rs +++ b/server/lib/src/repl/consumer.rs @@ -51,6 +51,8 @@ impl<'a> QueryServerWriteTransaction<'a> { e })?; + trace!(?db_entries); + // Need to probably handle conflicts here in this phase. I think they // need to be pushed to a separate list where they are then "created" // as a conflict. diff --git a/server/lib/src/repl/supplier.rs b/server/lib/src/repl/supplier.rs index cabf245f2..c79d55f4e 100644 --- a/server/lib/src/repl/supplier.rs +++ b/server/lib/src/repl/supplier.rs @@ -172,16 +172,20 @@ impl<'a> QueryServerReadTransaction<'a> { f_eq("uuid", PVUUID_SYSTEM_CONFIG.clone()), ])); - let entry_filter = filter!(f_and!([ - f_pres("class"), - f_andnot(f_or(vec![ - // These are from above! - f_eq("class", PVCLASS_ATTRIBUTETYPE.clone()), - f_eq("class", PVCLASS_CLASSTYPE.clone()), - f_eq("uuid", PVUUID_DOMAIN_INFO.clone()), - f_eq("uuid", PVUUID_SYSTEM_INFO.clone()), - f_eq("uuid", PVUUID_SYSTEM_CONFIG.clone()), - ])), + let entry_filter = filter_all!(f_or!([ + f_and!([ + f_pres("class"), + f_andnot(f_or(vec![ + // These are from above! + f_eq("class", PVCLASS_ATTRIBUTETYPE.clone()), + f_eq("class", PVCLASS_CLASSTYPE.clone()), + f_eq("uuid", PVUUID_DOMAIN_INFO.clone()), + f_eq("uuid", PVUUID_SYSTEM_INFO.clone()), + f_eq("uuid", PVUUID_SYSTEM_CONFIG.clone()), + ])), + ]), + f_eq("class", PVCLASS_TOMBSTONE.clone()), + f_eq("class", PVCLASS_RECYCLED.clone()), ])); let schema_entries = self diff --git a/server/lib/src/repl/tests.rs b/server/lib/src/repl/tests.rs index 61b7bc27a..8077ea739 100644 --- a/server/lib/src/repl/tests.rs +++ b/server/lib/src/repl/tests.rs @@ -1,6 +1,7 @@ use crate::be::BackendTransaction; use crate::prelude::*; use crate::repl::consumer::ConsumerState; +use crate::repl::entry::State; use crate::repl::proto::ReplIncrementalContext; use crate::repl::ruv::ReplicationUpdateVectorTransaction; use crate::repl::ruv::{RangeDiffStatus, ReplicationUpdateVector}; @@ -750,16 +751,374 @@ async fn test_repl_increment_simultaneous_bidirectional_write( // TS on B // B -> A TS +#[qs_pair_test] +async fn test_repl_increment_basic_bidirectional_lifecycle( + server_a: &QueryServer, + server_b: &QueryServer, +) { + let ct = duration_from_epoch_now(); + + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + // Add an entry. + let mut server_b_txn = server_b.write(ct).await; + let t_uuid = Uuid::new_v4(); + assert!(server_b_txn + .internal_create(vec![entry_init!( + ("class", Value::new_class("object")), + ("class", Value::new_class("person")), + ("name", Value::new_iname("testperson1")), + ("uuid", Value::Uuid(t_uuid)), + ("description", Value::new_utf8s("testperson1")), + ("displayname", Value::new_utf8s("testperson1")) + ),]) + .is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Assert the entry is not on A. + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert_eq!( + server_a_txn.internal_search_uuid(t_uuid), + Err(OperationError::NoMatchingEntries) + ); + + // from to + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // Delete on A + let mut server_a_txn = server_a.write(ct).await; + assert!(server_a_txn.internal_delete_uuid(t_uuid).is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + // Repl A -> B + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(ct).await; + + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + // They are consistent again. + assert!(e1 == e2); + assert!(e1.attribute_equality("class", &PVCLASS_RECYCLED)); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); + + // At an earlier time make a change on A. + let mut server_a_txn = server_a.write(ct).await; + assert!(server_a_txn.internal_revive_uuid(t_uuid).is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + // Now move past the recyclebin time. + let ct = ct + Duration::from_secs(RECYCLEBIN_MAX_AGE + 1); + + // Now TS on B. + let mut server_b_txn = server_b.write(ct).await; + assert!(server_b_txn.purge_recycled().is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Repl A -> B - B will silently reject the update due to the TS state on B. + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(ct).await; + + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + // They are NOT consistent. + assert!(e1 != e2); + // E1 from A is NOT a tombstone ... yet. + assert!(!e1.attribute_equality("class", &PVCLASS_TOMBSTONE)); + // E2 from B is a tombstone! + assert!(e2.attribute_equality("class", &PVCLASS_TOMBSTONE)); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); + + // Repl B -> A - will have a TS at the end. + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + // Ts on both. + assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE)); + assert!(e1 == e2); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); +} + // Create entry on A -> B // Recycle on Both A/B -// Recycle propagates from A -> B, B -> A, keep earliest. -// TS on A -// A -> B TS +// Recycle propagates from A -> B, B -> A, keep latest. +// We already know the recycle -> ts state is good from other tests. -// Create + recycle entry on A -> B +#[qs_pair_test] +async fn test_repl_increment_basic_bidirectional_recycle( + server_a: &QueryServer, + server_b: &QueryServer, +) { + let ct = duration_from_epoch_now(); + + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + // Add an entry. + let mut server_b_txn = server_b.write(ct).await; + let t_uuid = Uuid::new_v4(); + assert!(server_b_txn + .internal_create(vec![entry_init!( + ("class", Value::new_class("object")), + ("class", Value::new_class("person")), + ("name", Value::new_iname("testperson1")), + ("uuid", Value::Uuid(t_uuid)), + ("description", Value::new_utf8s("testperson1")), + ("displayname", Value::new_utf8s("testperson1")) + ),]) + .is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Assert the entry is not on A. + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + // from to + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // On both servers, at seperate timestamps, run the recycle. + let ct = ct + Duration::from_secs(1); + let mut server_a_txn = server_a.write(ct).await; + assert!(server_a_txn.internal_delete_uuid(t_uuid).is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + let ct = ct + Duration::from_secs(2); + let mut server_b_txn = server_b.write(ct).await; + assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Send server a -> b - ignored. + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(ct).await; + + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); + + // They are equal, but their CL states are not. e2 should have been + // retained due to being the latest! + assert!(e1 == e2); + assert!(e1.attribute_equality("class", &PVCLASS_RECYCLED)); + + // Remember entry comparison doesn't compare last_mod_cid. + assert!(e1.get_last_changed() < e2.get_last_changed()); + + let e1_cs = e1.get_changestate(); + let e2_cs = e2.get_changestate(); + + let valid = match (e1_cs.current(), e2_cs.current()) { + ( + State::Live { + at: _, + changes: changes_left, + }, + State::Live { + at: _, + changes: changes_right, + }, + ) => match (changes_left.get("class"), changes_right.get("class")) { + (Some(cid_left), Some(cid_right)) => cid_left < cid_right, + _ => false, + }, + _ => false, + }; + assert!(valid); + + // Now go the other way. They'll be equal again. + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + + let e1_cs = e1.get_changestate(); + let e2_cs = e2.get_changestate(); + assert!(e1_cs == e2_cs); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); +} + +// Create + recycle entry on B -> A // TS on Both, // TS resolves to lowest AT. +#[qs_pair_test] +async fn test_repl_increment_basic_bidirectional_tombstone( + server_a: &QueryServer, + server_b: &QueryServer, +) { + let ct = duration_from_epoch_now(); + + let mut server_b_txn = server_b.write(ct).await; + let t_uuid = Uuid::new_v4(); + assert!(server_b_txn + .internal_create(vec![entry_init!( + ("class", Value::new_class("object")), + ("class", Value::new_class("person")), + ("name", Value::new_iname("testperson1")), + ("uuid", Value::Uuid(t_uuid)), + ("description", Value::new_utf8s("testperson1")), + ("displayname", Value::new_utf8s("testperson1")) + ),]) + .is_ok()); + // And then recycle it. + assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Now setup repl + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn).is_ok()); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // Now on both servers, perform a recycle -> ts at different times. + let ct = ct + Duration::from_secs(RECYCLEBIN_MAX_AGE + 1); + let mut server_a_txn = server_a.write(ct).await; + assert!(server_a_txn.purge_recycled().is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + let ct = ct + Duration::from_secs(1); + let mut server_b_txn = server_b.write(ct).await; + assert!(server_b_txn.purge_recycled().is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Now do B -> A - no change on A as it's TS was earlier. + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE)); + assert!(e2.attribute_equality("class", &PVCLASS_TOMBSTONE)); + trace!("{:?}", e1.get_last_changed()); + trace!("{:?}", e2.get_last_changed()); + assert!(e1.get_last_changed() < e2.get_last_changed()); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // A -> B - B should now have the A TS time. + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE)); + assert!(e2.attribute_equality("class", &PVCLASS_TOMBSTONE)); + assert!(e1.get_last_changed() == e2.get_last_changed()); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); +} + // conflict cases. // both add entry with same uuid - only one can win! diff --git a/server/lib/src/server/recycle.rs b/server/lib/src/server/recycle.rs index 08b2420c4..bcc14e1b9 100644 --- a/server/lib/src/server/recycle.rs +++ b/server/lib/src/server/recycle.rs @@ -220,75 +220,16 @@ impl<'a> QueryServerWriteTransaction<'a> { Ok(()) } - /* - #[instrument(level = "debug", skip_all)] - pub(crate) fn revive_recycled_legacy( - &mut self, - re: &ReviveRecycledEvent, - ) -> Result<(), OperationError> { - // Revive an entry to live. This is a specialised function, and draws a lot of - // inspiration from modify. - // - // - // Access is granted by the ability to ability to search the class=recycled - // and the ability modify + remove that class from the object. - - // create the modify for access testing. - // tl;dr, remove the class=recycled - let modlist = ModifyList::new_list(vec![Modify::Removed( - AttrString::from("class"), - PVCLASS_RECYCLED.clone(), - )]); - - let m_valid = modlist.validate(self.get_schema()).map_err(|e| { - admin_error!( - "Schema Violation in revive recycled modlist validate: {:?}", - e - ); - OperationError::SchemaViolation(e) - })?; - - // Get the entries we are about to revive. - // we make a set of per-entry mod lists. A list of lists even ... - let revive_cands = - self.impersonate_search_valid(re.filter.clone(), re.filter.clone(), &re.ident)?; - - let mut dm_mods: HashMap> = - HashMap::with_capacity(revive_cands.len()); - - for e in revive_cands { - // Get this entries uuid. - let u: Uuid = e.get_uuid(); - - if let Some(riter) = e.get_ava_as_refuuid("directmemberof") { - for g_uuid in riter { - dm_mods - .entry(g_uuid) - .and_modify(|mlist| { - let m = Modify::Present(AttrString::from("member"), Value::Refer(u)); - mlist.push_mod(m); - }) - .or_insert({ - let m = Modify::Present(AttrString::from("member"), Value::Refer(u)); - ModifyList::new_list(vec![m]) - }); - } - } - } - - // Now impersonate the modify - self.impersonate_modify_valid(re.filter.clone(), re.filter.clone(), m_valid, &re.ident)?; - // If and only if that succeeds, apply the direct membership modifications - // if possible. - for (g, mods) in dm_mods { - // I think the filter/filter_all shouldn't matter here because the only - // valid direct memberships should be still valid/live references. - let f = filter_all!(f_eq("uuid", PartialValue::Uuid(g))); - self.internal_modify(&f, &mods)?; - } - Ok(()) + #[cfg(test)] + pub(crate) fn internal_revive_uuid(&mut self, target_uuid: Uuid) -> Result<(), OperationError> { + // Note the use of filter_rec here for only recycled targets. + let filter = filter_rec!(f_eq("uuid", PartialValue::Uuid(target_uuid))); + let f_valid = filter + .validate(self.get_schema()) + .map_err(OperationError::SchemaViolation)?; + let re = ReviveRecycledEvent::new_internal(f_valid); + self.revive_recycled(&re) } - */ } #[cfg(test)] diff --git a/server/lib/src/valueset/cid.rs b/server/lib/src/valueset/cid.rs index 40eac7e3a..58eba4fa6 100644 --- a/server/lib/src/valueset/cid.rs +++ b/server/lib/src/valueset/cid.rs @@ -157,7 +157,6 @@ impl ValueSetT for ValueSetCid { } } - /* fn to_cid_single(&self) -> Option { if self.set.len() == 1 { self.set.iter().cloned().take(1).next() @@ -165,7 +164,6 @@ impl ValueSetT for ValueSetCid { None } } - */ fn as_cid_set(&self) -> Option<&SmolSet<[Cid; 1]>> { Some(&self.set) diff --git a/server/lib/src/valueset/mod.rs b/server/lib/src/valueset/mod.rs index 4f143cb0a..57f6b91ff 100644 --- a/server/lib/src/valueset/mod.rs +++ b/server/lib/src/valueset/mod.rs @@ -357,6 +357,11 @@ pub trait ValueSetT: std::fmt::Debug + DynClone { None } + fn to_cid_single(&self) -> Option { + error!("to_cid_single should not be called on {:?}", self.syntax()); + None + } + fn to_refer_single(&self) -> Option { error!( "to_refer_single should not be called on {:?}",