20230526 incremental replication improvements (#1659)

* Improve refresh ruv checking
* Expand comments for tests, add basic attribute merge statemachine
This commit is contained in:
Firstyear 2023-05-29 08:53:27 +10:00 committed by GitHub
parent 8fb1763635
commit 8a548fe13e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 372 additions and 53 deletions

68
Cargo.lock generated
View file

@ -399,7 +399,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.17",
]
[[package]]
@ -479,9 +479,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.21.0"
version = "0.21.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d"
[[package]]
name = "base64urlsafedata"
@ -489,7 +489,7 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18b3d30abb74120a9d5267463b9e0045fdccc4dd152e7249d966612dc1721384"
dependencies = [
"base64 0.21.0",
"base64 0.21.2",
"serde",
"serde_json",
]
@ -601,9 +601,9 @@ checksum = "cfa8873f51c92e232f9bac4065cddef41b714152812bfc5f7672ba16d6ef8cd9"
[[package]]
name = "bumpalo"
version = "3.12.2"
version = "3.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c6ed94e98ecff0c12dd1b04c15ec0d7d9458ca8fe806cea6f12954efe74c63b"
checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
[[package]]
name = "byte-tools"
@ -1305,7 +1305,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.17",
]
[[package]]
@ -1430,7 +1430,7 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3364d69f691f3903b1a71605fa04f40a7c2d259f0f0512347e36d19a63debf1f"
dependencies = [
"base64 0.21.0",
"base64 0.21.2",
"byteorder",
"getrandom 0.2.9",
"openssl",
@ -1581,7 +1581,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.17",
]
[[package]]
@ -2185,9 +2185,9 @@ dependencies = [
[[package]]
name = "io-lifetimes"
version = "1.0.10"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c66c74d2ae7e79a5a8f7ac924adbe38ee42a859c6539ad869eb51f0b52dc220"
checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi 0.3.1",
"libc",
@ -2278,7 +2278,7 @@ dependencies = [
name = "kanidm_lib_crypto"
version = "0.1.0"
dependencies = [
"base64 0.21.0",
"base64 0.21.2",
"base64urlsafedata",
"hex",
"kanidm_proto",
@ -2415,7 +2415,7 @@ name = "kanidmd_lib"
version = "1.1.0-beta.13-dev"
dependencies = [
"async-trait",
"base64 0.21.0",
"base64 0.21.2",
"base64urlsafedata",
"compact_jwt",
"concread",
@ -2475,7 +2475,7 @@ version = "0.1.0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.17",
]
[[package]]
@ -3033,7 +3033,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.17",
]
[[package]]
@ -3207,7 +3207,7 @@ checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.17",
]
[[package]]
@ -3358,9 +3358,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068"
[[package]]
name = "proc-macro2"
version = "1.0.58"
version = "1.0.59"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa1fb82fc0c281dd9671101b66b771ebbe1eaf967b96ac8740dcba4b70005ca8"
checksum = "6aeca18b86b413c660b781aa319e4e2648a3e6f9eadc9b47e9038e6fe9f3451b"
dependencies = [
"unicode-ident",
]
@ -3369,7 +3369,7 @@ dependencies = [
name = "profiles"
version = "1.1.0-beta.13-dev"
dependencies = [
"base64 0.21.0",
"base64 0.21.2",
"serde",
"toml",
]
@ -3614,7 +3614,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55"
dependencies = [
"async-compression 0.4.0",
"base64 0.21.0",
"base64 0.21.2",
"bytes",
"cookie 0.16.2",
"cookie_store",
@ -3906,7 +3906,7 @@ checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.17",
]
[[package]]
@ -4240,9 +4240,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.16"
version = "2.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6f671d4b5ffdb8eadec19c0ae67fe2639df8684bd7bc4b83d986b8db549cf01"
checksum = "45b6ddbb36c5b969c182aec3c4a0bce7df3fbad4b77114706a49aacc80567388"
dependencies = [
"proc-macro2",
"quote",
@ -4289,7 +4289,7 @@ version = "0.1.0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.17",
]
[[package]]
@ -4315,7 +4315,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.17",
]
[[package]]
@ -4529,7 +4529,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.17",
]
[[package]]
@ -4596,9 +4596,9 @@ checksum = "5a76a9312f5ba4c2dec6b9161fdf25d87ad8a09256ccea5a556fef03c706a10f"
[[package]]
name = "toml_edit"
version = "0.19.9"
version = "0.19.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92d964908cec0d030b812013af25a0e57fddfadb1e066ecc6681d86253129d4f"
checksum = "2380d56e8670370eee6566b0bfd4265f65b3f432e8c6d85623f728d4fa31f739"
dependencies = [
"indexmap",
"toml_datetime",
@ -4640,7 +4640,7 @@ checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.17",
]
[[package]]
@ -4715,9 +4715,9 @@ checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460"
[[package]]
name = "unicode-ident"
version = "1.0.8"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4"
checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0"
[[package]]
name = "unicode-normalization"
@ -4887,7 +4887,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.17",
"wasm-bindgen-shared",
]
@ -4921,7 +4921,7 @@ checksum = "e128beba882dd1eb6200e1dc92ae6c5dbaa4311aa7bb211ca035779e5efc39f8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.17",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -5417,7 +5417,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.17",
]
[[package]]

View file

@ -713,11 +713,60 @@ impl Entry<EntryIncremental, EntryNew> {
// Now we have the set of attrs from both sides. Lets see what state they are in!
for attr_name in attr_set.into_iter() {
match (changes_left.get(attr_name), changes_right.get(attr_name)) {
(Some(_cid_left), Some(_cid_right)) => {
(Some(cid_left), Some(cid_right)) => {
// This is the normal / usual and most "fun" case. Here we need to determine
// which side is latest and then do a valueset merge. This is also
// needing schema awareness depending on the attribute!
todo!();
//
// The behaviour is very dependent on the state of the attributes and
// if they exist.
let take_left = cid_left > cid_right;
match (self.attrs.get(attr_name), db_ent.attrs.get(attr_name)) {
(Some(vs_left), Some(vs_right)) if take_left => {
if let Some(_attr_state) = vs_left.repl_merge_valueset(vs_right)
{
todo!();
} else {
changes.insert(attr_name.clone(), cid_left.clone());
eattrs.insert(attr_name.clone(), vs_left.clone());
}
}
(Some(vs_left), Some(vs_right)) => {
if let Some(_attr_state) = vs_right.repl_merge_valueset(vs_left)
{
todo!();
} else {
changes.insert(attr_name.clone(), cid_right.clone());
eattrs.insert(attr_name.clone(), vs_right.clone());
}
}
(Some(vs_left), None) if take_left => {
changes.insert(attr_name.clone(), cid_left.clone());
eattrs.insert(attr_name.clone(), vs_left.clone());
}
(Some(_vs_left), None) => {
changes.insert(attr_name.clone(), cid_right.clone());
// Taking right, nothing to do due to no attr.
}
(None, Some(_vs_right)) if take_left => {
changes.insert(attr_name.clone(), cid_left.clone());
// Taking left, nothing to do due to no attr.
}
(None, Some(vs_right)) => {
changes.insert(attr_name.clone(), cid_right.clone());
eattrs.insert(attr_name.clone(), vs_right.clone());
}
(None, None) if take_left => {
changes.insert(attr_name.clone(), cid_left.clone());
// Taking left, nothing to do due to no attr.
}
(None, None) => {
changes.insert(attr_name.clone(), cid_right.clone());
// Taking right, nothing to do due to no attr.
}
}
// End attr merging
}
(Some(cid_left), None) => {
// Keep the value on the left.
@ -726,10 +775,12 @@ impl Entry<EntryIncremental, EntryNew> {
eattrs.insert(attr_name.clone(), valueset.clone());
}
}
(None, Some(_cid_right)) => {
(None, Some(cid_right)) => {
// Keep the value on the right.
todo!();
changes.insert(attr_name.clone(), cid_right.clone());
if let Some(valueset) = db_ent.attrs.get(attr_name) {
eattrs.insert(attr_name.clone(), valueset.clone());
}
}
(None, None) => {
// Should be impossible! At least one side or the other must have a change.

View file

@ -424,31 +424,43 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
// our ruv, that it's maximum matches the ctx ruv.
//
// Since the ctx range comes from the supplier, when we rebuild due to the
// state machine then some values may not exist since they were replaced. But
// the server uuid maximums must exist.
// state machine then some values may not exist since they were replaced
// or updated. It's also possible that the imported range maximums *may not*
// exist especially in three way replication scenarioes where S1:A was the S1
// maximum but is replaced by S2:B. This would make S1:A still it's valid
// maximum but no entry reflects that in it's change state.
let mut valid = true;
for (server_uuid, server_range) in self.ranged.iter() {
match ctx_ranges.get(server_uuid) {
Some(ctx_range) => {
let ctx_ts = &ctx_range.ts_max;
for (ctx_server_uuid, ctx_server_range) in ctx_ranges.iter() {
match self.ranged.get(ctx_server_uuid) {
Some(server_range) => {
let ctx_ts = &ctx_server_range.ts_max;
match server_range.last() {
Some(s_ts) if ctx_ts == s_ts => {
// Ok
trace!(?server_uuid, ?ctx_ts, ?s_ts, "valid");
Some(s_ts) if s_ts <= ctx_ts => {
// Ok - our entries reflect maximum or earlier.
trace!(?ctx_server_uuid, ?ctx_ts, ?s_ts, "valid");
}
Some(s_ts) => {
valid = false;
warn!(?server_uuid, ?ctx_ts, ?s_ts, "inconsistent s_uuid in ruv");
warn!(?ctx_server_uuid, ?ctx_ts, ?s_ts, "inconsistent s_uuid in ruv, consumer ruv is advanced past supplier");
}
None => {
valid = false;
warn!(?server_uuid, ?ctx_ts, "inconsistent server range in ruv");
warn!(
?ctx_server_uuid,
?ctx_ts,
"inconsistent server range in ruv, no maximum ts found for s_uuid"
);
}
}
}
None => {
valid = false;
error!(?server_uuid, "s_uuid absent from in ruv");
// valid = false;
trace!(
?ctx_server_uuid,
"s_uuid absent from ranged ruv, possible that changes have been expired"
);
}
}
}
@ -550,6 +562,9 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
let eid = entry.get_id();
let ecstate = entry.get_changestate();
trace!("Updating ruv state from entry {}", eid);
trace!(?ecstate);
for cid in ecstate.cid_iter() {
if let Some(idl) = self.data.get_mut(cid) {
// We can't guarantee id order, so we have to do this properly.

View file

@ -3,6 +3,7 @@ use crate::prelude::*;
use crate::repl::consumer::ConsumerState;
use crate::repl::proto::ReplIncrementalContext;
use crate::repl::ruv::ReplicationUpdateVectorTransaction;
use crate::repl::ruv::{RangeDiffStatus, ReplicationUpdateVector};
use std::collections::BTreeMap;
fn repl_initialise(
@ -88,9 +89,14 @@ fn repl_incremental(
trace!(?a_ruv_range);
trace!(?b_ruv_range);
// May need to be "is subset" for future when we are testing
// some more complex scenarioes.
assert!(a_ruv_range == b_ruv_range);
let valid = match ReplicationUpdateVector::range_diff(&a_ruv_range, &b_ruv_range) {
RangeDiffStatus::Ok(require) => require.is_empty(),
_ => false,
};
assert!(valid);
}
#[qs_pair_test]
@ -546,6 +552,222 @@ async fn test_repl_increment_consumer_lagging_tombstone(
drop(server_b_txn);
}
// Write state cases.
// Create Entry an B -> A
// Write to A
// A -> B becomes consistent.
#[qs_pair_test]
async fn test_repl_increment_basic_bidirectional_write(
server_a: &QueryServer,
server_b: &QueryServer,
) {
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
// Add an entry.
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
let t_uuid = Uuid::new_v4();
assert!(server_b_txn
.internal_create(vec![entry_init!(
("class", Value::new_class("object")),
("class", Value::new_class("person")),
("name", Value::new_iname("testperson1")),
("uuid", Value::Uuid(t_uuid)),
("description", Value::new_utf8s("testperson1")),
("displayname", Value::new_utf8s("testperson1"))
),])
.is_ok());
server_b_txn.commit().expect("Failed to commit");
// Assert the entry is not on A.
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let mut server_b_txn = server_b.read().await;
assert_eq!(
server_a_txn.internal_search_uuid(t_uuid),
Err(OperationError::NoMatchingEntries)
);
// from to
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
// Now perform a write on A
assert!(server_a_txn
.internal_modify_uuid(t_uuid, &ModifyList::new_purge("description"))
.is_ok());
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Incremental repl in the reverse direction.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
// from to
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
// They are consistent again.
assert!(e1 == e2);
assert!(e1.get_ava_set("description").is_none());
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
}
// Create Entry on A -> B
// Write to both
// B -> A and A -> B become consistent.
#[qs_pair_test]
async fn test_repl_increment_simultaneous_bidirectional_write(
server_a: &QueryServer,
server_b: &QueryServer,
) {
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
// Add an entry.
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
let t_uuid = Uuid::new_v4();
assert!(server_b_txn
.internal_create(vec![entry_init!(
("class", Value::new_class("object")),
("class", Value::new_class("person")),
("name", Value::new_iname("testperson1")),
("uuid", Value::Uuid(t_uuid)),
("description", Value::new_utf8s("testperson1")),
("displayname", Value::new_utf8s("testperson1"))
),])
.is_ok());
server_b_txn.commit().expect("Failed to commit");
// Assert the entry is not on A.
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let mut server_b_txn = server_b.read().await;
assert_eq!(
server_a_txn.internal_search_uuid(t_uuid),
Err(OperationError::NoMatchingEntries)
);
// from to
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
// Now perform a write on A
assert!(server_a_txn
.internal_modify_uuid(
t_uuid,
&ModifyList::new_purge_and_set("description", Value::new_utf8s("repl_test"))
)
.is_ok());
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Also write to B.
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
assert!(server_b_txn
.internal_modify_uuid(
t_uuid,
&ModifyList::new_purge_and_set("displayname", Value::new_utf8s("repl_test"))
)
.is_ok());
server_b_txn.commit().expect("Failed to commit");
// Incremental repl in the both directions.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
// from to
repl_incremental(&mut server_a_txn, &mut server_b_txn);
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let mut server_b_txn = server_b.read().await;
// from to
repl_incremental(&mut server_b_txn, &mut server_a_txn);
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Validate they are the same again.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.read().await;
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
// They are consistent again.
assert!(e1 == e2);
assert!(e1.get_ava_single_utf8("description") == Some("repl_test"));
assert!(e1.get_ava_single_utf8("displayname") == Some("repl_test"));
}
// Create entry on A -> B
// Recycle
// Recycle propagates from A -> B
// TS on B
// B -> A TS
// Create entry on A -> B
// Recycle on Both A/B
// Recycle propagates from A -> B, B -> A, keep earliest.
// TS on A
// A -> B TS
// Create + recycle entry on A -> B
// TS on Both,
// TS resolves to lowest AT.
// conflict cases.
// both add entry with same uuid - only one can win!
// both add entry with same uuid, but one becomes ts - ts always wins.
// both add entry with same uuid, both become ts - merge, take lowest AT.
// Test RUV content when a server's changes have been trimmed out and are not present
// in a refresh. This is not about tombstones, this is about attribute state.

View file

@ -527,6 +527,29 @@ pub trait ValueSetT: std::fmt::Debug + DynClone {
debug_assert!(false);
None
}
fn repl_merge_valueset(
&self,
_older: &ValueSet,
// schema_attr: &SchemaAttribute
) -> Option<ValueSet> {
// Self is the "latest" content. Older contains the earlier
// state of the attribute.
//
// In most cases we don't actually need a merge strategy. We just need the
// newer state of the attribute.
//
// However when we have a merge strategy that is required we return
// Some(new_state) if and only if merges were applied that need to be added
// to the change state.
//
// If no merge was required, we just return None.
//
// Examples where we need merging is session states. This has an internal
// attribute state machine that works similarly to tombstones to ensure that
// after a certain period that attributes are cleaned up.
None
}
}
impl PartialEq for ValueSet {

View file

@ -497,6 +497,10 @@ impl ValueSetT for ValueSetSession {
.collect();
Ok(Box::new(ValueSetApiToken { map }))
}
fn repl_merge_valueset(&self, _older: &ValueSet) -> Option<ValueSet> {
todo!();
}
}
// == oauth2 session ==
@ -867,6 +871,10 @@ impl ValueSetT for ValueSetOauth2Session {
// bind to our resource servers, not our ids!
Some(Box::new(self.map.values().map(|m| &m.rs_uuid).copied()))
}
fn repl_merge_valueset(&self, _older: &ValueSet) -> Option<ValueSet> {
todo!();
}
}
#[derive(Debug, Clone)]