diff --git a/server/lib/src/entry.rs b/server/lib/src/entry.rs index 9167fcc7e..12580e1f4 100644 --- a/server/lib/src/entry.rs +++ b/server/lib/src/entry.rs @@ -758,7 +758,17 @@ impl Entry { // Due to previous checks, this must be equal! debug_assert!(left_at == right_at); debug_assert!(self.attrs == db_ent.attrs); - // Doesn't matter which side we take. + // We have to generate the attrs here, since on replication + // we just send the tombstone ecstate rather than attrs. Our + // db stub also lacks these attributes too. + let mut attrs_new: Eattrs = Map::new(); + let class_ava = vs_iutf8!["object", "tombstone"]; + let last_mod_ava = vs_cid![left_at.clone()]; + + attrs_new.insert(AttrString::from("uuid"), vs_uuid![self.valid.uuid]); + attrs_new.insert(AttrString::from("class"), class_ava); + attrs_new.insert(AttrString::from("last_modified_cid"), last_mod_ava); + Entry { valid: EntryIncremental { uuid: self.valid.uuid, @@ -767,10 +777,11 @@ impl Entry { state: EntryCommitted { id: db_ent.state.id, }, - attrs: self.attrs.clone(), + attrs: attrs_new, } } (State::Tombstone { .. }, State::Live { .. }) => { + debug_assert!(false); // Keep the left side. Entry { valid: EntryIncremental { @@ -784,6 +795,7 @@ impl Entry { } } (State::Live { .. }, State::Tombstone { .. }) => { + debug_assert!(false); // Keep the right side Entry { valid: EntryIncremental { diff --git a/server/lib/src/plugins/mod.rs b/server/lib/src/plugins/mod.rs index 72e9e108c..58845e2d8 100644 --- a/server/lib/src/plugins/mod.rs +++ b/server/lib/src/plugins/mod.rs @@ -154,6 +154,7 @@ trait Plugin { "plugin {} has an unimplemented pre_repl_incremental!", Self::id() ); + // debug_assert!(false); // Err(OperationError::InvalidState) Ok(()) } @@ -167,6 +168,7 @@ trait Plugin { "plugin {} has an unimplemented post_repl_incremental!", Self::id() ); + // debug_assert!(false); // Err(OperationError::InvalidState) Ok(()) } @@ -327,14 +329,11 @@ impl Plugins { qs: &mut QueryServerWriteTransaction, cand: &mut [(EntryIncrementalCommitted, Arc)], ) -> Result<(), OperationError> { - base::Base::pre_repl_incremental(qs, cand) - // .and_then(|_| jwskeygen::JwsKeygen::pre_repl_incremental(qs, cand, me)) - // .and_then(|_| gidnumber::GidNumber::pre_repl_incremental(qs, cand, me)) - .and_then(|_| domain::Domain::pre_repl_incremental(qs, cand)) - .and_then(|_| spn::Spn::pre_repl_incremental(qs, cand)) - .and_then(|_| session::SessionConsistency::pre_repl_incremental(qs, cand)) - // attr unique should always be last - .and_then(|_| attrunique::AttrUnique::pre_repl_incremental(qs, cand)) + // Cleanup sessions on incoming replication? May not actually + // be needed ... + // session::SessionConsistency::pre_repl_incremental(qs, cand)?; + // attr unique should always be last + attrunique::AttrUnique::pre_repl_incremental(qs, cand) } #[instrument(level = "debug", name = "plugins::run_post_repl_incremental", skip_all)] @@ -343,9 +342,10 @@ impl Plugins { pre_cand: &[Arc], cand: &[EntrySealedCommitted], ) -> Result<(), OperationError> { - refint::ReferentialIntegrity::post_repl_incremental(qs, pre_cand, cand) - .and_then(|_| spn::Spn::post_repl_incremental(qs, pre_cand, cand)) - .and_then(|_| memberof::MemberOf::post_repl_incremental(qs, pre_cand, cand)) + domain::Domain::post_repl_incremental(qs, pre_cand, cand)?; + spn::Spn::post_repl_incremental(qs, pre_cand, cand)?; + refint::ReferentialIntegrity::post_repl_incremental(qs, pre_cand, cand)?; + memberof::MemberOf::post_repl_incremental(qs, pre_cand, cand) } #[instrument(level = "debug", name = "plugins::run_verify", skip_all)] diff --git a/server/lib/src/repl/consumer.rs b/server/lib/src/repl/consumer.rs index 28427e4c2..16431561a 100644 --- a/server/lib/src/repl/consumer.rs +++ b/server/lib/src/repl/consumer.rs @@ -1,48 +1,12 @@ use super::proto::*; -use crate::be::BackendTransaction; use crate::plugins::Plugins; use crate::prelude::*; -use crate::repl::proto::ReplRuvRange; -use crate::repl::ruv::ReplicationUpdateVectorTransaction; use std::collections::BTreeMap; use std::sync::Arc; -impl<'a> QueryServerReadTransaction<'a> { - // Get the current state of "where we are up to" - // - // There are two approaches we can use here. We can either store a cookie - // related to the supplier we are fetching from, or we can use our RUV state. - // - // Initially I'm using RUV state, because it lets us select exactly what has - // changed, where the cookie approach is more coarse grained. The cookie also - // requires some more knowledge about what supplier we are communicating too - // where the RUV approach doesn't since the supplier calcs the diff. - - #[instrument(level = "debug", skip_all)] - pub fn consumer_get_state(&mut self) -> Result { - // We need the RUV as a state of - // - // [ s_uuid, cid_min, cid_max ] - // [ s_uuid, cid_min, cid_max ] - // [ s_uuid, cid_min, cid_max ] - // ... - // - // This way the remote can diff against it's knowledge and work out: - // - // [ s_uuid, from_cid, to_cid ] - // [ s_uuid, from_cid, to_cid ] - // - // ... - - // Which then the supplier will use to actually retrieve the set of entries. - // and the needed attributes we need. - let ruv_snapshot = self.get_be_txn().get_ruv(); - - // What's the current set of ranges? - ruv_snapshot - .current_ruv_range() - .map(|ranges| ReplRuvRange::V1 { ranges }) - } +pub enum ConsumerState { + Ok, + RefreshRequired, } impl<'a> QueryServerWriteTransaction<'a> { @@ -79,6 +43,9 @@ impl<'a> QueryServerWriteTransaction<'a> { e })?; + trace!("==========================================="); + trace!(?ctx_entries); + let db_entries = self.be_txn.incremental_prepare(&ctx_entries).map_err(|e| { error!("Failed to access entries from db"); e @@ -237,14 +204,16 @@ impl<'a> QueryServerWriteTransaction<'a> { pub fn consumer_apply_changes( &mut self, ctx: &ReplIncrementalContext, - ) -> Result<(), OperationError> { + ) -> Result { match ctx { ReplIncrementalContext::NoChangesAvailable => { info!("no changes are available"); - Ok(()) + Ok(ConsumerState::Ok) } ReplIncrementalContext::RefreshRequired => { - todo!(); + error!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is outdated, and replication would introduce data corruption."); + error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly."); + Ok(ConsumerState::RefreshRequired) } ReplIncrementalContext::UnwillingToSupply => { todo!(); @@ -276,7 +245,7 @@ impl<'a> QueryServerWriteTransaction<'a> { ctx_schema_entries: &[ReplIncrementalEntryV1], ctx_meta_entries: &[ReplIncrementalEntryV1], ctx_entries: &[ReplIncrementalEntryV1], - ) -> Result<(), OperationError> { + ) -> Result { if ctx_domain_version < DOMAIN_MIN_LEVEL { error!("Unable to proceed with consumer incremental - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL); return Err(OperationError::ReplDomainLevelUnsatisfiable); @@ -352,7 +321,7 @@ impl<'a> QueryServerWriteTransaction<'a> { e })?; - Ok(()) + Ok(ConsumerState::Ok) } pub fn consumer_apply_refresh( diff --git a/server/lib/src/repl/tests.rs b/server/lib/src/repl/tests.rs index 928c0bc66..180fff6f8 100644 --- a/server/lib/src/repl/tests.rs +++ b/server/lib/src/repl/tests.rs @@ -1,5 +1,7 @@ use crate::be::BackendTransaction; use crate::prelude::*; +use crate::repl::consumer::ConsumerState; +use crate::repl::proto::ReplIncrementalContext; use crate::repl::ruv::ReplicationUpdateVectorTransaction; use std::collections::BTreeMap; @@ -38,6 +40,59 @@ fn repl_initialise( Ok(()) } +fn repl_incremental( + from: &mut QueryServerReadTransaction<'_>, + to: &mut QueryServerWriteTransaction<'_>, +) { + let a_ruv_range = to + .get_be_txn() + .get_ruv() + .current_ruv_range() + .expect("Failed to get RUV range from"); + let b_ruv_range = from + .get_be_txn() + .get_ruv() + .current_ruv_range() + .expect("Failed to get RUV range to"); + + trace!(?a_ruv_range); + trace!(?b_ruv_range); + assert!(a_ruv_range != b_ruv_range); + + // Now setup the consumer state for the next incremental replication. + let a_ruv_range = to.consumer_get_state().expect("Unable to access RUV range"); + + // Incremental. + // Should now be on the other partner. + + // Get the changes. + let changes = from + .supplier_provide_changes(a_ruv_range) + .expect("Unable to generate supplier changes"); + + // Check the changes = should be empty. + to.consumer_apply_changes(&changes) + .expect("Unable to apply changes to consumer."); + + // RUV should be consistent again. + let a_ruv_range = to + .get_be_txn() + .get_ruv() + .current_ruv_range() + .expect("Failed to get RUV range A"); + let b_ruv_range = from + .get_be_txn() + .get_ruv() + .current_ruv_range() + .expect("Failed to get RUV range B"); + + trace!(?a_ruv_range); + trace!(?b_ruv_range); + // May need to be "is subset" for future when we are testing + // some more complex scenarioes. + assert!(a_ruv_range == b_ruv_range); +} + #[qs_pair_test] async fn test_repl_refresh_basic(server_a: &QueryServer, server_b: &QueryServer) { // Rebuild / refresh the content of server a with the content from b. @@ -125,8 +180,9 @@ async fn test_repl_refresh_basic(server_a: &QueryServer, server_b: &QueryServer) // Both servers will be post-test validated. } +// Test that adding an entry to one side replicates correctly. #[qs_pair_test] -async fn test_repl_increment_basic(server_a: &QueryServer, server_b: &QueryServer) { +async fn test_repl_increment_basic_entry_add(server_a: &QueryServer, server_b: &QueryServer) { let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; let mut server_b_txn = server_b.read().await; @@ -253,13 +309,245 @@ async fn test_repl_increment_basic(server_a: &QueryServer, server_b: &QueryServe trace!(?b_ruv_range); assert!(a_ruv_range == b_ruv_range); - server_a_txn.commit().expect("Failed to commit"); + // Assert the entry is now present, and the same on both sides + let e1 = server_a_txn + .internal_search_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_uuid(t_uuid) + .expect("Unable to access entry."); + assert!(e1 == e2); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); +} + +// Test that adding an entry to one side, then recycling it replicates correctly. +#[qs_pair_test] +async fn test_repl_increment_basic_entry_recycle(server_a: &QueryServer, server_b: &QueryServer) { + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + // Add an entry. + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + let t_uuid = Uuid::new_v4(); + assert!(server_b_txn + .internal_create(vec![entry_init!( + ("class", Value::new_class("object")), + ("class", Value::new_class("person")), + ("name", Value::new_iname("testperson1")), + ("uuid", Value::Uuid(t_uuid)), + ("description", Value::new_utf8s("testperson1")), + ("displayname", Value::new_utf8s("testperson1")) + ),]) + .is_ok()); + + // Now recycle it. + assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok()); + + server_b_txn.commit().expect("Failed to commit"); + + // Assert the entry is not on A. + + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + let mut server_b_txn = server_b.read().await; + + assert_eq!( + server_a_txn.internal_search_uuid(t_uuid), + Err(OperationError::NoMatchingEntries) + ); + + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); +} + +// Test that adding an entry to one side, then recycling it, and tombstoning it +// replicates correctly. +#[qs_pair_test] +async fn test_repl_increment_basic_entry_tombstone(server_a: &QueryServer, server_b: &QueryServer) { + let ct = duration_from_epoch_now(); + + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + // Add an entry. + let mut server_b_txn = server_b.write(ct).await; + let t_uuid = Uuid::new_v4(); + assert!(server_b_txn + .internal_create(vec![entry_init!( + ("class", Value::new_class("object")), + ("class", Value::new_class("person")), + ("name", Value::new_iname("testperson1")), + ("uuid", Value::Uuid(t_uuid)), + ("description", Value::new_utf8s("testperson1")), + ("displayname", Value::new_utf8s("testperson1")) + ),]) + .is_ok()); + + // Now recycle it. + assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Now move past the recyclebin time. + let ct = ct + Duration::from_secs(RECYCLEBIN_MAX_AGE + 1); + + let mut server_b_txn = server_b.write(ct).await; + // Clean out the recycle bin. + assert!(server_b_txn.purge_recycled().is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Assert the entry is not on A. + + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert_eq!( + server_a_txn.internal_search_uuid(t_uuid), + Err(OperationError::NoMatchingEntries) + ); + + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE)); + + assert!(e1 == e2); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); +} + +// Test that adding an entry -> tombstone then the tombstone is trimmed raises +// a replication error. +#[qs_pair_test] +async fn test_repl_increment_consumer_lagging_tombstone( + server_a: &QueryServer, + server_b: &QueryServer, +) { + let ct = duration_from_epoch_now(); + + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + // Add an entry. + let mut server_b_txn = server_b.write(ct).await; + let t_uuid = Uuid::new_v4(); + assert!(server_b_txn + .internal_create(vec![entry_init!( + ("class", Value::new_class("object")), + ("class", Value::new_class("person")), + ("name", Value::new_iname("testperson1")), + ("uuid", Value::Uuid(t_uuid)), + ("description", Value::new_utf8s("testperson1")), + ("displayname", Value::new_utf8s("testperson1")) + ),]) + .is_ok()); + + // Now recycle it. + assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Now move past the recyclebin time. + let ct = ct + Duration::from_secs(RECYCLEBIN_MAX_AGE + 1); + + let mut server_b_txn = server_b.write(ct).await; + // Clean out the recycle bin. + assert!(server_b_txn.purge_recycled().is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Now move past the tombstone trim time. + let ct = ct + Duration::from_secs(CHANGELOG_MAX_AGE + 1); + + let mut server_b_txn = server_b.write(ct).await; + // Clean out the recycle bin. + assert!(server_b_txn.purge_tombstones().is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Assert the entry is not on A *or* B. + + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert_eq!( + server_a_txn.internal_search_uuid(t_uuid), + Err(OperationError::NoMatchingEntries) + ); + assert_eq!( + server_b_txn.internal_search_uuid(t_uuid), + Err(OperationError::NoMatchingEntries) + ); + + // The ruvs must be different + let a_ruv_range = server_a_txn + .get_be_txn() + .get_ruv() + .current_ruv_range() + .expect("Failed to get RUV range A"); + let b_ruv_range = server_b_txn + .get_be_txn() + .get_ruv() + .current_ruv_range() + .expect("Failed to get RUV range B"); + + trace!(?a_ruv_range); + trace!(?b_ruv_range); + assert!(a_ruv_range != b_ruv_range); + + let a_ruv_range = server_a_txn + .consumer_get_state() + .expect("Unable to access RUV range"); + + let changes = server_b_txn + .supplier_provide_changes(a_ruv_range) + .expect("Unable to generate supplier changes"); + + assert!(matches!(changes, ReplIncrementalContext::RefreshRequired)); + + let result = server_a_txn + .consumer_apply_changes(&changes) + .expect("Unable to apply changes to consumer."); + + assert!(matches!(result, ConsumerState::RefreshRequired)); + + drop(server_a_txn); drop(server_b_txn); } // Test RUV content when a server's changes have been trimmed out and are not present -// in a refresh. +// in a refresh. This is not about tombstones, this is about attribute state. // Test change of a domain name over incremental. diff --git a/server/lib/src/server/mod.rs b/server/lib/src/server/mod.rs index 9526a15fe..852877017 100644 --- a/server/lib/src/server/mod.rs +++ b/server/lib/src/server/mod.rs @@ -27,6 +27,8 @@ use crate::filter::{Filter, FilterInvalid, FilterValid, FilterValidResolved}; use crate::plugins::dyngroup::{DynGroup, DynGroupCache}; use crate::plugins::Plugins; use crate::repl::cid::Cid; +use crate::repl::proto::ReplRuvRange; +use crate::repl::ruv::ReplicationUpdateVectorTransaction; use crate::schema::{ Schema, SchemaAttribute, SchemaClass, SchemaReadTransaction, SchemaTransaction, SchemaWriteTransaction, @@ -420,6 +422,27 @@ pub trait QueryServerTransaction<'a> { } } + /// Get a single entry by its UUID, even if the entry in question + /// is in a masked state (recycled, tombstoned). + #[instrument(level = "debug", skip_all)] + fn internal_search_all_uuid( + &mut self, + uuid: Uuid, + ) -> Result, OperationError> { + let filter = filter_all!(f_eq("uuid", PartialValue::Uuid(uuid))); + let f_valid = filter.validate(self.get_schema()).map_err(|e| { + error!(?e, "Filter Validate - SchemaViolation"); + OperationError::SchemaViolation(e) + })?; + let se = SearchEvent::new_internal(f_valid); + + let mut vs = self.search(&se)?; + match vs.pop() { + Some(entry) if vs.is_empty() => Ok(entry), + _ => Err(OperationError::NoMatchingEntries), + } + } + #[instrument(level = "debug", skip_all)] fn impersonate_search_ext_uuid( &mut self, @@ -775,6 +798,42 @@ pub trait QueryServerTransaction<'a> { fn get_oauth2rs_set(&mut self) -> Result>, OperationError> { self.internal_search(filter!(f_eq("class", PVCLASS_OAUTH2_RS.clone(),))) } + + #[instrument(level = "debug", skip_all)] + fn consumer_get_state(&mut self) -> Result { + // Get the current state of "where we are up to" + // + // There are two approaches we can use here. We can either store a cookie + // related to the supplier we are fetching from, or we can use our RUV state. + // + // Initially I'm using RUV state, because it lets us select exactly what has + // changed, where the cookie approach is more coarse grained. The cookie also + // requires some more knowledge about what supplier we are communicating too + // where the RUV approach doesn't since the supplier calcs the diff. + // + // We need the RUV as a state of + // + // [ s_uuid, cid_min, cid_max ] + // [ s_uuid, cid_min, cid_max ] + // [ s_uuid, cid_min, cid_max ] + // ... + // + // This way the remote can diff against it's knowledge and work out: + // + // [ s_uuid, from_cid, to_cid ] + // [ s_uuid, from_cid, to_cid ] + // + // ... + + // Which then the supplier will use to actually retrieve the set of entries. + // and the needed attributes we need. + let ruv_snapshot = self.get_be_txn().get_ruv(); + + // What's the current set of ranges? + ruv_snapshot + .current_ruv_range() + .map(|ranges| ReplRuvRange::V1 { ranges }) + } } // Actually conduct a search request