Upgrade replication to use anchors (#2423)

* Upgrade replication to use anchors
This commit is contained in:
Firstyear 2024-01-10 14:46:08 +10:00 committed by GitHub
parent 0e44cc1dcb
commit 666448f787
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 259 additions and 41 deletions

View file

@ -740,6 +740,11 @@ pub trait BackendTransaction {
// If it was possible, we could just & with allids to remove the extraneous
// values.
if idl.is_empty() {
// return no entries.
return Ok(Vec::with_capacity(0));
}
// Make it an id list fr the backend.
let id_list = IdList::Indexed(idl);
@ -1257,11 +1262,20 @@ impl<'a> BackendWriteTransaction<'a> {
}
#[instrument(level = "debug", name = "be::reap_tombstones", skip_all)]
pub fn reap_tombstones(&mut self, cid: &Cid) -> Result<usize, OperationError> {
pub fn reap_tombstones(&mut self, cid: &Cid, trim_cid: &Cid) -> Result<usize, OperationError> {
debug_assert!(cid > trim_cid);
// Mark a new maximum for the RUV by inserting an empty change. This
// is important to keep the changestate always advancing.
self.get_ruv().insert_change(cid, IDLBitRange::default())?;
// We plan to clear the RUV up to this cid. So we need to build an IDL
// of all the entries we need to examine.
let idl = self.get_ruv().trim_up_to(cid).map_err(|e| {
admin_error!(?e, "failed to trim RUV to {:?}", cid);
let idl = self.get_ruv().trim_up_to(trim_cid).map_err(|e| {
admin_error!(
?e,
"During tombstone cleanup, failed to trim RUV to {:?}",
trim_cid
);
e
})?;
@ -1293,7 +1307,7 @@ impl<'a> BackendWriteTransaction<'a> {
let (tombstones, leftover): (Vec<_>, Vec<_>) = entries
.into_iter()
.partition(|e| e.get_changestate().can_delete(cid));
.partition(|e| e.get_changestate().can_delete(trim_cid));
let ruv_idls = self.get_ruv().ruv_idls();
@ -2015,9 +2029,8 @@ impl Backend {
})
.collect();
// RUV-TODO
// Load the replication update vector here. For now we rebuild every startup
// from the database.
// Load the replication update vector here. Initially we build an in memory
// RUV, and then we load it from the DB.
let ruv = Arc::new(ReplicationUpdateVector::default());
// this has a ::memory() type, but will path == "" work?
@ -2107,6 +2120,7 @@ mod tests {
static ref CID_ONE: Cid = Cid::new_count(1);
static ref CID_TWO: Cid = Cid::new_count(2);
static ref CID_THREE: Cid = Cid::new_count(3);
static ref CID_ADV: Cid = Cid::new_count(10);
}
macro_rules! run_test {
@ -2382,7 +2396,7 @@ mod tests {
let r3 = results.remove(0);
// Deletes nothing, all entries are live.
assert!(matches!(be.reap_tombstones(&CID_ZERO), Ok(0)));
assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ZERO), Ok(0)));
// Put them into the tombstone state, and write that down.
// This sets up the RUV with the changes.
@ -2399,32 +2413,32 @@ mod tests {
// The entry are now tombstones, but is still in the ruv. This is because we
// targeted CID_ZERO, not ONE.
assert!(matches!(be.reap_tombstones(&CID_ZERO), Ok(0)));
assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ZERO), Ok(0)));
assert!(entry_exists!(be, r1_ts));
assert!(entry_exists!(be, r2_ts));
assert!(entry_exists!(be, r3_ts));
assert!(matches!(be.reap_tombstones(&CID_ONE), Ok(0)));
assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ONE), Ok(0)));
assert!(entry_exists!(be, r1_ts));
assert!(entry_exists!(be, r2_ts));
assert!(entry_exists!(be, r3_ts));
assert!(matches!(be.reap_tombstones(&CID_TWO), Ok(1)));
assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_TWO), Ok(1)));
assert!(!entry_exists!(be, r1_ts));
assert!(entry_exists!(be, r2_ts));
assert!(entry_exists!(be, r3_ts));
assert!(matches!(be.reap_tombstones(&CID_THREE), Ok(2)));
assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_THREE), Ok(2)));
assert!(!entry_exists!(be, r1_ts));
assert!(!entry_exists!(be, r2_ts));
assert!(!entry_exists!(be, r3_ts));
// Nothing left
assert!(matches!(be.reap_tombstones(&CID_THREE), Ok(0)));
assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_THREE), Ok(0)));
assert!(!entry_exists!(be, r1_ts));
assert!(!entry_exists!(be, r2_ts));
@ -2812,7 +2826,7 @@ mod tests {
// == Now we reap_tombstones, and assert we removed the items.
let e1_ts = e1.to_tombstone(CID_ONE.clone()).into_sealed_committed();
assert!(be.modify(&CID_ONE, &[e1], &[e1_ts]).is_ok());
be.reap_tombstones(&CID_TWO).unwrap();
be.reap_tombstones(&CID_ADV, &CID_TWO).unwrap();
idl_state!(
be,
@ -2894,7 +2908,7 @@ mod tests {
let e1_ts = e1.to_tombstone(CID_ONE.clone()).into_sealed_committed();
let e3_ts = e3.to_tombstone(CID_ONE.clone()).into_sealed_committed();
assert!(be.modify(&CID_ONE, &[e1, e3], &[e1_ts, e3_ts]).is_ok());
be.reap_tombstones(&CID_TWO).unwrap();
be.reap_tombstones(&CID_ADV, &CID_TWO).unwrap();
idl_state!(
be,

View file

@ -292,7 +292,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
&mut self,
ctx_domain_version: DomainVersion,
ctx_domain_uuid: Uuid,
ctx_ranges: &BTreeMap<Uuid, ReplCidRange>,
ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
ctx_schema_entries: &[ReplIncrementalEntryV1],
ctx_meta_entries: &[ReplIncrementalEntryV1],
ctx_entries: &[ReplIncrementalEntryV1],
@ -482,7 +482,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
&mut self,
ctx_domain_version: DomainVersion,
ctx_domain_uuid: Uuid,
ctx_ranges: &BTreeMap<Uuid, ReplCidRange>,
ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
ctx_schema_entries: &[ReplEntryV1],
ctx_meta_entries: &[ReplEntryV1],
ctx_entries: &[ReplEntryV1],

View file

@ -59,6 +59,22 @@ impl From<&ReplCidV1> for Cid {
}
}
/// An anchored CID range. This contains a minimum and maximum range of CID times for a server,
/// and also includes the list of all CIDs that occur between those two points. This allows these
/// extra change "anchors" to be injected into the consumer RUV during an incremental. Once
/// inserted, these anchors prevent RUV trimming from creating "jumps" due to idle servers.
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub struct ReplAnchoredCidRange {
#[serde(rename = "m")]
pub ts_min: Duration,
#[serde(rename = "a", default)]
pub anchors: Vec<Duration>,
#[serde(rename = "x")]
pub ts_max: Duration,
}
/// A CID range. This contains the minimum and maximum values of a range. This is used for
/// querying the RUV to select all elements in this range.
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub struct ReplCidRange {
#[serde(rename = "m")]
@ -700,7 +716,7 @@ pub enum ReplRefreshContext {
domain_uuid: Uuid,
// We need to send the current state of the ranges to populate into
// the ranges so that lookups and ranges work properly.
ranges: BTreeMap<Uuid, ReplCidRange>,
ranges: BTreeMap<Uuid, ReplAnchoredCidRange>,
schema_entries: Vec<ReplEntryV1>,
meta_entries: Vec<ReplEntryV1>,
entries: Vec<ReplEntryV1>,
@ -721,7 +737,7 @@ pub enum ReplIncrementalContext {
// the ranges so that lookups and ranges work properly, and the
// consumer ends with the same state as we have (or at least merges)
// it with this.
ranges: BTreeMap<Uuid, ReplCidRange>,
ranges: BTreeMap<Uuid, ReplAnchoredCidRange>,
schema_entries: Vec<ReplIncrementalEntryV1>,
meta_entries: Vec<ReplIncrementalEntryV1>,
entries: Vec<ReplIncrementalEntryV1>,

View file

@ -12,7 +12,7 @@ use kanidm_proto::v1::ConsistencyError;
use crate::prelude::*;
use crate::repl::cid::Cid;
use crate::repl::proto::ReplCidRange;
use crate::repl::proto::{ReplAnchoredCidRange, ReplCidRange};
use std::fmt;
#[derive(Default)]
@ -451,6 +451,46 @@ pub trait ReplicationUpdateVectorTransaction {
// Done!
}
fn get_anchored_ranges(
&self,
ranges: BTreeMap<Uuid, ReplCidRange>,
) -> Result<BTreeMap<Uuid, ReplAnchoredCidRange>, OperationError> {
let self_range_snapshot = self.range_snapshot();
ranges
.into_iter()
.map(|(s_uuid, ReplCidRange { ts_min, ts_max })| {
let ts_range = self_range_snapshot.get(&s_uuid).ok_or_else(|| {
error!(
?s_uuid,
"expected cid range for server in ruv, was not present"
);
OperationError::InvalidState
})?;
// If these are equal and excluded, btreeset panics
let anchors = if ts_max > ts_min {
// We exclude the ends because these are already in the ts_min/max
ts_range
.range((Excluded(ts_min), Excluded(ts_max)))
.copied()
.collect::<Vec<_>>()
} else {
Vec::with_capacity(0)
};
Ok((
s_uuid,
ReplAnchoredCidRange {
ts_min,
anchors,
ts_max,
},
))
})
.collect()
}
}
impl<'a> ReplicationUpdateVectorTransaction for ReplicationUpdateVectorWriteTransaction<'a> {
@ -482,7 +522,7 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
pub(crate) fn incremental_preflight_validate_ruv(
&self,
ctx_ranges: &BTreeMap<Uuid, ReplCidRange>,
ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
txn_cid: &Cid,
) -> Result<(), OperationError> {
// Check that the incoming ranges, for our servers id, do not exceed
@ -521,7 +561,7 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
pub(crate) fn refresh_validate_ruv(
&self,
ctx_ranges: &BTreeMap<Uuid, ReplCidRange>,
ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
) -> Result<(), OperationError> {
// Assert that the ruv that currently exists, is a valid data set of
// the supplied consumer range - especially check that when a uuid exists in
@ -533,7 +573,6 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
// exist especially in three way replication scenarioes where S1:A was the S1
// maximum but is replaced by S2:B. This would make S1:A still it's valid
// maximum but no entry reflects that in it's change state.
let mut valid = true;
for (ctx_server_uuid, ctx_server_range) in ctx_ranges.iter() {
@ -576,17 +615,22 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
}
}
#[instrument(level = "trace", name = "ruv::refresh_update_ruv", skip_all)]
pub(crate) fn refresh_update_ruv(
&mut self,
ctx_ranges: &BTreeMap<Uuid, ReplCidRange>,
ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
) -> Result<(), OperationError> {
// Previously this would just add in the ranges, and then the actual entries
// from the changestate would populate the data/ranges. Now we add empty idls
// to each of these so that they are db persisted allowing ruv reload.
for (ctx_s_uuid, ctx_range) in ctx_ranges.iter() {
if let Some(s_range) = self.ranged.get_mut(ctx_s_uuid) {
// Just assert the max is what we have.
s_range.insert(ctx_range.ts_max);
} else {
let s_range = btreeset!(ctx_range.ts_max);
self.ranged.insert(*ctx_s_uuid, s_range);
let cid_iter = std::iter::once(&ctx_range.ts_min)
.chain(ctx_range.anchors.iter())
.chain(std::iter::once(&ctx_range.ts_max))
.map(|ts| Cid::new(*ctx_s_uuid, *ts));
for cid in cid_iter {
self.insert_change(&cid, IDLBitRange::default())?;
}
}
Ok(())
@ -824,6 +868,7 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
// try to do.
for (cid, ex_idl) in self.data.range((Unbounded, Excluded(cid))) {
trace!(?cid, "examining for RUV removal");
idl = ex_idl as &_ | &idl;
// Remove the reverse version of the cid from the ranged index.

View file

@ -208,8 +208,11 @@ impl<'a> QueryServerReadTransaction<'a> {
.map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
.collect();
// Build the incremental context.
// Finally, populate the ranges with anchors from the RUV
let supplier_ruv = self.get_be_txn().get_ruv();
let ranges = supplier_ruv.get_anchored_ranges(ranges)?;
// Build the incremental context.
Ok(ReplIncrementalContext::V1 {
domain_version,
domain_uuid,
@ -310,6 +313,10 @@ impl<'a> QueryServerReadTransaction<'a> {
e
})?;
// Finally, populate the ranges with anchors from the RUV
let supplier_ruv = self.get_be_txn().get_ruv();
let ranges = supplier_ruv.get_anchored_ranges(ranges)?;
Ok(ReplRefreshContext::V1 {
domain_version,
domain_uuid,

View file

@ -1823,6 +1823,9 @@ async fn test_repl_increment_consumer_ruv_trim_past_valid(
assert!(server_b_txn.purge_tombstones().is_ok());
server_b_txn.commit().expect("Failed to commit");
// At this point, purge_tombstones now writes an anchor cid to the RUV, which means
// both servers will detect the deception and error.
// Now check incremental in both directions. Should show *no* changes
// needed (rather than an error/lagging).
let mut server_a_txn = server_a.write(ct).await;
@ -1838,16 +1841,13 @@ async fn test_repl_increment_consumer_ruv_trim_past_valid(
.supplier_provide_changes(a_ruv_range)
.expect("Unable to generate supplier changes");
assert!(matches!(
changes,
ReplIncrementalContext::NoChangesAvailable
));
assert!(matches!(changes, ReplIncrementalContext::RefreshRequired));
let result = server_a_txn
.consumer_apply_changes(&changes)
.expect("Unable to apply changes to consumer.");
assert!(matches!(result, ConsumerState::Ok));
assert!(matches!(result, ConsumerState::RefreshRequired));
drop(server_a_txn);
drop(server_b_txn);
@ -1866,10 +1866,7 @@ async fn test_repl_increment_consumer_ruv_trim_past_valid(
.supplier_provide_changes(b_ruv_range)
.expect("Unable to generate supplier changes");
assert!(matches!(
changes,
ReplIncrementalContext::NoChangesAvailable
));
assert!(matches!(changes, ReplIncrementalContext::UnwillingToSupply));
let result = server_b_txn
.consumer_apply_changes(&changes)
@ -1881,6 +1878,143 @@ async fn test_repl_increment_consumer_ruv_trim_past_valid(
drop(server_b_txn);
}
// Test two synchronised nodes where changes are not occuring - this situation would previously
// cause issues because when a change did occur, the ruv would "jump" ahead and cause desyncs.w
#[qs_pair_test]
async fn test_repl_increment_consumer_ruv_trim_idle_servers(
server_a: &QueryServer,
server_b: &QueryServer,
) {
let ct = duration_from_epoch_now();
let changelog_quarter_life = Duration::from_secs(CHANGELOG_MAX_AGE / 4);
let one_second = Duration::from_secs(1);
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
// Add an entry. We need at least one change on B, else it won't have anything
// to ship in it's RUV to A.
let ct = ct + one_second;
let mut server_b_txn = server_b.write(ct).await;
let t_uuid = Uuid::new_v4();
assert!(server_b_txn
.internal_create(vec![entry_init!(
(Attribute::Class, EntryClass::Object.to_value()),
(Attribute::Class, EntryClass::Person.to_value()),
(Attribute::Name, Value::new_iname("testperson1")),
(Attribute::Uuid, Value::Uuid(t_uuid)),
(Attribute::Description, Value::new_utf8s("testperson1")),
(Attribute::DisplayName, Value::new_utf8s("testperson1"))
),])
.is_ok());
server_b_txn.commit().expect("Failed to commit");
// Now setup bidirectional replication. We only need to trigger B -> A
// here because that's all that has changes.
let ct = ct + one_second;
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
trace!("========================================");
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Everything is consistent!
let mut ct = ct;
// We now loop periodically, and everything should stay in sync.
for i in 0..8 {
trace!("========================================");
trace!("repl iteration {}", i);
// Purge tombstones.
let mut server_a_txn = server_a.write(ct).await;
assert!(server_a_txn.purge_tombstones().is_ok());
server_a_txn.commit().expect("Failed to commit");
ct += one_second;
let mut server_b_txn = server_b.write(ct).await;
assert!(server_b_txn.purge_tombstones().is_ok());
server_b_txn.commit().expect("Failed to commit");
ct += one_second;
// Now check incremental in both directions. Should show *no* changes
// needed (rather than an error/lagging).
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
let a_ruv_range = server_a_txn
.consumer_get_state()
.expect("Unable to access RUV range");
trace!(?a_ruv_range);
let changes = server_b_txn
.supplier_provide_changes(a_ruv_range)
.expect("Unable to generate supplier changes");
assert!(matches!(changes, ReplIncrementalContext::V1 { .. }));
let result = server_a_txn
.consumer_apply_changes(&changes)
.expect("Unable to apply changes to consumer.");
assert!(matches!(result, ConsumerState::Ok));
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
ct += one_second;
// Reverse it!
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(ct).await;
let b_ruv_range = server_b_txn
.consumer_get_state()
.expect("Unable to access RUV range");
trace!(?b_ruv_range);
let changes = server_a_txn
.supplier_provide_changes(b_ruv_range)
.expect("Unable to generate supplier changes");
assert!(matches!(changes, ReplIncrementalContext::V1 { .. }));
let result = server_b_txn
.consumer_apply_changes(&changes)
.expect("Unable to apply changes to consumer.");
assert!(matches!(result, ConsumerState::Ok));
drop(server_a_txn);
server_b_txn.commit().expect("Failed to commit");
ct += changelog_quarter_life;
}
// Done!
}
// Test change of a domain name over incremental.
#[qs_pair_test]
async fn test_repl_increment_domain_rename(server_a: &QueryServer, server_b: &QueryServer) {

View file

@ -1827,6 +1827,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
.and_then(|_| accesscontrols.commit())
.and_then(|_| be_txn.commit())
}
pub(crate) fn get_txn_cid(&self) -> &Cid {
&self.cid
}

View file

@ -9,10 +9,11 @@ impl<'a> QueryServerWriteTransaction<'a> {
pub fn purge_tombstones(&mut self) -> Result<usize, OperationError> {
// purge everything that is a tombstone.
let trim_cid = self.trim_cid().clone();
let anchor_cid = self.get_txn_cid().clone();
// Delete them - this is a TRUE delete, no going back now!
self.be_txn
.reap_tombstones(&trim_cid)
.reap_tombstones(&anchor_cid, &trim_cid)
.map_err(|e| {
error!(err = ?e, "Tombstone purge operation failed (backend)");
e