20230505 replication groundwork - ruv consistency improvements (#1606)

This commit is contained in:
Firstyear 2023-05-08 18:25:27 +10:00 committed by GitHub
parent 10abdd0b59
commit 6afb15ca92
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 421 additions and 99 deletions

View file

@ -990,6 +990,7 @@ impl<'a> BackendWriteTransaction<'a> {
// This auto compresses.
let ruv_idl = IDLBitRange::from_iter(c_entries.iter().map(|e| e.get_id()));
// We don't need to skip this like in mod since creates always go to the ruv
self.get_ruv().insert_change(cid, ruv_idl)?;
self.idlayer.write_identries(c_entries.iter())?;
@ -1005,10 +1006,10 @@ impl<'a> BackendWriteTransaction<'a> {
}
#[instrument(level = "debug", name = "be::create", skip_all)]
/// This is similar to create, but used in the replication path as it skips the
/// modification of the RUV and the checking of CIDs since these actions are not
/// required during a replication refresh (else we'd create an infinite replication
/// loop.)
/// This is similar to create, but used in the replication path as it records all
/// the CID's in the entry to the RUV, but without applying the current CID as
/// a new value in the RUV. We *do not* want to apply the current CID in the RUV
/// related to this entry as that could cause an infinite replication loop!
pub fn refresh(
&mut self,
entries: Vec<Entry<EntrySealed, EntryNew>>,
@ -1032,6 +1033,11 @@ impl<'a> BackendWriteTransaction<'a> {
self.idlayer.set_id2entry_max_id(id_max);
// Update the RUV with all the changestates of the affected entries.
for e in c_entries.iter() {
self.get_ruv().update_entry_changestate(e)?;
}
// Now update the indexes as required.
for e in c_entries.iter() {
self.entry_index(None, Some(e))?
@ -1054,21 +1060,24 @@ impl<'a> BackendWriteTransaction<'a> {
assert!(post_entries.len() == pre_entries.len());
post_entries.iter().try_for_each(|e| {
if e.get_changestate().contains_tail_cid(cid) {
Ok(())
} else {
admin_error!(
"Entry changelog does not contain a change related to this transaction"
);
Err(OperationError::ReplEntryNotChanged)
}
})?;
let post_entries_iter = post_entries.iter().filter(|e| {
trace!(?cid);
trace!(changestate = ?e.get_changestate());
// If True - This means that at least one attribute that *is* replicated was changed
// on this entry, so we need to update and add this to the RUV!
//
// If False - This means that the entry in question was updated but the changes are all
// non-replicated so we DO NOT update the RUV here!
e.get_changestate().contains_tail_cid(cid)
});
// All good, lets update the RUV.
// This auto compresses.
let ruv_idl = IDLBitRange::from_iter(post_entries.iter().map(|e| e.get_id()));
self.get_ruv().insert_change(cid, ruv_idl)?;
let ruv_idl = IDLBitRange::from_iter(post_entries_iter.map(|e| e.get_id()));
if !ruv_idl.is_empty() {
self.get_ruv().insert_change(cid, ruv_idl)?;
}
// Now, given the list of id's, update them
self.get_idlayer().write_identries(post_entries.iter())?;
@ -1484,6 +1493,7 @@ impl<'a> BackendWriteTransaction<'a> {
}
pub(crate) fn danger_delete_all_db_content(&mut self) -> Result<(), OperationError> {
self.get_ruv().clear();
unsafe {
self.get_idlayer()
.purge_id2entry()

View file

@ -1535,6 +1535,9 @@ pub const JSON_SCHEMA_CLASS_DYNGROUP: &str = r#"
"systemmust": [
"dyngroup_filter"
],
"systemmay": [
"dynmember"
],
"systemsupplements": [
"group"
],

View file

@ -226,6 +226,7 @@ pub const UUID_SCHEMA_ATTR_REPLICATED: Uuid = uuid!("00000000-0000-0000-0000-fff
pub const UUID_SCHEMA_ATTR_PRIVATE_COOKIE_KEY: Uuid = uuid!("00000000-0000-0000-0000-ffff00000130");
pub const _UUID_SCHEMA_ATTR_DOMAIN_LDAP_BASEDN: Uuid =
uuid!("00000000-0000-0000-0000-ffff00000131");
pub const UUID_SCHEMA_ATTR_DYNMEMBER: Uuid = uuid!("00000000-0000-0000-0000-ffff00000132");
// System and domain infos
// I'd like to strongly criticise william of the past for making poor choices about these allocations.

View file

@ -576,7 +576,7 @@ impl Entry<EntryInit, EntryNew> {
// event for the attribute.
/// Add an attribute-value-assertion to this Entry.
pub fn add_ava(&mut self, attr: &str, value: Value) {
self.add_ava_int(attr, value)
self.add_ava_int(attr, value);
}
/// Replace the existing content of an attribute set of this Entry, with a new set of Values.
@ -1850,18 +1850,23 @@ impl Entry<EntryReduced, EntryCommitted> {
// impl<STATE> Entry<EntryValid, STATE> {
impl<VALID, STATE> Entry<VALID, STATE> {
/// This internally adds an AVA to the entry.
fn add_ava_int(&mut self, attr: &str, value: Value) {
// How do we make this turn into an ok / err?
/// This internally adds an AVA to the entry. If the entry was newely added, then true is returned.
/// If the value already existed, or was unable to be added, false is returned. Alternately,
/// you can think of this boolean as "if a write occured to the structure", true indicating that
/// a change occured.
fn add_ava_int(&mut self, attr: &str, value: Value) -> bool {
if let Some(vs) = self.attrs.get_mut(attr) {
let r = vs.insert_checked(value);
debug_assert!(r.is_ok());
// Default to the value not being present if wrong typed.
r.unwrap_or(false)
} else {
#[allow(clippy::expect_used)]
let vs = valueset::from_value_iter(std::iter::once(value))
.expect("Unable to fail - non-zero iter, and single value type!");
self.attrs.insert(AttrString::from(attr), vs);
// The attribute did not exist before.
false
}
// Doesn't matter if it already exists, equality will replace.
}
@ -2401,7 +2406,15 @@ where
.eclog
.add_ava_iter(&self.valid.cid, attr, std::iter::once(value.clone()));
*/
self.add_ava_int(attr, value)
self.add_ava_int(attr, value);
}
pub fn add_ava_if_not_exist(&mut self, attr: &str, value: Value) {
// This returns true if the value WAS changed! See add_ava_int.
if self.add_ava_int(attr, value) {
// In this case, we ONLY update the changestate if the value was already present!
self.valid.ecstate.change_ava(&self.valid.cid, attr);
}
}
fn assert_ava(&mut self, attr: &str, value: &PartialValue) -> Result<(), OperationError> {

View file

@ -71,12 +71,17 @@ impl DynGroup {
affected_uuids.extend(uuid_iter);
}
// Mark the former members as being affected also.
if let Some(uuid_iter) = pre.get_ava_as_refuuid("dynmember") {
affected_uuids.extend(uuid_iter);
}
if let Some(members) = members {
// Only set something if there is actually something to do!
nd_group.set_ava_set("member", members);
nd_group.set_ava_set("dynmember", members);
// push the entries to pre/cand
} else {
nd_group.purge_ava("member");
nd_group.purge_ava("dynmember");
}
candidate_tuples.push((pre, nd_group));
@ -191,7 +196,7 @@ impl DynGroup {
matches
.iter()
.copied()
.for_each(|u| d_group.add_ava("member", Value::Refer(u)));
.for_each(|u| d_group.add_ava("dynmember", Value::Refer(u)));
affected_uuids.extend(matches.into_iter());
affected_uuids.push(*dg_uuid);
@ -315,8 +320,8 @@ impl DynGroup {
if let Some((pre, mut d_group)) = work_set.pop() {
matches.iter().copied().for_each(|choice| match choice {
Ok(u) => d_group.add_ava("member", Value::Refer(u)),
Err(u) => d_group.remove_ava("member", &PartialValue::Refer(u)),
Ok(u) => d_group.add_ava("dynmember", Value::Refer(u)),
Err(u) => d_group.remove_ava("dynmember", &PartialValue::Refer(u)),
});
affected_uuids.extend(matches.into_iter().map(|choice| match choice {
@ -395,7 +400,7 @@ mod tests {
let d_group = cands.get(0).expect("Unable to access group.");
let members = d_group
.get_ava_set("member")
.get_ava_set("dynmember")
.expect("No members on dyn group");
assert!(members.to_refer_single() == Some(UUID_TEST_GROUP));
@ -441,7 +446,7 @@ mod tests {
let d_group = cands.get(0).expect("Unable to access group.");
let members = d_group
.get_ava_set("member")
.get_ava_set("dynmember")
.expect("No members on dyn group");
assert!(members.to_refer_single() == Some(UUID_TEST_GROUP));
@ -489,7 +494,7 @@ mod tests {
.expect("Internal search failure");
let d_group = cands.get(0).expect("Unable to access group.");
assert!(d_group.get_ava_set("member").is_none());
assert!(d_group.get_ava_set("dynmember").is_none());
}
);
}
@ -532,7 +537,7 @@ mod tests {
let d_group = cands.get(0).expect("Unable to access group.");
let members = d_group
.get_ava_set("member")
.get_ava_set("dynmember")
.expect("No members on dyn group");
assert!(members.to_refer_single() == Some(UUID_TEST_GROUP));
@ -587,7 +592,7 @@ mod tests {
let d_group = cands.get(0).expect("Unable to access group.");
let members = d_group
.get_ava_set("member")
.get_ava_set("dynmember")
.expect("No members on dyn group");
assert!(members.to_refer_single() == Some(UUID_TEST_GROUP));
@ -641,7 +646,7 @@ mod tests {
.expect("Internal search failure");
let d_group = cands.get(0).expect("Unable to access group.");
assert!(d_group.get_ava_set("member").is_none());
assert!(d_group.get_ava_set("dynmember").is_none());
}
);
}
@ -672,7 +677,7 @@ mod tests {
preload,
filter!(f_eq("name", PartialValue::new_iname("test_dyngroup"))),
ModifyList::new_list(vec![Modify::Present(
AttrString::from("member"),
AttrString::from("dynmember"),
Value::Refer(UUID_ADMIN)
)]),
None,
@ -687,7 +692,7 @@ mod tests {
let d_group = cands.get(0).expect("Unable to access group.");
let members = d_group
.get_ava_set("member")
.get_ava_set("dynmember")
.expect("No members on dyn group");
// We assert to refer single here because we should have "removed" uuid_admin being added
// at all.
@ -721,7 +726,7 @@ mod tests {
Ok(()),
preload,
filter!(f_eq("name", PartialValue::new_iname("test_dyngroup"))),
ModifyList::new_list(vec![Modify::Purged(AttrString::from("member"),)]),
ModifyList::new_list(vec![Modify::Purged(AttrString::from("dynmember"),)]),
None,
|_| {},
|qs: &mut QueryServerWriteTransaction| {
@ -734,7 +739,7 @@ mod tests {
let d_group = cands.get(0).expect("Unable to access group.");
let members = d_group
.get_ava_set("member")
.get_ava_set("dynmember")
.expect("No members on dyn group");
// We assert to refer single here because we should have re-added the members
assert!(members.to_refer_single() == Some(UUID_TEST_GROUP));
@ -783,7 +788,7 @@ mod tests {
let d_group = cands.get(0).expect("Unable to access group.");
let members = d_group
.get_ava_set("member")
.get_ava_set("dynmember")
.expect("No members on dyn group");
assert!(members.to_refer_single() == Some(UUID_TEST_GROUP));
@ -831,7 +836,7 @@ mod tests {
.expect("Internal search failure");
let d_group = cands.get(0).expect("Unable to access group.");
assert!(d_group.get_ava_set("member").is_none());
assert!(d_group.get_ava_set("dynmember").is_none());
}
);
}
@ -871,7 +876,7 @@ mod tests {
.expect("Internal search failure");
let d_group = cands.get(0).expect("Unable to access group.");
assert!(d_group.get_ava_set("member").is_none());
assert!(d_group.get_ava_set("dynmember").is_none());
}
);
}

View file

@ -33,15 +33,18 @@ fn do_memberof(
let groups = qs
.internal_search(filter!(f_and!([
f_eq("class", PVCLASS_GROUP.clone()),
f_eq("member", PartialValue::Refer(uuid))
f_or!([
f_eq("member", PartialValue::Refer(uuid)),
f_eq("dynmember", PartialValue::Refer(uuid))
])
])))
.map_err(|e| {
admin_error!("internal search failure -> {:?}", e);
e
})?;
// Ensure we are MO capable.
tgte.add_ava("class", CLASS_MEMBEROF.clone());
// Ensure we are MO capable. We only add this if it's not already present.
tgte.add_ava_if_not_exist("class", CLASS_MEMBEROF.clone());
// Clear the dmo + mos, we will recreate them now.
// This is how we handle deletes/etc.
tgte.purge_ava("memberof");
@ -157,6 +160,9 @@ fn apply_memberof(
if let Some(miter) = tgte.get_ava_as_refuuid("member") {
group_affect.extend(miter.filter(|m| !other_cache.contains_key(m)));
};
if let Some(miter) = tgte.get_ava_as_refuuid("dynmember") {
group_affect.extend(miter.filter(|m| !other_cache.contains_key(m)));
};
// push the entries to pre/cand
changes.push((pre, tgte));
@ -261,6 +267,18 @@ impl Plugin for MemberOf {
}
})
.flatten()
.chain(
// Or a dyn group?
cand.iter()
.filter_map(|post| {
if post.attribute_equality("class", &PVCLASS_DYNGROUP) {
post.get_ava_as_refuuid("dynmember")
} else {
None
}
})
.flatten(),
)
.collect();
apply_memberof(qs, group_affect)
@ -282,9 +300,13 @@ impl Plugin for MemberOf {
// for each entry in the DB (live).
for e in all_cand {
let uuid = e.get_uuid();
let filt_in = filter!(f_and!([
f_eq("class", PVCLASS_GROUP.clone()),
f_eq("member", PartialValue::Refer(e.get_uuid()))
f_or!([
f_eq("member", PartialValue::Refer(uuid)),
f_eq("dynmember", PartialValue::Refer(uuid))
])
]));
let direct_memberof = match qs

View file

@ -225,7 +225,8 @@ impl ReferentialIntegrity {
let dyn_group = c.attribute_equality("class", &PVCLASS_DYNGROUP);
ref_types.values().filter_map(move |rtype| {
let skip_mb = dyn_group && rtype.name == "member";
// Skip dynamic members
let skip_mb = dyn_group && rtype.name == "dynmember";
// Skip memberOf.
let skip_mo = rtype.name == "memberof";
if skip_mb || skip_mo {
@ -923,7 +924,7 @@ mod tests {
("class", Value::new_class("dyngroup")),
("uuid", Value::Uuid(dyn_uuid)),
("name", Value::new_iname("test_dyngroup")),
("member", Value::Refer(inv_mb_uuid)),
("dynmember", Value::Refer(inv_mb_uuid)),
(
"dyngroup_filter",
Value::JsonFilt(ProtoFilter::Eq("name".to_string(), "testgroup".to_string()))
@ -946,7 +947,7 @@ mod tests {
.expect("Failed to access dyn group");
let dyn_member = dyna
.get_ava_refer("member")
.get_ava_refer("dynmember")
.expect("Failed to get member attribute");
assert!(dyn_member.len() == 1);
assert!(dyn_member.contains(&tgroup_uuid));

View file

@ -1,6 +1,9 @@
use super::proto::*;
use crate::be::BackendTransaction;
use crate::plugins::Plugins;
use crate::prelude::*;
use crate::repl::proto::ReplRuvRange;
use crate::repl::ruv::ReplicationUpdateVectorTransaction;
impl<'a> QueryServerReadTransaction<'a> {
// Get the current state of "where we are up to"
@ -14,8 +17,27 @@ impl<'a> QueryServerReadTransaction<'a> {
// where the RUV approach doesn't since the supplier calcs the diff.
#[instrument(level = "debug", skip_all)]
pub fn consumer_get_state(&mut self) -> Result<(), OperationError> {
Ok(())
pub fn consumer_get_state(&mut self) -> Result<ReplRuvRange, OperationError> {
// We need the RUV as a state of
//
// [ s_uuid, cid_min, cid_max ]
// [ s_uuid, cid_min, cid_max ]
// [ s_uuid, cid_min, cid_max ]
// ...
//
// This way the remote can diff against it's knowledge and work out:
//
// [ s_uuid, from_cid, to_cid ]
// [ s_uuid, from_cid, to_cid ]
//
// ...
// Which then the supplier will use to actually retrieve the set of entries.
// and the needed attributes we need.
let ruv_snapshot = self.get_be_txn().get_ruv();
// What's the current set of ranges?
ruv_snapshot.current_ruv_range()
}
}

View file

@ -52,6 +52,37 @@ impl From<&ReplCidV1> for Cid {
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub struct ReplCidRange {
#[serde(rename = "m")]
pub ts_min: Duration,
#[serde(rename = "x")]
pub ts_max: Duration,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub enum ReplRuvRange {
V1 {
ranges: BTreeMap<Uuid, ReplCidRange>,
},
}
impl Default for ReplRuvRange {
fn default() -> Self {
ReplRuvRange::V1 {
ranges: BTreeMap::default(),
}
}
}
impl ReplRuvRange {
pub fn is_empty(&self) -> bool {
match self {
ReplRuvRange::V1 { ranges } => ranges.is_empty(),
}
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
pub struct ReplAddressV1 {
#[serde(rename = "f")]

View file

@ -1,46 +1,53 @@
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::ops::Bound::*;
use std::sync::Arc;
use std::time::Duration;
use concread::bptree::{BptreeMap, BptreeMapReadTxn, BptreeMapWriteTxn};
use concread::bptree::{BptreeMap, BptreeMapReadSnapshot, BptreeMapReadTxn, BptreeMapWriteTxn};
use idlset::v2::IDLBitRange;
use kanidm_proto::v1::ConsistencyError;
use crate::prelude::*;
use crate::repl::cid::Cid;
use crate::repl::proto::{ReplCidRange, ReplRuvRange};
use std::fmt;
#[derive(Default)]
pub struct ReplicationUpdateVector {
// This sorts by time. Should we look up by IDL or by UUID?
// I think IDL, because when we need to actually do the look ups we'll need
// to send this list to the BE to get the affected entries.
// This sorts by time. We store the set of entry id's that are affected in an operation.
// Due to how replication state works, it is possibly that id's in these sets *may* not
// exist anymore, so these bit ranges likely need intersection with allids before use.
data: BptreeMap<Cid, IDLBitRange>,
}
impl Default for ReplicationUpdateVector {
fn default() -> Self {
let data: BptreeMap<Cid, IDLBitRange> = BptreeMap::new();
ReplicationUpdateVector { data }
}
// This sorts by Server ID. It's used for the RUV to build ranges for you ... guessed it
// range queries. These are used to build the set of differences that need to be sent in
// a replication operation.
//
// we need a way to invert the cid, but without duplication? Maybe an invert cid type?
// This way it still orders things in the right order by time stamp just searches by cid
// first.
ranged: BptreeMap<Uuid, BTreeSet<Duration>>,
}
impl ReplicationUpdateVector {
pub fn write(&self) -> ReplicationUpdateVectorWriteTransaction<'_> {
ReplicationUpdateVectorWriteTransaction {
data: self.data.write(),
ranged: self.ranged.write(),
}
}
pub fn read(&self) -> ReplicationUpdateVectorReadTransaction<'_> {
ReplicationUpdateVectorReadTransaction {
data: self.data.read(),
ranged: self.ranged.read(),
}
}
}
pub struct ReplicationUpdateVectorWriteTransaction<'a> {
data: BptreeMapWriteTxn<'a, Cid, IDLBitRange>,
ranged: BptreeMapWriteTxn<'a, Uuid, BTreeSet<Duration>>,
}
impl<'a> fmt::Debug for ReplicationUpdateVectorWriteTransaction<'a> {
@ -54,10 +61,38 @@ impl<'a> fmt::Debug for ReplicationUpdateVectorWriteTransaction<'a> {
pub struct ReplicationUpdateVectorReadTransaction<'a> {
data: BptreeMapReadTxn<'a, Cid, IDLBitRange>,
ranged: BptreeMapReadTxn<'a, Uuid, BTreeSet<Duration>>,
}
pub trait ReplicationUpdateVectorTransaction {
fn ruv_snapshot(&self) -> BTreeMap<Cid, IDLBitRange>;
fn ruv_snapshot(&self) -> BptreeMapReadSnapshot<'_, Cid, IDLBitRange>;
fn range_snapshot(&self) -> BptreeMapReadSnapshot<'_, Uuid, BTreeSet<Duration>>;
fn current_ruv_range(&self) -> Result<ReplRuvRange, OperationError> {
let ranges = self
.range_snapshot()
.iter()
.map(|(s_uuid, range)| match (range.first(), range.last()) {
(Some(first), Some(last)) => Ok((
*s_uuid,
ReplCidRange {
ts_min: *first,
ts_max: *last,
},
)),
_ => {
error!(
"invalid state for server uuid {:?}, no ranges present",
s_uuid
);
Err(OperationError::InvalidState)
}
})
.collect::<Result<BTreeMap<_, _>, _>>()?;
Ok(ReplRuvRange::V1 { ranges })
}
fn verify(
&self,
@ -65,7 +100,7 @@ pub trait ReplicationUpdateVectorTransaction {
results: &mut Vec<Result<(), ConsistencyError>>,
) {
// Okay rebuild the RUV in parallel.
let mut check_ruv: BTreeMap<Cid, IDLBitRange> = BTreeMap::new();
let mut check_ruv: BTreeMap<Cid, IDLBitRange> = BTreeMap::default();
for entry in entries {
// The DB id we need.
let eid = entry.get_id();
@ -73,6 +108,7 @@ pub trait ReplicationUpdateVectorTransaction {
// We don't need the details of the change - only the cid of the
// change that this entry was involved in.
for cid in ecstate.cid_iter() {
// Add to the main ruv data.
if let Some(idl) = check_ruv.get_mut(cid) {
// We can't guarantee id order, so we have to do this properly.
idl.insert_id(eid);
@ -87,7 +123,6 @@ pub trait ReplicationUpdateVectorTransaction {
trace!(?check_ruv);
// Get the current state
let snapshot_ruv = self.ruv_snapshot();
trace!(?snapshot_ruv);
// Now compare. We want to do this checking for each CID in each, and then asserting
// the content is the same.
@ -150,34 +185,72 @@ pub trait ReplicationUpdateVectorTransaction {
snap_next = snap_iter.next();
}
// Assert that the content of the ranged set matches the data set and has the
// correct set of values.
let snapshot_range = self.range_snapshot();
for cid in snapshot_ruv.keys() {
if let Some(server_range) = snapshot_range.get(&cid.s_uuid) {
if !server_range.contains(&cid.ts) {
admin_warn!(
"{:?} is NOT consistent! server range is missing cid in index",
cid
);
debug_assert!(false);
results.push(Err(ConsistencyError::RuvInconsistent(
cid.s_uuid.to_string(),
)));
}
} else {
admin_warn!(
"{:?} is NOT consistent! server range is not present",
cid.s_uuid
);
debug_assert!(false);
results.push(Err(ConsistencyError::RuvInconsistent(
cid.s_uuid.to_string(),
)));
}
}
// Done!
}
}
impl<'a> ReplicationUpdateVectorTransaction for ReplicationUpdateVectorWriteTransaction<'a> {
fn ruv_snapshot(&self) -> BTreeMap<Cid, IDLBitRange> {
self.data
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
fn ruv_snapshot(&self) -> BptreeMapReadSnapshot<'_, Cid, IDLBitRange> {
self.data.to_snapshot()
}
fn range_snapshot(&self) -> BptreeMapReadSnapshot<'_, Uuid, BTreeSet<Duration>> {
self.ranged.to_snapshot()
}
}
impl<'a> ReplicationUpdateVectorTransaction for ReplicationUpdateVectorReadTransaction<'a> {
fn ruv_snapshot(&self) -> BTreeMap<Cid, IDLBitRange> {
self.data
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
fn ruv_snapshot(&self) -> BptreeMapReadSnapshot<'_, Cid, IDLBitRange> {
self.data.to_snapshot()
}
fn range_snapshot(&self) -> BptreeMapReadSnapshot<'_, Uuid, BTreeSet<Duration>> {
self.ranged.to_snapshot()
}
}
impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
pub fn clear(&mut self) {
self.data.clear();
self.ranged.clear();
}
pub fn rebuild(&mut self, entries: &[Arc<EntrySealedCommitted>]) -> Result<(), OperationError> {
// Drop everything.
self.clear();
// Entries and their internal changelogs are the "source of truth" for all changes
// that have ever occurred and are stored on this server. So we use them to rebuild our RUV
// here!
let mut rebuild_ruv: BTreeMap<Cid, IDLBitRange> = BTreeMap::new();
let mut rebuild_range: BTreeMap<Uuid, BTreeSet<Duration>> = BTreeMap::default();
for entry in entries {
// The DB id we need.
@ -194,6 +267,14 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
idl.insert_id(eid);
rebuild_ruv.insert(cid.clone(), idl);
}
if let Some(server_range) = rebuild_range.get_mut(&cid.s_uuid) {
server_range.insert(cid.ts);
} else {
let mut ts_range = BTreeSet::default();
ts_range.insert(cid.ts);
rebuild_range.insert(cid.s_uuid, ts_range);
}
}
}
@ -202,8 +283,8 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
idl.maybe_compress();
});
self.data.clear();
self.data.extend(rebuild_ruv.into_iter());
self.ranged.extend(rebuild_range.into_iter());
Ok(())
}
@ -217,6 +298,44 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
} else {
self.data.insert(cid.clone(), idl);
}
if let Some(server_range) = self.ranged.get_mut(&cid.s_uuid) {
server_range.insert(cid.ts);
} else {
let mut range = BTreeSet::default();
range.insert(cid.ts);
self.ranged.insert(cid.s_uuid, range);
}
Ok(())
}
pub fn update_entry_changestate(
&mut self,
entry: &EntrySealedCommitted,
) -> Result<(), OperationError> {
let eid = entry.get_id();
let ecstate = entry.get_changestate();
for cid in ecstate.cid_iter() {
if let Some(idl) = self.data.get_mut(cid) {
// We can't guarantee id order, so we have to do this properly.
idl.insert_id(eid);
} else {
let mut idl = IDLBitRange::new();
idl.insert_id(eid);
self.data.insert(cid.clone(), idl);
}
if let Some(server_range) = self.ranged.get_mut(&cid.s_uuid) {
server_range.insert(cid.ts);
} else {
let mut ts_range = BTreeSet::default();
ts_range.insert(cid.ts);
self.ranged.insert(cid.s_uuid, ts_range);
}
}
Ok(())
}
@ -245,12 +364,39 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
pub fn trim_up_to(&mut self, cid: &Cid) -> Result<IDLBitRange, OperationError> {
let mut idl = IDLBitRange::new();
let mut remove_suuid = Vec::default();
self.data
.range((Unbounded, Excluded(cid)))
.for_each(|(_, ex_idl)| {
idl = ex_idl as &_ | &idl;
});
// Here we can use the for_each here to be trimming the
// range set since that is not ordered by time, we need
// to do fragmented searches over this no matter what we
// try to do.
for (cid, ex_idl) in self.data.range((Unbounded, Excluded(cid))) {
idl = ex_idl as &_ | &idl;
// Remove the reverse version of the cid from the ranged index.
match self.ranged.get_mut(&cid.s_uuid) {
Some(server_range) => {
// Remove returns a bool if the element WAS present.
if !server_range.remove(&cid.ts) {
error!("Impossible State - The RUV is corrupted due to missing sid:ts pair in ranged index");
return Err(OperationError::InvalidState);
}
if server_range.is_empty() {
remove_suuid.push(cid.s_uuid);
}
}
None => {
error!("Impossible State - The RUV is corrupted due to missing sid in ranged index");
return Err(OperationError::InvalidState);
}
}
}
for s_uuid in remove_suuid {
let x = self.ranged.remove(&s_uuid);
assert!(x.map(|y| y.is_empty()).unwrap_or(false))
}
// Trim all cid's up to this value, and return the range of IDs
// that are affected.
@ -261,5 +407,6 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
pub fn commit(self) {
self.data.commit();
self.ranged.commit();
}
}

View file

@ -1,6 +1,27 @@
use crate::be::BackendTransaction;
use crate::prelude::*;
use crate::repl::ruv::ReplicationUpdateVectorTransaction;
use std::collections::BTreeMap;
fn repl_initialise(
from: &mut QueryServerReadTransaction<'_>,
to: &mut QueryServerWriteTransaction<'_>,
) -> Result<(), OperationError> {
// First, build the refresh context.
let refresh_context = from.supplier_provide_refresh()?;
// Verify content of the refresh
// eprintln!("{:#?}", refresh_context);
// Apply it to the server
to.consumer_apply_refresh(&refresh_context)?;
// Need same d_uuid
assert_eq!(from.get_domain_uuid(), to.get_domain_uuid());
Ok(())
}
#[qs_pair_test]
async fn test_repl_refresh_basic(server_a: &QueryServer, server_b: &QueryServer) {
// Rebuild / refresh the content of server a with the content from b.
@ -12,29 +33,13 @@ async fn test_repl_refresh_basic(server_a: &QueryServer, server_b: &QueryServer)
let mut server_b_txn = server_b.read().await;
// First, build the refresh context.
let refresh_context = server_b_txn
.supplier_provide_refresh()
.expect("Failed to build refresh");
// Verify content of the refresh
// eprintln!("{:#?}", refresh_context);
// Apply it to the server
assert!(server_a_txn
.consumer_apply_refresh(&refresh_context)
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
// Verify the content of server_a and server_b are identical.
let mut server_a_txn = server_a.read().await;
// Need same d_uuid
assert_eq!(
server_a_txn.get_domain_uuid(),
server_b_txn.get_domain_uuid()
);
let domain_entry_a = server_a_txn
.internal_search_uuid(UUID_DOMAIN_INFO)
.expect("Failed to access domain info");
@ -46,8 +51,14 @@ async fn test_repl_refresh_basic(server_a: &QueryServer, server_b: &QueryServer)
// Same d_vers / domain info.
assert_eq!(domain_entry_a, domain_entry_b);
trace!("a {:#?}", domain_entry_a.get_changestate());
trace!("b {:#?}", domain_entry_b.get_changestate());
trace!(
"domain_changestate a {:#?}",
domain_entry_a.get_changestate()
);
trace!(
"domain_changestate b {:#?}",
domain_entry_b.get_changestate()
);
// Compare that their change states are identical too.
assert_eq!(
@ -94,6 +105,46 @@ async fn test_repl_refresh_basic(server_a: &QueryServer, server_b: &QueryServer)
// Done! The entry content are identical as are their replication metadata. We are good
// to go!
let a_ruv_range = server_a_txn.get_be_txn().get_ruv().current_ruv_range();
let b_ruv_range = server_b_txn.get_be_txn().get_ruv().current_ruv_range();
trace!(?a_ruv_range);
trace!(?b_ruv_range);
assert!(a_ruv_range == b_ruv_range);
// Both servers will be post-test validated.
}
#[qs_pair_test]
async fn test_repl_increment_basic(server_a: &QueryServer, server_b: &QueryServer) {
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let a_ruv_range = server_a_txn.get_be_txn().get_ruv().current_ruv_range();
let b_ruv_range = server_b_txn.get_be_txn().get_ruv().current_ruv_range();
trace!(?a_ruv_range);
trace!(?b_ruv_range);
assert!(a_ruv_range == b_ruv_range);
// Check ruv
// - should be same
// - incremental
// - no change.
// Add an entry.
// Do a ruv check.
// Incremental.
// Should now be on the other partner.
}

View file

@ -1265,6 +1265,21 @@ impl<'a> SchemaWriteTransaction<'a> {
syntax: SyntaxType::ReferenceUuid,
},
);
self.attributes.insert(
AttrString::from("dynmember"),
SchemaAttribute {
name: AttrString::from("dynmember"),
uuid: UUID_SCHEMA_ATTR_DYNMEMBER,
description: String::from("List of dynamic members of the group"),
multivalue: true,
unique: false,
phantom: false,
sync_allowed: true,
replicated: false,
index: vec![IndexType::Equality],
syntax: SyntaxType::ReferenceUuid,
},
);
// Migration related
self.attributes.insert(
AttrString::from("version"),

View file

@ -569,6 +569,7 @@ mod tests {
let time_p2 = time_p1 + Duration::from_secs(CHANGELOG_MAX_AGE * 2);
let time_p3 = time_p2 + Duration::from_secs(CHANGELOG_MAX_AGE * 2);
trace!("test_tombstone_start");
let mut server_txn = server.write(time_p1).await;
let admin = server_txn.internal_search_uuid(UUID_ADMIN).expect("failed");