diff --git a/server/lib/src/be/mod.rs b/server/lib/src/be/mod.rs index 69ded77e0..189dfbcf8 100644 --- a/server/lib/src/be/mod.rs +++ b/server/lib/src/be/mod.rs @@ -740,6 +740,11 @@ pub trait BackendTransaction { // If it was possible, we could just & with allids to remove the extraneous // values. + if idl.is_empty() { + // return no entries. + return Ok(Vec::with_capacity(0)); + } + // Make it an id list fr the backend. let id_list = IdList::Indexed(idl); @@ -1257,11 +1262,20 @@ impl<'a> BackendWriteTransaction<'a> { } #[instrument(level = "debug", name = "be::reap_tombstones", skip_all)] - pub fn reap_tombstones(&mut self, cid: &Cid) -> Result { + pub fn reap_tombstones(&mut self, cid: &Cid, trim_cid: &Cid) -> Result { + debug_assert!(cid > trim_cid); + // Mark a new maximum for the RUV by inserting an empty change. This + // is important to keep the changestate always advancing. + self.get_ruv().insert_change(cid, IDLBitRange::default())?; + // We plan to clear the RUV up to this cid. So we need to build an IDL // of all the entries we need to examine. - let idl = self.get_ruv().trim_up_to(cid).map_err(|e| { - admin_error!(?e, "failed to trim RUV to {:?}", cid); + let idl = self.get_ruv().trim_up_to(trim_cid).map_err(|e| { + admin_error!( + ?e, + "During tombstone cleanup, failed to trim RUV to {:?}", + trim_cid + ); e })?; @@ -1293,7 +1307,7 @@ impl<'a> BackendWriteTransaction<'a> { let (tombstones, leftover): (Vec<_>, Vec<_>) = entries .into_iter() - .partition(|e| e.get_changestate().can_delete(cid)); + .partition(|e| e.get_changestate().can_delete(trim_cid)); let ruv_idls = self.get_ruv().ruv_idls(); @@ -2015,9 +2029,8 @@ impl Backend { }) .collect(); - // RUV-TODO - // Load the replication update vector here. For now we rebuild every startup - // from the database. + // Load the replication update vector here. Initially we build an in memory + // RUV, and then we load it from the DB. let ruv = Arc::new(ReplicationUpdateVector::default()); // this has a ::memory() type, but will path == "" work? @@ -2107,6 +2120,7 @@ mod tests { static ref CID_ONE: Cid = Cid::new_count(1); static ref CID_TWO: Cid = Cid::new_count(2); static ref CID_THREE: Cid = Cid::new_count(3); + static ref CID_ADV: Cid = Cid::new_count(10); } macro_rules! run_test { @@ -2382,7 +2396,7 @@ mod tests { let r3 = results.remove(0); // Deletes nothing, all entries are live. - assert!(matches!(be.reap_tombstones(&CID_ZERO), Ok(0))); + assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ZERO), Ok(0))); // Put them into the tombstone state, and write that down. // This sets up the RUV with the changes. @@ -2399,32 +2413,32 @@ mod tests { // The entry are now tombstones, but is still in the ruv. This is because we // targeted CID_ZERO, not ONE. - assert!(matches!(be.reap_tombstones(&CID_ZERO), Ok(0))); + assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ZERO), Ok(0))); assert!(entry_exists!(be, r1_ts)); assert!(entry_exists!(be, r2_ts)); assert!(entry_exists!(be, r3_ts)); - assert!(matches!(be.reap_tombstones(&CID_ONE), Ok(0))); + assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ONE), Ok(0))); assert!(entry_exists!(be, r1_ts)); assert!(entry_exists!(be, r2_ts)); assert!(entry_exists!(be, r3_ts)); - assert!(matches!(be.reap_tombstones(&CID_TWO), Ok(1))); + assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_TWO), Ok(1))); assert!(!entry_exists!(be, r1_ts)); assert!(entry_exists!(be, r2_ts)); assert!(entry_exists!(be, r3_ts)); - assert!(matches!(be.reap_tombstones(&CID_THREE), Ok(2))); + assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_THREE), Ok(2))); assert!(!entry_exists!(be, r1_ts)); assert!(!entry_exists!(be, r2_ts)); assert!(!entry_exists!(be, r3_ts)); // Nothing left - assert!(matches!(be.reap_tombstones(&CID_THREE), Ok(0))); + assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_THREE), Ok(0))); assert!(!entry_exists!(be, r1_ts)); assert!(!entry_exists!(be, r2_ts)); @@ -2812,7 +2826,7 @@ mod tests { // == Now we reap_tombstones, and assert we removed the items. let e1_ts = e1.to_tombstone(CID_ONE.clone()).into_sealed_committed(); assert!(be.modify(&CID_ONE, &[e1], &[e1_ts]).is_ok()); - be.reap_tombstones(&CID_TWO).unwrap(); + be.reap_tombstones(&CID_ADV, &CID_TWO).unwrap(); idl_state!( be, @@ -2894,7 +2908,7 @@ mod tests { let e1_ts = e1.to_tombstone(CID_ONE.clone()).into_sealed_committed(); let e3_ts = e3.to_tombstone(CID_ONE.clone()).into_sealed_committed(); assert!(be.modify(&CID_ONE, &[e1, e3], &[e1_ts, e3_ts]).is_ok()); - be.reap_tombstones(&CID_TWO).unwrap(); + be.reap_tombstones(&CID_ADV, &CID_TWO).unwrap(); idl_state!( be, diff --git a/server/lib/src/repl/consumer.rs b/server/lib/src/repl/consumer.rs index e1b04aabe..2ded1d16a 100644 --- a/server/lib/src/repl/consumer.rs +++ b/server/lib/src/repl/consumer.rs @@ -292,7 +292,7 @@ impl<'a> QueryServerWriteTransaction<'a> { &mut self, ctx_domain_version: DomainVersion, ctx_domain_uuid: Uuid, - ctx_ranges: &BTreeMap, + ctx_ranges: &BTreeMap, ctx_schema_entries: &[ReplIncrementalEntryV1], ctx_meta_entries: &[ReplIncrementalEntryV1], ctx_entries: &[ReplIncrementalEntryV1], @@ -482,7 +482,7 @@ impl<'a> QueryServerWriteTransaction<'a> { &mut self, ctx_domain_version: DomainVersion, ctx_domain_uuid: Uuid, - ctx_ranges: &BTreeMap, + ctx_ranges: &BTreeMap, ctx_schema_entries: &[ReplEntryV1], ctx_meta_entries: &[ReplEntryV1], ctx_entries: &[ReplEntryV1], diff --git a/server/lib/src/repl/proto.rs b/server/lib/src/repl/proto.rs index 1276623ec..6927f65ae 100644 --- a/server/lib/src/repl/proto.rs +++ b/server/lib/src/repl/proto.rs @@ -59,6 +59,22 @@ impl From<&ReplCidV1> for Cid { } } +/// An anchored CID range. This contains a minimum and maximum range of CID times for a server, +/// and also includes the list of all CIDs that occur between those two points. This allows these +/// extra change "anchors" to be injected into the consumer RUV during an incremental. Once +/// inserted, these anchors prevent RUV trimming from creating "jumps" due to idle servers. +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +pub struct ReplAnchoredCidRange { + #[serde(rename = "m")] + pub ts_min: Duration, + #[serde(rename = "a", default)] + pub anchors: Vec, + #[serde(rename = "x")] + pub ts_max: Duration, +} + +/// A CID range. This contains the minimum and maximum values of a range. This is used for +/// querying the RUV to select all elements in this range. #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub struct ReplCidRange { #[serde(rename = "m")] @@ -700,7 +716,7 @@ pub enum ReplRefreshContext { domain_uuid: Uuid, // We need to send the current state of the ranges to populate into // the ranges so that lookups and ranges work properly. - ranges: BTreeMap, + ranges: BTreeMap, schema_entries: Vec, meta_entries: Vec, entries: Vec, @@ -721,7 +737,7 @@ pub enum ReplIncrementalContext { // the ranges so that lookups and ranges work properly, and the // consumer ends with the same state as we have (or at least merges) // it with this. - ranges: BTreeMap, + ranges: BTreeMap, schema_entries: Vec, meta_entries: Vec, entries: Vec, diff --git a/server/lib/src/repl/ruv.rs b/server/lib/src/repl/ruv.rs index 3d69f0955..3fcea4f04 100644 --- a/server/lib/src/repl/ruv.rs +++ b/server/lib/src/repl/ruv.rs @@ -12,7 +12,7 @@ use kanidm_proto::v1::ConsistencyError; use crate::prelude::*; use crate::repl::cid::Cid; -use crate::repl::proto::ReplCidRange; +use crate::repl::proto::{ReplAnchoredCidRange, ReplCidRange}; use std::fmt; #[derive(Default)] @@ -451,6 +451,46 @@ pub trait ReplicationUpdateVectorTransaction { // Done! } + + fn get_anchored_ranges( + &self, + ranges: BTreeMap, + ) -> Result, OperationError> { + let self_range_snapshot = self.range_snapshot(); + + ranges + .into_iter() + .map(|(s_uuid, ReplCidRange { ts_min, ts_max })| { + let ts_range = self_range_snapshot.get(&s_uuid).ok_or_else(|| { + error!( + ?s_uuid, + "expected cid range for server in ruv, was not present" + ); + OperationError::InvalidState + })?; + + // If these are equal and excluded, btreeset panics + let anchors = if ts_max > ts_min { + // We exclude the ends because these are already in the ts_min/max + ts_range + .range((Excluded(ts_min), Excluded(ts_max))) + .copied() + .collect::>() + } else { + Vec::with_capacity(0) + }; + + Ok(( + s_uuid, + ReplAnchoredCidRange { + ts_min, + anchors, + ts_max, + }, + )) + }) + .collect() + } } impl<'a> ReplicationUpdateVectorTransaction for ReplicationUpdateVectorWriteTransaction<'a> { @@ -482,7 +522,7 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> { pub(crate) fn incremental_preflight_validate_ruv( &self, - ctx_ranges: &BTreeMap, + ctx_ranges: &BTreeMap, txn_cid: &Cid, ) -> Result<(), OperationError> { // Check that the incoming ranges, for our servers id, do not exceed @@ -521,7 +561,7 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> { pub(crate) fn refresh_validate_ruv( &self, - ctx_ranges: &BTreeMap, + ctx_ranges: &BTreeMap, ) -> Result<(), OperationError> { // Assert that the ruv that currently exists, is a valid data set of // the supplied consumer range - especially check that when a uuid exists in @@ -533,7 +573,6 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> { // exist especially in three way replication scenarioes where S1:A was the S1 // maximum but is replaced by S2:B. This would make S1:A still it's valid // maximum but no entry reflects that in it's change state. - let mut valid = true; for (ctx_server_uuid, ctx_server_range) in ctx_ranges.iter() { @@ -576,17 +615,22 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> { } } + #[instrument(level = "trace", name = "ruv::refresh_update_ruv", skip_all)] pub(crate) fn refresh_update_ruv( &mut self, - ctx_ranges: &BTreeMap, + ctx_ranges: &BTreeMap, ) -> Result<(), OperationError> { + // Previously this would just add in the ranges, and then the actual entries + // from the changestate would populate the data/ranges. Now we add empty idls + // to each of these so that they are db persisted allowing ruv reload. for (ctx_s_uuid, ctx_range) in ctx_ranges.iter() { - if let Some(s_range) = self.ranged.get_mut(ctx_s_uuid) { - // Just assert the max is what we have. - s_range.insert(ctx_range.ts_max); - } else { - let s_range = btreeset!(ctx_range.ts_max); - self.ranged.insert(*ctx_s_uuid, s_range); + let cid_iter = std::iter::once(&ctx_range.ts_min) + .chain(ctx_range.anchors.iter()) + .chain(std::iter::once(&ctx_range.ts_max)) + .map(|ts| Cid::new(*ctx_s_uuid, *ts)); + + for cid in cid_iter { + self.insert_change(&cid, IDLBitRange::default())?; } } Ok(()) @@ -824,6 +868,7 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> { // try to do. for (cid, ex_idl) in self.data.range((Unbounded, Excluded(cid))) { + trace!(?cid, "examining for RUV removal"); idl = ex_idl as &_ | &idl; // Remove the reverse version of the cid from the ranged index. diff --git a/server/lib/src/repl/supplier.rs b/server/lib/src/repl/supplier.rs index 539027bf3..95ea8ea90 100644 --- a/server/lib/src/repl/supplier.rs +++ b/server/lib/src/repl/supplier.rs @@ -208,8 +208,11 @@ impl<'a> QueryServerReadTransaction<'a> { .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges)) .collect(); - // Build the incremental context. + // Finally, populate the ranges with anchors from the RUV + let supplier_ruv = self.get_be_txn().get_ruv(); + let ranges = supplier_ruv.get_anchored_ranges(ranges)?; + // Build the incremental context. Ok(ReplIncrementalContext::V1 { domain_version, domain_uuid, @@ -310,6 +313,10 @@ impl<'a> QueryServerReadTransaction<'a> { e })?; + // Finally, populate the ranges with anchors from the RUV + let supplier_ruv = self.get_be_txn().get_ruv(); + let ranges = supplier_ruv.get_anchored_ranges(ranges)?; + Ok(ReplRefreshContext::V1 { domain_version, domain_uuid, diff --git a/server/lib/src/repl/tests.rs b/server/lib/src/repl/tests.rs index 044ef408f..3a6f5ad0e 100644 --- a/server/lib/src/repl/tests.rs +++ b/server/lib/src/repl/tests.rs @@ -1823,6 +1823,9 @@ async fn test_repl_increment_consumer_ruv_trim_past_valid( assert!(server_b_txn.purge_tombstones().is_ok()); server_b_txn.commit().expect("Failed to commit"); + // At this point, purge_tombstones now writes an anchor cid to the RUV, which means + // both servers will detect the deception and error. + // Now check incremental in both directions. Should show *no* changes // needed (rather than an error/lagging). let mut server_a_txn = server_a.write(ct).await; @@ -1838,16 +1841,13 @@ async fn test_repl_increment_consumer_ruv_trim_past_valid( .supplier_provide_changes(a_ruv_range) .expect("Unable to generate supplier changes"); - assert!(matches!( - changes, - ReplIncrementalContext::NoChangesAvailable - )); + assert!(matches!(changes, ReplIncrementalContext::RefreshRequired)); let result = server_a_txn .consumer_apply_changes(&changes) .expect("Unable to apply changes to consumer."); - assert!(matches!(result, ConsumerState::Ok)); + assert!(matches!(result, ConsumerState::RefreshRequired)); drop(server_a_txn); drop(server_b_txn); @@ -1866,10 +1866,7 @@ async fn test_repl_increment_consumer_ruv_trim_past_valid( .supplier_provide_changes(b_ruv_range) .expect("Unable to generate supplier changes"); - assert!(matches!( - changes, - ReplIncrementalContext::NoChangesAvailable - )); + assert!(matches!(changes, ReplIncrementalContext::UnwillingToSupply)); let result = server_b_txn .consumer_apply_changes(&changes) @@ -1881,6 +1878,143 @@ async fn test_repl_increment_consumer_ruv_trim_past_valid( drop(server_b_txn); } +// Test two synchronised nodes where changes are not occuring - this situation would previously +// cause issues because when a change did occur, the ruv would "jump" ahead and cause desyncs.w +#[qs_pair_test] +async fn test_repl_increment_consumer_ruv_trim_idle_servers( + server_a: &QueryServer, + server_b: &QueryServer, +) { + let ct = duration_from_epoch_now(); + let changelog_quarter_life = Duration::from_secs(CHANGELOG_MAX_AGE / 4); + let one_second = Duration::from_secs(1); + + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + // Add an entry. We need at least one change on B, else it won't have anything + // to ship in it's RUV to A. + let ct = ct + one_second; + let mut server_b_txn = server_b.write(ct).await; + let t_uuid = Uuid::new_v4(); + assert!(server_b_txn + .internal_create(vec![entry_init!( + (Attribute::Class, EntryClass::Object.to_value()), + (Attribute::Class, EntryClass::Person.to_value()), + (Attribute::Name, Value::new_iname("testperson1")), + (Attribute::Uuid, Value::Uuid(t_uuid)), + (Attribute::Description, Value::new_utf8s("testperson1")), + (Attribute::DisplayName, Value::new_utf8s("testperson1")) + ),]) + .is_ok()); + + server_b_txn.commit().expect("Failed to commit"); + + // Now setup bidirectional replication. We only need to trigger B -> A + // here because that's all that has changes. + let ct = ct + one_second; + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + trace!("========================================"); + repl_incremental(&mut server_b_txn, &mut server_a_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + // Everything is consistent! + let mut ct = ct; + + // We now loop periodically, and everything should stay in sync. + for i in 0..8 { + trace!("========================================"); + trace!("repl iteration {}", i); + // Purge tombstones. + let mut server_a_txn = server_a.write(ct).await; + assert!(server_a_txn.purge_tombstones().is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + ct += one_second; + + let mut server_b_txn = server_b.write(ct).await; + assert!(server_b_txn.purge_tombstones().is_ok()); + server_b_txn.commit().expect("Failed to commit"); + + ct += one_second; + + // Now check incremental in both directions. Should show *no* changes + // needed (rather than an error/lagging). + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + let a_ruv_range = server_a_txn + .consumer_get_state() + .expect("Unable to access RUV range"); + + trace!(?a_ruv_range); + + let changes = server_b_txn + .supplier_provide_changes(a_ruv_range) + .expect("Unable to generate supplier changes"); + + assert!(matches!(changes, ReplIncrementalContext::V1 { .. })); + + let result = server_a_txn + .consumer_apply_changes(&changes) + .expect("Unable to apply changes to consumer."); + + assert!(matches!(result, ConsumerState::Ok)); + + server_a_txn.commit().expect("Failed to commit"); + drop(server_b_txn); + + ct += one_second; + + // Reverse it! + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(ct).await; + + let b_ruv_range = server_b_txn + .consumer_get_state() + .expect("Unable to access RUV range"); + + trace!(?b_ruv_range); + + let changes = server_a_txn + .supplier_provide_changes(b_ruv_range) + .expect("Unable to generate supplier changes"); + + assert!(matches!(changes, ReplIncrementalContext::V1 { .. })); + + let result = server_b_txn + .consumer_apply_changes(&changes) + .expect("Unable to apply changes to consumer."); + + assert!(matches!(result, ConsumerState::Ok)); + + drop(server_a_txn); + server_b_txn.commit().expect("Failed to commit"); + + ct += changelog_quarter_life; + } + + // Done! +} + // Test change of a domain name over incremental. #[qs_pair_test] async fn test_repl_increment_domain_rename(server_a: &QueryServer, server_b: &QueryServer) { diff --git a/server/lib/src/server/mod.rs b/server/lib/src/server/mod.rs index fbd3aa0b9..1c409ebd2 100644 --- a/server/lib/src/server/mod.rs +++ b/server/lib/src/server/mod.rs @@ -1827,6 +1827,7 @@ impl<'a> QueryServerWriteTransaction<'a> { .and_then(|_| accesscontrols.commit()) .and_then(|_| be_txn.commit()) } + pub(crate) fn get_txn_cid(&self) -> &Cid { &self.cid } diff --git a/server/lib/src/server/recycle.rs b/server/lib/src/server/recycle.rs index bcd843463..256c1325a 100644 --- a/server/lib/src/server/recycle.rs +++ b/server/lib/src/server/recycle.rs @@ -9,10 +9,11 @@ impl<'a> QueryServerWriteTransaction<'a> { pub fn purge_tombstones(&mut self) -> Result { // purge everything that is a tombstone. let trim_cid = self.trim_cid().clone(); + let anchor_cid = self.get_txn_cid().clone(); // Delete them - this is a TRUE delete, no going back now! self.be_txn - .reap_tombstones(&trim_cid) + .reap_tombstones(&anchor_cid, &trim_cid) .map_err(|e| { error!(err = ?e, "Tombstone purge operation failed (backend)"); e