From 61c59d5a5a56198051c100fe2b8b4257dccde392 Mon Sep 17 00:00:00 2001 From: Firstyear Date: Fri, 8 Sep 2023 08:59:06 +1000 Subject: [PATCH] 68 20230907 replication (#2081) * Test replication when nodes are valid beyond cl trim --- server/lib/src/repl/ruv.rs | 65 ++++++++++++++--- server/lib/src/repl/tests.rs | 136 +++++++++++++++++++++++++++++++++++ 2 files changed, 190 insertions(+), 11 deletions(-) diff --git a/server/lib/src/repl/ruv.rs b/server/lib/src/repl/ruv.rs index 328dc1ca7..cac2f202a 100644 --- a/server/lib/src/repl/ruv.rs +++ b/server/lib/src/repl/ruv.rs @@ -99,13 +99,15 @@ impl ReplicationUpdateVector { // max > consumer max, then the range between consumer max // and supplier max must be supplied. // - // consumer min consumer max - // <-- supplier min supplier max --> + // [ consumer min ... consumer max ] + // <-- [ supplier min .. supplier max ] --> // // In other words if we have: // - // consumer min consumer max - // supplier min supplier max + // [ consumer min ... consumer max ] + // [ supplier min ... supplier max ] + // ^ + // \-- no overlap of the range windows! // // then because there has been too much lag between consumer and // the supplier then there is a risk of changes being dropped or @@ -113,6 +115,12 @@ impl ReplicationUpdateVector { // of zero -> supplier max, but I think thought is needed to // ensure no corruption in this case. if consumer_cid_range.ts_max < supplier_cid_range.ts_min { + // + // [ consumer min ... consumer max ] + // [ supplier min ... supplier max ] + // ^ + // \-- no overlap of the range windows! + // consumer_lagging = true; lag_range.insert( *supplier_s_uuid, @@ -122,8 +130,14 @@ impl ReplicationUpdateVector { }, ); } else if supplier_cid_range.ts_max < consumer_cid_range.ts_min { - // It could be valid in this case to ignore this instead - // of erroring as changelog trim has occurred? Thought needed. + // + // [ consumer min ... consumer max ] + // [ supplier min ... supplier max ] + // ^ + // \-- no overlap of the range windows! + // + // This means we can't supply because we are missing changes that the consumer + // has. *we* are lagging. supplier_lagging = true; adv_range.insert( *supplier_s_uuid, @@ -133,6 +147,14 @@ impl ReplicationUpdateVector { }, ); } else if consumer_cid_range.ts_max < supplier_cid_range.ts_max { + // + // /-- consumer needs these changes + // v + // [ consumer min ... consumer max ] --> ] + // [ supplier min ... supplier max ] + // ^ + // \-- overlap of the range windows + // // We require the changes from consumer max -> supplier max. diff_range.insert( *supplier_s_uuid, @@ -143,6 +165,23 @@ impl ReplicationUpdateVector { ); } // else ... + // + // /-- The consumer has changes we don't have. + // | So we don't need to supply + // v + // [ consumer min ... consumer max ] + // [ supplier min ... supplier max ] + // ^ + // \-- overlap of the range windows + // + // OR + // + // [ consumer min ... consumer max ] + // [ supplier min ... supplier max ] + // ^ + // \-- the windows max is identical + // no actions needed + // // In this case there is no action required since consumer_cid_range.ts_max // must be greater than or equal to supplier max. } @@ -220,8 +259,7 @@ pub trait ReplicationUpdateVectorTransaction { fn range_to_idl(&self, ctx_ranges: &BTreeMap) -> IDLBitRange { let mut idl = IDLBitRange::new(); - // Force the set to be compressed, saves on seeks during - // inserts. + // Force the set to be compressed, saves on seeks during inserts. idl.compress(); let range = self.range_snapshot(); let ruv = self.ruv_snapshot(); @@ -246,7 +284,6 @@ pub trait ReplicationUpdateVectorTransaction { // Get from the min to the max. Unbounded and // Included(ctx_range.ts_max) are the same in // this context. - for ts in ruv_range.range((Excluded(ctx_range.ts_min), Unbounded)) { let cid = Cid { ts: *ts, @@ -256,7 +293,9 @@ pub trait ReplicationUpdateVectorTransaction { if let Some(ruv_idl) = ruv.get(&cid) { ruv_idl.into_iter().for_each(|id| idl.insert_id(id)) } - // If the cid isn't found, it may have been trimmed, but that's okay. + // If the cid isn't found, it may have been trimmed, but that's okay. A cid in + // a range can be trimmed if all entries of that cid have since tombstoned so + // no longer need to be applied in change ranges. } } @@ -634,6 +673,9 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> { Mostly longer ruv/cid ranges aren't an issue for us, so could we just maek these ranges really large? + + NOTE: For now we do NOT trim out max CID's of any s_uuid so that we don't have to confront + this edge case yet. */ // Problem Cases @@ -661,6 +703,7 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> { */ pub fn trim_up_to(&mut self, cid: &Cid) -> Result { + trace!(trim_up_to_cid = ?cid); let mut idl = IDLBitRange::new(); // let mut remove_suuid = Vec::default(); @@ -690,7 +733,7 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> { return Err(OperationError::InvalidState); } } else { - trace!("skipping maximum cid for s_uuid"); + trace!("skip trimming maximum cid for s_uuid {}", cid.s_uuid); } if server_range.is_empty() { // remove_suuid.push(cid.s_uuid); diff --git a/server/lib/src/repl/tests.rs b/server/lib/src/repl/tests.rs index 39e2d21a7..d47c07813 100644 --- a/server/lib/src/repl/tests.rs +++ b/server/lib/src/repl/tests.rs @@ -1846,6 +1846,142 @@ async fn test_repl_increment_consumer_lagging_attributes( drop(server_b_txn); } +// Test two synchronised nodes where no changes occured in a TS/RUV window. +#[qs_pair_test] +async fn test_repl_increment_consumer_ruv_trim_past_valid( + server_a: &QueryServer, + server_b: &QueryServer, +) { + let ct = duration_from_epoch_now(); + + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + // Add an entry. We need at least one change on B, else it won't have anything + // to ship in it's RUV to A. + let ct = duration_from_epoch_now(); + let mut server_b_txn = server_b.write(ct).await; + let t_uuid = Uuid::new_v4(); + assert!(server_b_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Person.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testperson1")), + (Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)), + ( + Attribute::Description.as_ref(), + Value::new_utf8s("testperson1") + ), + ( + Attribute::DisplayName.as_ref(), + Value::new_utf8s("testperson1") + ) + ),]) + .is_ok()); + + server_b_txn.commit().expect("Failed to commit"); + + // Now setup bidirectional replication. We only need to trigger B -> A + // here because that's all that has changes. + let ct = duration_from_epoch_now(); + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + trace!("========================================"); + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // Everything is consistent! + + // Compare RUV's + + // Push time ahead past a changelog max age. + let ct = ct + Duration::from_secs(CHANGELOG_MAX_AGE * 4); + + // And setup the ruv trim. This is triggered by purge/reap tombstones. + // Apply this to both nodes so that they shift their RUV states. + let mut server_a_txn = server_a.write(ct).await; + assert!(server_a_txn.purge_tombstones().is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + let mut server_b_txn = server_b.write(ct).await; + assert!(server_b_txn.purge_tombstones().is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + // Now check incremental in both directions. Should show *no* changes + // needed (rather than an error/lagging). + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + let a_ruv_range = server_a_txn + .consumer_get_state() + .expect("Unable to access RUV range"); + + trace!(?a_ruv_range); + + let changes = server_b_txn + .supplier_provide_changes(a_ruv_range) + .expect("Unable to generate supplier changes"); + + assert!(matches!( + changes, + ReplIncrementalContext::NoChangesAvailable + )); + + let result = server_a_txn + .consumer_apply_changes(&changes) + .expect("Unable to apply changes to consumer."); + + assert!(matches!(result, ConsumerState::Ok)); + + drop(server_a_txn); + drop(server_b_txn); + + // Reverse it! + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(ct).await; + + let b_ruv_range = server_b_txn + .consumer_get_state() + .expect("Unable to access RUV range"); + + trace!(?b_ruv_range); + + let changes = server_a_txn + .supplier_provide_changes(b_ruv_range) + .expect("Unable to generate supplier changes"); + + assert!(matches!( + changes, + ReplIncrementalContext::NoChangesAvailable + )); + + let result = server_b_txn + .consumer_apply_changes(&changes) + .expect("Unable to apply changes to consumer."); + + assert!(matches!(result, ConsumerState::Ok)); + + drop(server_a_txn); + drop(server_b_txn); +} + // Test change of a domain name over incremental. #[qs_pair_test] async fn test_repl_increment_domain_rename(server_a: &QueryServer, server_b: &QueryServer) {