diff --git a/server/lib/src/be/mod.rs b/server/lib/src/be/mod.rs index 8df271b5b..fb3e7801b 100644 --- a/server/lib/src/be/mod.rs +++ b/server/lib/src/be/mod.rs @@ -990,6 +990,7 @@ impl<'a> BackendWriteTransaction<'a> { // This auto compresses. let ruv_idl = IDLBitRange::from_iter(c_entries.iter().map(|e| e.get_id())); + // We don't need to skip this like in mod since creates always go to the ruv self.get_ruv().insert_change(cid, ruv_idl)?; self.idlayer.write_identries(c_entries.iter())?; @@ -1005,10 +1006,10 @@ impl<'a> BackendWriteTransaction<'a> { } #[instrument(level = "debug", name = "be::create", skip_all)] - /// This is similar to create, but used in the replication path as it skips the - /// modification of the RUV and the checking of CIDs since these actions are not - /// required during a replication refresh (else we'd create an infinite replication - /// loop.) + /// This is similar to create, but used in the replication path as it records all + /// the CID's in the entry to the RUV, but without applying the current CID as + /// a new value in the RUV. We *do not* want to apply the current CID in the RUV + /// related to this entry as that could cause an infinite replication loop! pub fn refresh( &mut self, entries: Vec>, @@ -1032,6 +1033,11 @@ impl<'a> BackendWriteTransaction<'a> { self.idlayer.set_id2entry_max_id(id_max); + // Update the RUV with all the changestates of the affected entries. + for e in c_entries.iter() { + self.get_ruv().update_entry_changestate(e)?; + } + // Now update the indexes as required. for e in c_entries.iter() { self.entry_index(None, Some(e))? @@ -1054,21 +1060,24 @@ impl<'a> BackendWriteTransaction<'a> { assert!(post_entries.len() == pre_entries.len()); - post_entries.iter().try_for_each(|e| { - if e.get_changestate().contains_tail_cid(cid) { - Ok(()) - } else { - admin_error!( - "Entry changelog does not contain a change related to this transaction" - ); - Err(OperationError::ReplEntryNotChanged) - } - })?; + let post_entries_iter = post_entries.iter().filter(|e| { + trace!(?cid); + trace!(changestate = ?e.get_changestate()); + // If True - This means that at least one attribute that *is* replicated was changed + // on this entry, so we need to update and add this to the RUV! + // + // If False - This means that the entry in question was updated but the changes are all + // non-replicated so we DO NOT update the RUV here! + e.get_changestate().contains_tail_cid(cid) + }); // All good, lets update the RUV. // This auto compresses. - let ruv_idl = IDLBitRange::from_iter(post_entries.iter().map(|e| e.get_id())); - self.get_ruv().insert_change(cid, ruv_idl)?; + let ruv_idl = IDLBitRange::from_iter(post_entries_iter.map(|e| e.get_id())); + + if !ruv_idl.is_empty() { + self.get_ruv().insert_change(cid, ruv_idl)?; + } // Now, given the list of id's, update them self.get_idlayer().write_identries(post_entries.iter())?; @@ -1484,6 +1493,7 @@ impl<'a> BackendWriteTransaction<'a> { } pub(crate) fn danger_delete_all_db_content(&mut self) -> Result<(), OperationError> { + self.get_ruv().clear(); unsafe { self.get_idlayer() .purge_id2entry() diff --git a/server/lib/src/constants/schema.rs b/server/lib/src/constants/schema.rs index 255eee2fd..f2622031d 100644 --- a/server/lib/src/constants/schema.rs +++ b/server/lib/src/constants/schema.rs @@ -1535,6 +1535,9 @@ pub const JSON_SCHEMA_CLASS_DYNGROUP: &str = r#" "systemmust": [ "dyngroup_filter" ], + "systemmay": [ + "dynmember" + ], "systemsupplements": [ "group" ], diff --git a/server/lib/src/constants/uuids.rs b/server/lib/src/constants/uuids.rs index 006bf9fee..8da00a538 100644 --- a/server/lib/src/constants/uuids.rs +++ b/server/lib/src/constants/uuids.rs @@ -226,6 +226,7 @@ pub const UUID_SCHEMA_ATTR_REPLICATED: Uuid = uuid!("00000000-0000-0000-0000-fff pub const UUID_SCHEMA_ATTR_PRIVATE_COOKIE_KEY: Uuid = uuid!("00000000-0000-0000-0000-ffff00000130"); pub const _UUID_SCHEMA_ATTR_DOMAIN_LDAP_BASEDN: Uuid = uuid!("00000000-0000-0000-0000-ffff00000131"); +pub const UUID_SCHEMA_ATTR_DYNMEMBER: Uuid = uuid!("00000000-0000-0000-0000-ffff00000132"); // System and domain infos // I'd like to strongly criticise william of the past for making poor choices about these allocations. diff --git a/server/lib/src/entry.rs b/server/lib/src/entry.rs index a7e322c55..400c45eaf 100644 --- a/server/lib/src/entry.rs +++ b/server/lib/src/entry.rs @@ -576,7 +576,7 @@ impl Entry { // event for the attribute. /// Add an attribute-value-assertion to this Entry. pub fn add_ava(&mut self, attr: &str, value: Value) { - self.add_ava_int(attr, value) + self.add_ava_int(attr, value); } /// Replace the existing content of an attribute set of this Entry, with a new set of Values. @@ -1850,18 +1850,23 @@ impl Entry { // impl Entry { impl Entry { - /// This internally adds an AVA to the entry. - fn add_ava_int(&mut self, attr: &str, value: Value) { - // How do we make this turn into an ok / err? - + /// This internally adds an AVA to the entry. If the entry was newely added, then true is returned. + /// If the value already existed, or was unable to be added, false is returned. Alternately, + /// you can think of this boolean as "if a write occured to the structure", true indicating that + /// a change occured. + fn add_ava_int(&mut self, attr: &str, value: Value) -> bool { if let Some(vs) = self.attrs.get_mut(attr) { let r = vs.insert_checked(value); debug_assert!(r.is_ok()); + // Default to the value not being present if wrong typed. + r.unwrap_or(false) } else { #[allow(clippy::expect_used)] let vs = valueset::from_value_iter(std::iter::once(value)) .expect("Unable to fail - non-zero iter, and single value type!"); self.attrs.insert(AttrString::from(attr), vs); + // The attribute did not exist before. + false } // Doesn't matter if it already exists, equality will replace. } @@ -2401,7 +2406,15 @@ where .eclog .add_ava_iter(&self.valid.cid, attr, std::iter::once(value.clone())); */ - self.add_ava_int(attr, value) + self.add_ava_int(attr, value); + } + + pub fn add_ava_if_not_exist(&mut self, attr: &str, value: Value) { + // This returns true if the value WAS changed! See add_ava_int. + if self.add_ava_int(attr, value) { + // In this case, we ONLY update the changestate if the value was already present! + self.valid.ecstate.change_ava(&self.valid.cid, attr); + } } fn assert_ava(&mut self, attr: &str, value: &PartialValue) -> Result<(), OperationError> { diff --git a/server/lib/src/plugins/dyngroup.rs b/server/lib/src/plugins/dyngroup.rs index f6b670a4d..e42b5c951 100644 --- a/server/lib/src/plugins/dyngroup.rs +++ b/server/lib/src/plugins/dyngroup.rs @@ -71,12 +71,17 @@ impl DynGroup { affected_uuids.extend(uuid_iter); } + // Mark the former members as being affected also. + if let Some(uuid_iter) = pre.get_ava_as_refuuid("dynmember") { + affected_uuids.extend(uuid_iter); + } + if let Some(members) = members { // Only set something if there is actually something to do! - nd_group.set_ava_set("member", members); + nd_group.set_ava_set("dynmember", members); // push the entries to pre/cand } else { - nd_group.purge_ava("member"); + nd_group.purge_ava("dynmember"); } candidate_tuples.push((pre, nd_group)); @@ -191,7 +196,7 @@ impl DynGroup { matches .iter() .copied() - .for_each(|u| d_group.add_ava("member", Value::Refer(u))); + .for_each(|u| d_group.add_ava("dynmember", Value::Refer(u))); affected_uuids.extend(matches.into_iter()); affected_uuids.push(*dg_uuid); @@ -315,8 +320,8 @@ impl DynGroup { if let Some((pre, mut d_group)) = work_set.pop() { matches.iter().copied().for_each(|choice| match choice { - Ok(u) => d_group.add_ava("member", Value::Refer(u)), - Err(u) => d_group.remove_ava("member", &PartialValue::Refer(u)), + Ok(u) => d_group.add_ava("dynmember", Value::Refer(u)), + Err(u) => d_group.remove_ava("dynmember", &PartialValue::Refer(u)), }); affected_uuids.extend(matches.into_iter().map(|choice| match choice { @@ -395,7 +400,7 @@ mod tests { let d_group = cands.get(0).expect("Unable to access group."); let members = d_group - .get_ava_set("member") + .get_ava_set("dynmember") .expect("No members on dyn group"); assert!(members.to_refer_single() == Some(UUID_TEST_GROUP)); @@ -441,7 +446,7 @@ mod tests { let d_group = cands.get(0).expect("Unable to access group."); let members = d_group - .get_ava_set("member") + .get_ava_set("dynmember") .expect("No members on dyn group"); assert!(members.to_refer_single() == Some(UUID_TEST_GROUP)); @@ -489,7 +494,7 @@ mod tests { .expect("Internal search failure"); let d_group = cands.get(0).expect("Unable to access group."); - assert!(d_group.get_ava_set("member").is_none()); + assert!(d_group.get_ava_set("dynmember").is_none()); } ); } @@ -532,7 +537,7 @@ mod tests { let d_group = cands.get(0).expect("Unable to access group."); let members = d_group - .get_ava_set("member") + .get_ava_set("dynmember") .expect("No members on dyn group"); assert!(members.to_refer_single() == Some(UUID_TEST_GROUP)); @@ -587,7 +592,7 @@ mod tests { let d_group = cands.get(0).expect("Unable to access group."); let members = d_group - .get_ava_set("member") + .get_ava_set("dynmember") .expect("No members on dyn group"); assert!(members.to_refer_single() == Some(UUID_TEST_GROUP)); @@ -641,7 +646,7 @@ mod tests { .expect("Internal search failure"); let d_group = cands.get(0).expect("Unable to access group."); - assert!(d_group.get_ava_set("member").is_none()); + assert!(d_group.get_ava_set("dynmember").is_none()); } ); } @@ -672,7 +677,7 @@ mod tests { preload, filter!(f_eq("name", PartialValue::new_iname("test_dyngroup"))), ModifyList::new_list(vec![Modify::Present( - AttrString::from("member"), + AttrString::from("dynmember"), Value::Refer(UUID_ADMIN) )]), None, @@ -687,7 +692,7 @@ mod tests { let d_group = cands.get(0).expect("Unable to access group."); let members = d_group - .get_ava_set("member") + .get_ava_set("dynmember") .expect("No members on dyn group"); // We assert to refer single here because we should have "removed" uuid_admin being added // at all. @@ -721,7 +726,7 @@ mod tests { Ok(()), preload, filter!(f_eq("name", PartialValue::new_iname("test_dyngroup"))), - ModifyList::new_list(vec![Modify::Purged(AttrString::from("member"),)]), + ModifyList::new_list(vec![Modify::Purged(AttrString::from("dynmember"),)]), None, |_| {}, |qs: &mut QueryServerWriteTransaction| { @@ -734,7 +739,7 @@ mod tests { let d_group = cands.get(0).expect("Unable to access group."); let members = d_group - .get_ava_set("member") + .get_ava_set("dynmember") .expect("No members on dyn group"); // We assert to refer single here because we should have re-added the members assert!(members.to_refer_single() == Some(UUID_TEST_GROUP)); @@ -783,7 +788,7 @@ mod tests { let d_group = cands.get(0).expect("Unable to access group."); let members = d_group - .get_ava_set("member") + .get_ava_set("dynmember") .expect("No members on dyn group"); assert!(members.to_refer_single() == Some(UUID_TEST_GROUP)); @@ -831,7 +836,7 @@ mod tests { .expect("Internal search failure"); let d_group = cands.get(0).expect("Unable to access group."); - assert!(d_group.get_ava_set("member").is_none()); + assert!(d_group.get_ava_set("dynmember").is_none()); } ); } @@ -871,7 +876,7 @@ mod tests { .expect("Internal search failure"); let d_group = cands.get(0).expect("Unable to access group."); - assert!(d_group.get_ava_set("member").is_none()); + assert!(d_group.get_ava_set("dynmember").is_none()); } ); } diff --git a/server/lib/src/plugins/memberof.rs b/server/lib/src/plugins/memberof.rs index 8eb58cee4..fcabce34f 100644 --- a/server/lib/src/plugins/memberof.rs +++ b/server/lib/src/plugins/memberof.rs @@ -33,15 +33,18 @@ fn do_memberof( let groups = qs .internal_search(filter!(f_and!([ f_eq("class", PVCLASS_GROUP.clone()), - f_eq("member", PartialValue::Refer(uuid)) + f_or!([ + f_eq("member", PartialValue::Refer(uuid)), + f_eq("dynmember", PartialValue::Refer(uuid)) + ]) ]))) .map_err(|e| { admin_error!("internal search failure -> {:?}", e); e })?; - // Ensure we are MO capable. - tgte.add_ava("class", CLASS_MEMBEROF.clone()); + // Ensure we are MO capable. We only add this if it's not already present. + tgte.add_ava_if_not_exist("class", CLASS_MEMBEROF.clone()); // Clear the dmo + mos, we will recreate them now. // This is how we handle deletes/etc. tgte.purge_ava("memberof"); @@ -157,6 +160,9 @@ fn apply_memberof( if let Some(miter) = tgte.get_ava_as_refuuid("member") { group_affect.extend(miter.filter(|m| !other_cache.contains_key(m))); }; + if let Some(miter) = tgte.get_ava_as_refuuid("dynmember") { + group_affect.extend(miter.filter(|m| !other_cache.contains_key(m))); + }; // push the entries to pre/cand changes.push((pre, tgte)); @@ -261,6 +267,18 @@ impl Plugin for MemberOf { } }) .flatten() + .chain( + // Or a dyn group? + cand.iter() + .filter_map(|post| { + if post.attribute_equality("class", &PVCLASS_DYNGROUP) { + post.get_ava_as_refuuid("dynmember") + } else { + None + } + }) + .flatten(), + ) .collect(); apply_memberof(qs, group_affect) @@ -282,9 +300,13 @@ impl Plugin for MemberOf { // for each entry in the DB (live). for e in all_cand { + let uuid = e.get_uuid(); let filt_in = filter!(f_and!([ f_eq("class", PVCLASS_GROUP.clone()), - f_eq("member", PartialValue::Refer(e.get_uuid())) + f_or!([ + f_eq("member", PartialValue::Refer(uuid)), + f_eq("dynmember", PartialValue::Refer(uuid)) + ]) ])); let direct_memberof = match qs diff --git a/server/lib/src/plugins/refint.rs b/server/lib/src/plugins/refint.rs index 6da81d654..237cf477f 100644 --- a/server/lib/src/plugins/refint.rs +++ b/server/lib/src/plugins/refint.rs @@ -225,7 +225,8 @@ impl ReferentialIntegrity { let dyn_group = c.attribute_equality("class", &PVCLASS_DYNGROUP); ref_types.values().filter_map(move |rtype| { - let skip_mb = dyn_group && rtype.name == "member"; + // Skip dynamic members + let skip_mb = dyn_group && rtype.name == "dynmember"; // Skip memberOf. let skip_mo = rtype.name == "memberof"; if skip_mb || skip_mo { @@ -923,7 +924,7 @@ mod tests { ("class", Value::new_class("dyngroup")), ("uuid", Value::Uuid(dyn_uuid)), ("name", Value::new_iname("test_dyngroup")), - ("member", Value::Refer(inv_mb_uuid)), + ("dynmember", Value::Refer(inv_mb_uuid)), ( "dyngroup_filter", Value::JsonFilt(ProtoFilter::Eq("name".to_string(), "testgroup".to_string())) @@ -946,7 +947,7 @@ mod tests { .expect("Failed to access dyn group"); let dyn_member = dyna - .get_ava_refer("member") + .get_ava_refer("dynmember") .expect("Failed to get member attribute"); assert!(dyn_member.len() == 1); assert!(dyn_member.contains(&tgroup_uuid)); diff --git a/server/lib/src/repl/consumer.rs b/server/lib/src/repl/consumer.rs index 9b6e66297..33064d56b 100644 --- a/server/lib/src/repl/consumer.rs +++ b/server/lib/src/repl/consumer.rs @@ -1,6 +1,9 @@ use super::proto::*; +use crate::be::BackendTransaction; use crate::plugins::Plugins; use crate::prelude::*; +use crate::repl::proto::ReplRuvRange; +use crate::repl::ruv::ReplicationUpdateVectorTransaction; impl<'a> QueryServerReadTransaction<'a> { // Get the current state of "where we are up to" @@ -14,8 +17,27 @@ impl<'a> QueryServerReadTransaction<'a> { // where the RUV approach doesn't since the supplier calcs the diff. #[instrument(level = "debug", skip_all)] - pub fn consumer_get_state(&mut self) -> Result<(), OperationError> { - Ok(()) + pub fn consumer_get_state(&mut self) -> Result { + // We need the RUV as a state of + // + // [ s_uuid, cid_min, cid_max ] + // [ s_uuid, cid_min, cid_max ] + // [ s_uuid, cid_min, cid_max ] + // ... + // + // This way the remote can diff against it's knowledge and work out: + // + // [ s_uuid, from_cid, to_cid ] + // [ s_uuid, from_cid, to_cid ] + // + // ... + + // Which then the supplier will use to actually retrieve the set of entries. + // and the needed attributes we need. + let ruv_snapshot = self.get_be_txn().get_ruv(); + + // What's the current set of ranges? + ruv_snapshot.current_ruv_range() } } diff --git a/server/lib/src/repl/proto.rs b/server/lib/src/repl/proto.rs index 249cf886d..7c77d2ce5 100644 --- a/server/lib/src/repl/proto.rs +++ b/server/lib/src/repl/proto.rs @@ -52,6 +52,37 @@ impl From<&ReplCidV1> for Cid { } } +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +pub struct ReplCidRange { + #[serde(rename = "m")] + pub ts_min: Duration, + #[serde(rename = "x")] + pub ts_max: Duration, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +pub enum ReplRuvRange { + V1 { + ranges: BTreeMap, + }, +} + +impl Default for ReplRuvRange { + fn default() -> Self { + ReplRuvRange::V1 { + ranges: BTreeMap::default(), + } + } +} + +impl ReplRuvRange { + pub fn is_empty(&self) -> bool { + match self { + ReplRuvRange::V1 { ranges } => ranges.is_empty(), + } + } +} + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] pub struct ReplAddressV1 { #[serde(rename = "f")] diff --git a/server/lib/src/repl/ruv.rs b/server/lib/src/repl/ruv.rs index 09f2a9d92..14c530f8f 100644 --- a/server/lib/src/repl/ruv.rs +++ b/server/lib/src/repl/ruv.rs @@ -1,46 +1,53 @@ use std::cmp::Ordering; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::ops::Bound::*; use std::sync::Arc; +use std::time::Duration; -use concread::bptree::{BptreeMap, BptreeMapReadTxn, BptreeMapWriteTxn}; +use concread::bptree::{BptreeMap, BptreeMapReadSnapshot, BptreeMapReadTxn, BptreeMapWriteTxn}; use idlset::v2::IDLBitRange; use kanidm_proto::v1::ConsistencyError; use crate::prelude::*; use crate::repl::cid::Cid; +use crate::repl::proto::{ReplCidRange, ReplRuvRange}; use std::fmt; +#[derive(Default)] pub struct ReplicationUpdateVector { - // This sorts by time. Should we look up by IDL or by UUID? - // I think IDL, because when we need to actually do the look ups we'll need - // to send this list to the BE to get the affected entries. + // This sorts by time. We store the set of entry id's that are affected in an operation. + // Due to how replication state works, it is possibly that id's in these sets *may* not + // exist anymore, so these bit ranges likely need intersection with allids before use. data: BptreeMap, -} - -impl Default for ReplicationUpdateVector { - fn default() -> Self { - let data: BptreeMap = BptreeMap::new(); - ReplicationUpdateVector { data } - } + // This sorts by Server ID. It's used for the RUV to build ranges for you ... guessed it + // range queries. These are used to build the set of differences that need to be sent in + // a replication operation. + // + // we need a way to invert the cid, but without duplication? Maybe an invert cid type? + // This way it still orders things in the right order by time stamp just searches by cid + // first. + ranged: BptreeMap>, } impl ReplicationUpdateVector { pub fn write(&self) -> ReplicationUpdateVectorWriteTransaction<'_> { ReplicationUpdateVectorWriteTransaction { data: self.data.write(), + ranged: self.ranged.write(), } } pub fn read(&self) -> ReplicationUpdateVectorReadTransaction<'_> { ReplicationUpdateVectorReadTransaction { data: self.data.read(), + ranged: self.ranged.read(), } } } pub struct ReplicationUpdateVectorWriteTransaction<'a> { data: BptreeMapWriteTxn<'a, Cid, IDLBitRange>, + ranged: BptreeMapWriteTxn<'a, Uuid, BTreeSet>, } impl<'a> fmt::Debug for ReplicationUpdateVectorWriteTransaction<'a> { @@ -54,10 +61,38 @@ impl<'a> fmt::Debug for ReplicationUpdateVectorWriteTransaction<'a> { pub struct ReplicationUpdateVectorReadTransaction<'a> { data: BptreeMapReadTxn<'a, Cid, IDLBitRange>, + ranged: BptreeMapReadTxn<'a, Uuid, BTreeSet>, } pub trait ReplicationUpdateVectorTransaction { - fn ruv_snapshot(&self) -> BTreeMap; + fn ruv_snapshot(&self) -> BptreeMapReadSnapshot<'_, Cid, IDLBitRange>; + + fn range_snapshot(&self) -> BptreeMapReadSnapshot<'_, Uuid, BTreeSet>; + + fn current_ruv_range(&self) -> Result { + let ranges = self + .range_snapshot() + .iter() + .map(|(s_uuid, range)| match (range.first(), range.last()) { + (Some(first), Some(last)) => Ok(( + *s_uuid, + ReplCidRange { + ts_min: *first, + ts_max: *last, + }, + )), + _ => { + error!( + "invalid state for server uuid {:?}, no ranges present", + s_uuid + ); + Err(OperationError::InvalidState) + } + }) + .collect::, _>>()?; + + Ok(ReplRuvRange::V1 { ranges }) + } fn verify( &self, @@ -65,7 +100,7 @@ pub trait ReplicationUpdateVectorTransaction { results: &mut Vec>, ) { // Okay rebuild the RUV in parallel. - let mut check_ruv: BTreeMap = BTreeMap::new(); + let mut check_ruv: BTreeMap = BTreeMap::default(); for entry in entries { // The DB id we need. let eid = entry.get_id(); @@ -73,6 +108,7 @@ pub trait ReplicationUpdateVectorTransaction { // We don't need the details of the change - only the cid of the // change that this entry was involved in. for cid in ecstate.cid_iter() { + // Add to the main ruv data. if let Some(idl) = check_ruv.get_mut(cid) { // We can't guarantee id order, so we have to do this properly. idl.insert_id(eid); @@ -87,7 +123,6 @@ pub trait ReplicationUpdateVectorTransaction { trace!(?check_ruv); // Get the current state let snapshot_ruv = self.ruv_snapshot(); - trace!(?snapshot_ruv); // Now compare. We want to do this checking for each CID in each, and then asserting // the content is the same. @@ -150,34 +185,72 @@ pub trait ReplicationUpdateVectorTransaction { snap_next = snap_iter.next(); } + // Assert that the content of the ranged set matches the data set and has the + // correct set of values. + let snapshot_range = self.range_snapshot(); + + for cid in snapshot_ruv.keys() { + if let Some(server_range) = snapshot_range.get(&cid.s_uuid) { + if !server_range.contains(&cid.ts) { + admin_warn!( + "{:?} is NOT consistent! server range is missing cid in index", + cid + ); + debug_assert!(false); + results.push(Err(ConsistencyError::RuvInconsistent( + cid.s_uuid.to_string(), + ))); + } + } else { + admin_warn!( + "{:?} is NOT consistent! server range is not present", + cid.s_uuid + ); + debug_assert!(false); + results.push(Err(ConsistencyError::RuvInconsistent( + cid.s_uuid.to_string(), + ))); + } + } + // Done! } } impl<'a> ReplicationUpdateVectorTransaction for ReplicationUpdateVectorWriteTransaction<'a> { - fn ruv_snapshot(&self) -> BTreeMap { - self.data - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect() + fn ruv_snapshot(&self) -> BptreeMapReadSnapshot<'_, Cid, IDLBitRange> { + self.data.to_snapshot() + } + + fn range_snapshot(&self) -> BptreeMapReadSnapshot<'_, Uuid, BTreeSet> { + self.ranged.to_snapshot() } } impl<'a> ReplicationUpdateVectorTransaction for ReplicationUpdateVectorReadTransaction<'a> { - fn ruv_snapshot(&self) -> BTreeMap { - self.data - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect() + fn ruv_snapshot(&self) -> BptreeMapReadSnapshot<'_, Cid, IDLBitRange> { + self.data.to_snapshot() + } + + fn range_snapshot(&self) -> BptreeMapReadSnapshot<'_, Uuid, BTreeSet> { + self.ranged.to_snapshot() } } impl<'a> ReplicationUpdateVectorWriteTransaction<'a> { + pub fn clear(&mut self) { + self.data.clear(); + self.ranged.clear(); + } + pub fn rebuild(&mut self, entries: &[Arc]) -> Result<(), OperationError> { + // Drop everything. + self.clear(); // Entries and their internal changelogs are the "source of truth" for all changes // that have ever occurred and are stored on this server. So we use them to rebuild our RUV // here! let mut rebuild_ruv: BTreeMap = BTreeMap::new(); + let mut rebuild_range: BTreeMap> = BTreeMap::default(); for entry in entries { // The DB id we need. @@ -194,6 +267,14 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> { idl.insert_id(eid); rebuild_ruv.insert(cid.clone(), idl); } + + if let Some(server_range) = rebuild_range.get_mut(&cid.s_uuid) { + server_range.insert(cid.ts); + } else { + let mut ts_range = BTreeSet::default(); + ts_range.insert(cid.ts); + rebuild_range.insert(cid.s_uuid, ts_range); + } } } @@ -202,8 +283,8 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> { idl.maybe_compress(); }); - self.data.clear(); self.data.extend(rebuild_ruv.into_iter()); + self.ranged.extend(rebuild_range.into_iter()); Ok(()) } @@ -217,6 +298,44 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> { } else { self.data.insert(cid.clone(), idl); } + + if let Some(server_range) = self.ranged.get_mut(&cid.s_uuid) { + server_range.insert(cid.ts); + } else { + let mut range = BTreeSet::default(); + range.insert(cid.ts); + self.ranged.insert(cid.s_uuid, range); + } + + Ok(()) + } + + pub fn update_entry_changestate( + &mut self, + entry: &EntrySealedCommitted, + ) -> Result<(), OperationError> { + let eid = entry.get_id(); + let ecstate = entry.get_changestate(); + + for cid in ecstate.cid_iter() { + if let Some(idl) = self.data.get_mut(cid) { + // We can't guarantee id order, so we have to do this properly. + idl.insert_id(eid); + } else { + let mut idl = IDLBitRange::new(); + idl.insert_id(eid); + self.data.insert(cid.clone(), idl); + } + + if let Some(server_range) = self.ranged.get_mut(&cid.s_uuid) { + server_range.insert(cid.ts); + } else { + let mut ts_range = BTreeSet::default(); + ts_range.insert(cid.ts); + self.ranged.insert(cid.s_uuid, ts_range); + } + } + Ok(()) } @@ -245,12 +364,39 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> { pub fn trim_up_to(&mut self, cid: &Cid) -> Result { let mut idl = IDLBitRange::new(); + let mut remove_suuid = Vec::default(); - self.data - .range((Unbounded, Excluded(cid))) - .for_each(|(_, ex_idl)| { - idl = ex_idl as &_ | &idl; - }); + // Here we can use the for_each here to be trimming the + // range set since that is not ordered by time, we need + // to do fragmented searches over this no matter what we + // try to do. + + for (cid, ex_idl) in self.data.range((Unbounded, Excluded(cid))) { + idl = ex_idl as &_ | &idl; + + // Remove the reverse version of the cid from the ranged index. + match self.ranged.get_mut(&cid.s_uuid) { + Some(server_range) => { + // Remove returns a bool if the element WAS present. + if !server_range.remove(&cid.ts) { + error!("Impossible State - The RUV is corrupted due to missing sid:ts pair in ranged index"); + return Err(OperationError::InvalidState); + } + if server_range.is_empty() { + remove_suuid.push(cid.s_uuid); + } + } + None => { + error!("Impossible State - The RUV is corrupted due to missing sid in ranged index"); + return Err(OperationError::InvalidState); + } + } + } + + for s_uuid in remove_suuid { + let x = self.ranged.remove(&s_uuid); + assert!(x.map(|y| y.is_empty()).unwrap_or(false)) + } // Trim all cid's up to this value, and return the range of IDs // that are affected. @@ -261,5 +407,6 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> { pub fn commit(self) { self.data.commit(); + self.ranged.commit(); } } diff --git a/server/lib/src/repl/tests.rs b/server/lib/src/repl/tests.rs index 2c6aa4e58..ba2d88ed9 100644 --- a/server/lib/src/repl/tests.rs +++ b/server/lib/src/repl/tests.rs @@ -1,6 +1,27 @@ +use crate::be::BackendTransaction; use crate::prelude::*; +use crate::repl::ruv::ReplicationUpdateVectorTransaction; use std::collections::BTreeMap; +fn repl_initialise( + from: &mut QueryServerReadTransaction<'_>, + to: &mut QueryServerWriteTransaction<'_>, +) -> Result<(), OperationError> { + // First, build the refresh context. + let refresh_context = from.supplier_provide_refresh()?; + + // Verify content of the refresh + // eprintln!("{:#?}", refresh_context); + + // Apply it to the server + to.consumer_apply_refresh(&refresh_context)?; + + // Need same d_uuid + assert_eq!(from.get_domain_uuid(), to.get_domain_uuid()); + + Ok(()) +} + #[qs_pair_test] async fn test_repl_refresh_basic(server_a: &QueryServer, server_b: &QueryServer) { // Rebuild / refresh the content of server a with the content from b. @@ -12,29 +33,13 @@ async fn test_repl_refresh_basic(server_a: &QueryServer, server_b: &QueryServer) let mut server_b_txn = server_b.read().await; - // First, build the refresh context. - let refresh_context = server_b_txn - .supplier_provide_refresh() - .expect("Failed to build refresh"); - - // Verify content of the refresh - // eprintln!("{:#?}", refresh_context); - - // Apply it to the server - assert!(server_a_txn - .consumer_apply_refresh(&refresh_context) + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) .and_then(|_| server_a_txn.commit()) .is_ok()); // Verify the content of server_a and server_b are identical. let mut server_a_txn = server_a.read().await; - // Need same d_uuid - assert_eq!( - server_a_txn.get_domain_uuid(), - server_b_txn.get_domain_uuid() - ); - let domain_entry_a = server_a_txn .internal_search_uuid(UUID_DOMAIN_INFO) .expect("Failed to access domain info"); @@ -46,8 +51,14 @@ async fn test_repl_refresh_basic(server_a: &QueryServer, server_b: &QueryServer) // Same d_vers / domain info. assert_eq!(domain_entry_a, domain_entry_b); - trace!("a {:#?}", domain_entry_a.get_changestate()); - trace!("b {:#?}", domain_entry_b.get_changestate()); + trace!( + "domain_changestate a {:#?}", + domain_entry_a.get_changestate() + ); + trace!( + "domain_changestate b {:#?}", + domain_entry_b.get_changestate() + ); // Compare that their change states are identical too. assert_eq!( @@ -94,6 +105,46 @@ async fn test_repl_refresh_basic(server_a: &QueryServer, server_b: &QueryServer) // Done! The entry content are identical as are their replication metadata. We are good // to go! + let a_ruv_range = server_a_txn.get_be_txn().get_ruv().current_ruv_range(); + + let b_ruv_range = server_b_txn.get_be_txn().get_ruv().current_ruv_range(); + + trace!(?a_ruv_range); + trace!(?b_ruv_range); + assert!(a_ruv_range == b_ruv_range); // Both servers will be post-test validated. } + +#[qs_pair_test] +async fn test_repl_increment_basic(server_a: &QueryServer, server_b: &QueryServer) { + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + + let a_ruv_range = server_a_txn.get_be_txn().get_ruv().current_ruv_range(); + + let b_ruv_range = server_b_txn.get_be_txn().get_ruv().current_ruv_range(); + + trace!(?a_ruv_range); + trace!(?b_ruv_range); + assert!(a_ruv_range == b_ruv_range); + + // Check ruv + // - should be same + // - incremental + // - no change. + + // Add an entry. + + // Do a ruv check. + + // Incremental. + // Should now be on the other partner. +} diff --git a/server/lib/src/schema.rs b/server/lib/src/schema.rs index 0db931017..72f341040 100644 --- a/server/lib/src/schema.rs +++ b/server/lib/src/schema.rs @@ -1265,6 +1265,21 @@ impl<'a> SchemaWriteTransaction<'a> { syntax: SyntaxType::ReferenceUuid, }, ); + self.attributes.insert( + AttrString::from("dynmember"), + SchemaAttribute { + name: AttrString::from("dynmember"), + uuid: UUID_SCHEMA_ATTR_DYNMEMBER, + description: String::from("List of dynamic members of the group"), + multivalue: true, + unique: false, + phantom: false, + sync_allowed: true, + replicated: false, + index: vec![IndexType::Equality], + syntax: SyntaxType::ReferenceUuid, + }, + ); // Migration related self.attributes.insert( AttrString::from("version"), diff --git a/server/lib/src/server/recycle.rs b/server/lib/src/server/recycle.rs index 195d96389..08b2420c4 100644 --- a/server/lib/src/server/recycle.rs +++ b/server/lib/src/server/recycle.rs @@ -569,6 +569,7 @@ mod tests { let time_p2 = time_p1 + Duration::from_secs(CHANGELOG_MAX_AGE * 2); let time_p3 = time_p2 + Duration::from_secs(CHANGELOG_MAX_AGE * 2); + trace!("test_tombstone_start"); let mut server_txn = server.write(time_p1).await; let admin = server_txn.internal_search_uuid(UUID_ADMIN).expect("failed");