mirror of
https://github.com/kanidm/kanidm.git
synced 2025-02-23 12:37:00 +01:00
68 20230907 replication (#2081)
* Test replication when nodes are valid beyond cl trim
This commit is contained in:
parent
98884931c5
commit
61c59d5a5a
|
@ -99,13 +99,15 @@ impl ReplicationUpdateVector {
|
|||
// max > consumer max, then the range between consumer max
|
||||
// and supplier max must be supplied.
|
||||
//
|
||||
// consumer min consumer max
|
||||
// <-- supplier min supplier max -->
|
||||
// [ consumer min ... consumer max ]
|
||||
// <-- [ supplier min .. supplier max ] -->
|
||||
//
|
||||
// In other words if we have:
|
||||
//
|
||||
// consumer min consumer max
|
||||
// supplier min supplier max
|
||||
// [ consumer min ... consumer max ]
|
||||
// [ supplier min ... supplier max ]
|
||||
// ^
|
||||
// \-- no overlap of the range windows!
|
||||
//
|
||||
// then because there has been too much lag between consumer and
|
||||
// the supplier then there is a risk of changes being dropped or
|
||||
|
@ -113,6 +115,12 @@ impl ReplicationUpdateVector {
|
|||
// of zero -> supplier max, but I think thought is needed to
|
||||
// ensure no corruption in this case.
|
||||
if consumer_cid_range.ts_max < supplier_cid_range.ts_min {
|
||||
//
|
||||
// [ consumer min ... consumer max ]
|
||||
// [ supplier min ... supplier max ]
|
||||
// ^
|
||||
// \-- no overlap of the range windows!
|
||||
//
|
||||
consumer_lagging = true;
|
||||
lag_range.insert(
|
||||
*supplier_s_uuid,
|
||||
|
@ -122,8 +130,14 @@ impl ReplicationUpdateVector {
|
|||
},
|
||||
);
|
||||
} else if supplier_cid_range.ts_max < consumer_cid_range.ts_min {
|
||||
// It could be valid in this case to ignore this instead
|
||||
// of erroring as changelog trim has occurred? Thought needed.
|
||||
//
|
||||
// [ consumer min ... consumer max ]
|
||||
// [ supplier min ... supplier max ]
|
||||
// ^
|
||||
// \-- no overlap of the range windows!
|
||||
//
|
||||
// This means we can't supply because we are missing changes that the consumer
|
||||
// has. *we* are lagging.
|
||||
supplier_lagging = true;
|
||||
adv_range.insert(
|
||||
*supplier_s_uuid,
|
||||
|
@ -133,6 +147,14 @@ impl ReplicationUpdateVector {
|
|||
},
|
||||
);
|
||||
} else if consumer_cid_range.ts_max < supplier_cid_range.ts_max {
|
||||
//
|
||||
// /-- consumer needs these changes
|
||||
// v
|
||||
// [ consumer min ... consumer max ] --> ]
|
||||
// [ supplier min ... supplier max ]
|
||||
// ^
|
||||
// \-- overlap of the range windows
|
||||
//
|
||||
// We require the changes from consumer max -> supplier max.
|
||||
diff_range.insert(
|
||||
*supplier_s_uuid,
|
||||
|
@ -143,6 +165,23 @@ impl ReplicationUpdateVector {
|
|||
);
|
||||
}
|
||||
// else ...
|
||||
//
|
||||
// /-- The consumer has changes we don't have.
|
||||
// | So we don't need to supply
|
||||
// v
|
||||
// [ consumer min ... consumer max ]
|
||||
// [ supplier min ... supplier max ]
|
||||
// ^
|
||||
// \-- overlap of the range windows
|
||||
//
|
||||
// OR
|
||||
//
|
||||
// [ consumer min ... consumer max ]
|
||||
// [ supplier min ... supplier max ]
|
||||
// ^
|
||||
// \-- the windows max is identical
|
||||
// no actions needed
|
||||
//
|
||||
// In this case there is no action required since consumer_cid_range.ts_max
|
||||
// must be greater than or equal to supplier max.
|
||||
}
|
||||
|
@ -220,8 +259,7 @@ pub trait ReplicationUpdateVectorTransaction {
|
|||
|
||||
fn range_to_idl(&self, ctx_ranges: &BTreeMap<Uuid, ReplCidRange>) -> IDLBitRange {
|
||||
let mut idl = IDLBitRange::new();
|
||||
// Force the set to be compressed, saves on seeks during
|
||||
// inserts.
|
||||
// Force the set to be compressed, saves on seeks during inserts.
|
||||
idl.compress();
|
||||
let range = self.range_snapshot();
|
||||
let ruv = self.ruv_snapshot();
|
||||
|
@ -246,7 +284,6 @@ pub trait ReplicationUpdateVectorTransaction {
|
|||
// Get from the min to the max. Unbounded and
|
||||
// Included(ctx_range.ts_max) are the same in
|
||||
// this context.
|
||||
|
||||
for ts in ruv_range.range((Excluded(ctx_range.ts_min), Unbounded)) {
|
||||
let cid = Cid {
|
||||
ts: *ts,
|
||||
|
@ -256,7 +293,9 @@ pub trait ReplicationUpdateVectorTransaction {
|
|||
if let Some(ruv_idl) = ruv.get(&cid) {
|
||||
ruv_idl.into_iter().for_each(|id| idl.insert_id(id))
|
||||
}
|
||||
// If the cid isn't found, it may have been trimmed, but that's okay.
|
||||
// If the cid isn't found, it may have been trimmed, but that's okay. A cid in
|
||||
// a range can be trimmed if all entries of that cid have since tombstoned so
|
||||
// no longer need to be applied in change ranges.
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -634,6 +673,9 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
|
|||
|
||||
Mostly longer ruv/cid ranges aren't an issue for us, so could we just maek these ranges
|
||||
really large?
|
||||
|
||||
NOTE: For now we do NOT trim out max CID's of any s_uuid so that we don't have to confront
|
||||
this edge case yet.
|
||||
*/
|
||||
|
||||
// Problem Cases
|
||||
|
@ -661,6 +703,7 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
|
|||
*/
|
||||
|
||||
pub fn trim_up_to(&mut self, cid: &Cid) -> Result<IDLBitRange, OperationError> {
|
||||
trace!(trim_up_to_cid = ?cid);
|
||||
let mut idl = IDLBitRange::new();
|
||||
// let mut remove_suuid = Vec::default();
|
||||
|
||||
|
@ -690,7 +733,7 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
|
|||
return Err(OperationError::InvalidState);
|
||||
}
|
||||
} else {
|
||||
trace!("skipping maximum cid for s_uuid");
|
||||
trace!("skip trimming maximum cid for s_uuid {}", cid.s_uuid);
|
||||
}
|
||||
if server_range.is_empty() {
|
||||
// remove_suuid.push(cid.s_uuid);
|
||||
|
|
|
@ -1846,6 +1846,142 @@ async fn test_repl_increment_consumer_lagging_attributes(
|
|||
drop(server_b_txn);
|
||||
}
|
||||
|
||||
// Test two synchronised nodes where no changes occured in a TS/RUV window.
|
||||
#[qs_pair_test]
|
||||
async fn test_repl_increment_consumer_ruv_trim_past_valid(
|
||||
server_a: &QueryServer,
|
||||
server_b: &QueryServer,
|
||||
) {
|
||||
let ct = duration_from_epoch_now();
|
||||
|
||||
let mut server_a_txn = server_a.write(ct).await;
|
||||
let mut server_b_txn = server_b.read().await;
|
||||
|
||||
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
|
||||
.and_then(|_| server_a_txn.commit())
|
||||
.is_ok());
|
||||
drop(server_b_txn);
|
||||
|
||||
// Add an entry. We need at least one change on B, else it won't have anything
|
||||
// to ship in it's RUV to A.
|
||||
let ct = duration_from_epoch_now();
|
||||
let mut server_b_txn = server_b.write(ct).await;
|
||||
let t_uuid = Uuid::new_v4();
|
||||
assert!(server_b_txn
|
||||
.internal_create(vec![entry_init!(
|
||||
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
|
||||
(Attribute::Class.as_ref(), EntryClass::Person.to_value()),
|
||||
(Attribute::Name.as_ref(), Value::new_iname("testperson1")),
|
||||
(Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)),
|
||||
(
|
||||
Attribute::Description.as_ref(),
|
||||
Value::new_utf8s("testperson1")
|
||||
),
|
||||
(
|
||||
Attribute::DisplayName.as_ref(),
|
||||
Value::new_utf8s("testperson1")
|
||||
)
|
||||
),])
|
||||
.is_ok());
|
||||
|
||||
server_b_txn.commit().expect("Failed to commit");
|
||||
|
||||
// Now setup bidirectional replication. We only need to trigger B -> A
|
||||
// here because that's all that has changes.
|
||||
let ct = duration_from_epoch_now();
|
||||
let mut server_a_txn = server_a.write(ct).await;
|
||||
let mut server_b_txn = server_b.read().await;
|
||||
|
||||
trace!("========================================");
|
||||
repl_incremental(&mut server_b_txn, &mut server_a_txn);
|
||||
|
||||
let e1 = server_a_txn
|
||||
.internal_search_all_uuid(t_uuid)
|
||||
.expect("Unable to access entry.");
|
||||
let e2 = server_b_txn
|
||||
.internal_search_all_uuid(t_uuid)
|
||||
.expect("Unable to access entry.");
|
||||
|
||||
assert!(e1 == e2);
|
||||
|
||||
server_a_txn.commit().expect("Failed to commit");
|
||||
drop(server_b_txn);
|
||||
|
||||
// Everything is consistent!
|
||||
|
||||
// Compare RUV's
|
||||
|
||||
// Push time ahead past a changelog max age.
|
||||
let ct = ct + Duration::from_secs(CHANGELOG_MAX_AGE * 4);
|
||||
|
||||
// And setup the ruv trim. This is triggered by purge/reap tombstones.
|
||||
// Apply this to both nodes so that they shift their RUV states.
|
||||
let mut server_a_txn = server_a.write(ct).await;
|
||||
assert!(server_a_txn.purge_tombstones().is_ok());
|
||||
server_a_txn.commit().expect("Failed to commit");
|
||||
|
||||
let mut server_b_txn = server_b.write(ct).await;
|
||||
assert!(server_b_txn.purge_tombstones().is_ok());
|
||||
server_b_txn.commit().expect("Failed to commit");
|
||||
|
||||
// Now check incremental in both directions. Should show *no* changes
|
||||
// needed (rather than an error/lagging).
|
||||
let mut server_a_txn = server_a.write(ct).await;
|
||||
let mut server_b_txn = server_b.read().await;
|
||||
|
||||
let a_ruv_range = server_a_txn
|
||||
.consumer_get_state()
|
||||
.expect("Unable to access RUV range");
|
||||
|
||||
trace!(?a_ruv_range);
|
||||
|
||||
let changes = server_b_txn
|
||||
.supplier_provide_changes(a_ruv_range)
|
||||
.expect("Unable to generate supplier changes");
|
||||
|
||||
assert!(matches!(
|
||||
changes,
|
||||
ReplIncrementalContext::NoChangesAvailable
|
||||
));
|
||||
|
||||
let result = server_a_txn
|
||||
.consumer_apply_changes(&changes)
|
||||
.expect("Unable to apply changes to consumer.");
|
||||
|
||||
assert!(matches!(result, ConsumerState::Ok));
|
||||
|
||||
drop(server_a_txn);
|
||||
drop(server_b_txn);
|
||||
|
||||
// Reverse it!
|
||||
let mut server_a_txn = server_a.read().await;
|
||||
let mut server_b_txn = server_b.write(ct).await;
|
||||
|
||||
let b_ruv_range = server_b_txn
|
||||
.consumer_get_state()
|
||||
.expect("Unable to access RUV range");
|
||||
|
||||
trace!(?b_ruv_range);
|
||||
|
||||
let changes = server_a_txn
|
||||
.supplier_provide_changes(b_ruv_range)
|
||||
.expect("Unable to generate supplier changes");
|
||||
|
||||
assert!(matches!(
|
||||
changes,
|
||||
ReplIncrementalContext::NoChangesAvailable
|
||||
));
|
||||
|
||||
let result = server_b_txn
|
||||
.consumer_apply_changes(&changes)
|
||||
.expect("Unable to apply changes to consumer.");
|
||||
|
||||
assert!(matches!(result, ConsumerState::Ok));
|
||||
|
||||
drop(server_a_txn);
|
||||
drop(server_b_txn);
|
||||
}
|
||||
|
||||
// Test change of a domain name over incremental.
|
||||
#[qs_pair_test]
|
||||
async fn test_repl_increment_domain_rename(server_a: &QueryServer, server_b: &QueryServer) {
|
||||
|
|
Loading…
Reference in a new issue