20230508 replication incremental (#1620)

This commit is contained in:
Firstyear 2023-05-23 13:25:22 +10:00 committed by GitHub
parent 1cae034c4b
commit 48c620e43a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 1918 additions and 89 deletions

View file

@ -251,6 +251,7 @@ pub enum OperationError {
ReplEntryNotChanged,
ReplInvalidRUVState,
ReplDomainLevelUnsatisfiable,
ReplDomainUuidMismatch,
}
impl PartialEq for OperationError {

View file

@ -4,6 +4,7 @@
//! is to persist content safely to disk, load that content, and execute queries
//! utilising indexes in the most effective way possible.
use std::collections::BTreeMap;
use std::fs;
use std::ops::DerefMut;
use std::sync::Arc;
@ -19,10 +20,11 @@ use tracing::{trace, trace_span};
use uuid::Uuid;
use crate::be::dbentry::{DbBackup, DbEntry};
use crate::entry::{Entry, EntryCommitted, EntryNew, EntrySealed};
use crate::entry::Entry;
use crate::filter::{Filter, FilterPlan, FilterResolved, FilterValidResolved};
use crate::prelude::*;
use crate::repl::cid::Cid;
use crate::repl::proto::ReplCidRange;
use crate::repl::ruv::{
ReplicationUpdateVector, ReplicationUpdateVectorReadTransaction,
ReplicationUpdateVectorTransaction, ReplicationUpdateVectorWriteTransaction,
@ -179,7 +181,7 @@ impl IdRawEntry {
.map(|dbe| (self.id, dbe))
}
fn into_entry(self) -> Result<Entry<EntrySealed, EntryCommitted>, OperationError> {
fn into_entry(self) -> Result<EntrySealedCommitted, OperationError> {
let db_e = serde_json::from_slice(self.data.as_slice()).map_err(|e| {
admin_error!(?e, id = %self.id, "Serde JSON Error");
let raw_str = String::from_utf8_lossy(self.data.as_slice());
@ -203,7 +205,7 @@ pub trait BackendTransaction {
/// Recursively apply a filter, transforming into IdList's on the way. This builds a query
/// execution log, so that it can be examined how an operation proceeded.
#[allow(clippy::cognitive_complexity)]
#[instrument(level = "debug", name = "be::filter2idl", skip_all)]
// #[instrument(level = "debug", name = "be::filter2idl", skip_all)]
fn filter2idl(
&mut self,
filt: &FilterResolved,
@ -569,7 +571,6 @@ pub trait BackendTransaction {
erl: &Limits,
filt: &Filter<FilterValidResolved>,
) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
let _entered = trace_span!("be::search").entered();
// Unlike DS, even if we don't get the index back, we can just pass
// to the in-memory filter test and be done.
@ -668,7 +669,8 @@ pub trait BackendTransaction {
// Using the indexes, resolve the IdList here, or AllIds.
// Also get if the filter was 100% resolved or not.
let (idl, fplan) = self.filter2idl(filt.to_inner(), FILTER_EXISTS_TEST_THRESHOLD)?;
let (idl, fplan) = trace_span!("be::exists -> filter2idl")
.in_scope(|| self.filter2idl(filt.to_inner(), FILTER_EXISTS_TEST_THRESHOLD))?;
debug!(filter_executed_plan = ?fplan);
@ -719,14 +721,34 @@ pub trait BackendTransaction {
} // end match idl
}
fn retrieve_range(
&mut self,
ranges: &BTreeMap<Uuid, ReplCidRange>,
) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
// First pass the ranges to the ruv to resolve to an absolute set of
// entry id's.
let idl = self.get_ruv().range_to_idl(ranges);
// Because of how this works, I think that it's not possible for the idl
// to have any missing ids.
//
// If it was possible, we could just & with allids to remove the extraneous
// values.
// Make it an id list fr the backend.
let id_list = IdList::Indexed(idl);
self.get_idlayer().get_identry(&id_list).map_err(|e| {
admin_error!(?e, "get_identry failed");
e
})
}
fn verify(&mut self) -> Vec<Result<(), ConsistencyError>> {
self.get_idlayer().verify()
}
fn verify_entry_index(
&mut self,
e: &Entry<EntrySealed, EntryCommitted>,
) -> Result<(), ConsistencyError> {
fn verify_entry_index(&mut self, e: &EntrySealedCommitted) -> Result<(), ConsistencyError> {
// First, check our references in name2uuid, uuid2spn and uuid2rdn
if e.mask_recycled_ts().is_some() {
let e_uuid = e.get_uuid();
@ -951,12 +973,16 @@ impl<'a> BackendTransaction for BackendWriteTransaction<'a> {
}
impl<'a> BackendWriteTransaction<'a> {
pub(crate) fn get_ruv_write(&mut self) -> &mut ReplicationUpdateVectorWriteTransaction<'a> {
&mut self.ruv
}
#[instrument(level = "debug", name = "be::create", skip_all)]
pub fn create(
&mut self,
cid: &Cid,
entries: Vec<Entry<EntrySealed, EntryNew>>,
) -> Result<Vec<Entry<EntrySealed, EntryCommitted>>, OperationError> {
entries: Vec<EntrySealedNew>,
) -> Result<Vec<EntrySealedCommitted>, OperationError> {
if entries.is_empty() {
admin_error!("No entries provided to BE to create, invalid server call!");
return Err(OperationError::EmptyRequest);
@ -1012,8 +1038,8 @@ impl<'a> BackendWriteTransaction<'a> {
/// related to this entry as that could cause an infinite replication loop!
pub fn refresh(
&mut self,
entries: Vec<Entry<EntrySealed, EntryNew>>,
) -> Result<Vec<Entry<EntrySealed, EntryCommitted>>, OperationError> {
entries: Vec<EntrySealedNew>,
) -> Result<Vec<EntrySealedCommitted>, OperationError> {
if entries.is_empty() {
admin_error!("No entries provided to BE to create, invalid server call!");
return Err(OperationError::EmptyRequest);
@ -1090,6 +1116,126 @@ impl<'a> BackendWriteTransaction<'a> {
.try_for_each(|(pre, post)| self.entry_index(Some(pre.as_ref()), Some(post)))
}
#[instrument(level = "debug", name = "be::incremental_prepare", skip_all)]
pub fn incremental_prepare<'x>(
&mut self,
entry_meta: &[EntryIncrementalNew],
) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
let mut ret_entries = Vec::with_capacity(entry_meta.len());
let id_max_pre = self.idlayer.get_id2entry_max_id()?;
let mut id_max = id_max_pre;
for ctx_ent in entry_meta.iter() {
let ctx_ent_uuid = ctx_ent.get_uuid();
let idx_key = ctx_ent_uuid.as_hyphenated().to_string();
let idl = self
.get_idlayer()
.get_idl("uuid", IndexType::Equality, &idx_key)?;
let entry = match idl {
Some(idl) if idl.is_empty() => {
// Create the stub entry, we just need it to have an id number
// allocated.
id_max += 1;
Arc::new(EntrySealedCommitted::stub_sealed_committed_id(
id_max, ctx_ent,
))
// Okay, entry ready to go.
}
Some(idl) if idl.len() == 1 => {
// Get the entry from this idl.
let mut entries = self
.get_idlayer()
.get_identry(&IdList::Indexed(idl))
.map_err(|e| {
admin_error!(?e, "get_identry failed");
e
})?;
if let Some(entry) = entries.pop() {
// Return it.
entry
} else {
error!("Invalid entry state, index was unable to locate entry");
return Err(OperationError::InvalidDbState);
}
// Done, entry is ready to go
}
Some(idl) => {
// BUG - duplicate uuid!
error!(uuid = ?ctx_ent_uuid, "Invalid IDL state, uuid index must have only a single or no values. Contains {}", idl.len());
return Err(OperationError::InvalidDbState);
}
None => {
// BUG - corrupt index.
error!(uuid = ?ctx_ent_uuid, "Invalid IDL state, uuid index must be present");
return Err(OperationError::InvalidDbState);
}
};
ret_entries.push(entry);
}
if id_max != id_max_pre {
self.idlayer.set_id2entry_max_id(id_max);
}
Ok(ret_entries)
}
#[instrument(level = "debug", name = "be::incremental_apply", skip_all)]
pub fn incremental_apply(
&mut self,
update_entries: &[(EntrySealedCommitted, Arc<EntrySealedCommitted>)],
create_entries: Vec<EntrySealedNew>,
) -> Result<(), OperationError> {
// For the values in create_cands, create these with similar code to the refresh
// path.
if !create_entries.is_empty() {
// Assign id's to all the new entries.
let mut id_max = self.idlayer.get_id2entry_max_id()?;
let c_entries: Vec<_> = create_entries
.into_iter()
.map(|e| {
id_max += 1;
e.into_sealed_committed_id(id_max)
})
.collect();
self.idlayer.write_identries(c_entries.iter())?;
self.idlayer.set_id2entry_max_id(id_max);
// Update the RUV with all the changestates of the affected entries.
for e in c_entries.iter() {
self.get_ruv().update_entry_changestate(e)?;
}
// Now update the indexes as required.
for e in c_entries.iter() {
self.entry_index(None, Some(e))?
}
}
// Otherwise this is a cid-less copy of modify.
if !update_entries.is_empty() {
self.get_idlayer()
.write_identries(update_entries.iter().map(|(up, _)| up))?;
for (e, _) in update_entries.iter() {
self.get_ruv().update_entry_changestate(e)?;
}
for (post, pre) in update_entries.iter() {
self.entry_index(Some(pre.as_ref()), Some(post))?
}
}
Ok(())
}
#[instrument(level = "debug", name = "be::reap_tombstones", skip_all)]
pub fn reap_tombstones(&mut self, cid: &Cid) -> Result<usize, OperationError> {
// We plan to clear the RUV up to this cid. So we need to build an IDL
@ -1204,8 +1350,8 @@ impl<'a> BackendWriteTransaction<'a> {
#[allow(clippy::cognitive_complexity)]
fn entry_index(
&mut self,
pre: Option<&Entry<EntrySealed, EntryCommitted>>,
post: Option<&Entry<EntrySealed, EntryCommitted>>,
pre: Option<&EntrySealedCommitted>,
post: Option<&EntrySealedCommitted>,
) -> Result<(), OperationError> {
let (e_uuid, e_id, uuid_same) = match (pre, post) {
(None, None) => {

View file

@ -51,10 +51,8 @@ use crate::idm::ldap::ldap_vattr_map;
use crate::modify::{Modify, ModifyInvalid, ModifyList, ModifyValid};
use crate::prelude::*;
use crate::repl::cid::Cid;
use crate::repl::proto::ReplEntryV1;
// use crate::repl::entry::EntryChangelog;
use crate::repl::entry::EntryChangeState;
use crate::repl::proto::{ReplEntryV1, ReplIncrementalEntryV1};
use crate::schema::{SchemaAttribute, SchemaClass, SchemaTransaction};
use crate::value::{
@ -66,11 +64,15 @@ pub type EntryInitNew = Entry<EntryInit, EntryNew>;
pub type EntryInvalidNew = Entry<EntryInvalid, EntryNew>;
pub type EntryRefreshNew = Entry<EntryRefresh, EntryNew>;
pub type EntrySealedNew = Entry<EntrySealed, EntryNew>;
pub type EntryValidCommitted = Entry<EntryValid, EntryCommitted>;
pub type EntrySealedCommitted = Entry<EntrySealed, EntryCommitted>;
pub type EntryInvalidCommitted = Entry<EntryInvalid, EntryCommitted>;
pub type EntryReducedCommitted = Entry<EntryReduced, EntryCommitted>;
pub type EntryTuple = (Arc<EntrySealedCommitted>, EntryInvalidCommitted);
pub type EntryIncrementalNew = Entry<EntryIncremental, EntryNew>;
pub type EntryIncrementalCommitted = Entry<EntryIncremental, EntryCommitted>;
// Entry should have a lifecycle of types. This is Raw (modifiable) and Entry (verified).
// This way, we can move between them, but only certain actions are possible on either
// This means modifications happen on Raw, but to move to Entry, you schema normalise.
@ -82,16 +84,16 @@ pub type EntryTuple = (Arc<EntrySealedCommitted>, EntryInvalidCommitted);
// This is specifically important for the commit to the backend, as we only want to
// commit validated types.
// Has never been in the DB, so doesn't have an ID.
#[derive(Clone, Debug)]
pub struct EntryNew; // new
// It's been in the DB, so it has an id
#[derive(Clone, Debug)]
pub struct EntryCommitted {
id: u64,
}
// It's been in the DB, so it has an id
// pub struct EntryPurged;
#[derive(Clone, Debug)]
pub struct EntryInit;
@ -114,6 +116,14 @@ pub struct EntryRefresh {
ecstate: EntryChangeState,
}
// Alternate path - this entry came from an incremental replication.
#[derive(Clone, Debug)]
pub struct EntryIncremental {
// Must have a uuid, else we can't proceed at all.
uuid: Uuid,
ecstate: EntryChangeState,
}
/* |
* | The changes made within this entry are validated by the schema.
* V
@ -629,7 +639,188 @@ impl<STATE> Entry<EntryRefresh, STATE> {
attrs: self.attrs,
};
ne.validate(schema)
ne.validate(schema).map(|()| ne)
}
}
impl<STATE> Entry<EntryIncremental, STATE> {
pub fn get_uuid(&self) -> Uuid {
self.valid.uuid
}
}
impl Entry<EntryIncremental, EntryNew> {
fn stub_ecstate(&self) -> EntryChangeState {
self.valid.ecstate.stub()
}
pub fn rehydrate(repl_inc_entry: &ReplIncrementalEntryV1) -> Result<Self, OperationError> {
let (uuid, ecstate, attrs) = repl_inc_entry.rehydrate()?;
Ok(Entry {
valid: EntryIncremental { uuid, ecstate },
state: EntryNew,
attrs,
})
}
pub(crate) fn is_add_conflict(&self, db_entry: &EntrySealedCommitted) -> bool {
debug_assert!(self.valid.uuid == db_entry.valid.uuid);
// This is a conflict if the state 'at' is not identical
self.valid.ecstate.at() != db_entry.valid.ecstate.at()
}
pub(crate) fn merge_state(
&self,
db_ent: &EntrySealedCommitted,
_schema: &dyn SchemaTransaction,
) -> EntryIncrementalCommitted {
use crate::repl::entry::State;
// Paranoid check.
debug_assert!(self.valid.uuid == db_ent.valid.uuid);
// First, determine if either side is a tombstone. This is needed so that only
// when both sides are live
let self_cs = &self.valid.ecstate;
let db_cs = db_ent.get_changestate();
match (self_cs.current(), db_cs.current()) {
(
State::Live {
at: at_left,
changes: changes_left,
},
State::Live {
at: at_right,
changes: changes_right,
},
) => {
debug_assert!(at_left == at_right);
// Given the current db entry, compare and merge our attributes to
// form a resultant entry attr and ecstate
//
// To shortcut this we dedup the attr set and then iterate.
let mut attr_set: Vec<_> =
changes_left.keys().chain(changes_right.keys()).collect();
attr_set.sort_unstable();
attr_set.dedup();
// Make a new ecstate and attrs set.
let mut changes = BTreeMap::default();
let mut eattrs = Eattrs::default();
// Now we have the set of attrs from both sides. Lets see what state they are in!
for attr_name in attr_set.into_iter() {
match (changes_left.get(attr_name), changes_right.get(attr_name)) {
(Some(_cid_left), Some(_cid_right)) => {
// This is the normal / usual and most "fun" case. Here we need to determine
// which side is latest and then do a valueset merge. This is also
// needing schema awareness depending on the attribute!
todo!();
}
(Some(cid_left), None) => {
// Keep the value on the left.
changes.insert(attr_name.clone(), cid_left.clone());
if let Some(valueset) = self.attrs.get(attr_name) {
eattrs.insert(attr_name.clone(), valueset.clone());
}
}
(None, Some(_cid_right)) => {
// Keep the value on the right.
todo!();
}
(None, None) => {
// Should be impossible! At least one side or the other must have a change.
debug_assert!(false);
}
}
}
let ecstate = EntryChangeState::build(State::Live {
at: at_left.clone(),
changes,
});
Entry {
valid: EntryIncremental {
uuid: self.valid.uuid,
ecstate,
},
state: EntryCommitted {
id: db_ent.state.id,
},
attrs: eattrs,
}
}
(State::Tombstone { at: left_at }, State::Tombstone { at: right_at }) => {
// Due to previous checks, this must be equal!
debug_assert!(left_at == right_at);
debug_assert!(self.attrs == db_ent.attrs);
// Doesn't matter which side we take.
Entry {
valid: EntryIncremental {
uuid: self.valid.uuid,
ecstate: self.valid.ecstate.clone(),
},
state: EntryCommitted {
id: db_ent.state.id,
},
attrs: self.attrs.clone(),
}
}
(State::Tombstone { .. }, State::Live { .. }) => {
// Keep the left side.
Entry {
valid: EntryIncremental {
uuid: self.valid.uuid,
ecstate: self.valid.ecstate.clone(),
},
state: EntryCommitted {
id: db_ent.state.id,
},
attrs: self.attrs.clone(),
}
}
(State::Live { .. }, State::Tombstone { .. }) => {
// Keep the right side
Entry {
valid: EntryIncremental {
uuid: db_ent.valid.uuid,
ecstate: db_ent.valid.ecstate.clone(),
},
state: EntryCommitted {
id: db_ent.state.id,
},
attrs: db_ent.attrs.clone(),
}
}
}
}
}
impl Entry<EntryIncremental, EntryCommitted> {
pub(crate) fn validate_repl(self, schema: &dyn SchemaTransaction) -> EntryValidCommitted {
// Unlike the other method of schema validation, we can't return an error
// here when schema fails - we need to in-place move the entry to a
// conflict state so that the replication can proceed.
let mut ne = Entry {
valid: EntryValid {
uuid: self.valid.uuid,
ecstate: self.valid.ecstate,
},
state: self.state,
attrs: self.attrs,
};
if let Err(e) = ne.validate(schema) {
warn!(uuid = ?self.valid.uuid, err = ?e, "Entry failed schema check, moving to a conflict state");
ne.add_ava_int("class", Value::new_class("conflict"));
todo!();
}
ne
}
}
@ -664,7 +855,7 @@ impl<STATE> Entry<EntryInvalid, STATE> {
attrs: self.attrs,
};
ne.validate(schema)
ne.validate(schema).map(|()| ne)
}
}
@ -859,11 +1050,19 @@ impl Entry<EntrySealed, EntryCommitted> {
self
}
/*
pub fn get_changelog_mut(&mut self) -> &mut EntryChangelog {
&mut self.valid.eclog
pub(crate) fn stub_sealed_committed_id(
id: u64,
ctx_ent: &EntryIncrementalNew,
) -> EntrySealedCommitted {
let uuid = ctx_ent.get_uuid();
let ecstate = ctx_ent.stub_ecstate();
Entry {
valid: EntrySealed { uuid, ecstate },
state: EntryCommitted { id },
attrs: Default::default(),
}
}
*/
/// Insert a claim to this entry. This claim can NOT be persisted to disk, this is only
/// used during a single Event session.
@ -1441,10 +1640,7 @@ impl Entry<EntrySealed, EntryCommitted> {
}
impl<STATE> Entry<EntryValid, STATE> {
fn validate(
self,
schema: &dyn SchemaTransaction,
) -> Result<Entry<EntryValid, STATE>, SchemaError> {
fn validate(&self, schema: &dyn SchemaTransaction) -> Result<(), SchemaError> {
let schema_classes = schema.get_classes();
let schema_attributes = schema.get_attributes();
@ -1666,7 +1862,7 @@ impl<STATE> Entry<EntryValid, STATE> {
}
// Well, we got here, so okay!
Ok(self)
Ok(())
}
pub fn invalidate(self, cid: Cid, ecstate: EntryChangeState) -> Entry<EntryInvalid, STATE> {

View file

@ -68,9 +68,10 @@ pub mod prelude {
pub use crate::be::Limits;
pub use crate::constants::*;
pub use crate::entry::{
Entry, EntryCommitted, EntryInit, EntryInitNew, EntryInvalid, EntryInvalidCommitted,
EntryInvalidNew, EntryNew, EntryReduced, EntryReducedCommitted, EntryRefresh,
EntryRefreshNew, EntrySealed, EntrySealedCommitted, EntrySealedNew, EntryTuple, EntryValid,
Entry, EntryCommitted, EntryIncrementalCommitted, EntryIncrementalNew, EntryInit,
EntryInitNew, EntryInvalid, EntryInvalidCommitted, EntryInvalidNew, EntryNew, EntryReduced,
EntryReducedCommitted, EntryRefresh, EntryRefreshNew, EntrySealed, EntrySealedCommitted,
EntrySealedNew, EntryTuple, EntryValid,
};
pub use crate::event::{CreateEvent, DeleteEvent, ExistsEvent, ModifyEvent, SearchEvent};
pub use crate::filter::{

View file

@ -146,6 +146,31 @@ trait Plugin {
Err(OperationError::InvalidState)
}
fn pre_repl_incremental(
_qs: &mut QueryServerWriteTransaction,
_cand: &mut [(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)],
) -> Result<(), OperationError> {
admin_error!(
"plugin {} has an unimplemented pre_repl_incremental!",
Self::id()
);
// Err(OperationError::InvalidState)
Ok(())
}
fn post_repl_incremental(
_qs: &mut QueryServerWriteTransaction,
_pre_cand: &[Arc<EntrySealedCommitted>],
_cand: &[EntrySealedCommitted],
) -> Result<(), OperationError> {
admin_error!(
"plugin {} has an unimplemented post_repl_incremental!",
Self::id()
);
// Err(OperationError::InvalidState)
Ok(())
}
fn verify(_qs: &mut QueryServerReadTransaction) -> Vec<Result<(), ConsistencyError>> {
admin_error!("plugin {} has an unimplemented verify!", Self::id());
vec![Err(ConsistencyError::Unknown)]
@ -297,6 +322,32 @@ impl Plugins {
.and_then(|_| memberof::MemberOf::post_repl_refresh(qs, cand))
}
#[instrument(level = "debug", name = "plugins::run_pre_repl_incremental", skip_all)]
pub fn run_pre_repl_incremental(
qs: &mut QueryServerWriteTransaction,
cand: &mut [(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)],
) -> Result<(), OperationError> {
base::Base::pre_repl_incremental(qs, cand)
// .and_then(|_| jwskeygen::JwsKeygen::pre_repl_incremental(qs, cand, me))
// .and_then(|_| gidnumber::GidNumber::pre_repl_incremental(qs, cand, me))
.and_then(|_| domain::Domain::pre_repl_incremental(qs, cand))
.and_then(|_| spn::Spn::pre_repl_incremental(qs, cand))
.and_then(|_| session::SessionConsistency::pre_repl_incremental(qs, cand))
// attr unique should always be last
.and_then(|_| attrunique::AttrUnique::pre_repl_incremental(qs, cand))
}
#[instrument(level = "debug", name = "plugins::run_post_repl_incremental", skip_all)]
pub fn run_post_repl_incremental(
qs: &mut QueryServerWriteTransaction,
pre_cand: &[Arc<EntrySealedCommitted>],
cand: &[EntrySealedCommitted],
) -> Result<(), OperationError> {
refint::ReferentialIntegrity::post_repl_incremental(qs, pre_cand, cand)
.and_then(|_| spn::Spn::post_repl_incremental(qs, pre_cand, cand))
.and_then(|_| memberof::MemberOf::post_repl_incremental(qs, pre_cand, cand))
}
#[instrument(level = "debug", name = "plugins::run_verify", skip_all)]
pub fn run_verify(
qs: &mut QueryServerReadTransaction,

View file

@ -4,6 +4,8 @@ use crate::plugins::Plugins;
use crate::prelude::*;
use crate::repl::proto::ReplRuvRange;
use crate::repl::ruv::ReplicationUpdateVectorTransaction;
use std::collections::BTreeMap;
use std::sync::Arc;
impl<'a> QueryServerReadTransaction<'a> {
// Get the current state of "where we are up to"
@ -37,15 +39,319 @@ impl<'a> QueryServerReadTransaction<'a> {
let ruv_snapshot = self.get_be_txn().get_ruv();
// What's the current set of ranges?
ruv_snapshot.current_ruv_range()
ruv_snapshot
.current_ruv_range()
.map(|ranges| ReplRuvRange::V1 { ranges })
}
}
impl<'a> QueryServerWriteTransaction<'a> {
// Apply the state changes if they are valid.
fn consumer_incremental_apply_entries(
&mut self,
ctx_entries: &[ReplIncrementalEntryV1],
) -> Result<(), OperationError> {
trace!(?ctx_entries);
// No action needed for this if the entries are empty.
if ctx_entries.is_empty() {
debug!("No entries to act upon");
return Ok(());
}
/*
* Incremental is very similar to modify in how we have to treat the entries
* with a pre and post state. However we need an incremental prepare so that
* when new entries are provided to us we can merge to a stub and then commit
* it correctly. This takes an extra backend interface that prepares the
* entry stubs for us.
*/
// I think we need to rehydrate all the repl content to a partial
// entry. This way all the types are consistent and ready.
let ctx_entries: Vec<_> = ctx_entries.iter().map(
EntryIncrementalNew::rehydrate
)
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
error!(err = ?e, "Unable to process replication incremental entries to valid entry states for replication");
e
})?;
let db_entries = self.be_txn.incremental_prepare(&ctx_entries).map_err(|e| {
error!("Failed to access entries from db");
e
})?;
// Need to probably handle conflicts here in this phase. I think they
// need to be pushed to a separate list where they are then "created"
// as a conflict.
// First find if entries are in a conflict state.
let (conflicts, proceed): (Vec<_>, Vec<_>) = ctx_entries
.iter()
.zip(db_entries.into_iter())
.partition(|(ctx_ent, db_ent)| ctx_ent.is_add_conflict(db_ent.as_ref()));
// Now we have a set of conflicts and a set of entries to proceed.
//
// /- entries that need to be created as conflicts.
// | /- entries that survive and need update to the db in place.
// v v
let (conflict_create, conflict_update): (
Vec<EntrySealedNew>,
Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)>,
) = conflicts
.into_iter()
.map(|(_ctx_ent, _db_ent)| {
// Determine which of the entries must become the conflict
// and which will now persist. There are two possible cases.
//
// 1. The ReplIncremental is after the DBEntry, and becomes the conflict.
// This means we just update the db entry with itself.
//
// 2. The ReplIncremental is before the DBEntry, and becomes live.
// This means we have to take the DBEntry as it exists, convert
// it to a new entry. Then we have to take the repl incremental
// entry and place it into the update queue.
todo!();
})
.unzip();
let proceed_update: Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)> = proceed
.into_iter()
.map(|(ctx_ent, db_ent)| {
// This now is the set of entries that are able to be updated. Merge
// their attribute sets/states per the change state rules.
// This must create an EntryInvalidCommitted
let merge_ent = ctx_ent.merge_state(db_ent.as_ref(), &self.schema);
(merge_ent, db_ent)
})
.collect();
// To be consistent to Modify, we need to run pre-modify here.
let mut all_updates = conflict_update
.into_iter()
.chain(proceed_update.into_iter())
.collect::<Vec<_>>();
// Plugins can mark entries into a conflict status.
Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| {
admin_error!(
"Refresh operation failed (pre_repl_incremental plugin), {:?}",
e
);
e
})?;
// Now we have to schema check our data and separate to schema_valid and
// invalid.
let all_updates_valid = all_updates
.into_iter()
.map(|(ctx_ent, db_ent)| {
// Check the schema
//
// In these cases when an entry fails schema, we mark it to
// a conflict state and then retain it in the update process.
//
// The marking is done INSIDE this function!
let sealed_ent = ctx_ent.validate_repl(&self.schema).seal(&self.schema);
(sealed_ent, db_ent)
})
.collect::<Vec<_>>();
/*
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
error!(err = ?e, "Failed to validate schema of incremental entries");
OperationError::SchemaViolation(e)
})?;
*/
// We now have three sets!
//
// * conflict_create - entries to be created that are conflicted via add statements (duplicate uuid)
// * schema_invalid - entries that were merged and their attribute state has now become invalid to schema.
// * schema_valid - entries that were merged and are schema valid.
//
// From these sets, we will move conflict_create and schema_invalid into the replication masked
// state. However schema_valid needs to be processed to check for plugin rules as well. If
// anything hits one of these states we need to have a way to handle this too in a consistent
// manner.
//
// Then similar to modify, we need the pre and post candidates.
// We need to unzip the schema_valid and invalid entries.
self.be_txn
.incremental_apply(&all_updates_valid, conflict_create)
.map_err(|e| {
admin_error!("betxn create failure {:?}", e);
e
})?;
// Plugins need these unzipped
let (cand, pre_cand): (Vec<_>, Vec<_>) = all_updates_valid.into_iter().unzip();
// We don't need to process conflict_creates here, since they are all conflicting
// uuids which means that the uuids are all *here* so they will trigger anything
// that requires processing anyway.
Plugins::run_post_repl_incremental(self, pre_cand.as_slice(), cand.as_slice()).map_err(
|e| {
admin_error!(
"Refresh operation failed (post_repl_incremental plugin), {:?}",
e
);
e
},
)?;
self.changed_uuid.extend(cand.iter().map(|e| e.get_uuid()));
if !self.changed_acp {
self.changed_acp = cand
.iter()
.chain(pre_cand.iter().map(|e| e.as_ref()))
.any(|e| e.attribute_equality("class", &PVCLASS_ACP))
}
if !self.changed_oauth2 {
self.changed_oauth2 = cand
.iter()
.chain(pre_cand.iter().map(|e| e.as_ref()))
.any(|e| e.attribute_equality("class", &PVCLASS_OAUTH2_RS));
}
trace!(
schema_reload = ?self.changed_schema,
acp_reload = ?self.changed_acp,
oauth2_reload = ?self.changed_oauth2,
domain_reload = ?self.changed_domain,
);
Ok(())
}
pub fn consumer_apply_changes(
&mut self,
ctx: &ReplIncrementalContext,
) -> Result<(), OperationError> {
match ctx {
ReplIncrementalContext::NoChangesAvailable => {
info!("no changes are available");
Ok(())
}
ReplIncrementalContext::RefreshRequired => {
todo!();
}
ReplIncrementalContext::UnwillingToSupply => {
todo!();
}
ReplIncrementalContext::V1 {
domain_version,
domain_uuid,
ranges,
schema_entries,
meta_entries,
entries,
} => self.consumer_apply_changes_v1(
*domain_version,
*domain_uuid,
ranges,
schema_entries,
meta_entries,
entries,
),
}
}
#[instrument(level = "debug", skip_all)]
pub fn consumer_apply_changes(&mut self) -> Result<(), OperationError> {
fn consumer_apply_changes_v1(
&mut self,
ctx_domain_version: DomainVersion,
ctx_domain_uuid: Uuid,
ctx_ranges: &BTreeMap<Uuid, ReplCidRange>,
ctx_schema_entries: &[ReplIncrementalEntryV1],
ctx_meta_entries: &[ReplIncrementalEntryV1],
ctx_entries: &[ReplIncrementalEntryV1],
) -> Result<(), OperationError> {
if ctx_domain_version < DOMAIN_MIN_LEVEL {
error!("Unable to proceed with consumer incremental - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
return Err(OperationError::ReplDomainLevelUnsatisfiable);
} else if ctx_domain_version > DOMAIN_MAX_LEVEL {
error!("Unable to proceed with consumer incremental - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAX_LEVEL);
return Err(OperationError::ReplDomainLevelUnsatisfiable);
};
// Assert that the d_uuid matches the repl domain uuid.
let db_uuid = self.be_txn.get_db_d_uuid();
if db_uuid != ctx_domain_uuid {
error!("Unable to proceed with consumer incremental - incoming domain uuid does not match our database uuid. You must investigate this situation. {:?} != {:?}", db_uuid, ctx_domain_uuid);
return Err(OperationError::ReplDomainUuidMismatch);
}
debug!(
"Proceeding to apply incremental from domain {:?} at level {}",
ctx_domain_uuid, ctx_domain_version
);
// == ⚠️ Below this point we begin to make changes! ==
// Apply the schema entries first.
self.consumer_incremental_apply_entries(ctx_schema_entries)
.map_err(|e| {
error!("Failed to apply incremental schema entries");
e
})?;
// We need to reload schema now!
self.reload_schema().map_err(|e| {
error!("Failed to reload schema");
e
})?;
// Apply meta entries now.
self.consumer_incremental_apply_entries(ctx_meta_entries)
.map_err(|e| {
error!("Failed to apply incremental schema entries");
e
})?;
// This is re-loaded in case the domain name changed on the remote. Also needed for changing
// the domain version.
self.reload_domain_info().map_err(|e| {
error!("Failed to reload domain info");
e
})?;
// Trigger for post commit hooks. Should we detect better in the entry
// apply phases?
self.changed_schema = true;
self.changed_domain = true;
// Update all other entries now.
self.consumer_incremental_apply_entries(ctx_entries)
.map_err(|e| {
error!("Failed to apply incremental schema entries");
e
})?;
// Finally, confirm that the ranges that we have added match the ranges from our
// context. Note that we get this in a writeable form!
let ruv = self.be_txn.get_ruv_write();
ruv.refresh_validate_ruv(ctx_ranges).map_err(|e| {
error!("RUV ranges were not rebuilt correctly.");
e
})?;
ruv.refresh_update_ruv(ctx_ranges).map_err(|e| {
error!("Unable to update RUV with supplier ranges.");
e
})?;
Ok(())
}
@ -57,12 +363,14 @@ impl<'a> QueryServerWriteTransaction<'a> {
ReplRefreshContext::V1 {
domain_version,
domain_uuid,
ranges,
schema_entries,
meta_entries,
entries,
} => self.consumer_apply_refresh_v1(
*domain_version,
*domain_uuid,
ranges,
schema_entries,
meta_entries,
entries,
@ -97,7 +405,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
.map(|e| {
e.validate(&self.schema)
.map_err(|e| {
admin_error!("Schema Violation in create validate {:?}", e);
admin_error!("Schema Violation in refresh validate {:?}", e);
OperationError::SchemaViolation(e)
})
.map(|e| {
@ -107,8 +415,6 @@ impl<'a> QueryServerWriteTransaction<'a> {
})
.collect::<Result<Vec<EntrySealedNew>, _>>()?;
// Do not run plugs!
let commit_cand = self.be_txn.refresh(norm_cand).map_err(|e| {
admin_error!("betxn create failure {:?}", e);
e
@ -133,6 +439,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
&mut self,
ctx_domain_version: DomainVersion,
ctx_domain_uuid: Uuid,
ctx_ranges: &BTreeMap<Uuid, ReplCidRange>,
ctx_schema_entries: &[ReplEntryV1],
ctx_meta_entries: &[ReplEntryV1],
ctx_entries: &[ReplEntryV1],
@ -202,7 +509,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
// Apply the domain info entry / system info / system config entry?
self.consumer_refresh_create_entries(ctx_meta_entries)
.map_err(|e| {
error!("Failed to refresh schema entries");
error!("Failed to refresh meta entries");
e
})?;
@ -229,7 +536,19 @@ impl<'a> QueryServerWriteTransaction<'a> {
e
})?;
// Run post repl plugins
// Finally, confirm that the ranges that we have recreated match the ranges from our
// context. Note that we get this in a writeable form!
let ruv = self.be_txn.get_ruv_write();
ruv.refresh_validate_ruv(ctx_ranges).map_err(|e| {
error!("RUV ranges were not rebuilt correctly.");
e
})?;
ruv.refresh_update_ruv(ctx_ranges).map_err(|e| {
error!("Unable to update RUV with supplier ranges.");
e
})?;
Ok(())
}

View file

@ -2,14 +2,18 @@ use super::cid::Cid;
use crate::entry::Eattrs;
use crate::prelude::*;
use crate::schema::SchemaTransaction;
// use crate::valueset;
use std::collections::BTreeMap;
#[derive(Debug, Clone)]
pub enum State {
Live { changes: BTreeMap<AttrString, Cid> },
Tombstone { at: Cid },
Live {
at: Cid,
changes: BTreeMap<AttrString, Cid>,
},
Tombstone {
at: Cid,
},
}
#[derive(Debug, Clone)]
@ -25,7 +29,10 @@ impl EntryChangeState {
.map(|attr| (attr, cid.clone()))
.collect();
let st = State::Live { changes };
let st = State::Live {
at: cid.clone(),
changes,
};
EntryChangeState { st }
}
@ -45,19 +52,47 @@ impl EntryChangeState {
.map(|attr| (attr, cid.clone()))
.collect();
State::Live { changes }
State::Live {
at: cid.clone(),
changes,
}
};
EntryChangeState { st }
}
pub(crate) fn build(st: State) -> Self {
EntryChangeState { st }
}
pub fn current(&self) -> &State {
&self.st
}
pub fn at(&self) -> &Cid {
match &self.st {
State::Live { at, .. } => at,
State::Tombstone { at } => at,
}
}
pub(crate) fn stub(&self) -> Self {
let st = match &self.st {
State::Live { at, changes: _ } => State::Live {
at: at.clone(),
changes: Default::default(),
},
State::Tombstone { at } => State::Tombstone { at: at.clone() },
};
EntryChangeState { st }
}
pub fn change_ava(&mut self, cid: &Cid, attr: &str) {
match &mut self.st {
State::Live { ref mut changes } => {
State::Live {
at: _,
ref mut changes,
} => {
if let Some(change) = changes.get_mut(attr) {
// Update the cid.
if change != cid {
@ -75,7 +110,7 @@ impl EntryChangeState {
pub fn tombstone(&mut self, cid: &Cid) {
match &mut self.st {
State::Live { changes: _ } => self.st = State::Tombstone { at: cid.clone() },
State::Live { at: _, changes: _ } => self.st = State::Tombstone { at: cid.clone() },
State::Tombstone { .. } => {} // no-op
};
}
@ -97,14 +132,14 @@ impl EntryChangeState {
pub fn contains_tail_cid(&self, cid: &Cid) -> bool {
// This is slow? Is it needed?
match &self.st {
State::Live { changes } => changes.values().any(|change| change == cid),
State::Live { at: _, changes } => changes.values().any(|change| change == cid),
State::Tombstone { at } => at == cid,
}
}
pub fn cid_iter(&self) -> Vec<&Cid> {
match &self.st {
State::Live { changes } => {
State::Live { at: _, changes } => {
let mut v: Vec<_> = changes.values().collect();
v.sort_unstable();
v.dedup();
@ -119,7 +154,7 @@ impl EntryChangeState {
F: FnMut(&AttrString, &mut Cid) -> bool,
{
match &mut self.st {
State::Live { changes } => changes.retain(f),
State::Live { at: _, changes } => changes.retain(f),
State::Tombstone { .. } => {}
}
}
@ -139,7 +174,9 @@ impl EntryChangeState {
.unwrap_or(false);
match (&self.st, is_ts) {
(State::Live { changes }, false) => {
(State::Live { at, changes }, false) => {
// Every change must be after at.
// Check that all attrs from expected, have a value in our changes.
let inconsistent: Vec<_> = expected_attrs
.keys()
@ -160,7 +197,18 @@ impl EntryChangeState {
* cases here, which is why we pretty much don't allow schema to be deleted
* but we have to handle it here due to a test case that simulates this.
*/
let desync = schema.is_replicated(attr) && !changes.contains_key(*attr);
let change_cid_present = if let Some(change_cid) = changes.get(*attr) {
if change_cid < at {
warn!("changestate has a change that occurs before entry was created! {attr:?} {change_cid:?} {at:?}");
results.push(Err(ConsistencyError::ChangeStateDesynchronised(entry_id)));
}
true
} else {
false
};
// Only assert this when we actually have replication requirements.
let desync = schema.is_replicated(attr) && !change_cid_present;
if desync {
debug!(%entry_id, %attr, %desync);
}
@ -195,12 +243,14 @@ impl PartialEq for EntryChangeState {
match (&self.st, &rhs.st) {
(
State::Live {
at: at_left,
changes: changes_left,
},
State::Live {
at: at_right,
changes: changes_right,
},
) => changes_left.eq(changes_right),
) => at_left.eq(at_right) && changes_left.eq(changes_right),
(State::Tombstone { at: at_left }, State::Tombstone { at: at_right }) => {
at_left.eq(at_right)
}

View file

@ -386,6 +386,8 @@ pub struct ReplAttrStateV1 {
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub enum ReplStateV1 {
Live {
at: ReplCidV1,
// Also add AT here for breaking entry origin on conflict.
attrs: BTreeMap<String, ReplAttrStateV1>,
},
Tombstone {
@ -408,7 +410,7 @@ impl ReplEntryV1 {
let uuid = entry.get_uuid();
let st = match cs.current() {
State::Live { changes } => {
State::Live { at, changes } => {
let live_attrs = entry.get_ava();
let attrs = changes
@ -440,7 +442,10 @@ impl ReplEntryV1 {
})
.collect();
ReplStateV1::Live { attrs }
ReplStateV1::Live {
at: at.into(),
attrs,
}
}
State::Tombstone { at } => ReplStateV1::Tombstone { at: at.into() },
};
@ -450,8 +455,8 @@ impl ReplEntryV1 {
pub fn rehydrate(&self) -> Result<(EntryChangeState, Eattrs), OperationError> {
match &self.st {
ReplStateV1::Live { attrs } => {
trace!("{:#?}", attrs);
ReplStateV1::Live { at, attrs } => {
trace!("{:?} {:#?}", at, attrs);
// We need to build two sets, one for the Entry Change States, and one for the
// Eattrs.
let mut changes = BTreeMap::default();
@ -484,8 +489,10 @@ impl ReplEntryV1 {
}
}
let at: Cid = at.into();
let ecstate = EntryChangeState {
st: State::Live { changes },
st: State::Live { at, changes },
};
Ok((ecstate, eattrs))
}
@ -511,6 +518,130 @@ impl ReplEntryV1 {
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
// I think partial entries should be separate? This clearly implies a refresh.
pub struct ReplIncrementalEntryV1 {
pub(crate) uuid: Uuid,
// Change State
pub(crate) st: ReplStateV1,
}
impl ReplIncrementalEntryV1 {
pub fn new(
entry: &EntrySealedCommitted,
schema: &SchemaReadTransaction,
ctx_range: &BTreeMap<Uuid, ReplCidRange>,
) -> ReplIncrementalEntryV1 {
let cs = entry.get_changestate();
let uuid = entry.get_uuid();
let st = match cs.current() {
State::Live { at, changes } => {
// Only put attributes into the change state that were changed within the range that was
// requested.
let live_attrs = entry.get_ava();
let attrs = changes
.iter()
.filter_map(|(attr_name, cid)| {
// If the cid is within the ctx range
let within = schema.is_replicated(attr_name)
&& ctx_range
.get(&cid.s_uuid)
.map(|repl_range| {
// Supply anything up to and including.
cid.ts <= repl_range.ts_max &&
// ts_min is always what the consumer already has.
cid.ts > repl_range.ts_min
})
// If not present in the range, assume it's not needed.
.unwrap_or(false);
// Then setup to supply it.
if within {
let live_attr = live_attrs.get(attr_name.as_str());
let cid = cid.into();
let attr = live_attr.and_then(|maybe| {
if maybe.len() > 0 {
Some(maybe.to_repl_v1())
} else {
None
}
});
Some((attr_name.to_string(), ReplAttrStateV1 { cid, attr }))
} else {
None
}
})
.collect();
ReplStateV1::Live {
at: at.into(),
attrs,
}
}
// Don't care what the at is - send the tombstone.
State::Tombstone { at } => ReplStateV1::Tombstone { at: at.into() },
};
ReplIncrementalEntryV1 { uuid, st }
}
pub fn rehydrate(&self) -> Result<(Uuid, EntryChangeState, Eattrs), OperationError> {
match &self.st {
ReplStateV1::Live { at, attrs } => {
trace!("{:?} {:#?}", at, attrs);
let mut changes = BTreeMap::default();
let mut eattrs = Eattrs::default();
for (attr_name, ReplAttrStateV1 { cid, attr }) in attrs.iter() {
let astring: AttrString = attr_name.as_str().into();
let cid: Cid = cid.into();
if let Some(attr_value) = attr {
let v = valueset::from_repl_v1(attr_value).map_err(|e| {
error!("Unable to restore valueset for {}", attr_name);
e
})?;
if eattrs.insert(astring.clone(), v).is_some() {
error!(
"Impossible eattrs state, attribute {} appears to be duplicated!",
attr_name
);
return Err(OperationError::InvalidEntryState);
}
}
if changes.insert(astring, cid).is_some() {
error!(
"Impossible changes state, attribute {} appears to be duplicated!",
attr_name
);
return Err(OperationError::InvalidEntryState);
}
}
let at: Cid = at.into();
let ecstate = EntryChangeState {
st: State::Live { at, changes },
};
Ok((self.uuid, ecstate, eattrs))
}
ReplStateV1::Tombstone { at } => {
let at: Cid = at.into();
let eattrs = Eattrs::default();
let ecstate = EntryChangeState {
st: State::Tombstone { at },
};
Ok((self.uuid, ecstate, eattrs))
}
}
}
}
// From / Into Entry
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
@ -519,8 +650,31 @@ pub enum ReplRefreshContext {
V1 {
domain_version: DomainVersion,
domain_uuid: Uuid,
// We need to send the current state of the ranges to populate into
// the ranges so that lookups and ranges work properly.
ranges: BTreeMap<Uuid, ReplCidRange>,
schema_entries: Vec<ReplEntryV1>,
meta_entries: Vec<ReplEntryV1>,
entries: Vec<ReplEntryV1>,
},
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ReplIncrementalContext {
NoChangesAvailable,
RefreshRequired,
UnwillingToSupply,
V1 {
domain_version: DomainVersion,
domain_uuid: Uuid,
// We need to send the current state of the ranges to populate into
// the ranges so that lookups and ranges work properly, and the
// consumer ends with the same state as we have (or at least merges)
// it with this.
ranges: BTreeMap<Uuid, ReplCidRange>,
schema_entries: Vec<ReplIncrementalEntryV1>,
meta_entries: Vec<ReplIncrementalEntryV1>,
entries: Vec<ReplIncrementalEntryV1>,
},
}

View file

@ -10,7 +10,7 @@ use kanidm_proto::v1::ConsistencyError;
use crate::prelude::*;
use crate::repl::cid::Cid;
use crate::repl::proto::{ReplCidRange, ReplRuvRange};
use crate::repl::proto::ReplCidRange;
use std::fmt;
#[derive(Default)]
@ -29,6 +29,34 @@ pub struct ReplicationUpdateVector {
ranged: BptreeMap<Uuid, BTreeSet<Duration>>,
}
/// The status of replication after investigating the RUV states.
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum RangeDiffStatus {
/// Ok - can proceed with replication, supplying the following
/// ranges of changes to the consumer.
Ok(BTreeMap<Uuid, ReplCidRange>),
/// Refresh - The consumer is lagging and is missing a set of changes
/// that are required to proceed. The consumer *MUST* be refreshed
/// immediately.
Refresh {
lag_range: BTreeMap<Uuid, ReplCidRange>,
},
/// Unwilling - The consumer is advanced beyond our state, and supplying
/// changes to them may introduce inconsistency in replication. This
/// server should be investigated immediately.
Unwilling {
adv_range: BTreeMap<Uuid, ReplCidRange>,
},
/// Critical - The consumer is lagging and missing changes, but also is
/// in possession of changes advancing it beyond our current state. This
/// is a critical fault in replication and the topology must be
/// investigated immediately.
Critical {
lag_range: BTreeMap<Uuid, ReplCidRange>,
adv_range: BTreeMap<Uuid, ReplCidRange>,
},
}
impl ReplicationUpdateVector {
pub fn write(&self) -> ReplicationUpdateVectorWriteTransaction<'_> {
ReplicationUpdateVectorWriteTransaction {
@ -43,6 +71,105 @@ impl ReplicationUpdateVector {
ranged: self.ranged.read(),
}
}
pub(crate) fn range_diff(
consumer_range: &BTreeMap<Uuid, ReplCidRange>,
supplier_range: &BTreeMap<Uuid, ReplCidRange>,
) -> RangeDiffStatus {
// We need to build a new set of ranges that express the difference between
// these two states.
let mut diff_range = BTreeMap::default();
let mut lag_range = BTreeMap::default();
let mut adv_range = BTreeMap::default();
let mut consumer_lagging = false;
let mut supplier_lagging = false;
// We need to look at each uuid in the *supplier* and assert if they are present
// on the *consumer*.
//
// If there are s_uuids with the same max, we don't add it to the
// diff
for (supplier_s_uuid, supplier_cid_range) in supplier_range.iter() {
match consumer_range.get(supplier_s_uuid) {
Some(consumer_cid_range) => {
// The two windows just have to overlap. If they over lap
// meaning that consumer max > supplier min, then if supplier
// max > consumer max, then the range between consumer max
// and supplier max must be supplied.
//
// consumer min consumer max
// <-- supplier min supplier max -->
//
// In other words if we have:
//
// consumer min consumer max
// supplier min supplier max
//
// then because there has been too much lag between consumer and
// the supplier then there is a risk of changes being dropped or
// missing. In the future we could alter this to force the resend
// of zero -> supplier max, but I think thought is needed to
// ensure no corruption in this case.
if consumer_cid_range.ts_max < supplier_cid_range.ts_min {
consumer_lagging = true;
lag_range.insert(
*supplier_s_uuid,
ReplCidRange {
ts_min: supplier_cid_range.ts_min,
ts_max: consumer_cid_range.ts_max,
},
);
} else if supplier_cid_range.ts_max < consumer_cid_range.ts_min {
// It could be valid in this case to ignore this instead
// of erroring as changelog trim has occurred? Thought needed.
supplier_lagging = true;
adv_range.insert(
*supplier_s_uuid,
ReplCidRange {
ts_min: supplier_cid_range.ts_max,
ts_max: consumer_cid_range.ts_min,
},
);
} else if consumer_cid_range.ts_max < supplier_cid_range.ts_max {
// We require the changes from consumer max -> supplier max.
diff_range.insert(
*supplier_s_uuid,
ReplCidRange {
ts_min: consumer_cid_range.ts_max,
ts_max: supplier_cid_range.ts_max,
},
);
}
// else ...
// In this case there is no action required since consumer_cid_range.ts_max
// must be greater than or equal to supplier max.
}
None => {
// The consumer does not have any content from this
// server. Select from Zero -> max of the supplier.
diff_range.insert(
*supplier_s_uuid,
ReplCidRange {
ts_min: Duration::ZERO,
ts_max: supplier_cid_range.ts_max,
},
);
}
}
}
match (consumer_lagging, supplier_lagging) {
(false, false) => RangeDiffStatus::Ok(diff_range),
(true, false) => RangeDiffStatus::Refresh { lag_range },
(false, true) => RangeDiffStatus::Unwilling { adv_range },
(true, true) => RangeDiffStatus::Critical {
lag_range,
adv_range,
},
}
}
}
pub struct ReplicationUpdateVectorWriteTransaction<'a> {
@ -69,9 +196,8 @@ pub trait ReplicationUpdateVectorTransaction {
fn range_snapshot(&self) -> BptreeMapReadSnapshot<'_, Uuid, BTreeSet<Duration>>;
fn current_ruv_range(&self) -> Result<ReplRuvRange, OperationError> {
let ranges = self
.range_snapshot()
fn current_ruv_range(&self) -> Result<BTreeMap<Uuid, ReplCidRange>, OperationError> {
self.range_snapshot()
.iter()
.map(|(s_uuid, range)| match (range.first(), range.last()) {
(Some(first), Some(last)) => Ok((
@ -89,9 +215,55 @@ pub trait ReplicationUpdateVectorTransaction {
Err(OperationError::InvalidState)
}
})
.collect::<Result<BTreeMap<_, _>, _>>()?;
.collect::<Result<BTreeMap<_, _>, _>>()
}
Ok(ReplRuvRange::V1 { ranges })
fn range_to_idl(&self, ctx_ranges: &BTreeMap<Uuid, ReplCidRange>) -> IDLBitRange {
let mut idl = IDLBitRange::new();
// Force the set to be compressed, saves on seeks during
// inserts.
idl.compress();
let range = self.range_snapshot();
let ruv = self.ruv_snapshot();
// The range we have has a collection of s_uuid containing low -> high ranges.
// We need to convert this to absolute ranges of all the idlbitranges that
// relate to the entries we have.
for (s_uuid, ctx_range) in ctx_ranges {
// For each server and range low to high, iterate over
// the list of CID's in the main RUV.
let ruv_range = match range.get(s_uuid) {
Some(r) => r,
None => {
// This is valid because if we clean up a server range on
// this node, but the other server isn't aware yet, so we
// just no-op this. The changes we have will still be
// correctly found and sent.
debug!(?s_uuid, "range not found in ruv.");
continue;
}
};
// Get from the min to the max. Unbounded and
// Included(ctx_range.ts_max) are the same in
// this context.
for ts in ruv_range.range((Excluded(ctx_range.ts_min), Unbounded)) {
let cid = Cid {
ts: *ts,
s_uuid: *s_uuid,
};
if let Some(ruv_idl) = ruv.get(&cid) {
ruv_idl.into_iter().for_each(|id| idl.insert_id(id))
}
// If the cid isn't found, it may have been trimmed, but that's okay.
}
}
idl
}
fn verify(
@ -243,6 +415,67 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
self.ranged.clear();
}
pub(crate) fn refresh_validate_ruv(
&self,
ctx_ranges: &BTreeMap<Uuid, ReplCidRange>,
) -> Result<(), OperationError> {
// Assert that the ruv that currently exists, is a valid data set of
// the supplied consumer range - especially check that when a uuid exists in
// our ruv, that it's maximum matches the ctx ruv.
//
// Since the ctx range comes from the supplier, when we rebuild due to the
// state machine then some values may not exist since they were replaced. But
// the server uuid maximums must exist.
let mut valid = true;
for (server_uuid, server_range) in self.ranged.iter() {
match ctx_ranges.get(server_uuid) {
Some(ctx_range) => {
let ctx_ts = &ctx_range.ts_max;
match server_range.last() {
Some(s_ts) if ctx_ts == s_ts => {
// Ok
trace!(?server_uuid, ?ctx_ts, ?s_ts, "valid");
}
Some(s_ts) => {
valid = false;
warn!(?server_uuid, ?ctx_ts, ?s_ts, "inconsistent s_uuid in ruv");
}
None => {
valid = false;
warn!(?server_uuid, ?ctx_ts, "inconsistent server range in ruv");
}
}
}
None => {
valid = false;
error!(?server_uuid, "s_uuid absent from in ruv");
}
}
}
if valid {
Ok(())
} else {
Err(OperationError::ReplInvalidRUVState)
}
}
pub(crate) fn refresh_update_ruv(
&mut self,
ctx_ranges: &BTreeMap<Uuid, ReplCidRange>,
) -> Result<(), OperationError> {
for (ctx_s_uuid, ctx_range) in ctx_ranges.iter() {
if let Some(s_range) = self.ranged.get_mut(ctx_s_uuid) {
// Just assert the max is what we have.
s_range.insert(ctx_range.ts_max);
} else {
let s_range = btreeset!(ctx_range.ts_max);
self.ranged.insert(*ctx_s_uuid, s_range);
}
}
Ok(())
}
pub fn rebuild(&mut self, entries: &[Arc<EntrySealedCommitted>]) -> Result<(), OperationError> {
// Drop everything.
self.clear();
@ -362,9 +595,62 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
}
*/
/*
How to handle changelog trimming? If we trim a server out from the RUV as a whole, we
need to be sure we don't oversupply changes the consumer already has. How can we do
this cleanly? Or do we just deal with it because our own local trim will occur soon after?
The situation would be
A: 1 -> 3
B: 1 -> 3
Assuming A trims first:
A:
B: 1 -> 3
Then on A <- B, B would try to supply 1->3 to A assuming it is not present. However,
the trim would occur soon after on B causing:
A:
B:
And then the supply would stop. So either A needs to retain the max/min in it's range
to allow the comparison here to continue even if it's ruv is cleaned. Or, we need to
have a delayed trim on the range that is 2x the normal trim range to give a buffer?
Mostly longer ruv/cid ranges aren't an issue for us, so could we just maek these ranges
really large?
*/
// Problem Cases
/*
What about generations? There is a "live" generation which can be replicated and a
former generation of ranges that previously existed. To replicate:
// The consumer must have content within the current live range.
consumer.live_max < supplier.live_max
consumer.live_max >= supplier.live_min
// The consumer must have all content that was formerly known.
consumer.live_min >= supplier.former_max
// I don't think we care what
*/
/*
B and C must be sequential to an s_uuid.
Former (trimmed) | Live (current)
A <-> B | C <-> D
0 <-> A | B <-> B
*/
pub fn trim_up_to(&mut self, cid: &Cid) -> Result<IDLBitRange, OperationError> {
let mut idl = IDLBitRange::new();
let mut remove_suuid = Vec::default();
// let mut remove_suuid = Vec::default();
// Here we can use the for_each here to be trimming the
// range set since that is not ordered by time, we need
@ -378,12 +664,26 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
match self.ranged.get_mut(&cid.s_uuid) {
Some(server_range) => {
// Remove returns a bool if the element WAS present.
let last = match server_range.last() {
Some(l) => *l,
None => {
error!("Impossible State - The RUV should not be empty");
return Err(OperationError::InvalidState);
}
};
if cid.ts != last {
if !server_range.remove(&cid.ts) {
error!("Impossible State - The RUV is corrupted due to missing sid:ts pair in ranged index");
return Err(OperationError::InvalidState);
}
} else {
trace!("skipping maximum cid for s_uuid");
}
if server_range.is_empty() {
remove_suuid.push(cid.s_uuid);
// remove_suuid.push(cid.s_uuid);
error!("Impossible State - The RUV should not be cleared for a s_uuid!");
return Err(OperationError::InvalidState);
}
}
None => {
@ -393,10 +693,12 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
}
}
/*
for s_uuid in remove_suuid {
let x = self.ranged.remove(&s_uuid);
assert!(x.map(|y| y.is_empty()).unwrap_or(false))
}
*/
// Trim all cid's up to this value, and return the range of IDs
// that are affected.
@ -410,3 +712,350 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
self.ranged.commit();
}
}
#[cfg(test)]
mod tests {
use super::RangeDiffStatus;
use super::ReplCidRange;
use super::ReplicationUpdateVector;
use std::collections::BTreeMap;
use std::time::Duration;
const UUID_A: uuid::Uuid = uuid::uuid!("13b530b0-efdd-4934-8fb7-9c35c8aab79e");
const UUID_B: uuid::Uuid = uuid::uuid!("16327cf8-6a34-4a17-982c-b2eaa6d02d00");
const UUID_C: uuid::Uuid = uuid::uuid!("2ed717e3-15be-41e6-b966-10a1f6d7ea1c");
#[test]
fn test_ruv_range_diff_1() {
let ctx_a = BTreeMap::default();
let ctx_b = BTreeMap::default();
let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
let expect = RangeDiffStatus::Ok(BTreeMap::default());
assert_eq!(result, expect);
// Test the inverse.
let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
let expect = RangeDiffStatus::Ok(BTreeMap::default());
assert_eq!(result, expect);
}
#[test]
fn test_ruv_range_diff_2() {
let ctx_a = btreemap!((
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(1),
ts_max: Duration::from_secs(3),
}
));
let ctx_b = BTreeMap::default();
let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
let expect = RangeDiffStatus::Ok(BTreeMap::default());
assert_eq!(result, expect);
let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
let expect = RangeDiffStatus::Ok(btreemap!((
UUID_A,
ReplCidRange {
ts_min: Duration::ZERO,
ts_max: Duration::from_secs(3),
}
)));
assert_eq!(result, expect);
}
#[test]
fn test_ruv_range_diff_3() {
let ctx_a = btreemap!((
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(1),
ts_max: Duration::from_secs(3),
}
));
let ctx_b = btreemap!((
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(1),
ts_max: Duration::from_secs(3),
}
));
let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
let expect = RangeDiffStatus::Ok(BTreeMap::default());
assert_eq!(result, expect);
let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
let expect = RangeDiffStatus::Ok(BTreeMap::default());
assert_eq!(result, expect);
}
#[test]
fn test_ruv_range_diff_4() {
let ctx_a = btreemap!((
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(1),
ts_max: Duration::from_secs(3),
}
));
let ctx_b = btreemap!((
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(1),
ts_max: Duration::from_secs(4),
}
));
let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
let expect = RangeDiffStatus::Ok(btreemap!((
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(3),
ts_max: Duration::from_secs(4),
}
)));
assert_eq!(result, expect);
let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
let expect = RangeDiffStatus::Ok(BTreeMap::default());
assert_eq!(result, expect);
}
#[test]
fn test_ruv_range_diff_5() {
let ctx_a = btreemap!((
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(5),
ts_max: Duration::from_secs(7),
}
));
let ctx_b = btreemap!((
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(1),
ts_max: Duration::from_secs(4),
}
));
let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
let expect = RangeDiffStatus::Unwilling {
adv_range: btreemap!((
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(4),
ts_max: Duration::from_secs(5),
}
)),
};
assert_eq!(result, expect);
let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
let expect = RangeDiffStatus::Refresh {
lag_range: btreemap!((
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(5),
ts_max: Duration::from_secs(4),
}
)),
};
assert_eq!(result, expect);
}
#[test]
fn test_ruv_range_diff_6() {
let ctx_a = btreemap!((
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(1),
ts_max: Duration::from_secs(4),
}
));
let ctx_b = btreemap!(
(
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(1),
ts_max: Duration::from_secs(3),
}
),
(
UUID_B,
ReplCidRange {
ts_min: Duration::from_secs(2),
ts_max: Duration::from_secs(4),
}
)
);
let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
let expect = RangeDiffStatus::Ok(btreemap!((
UUID_B,
ReplCidRange {
ts_min: Duration::ZERO,
ts_max: Duration::from_secs(4),
}
)));
assert_eq!(result, expect);
let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
let expect = RangeDiffStatus::Ok(btreemap!((
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(3),
ts_max: Duration::from_secs(4),
}
)));
assert_eq!(result, expect);
}
#[test]
fn test_ruv_range_diff_7() {
let ctx_a = btreemap!(
(
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(1),
ts_max: Duration::from_secs(4),
}
),
(
UUID_C,
ReplCidRange {
ts_min: Duration::from_secs(2),
ts_max: Duration::from_secs(5),
}
)
);
let ctx_b = btreemap!(
(
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(1),
ts_max: Duration::from_secs(3),
}
),
(
UUID_B,
ReplCidRange {
ts_min: Duration::from_secs(2),
ts_max: Duration::from_secs(4),
}
),
(
UUID_C,
ReplCidRange {
ts_min: Duration::from_secs(3),
ts_max: Duration::from_secs(4),
}
)
);
let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
let expect = RangeDiffStatus::Ok(btreemap!((
UUID_B,
ReplCidRange {
ts_min: Duration::ZERO,
ts_max: Duration::from_secs(4),
}
)));
assert_eq!(result, expect);
let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
let expect = RangeDiffStatus::Ok(btreemap!(
(
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(3),
ts_max: Duration::from_secs(4),
}
),
(
UUID_C,
ReplCidRange {
ts_min: Duration::from_secs(4),
ts_max: Duration::from_secs(5),
}
)
));
assert_eq!(result, expect);
}
#[test]
fn test_ruv_range_diff_8() {
let ctx_a = btreemap!(
(
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(4),
ts_max: Duration::from_secs(6),
}
),
(
UUID_B,
ReplCidRange {
ts_min: Duration::from_secs(1),
ts_max: Duration::from_secs(2),
}
)
);
let ctx_b = btreemap!(
(
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(1),
ts_max: Duration::from_secs(2),
}
),
(
UUID_B,
ReplCidRange {
ts_min: Duration::from_secs(4),
ts_max: Duration::from_secs(6),
}
)
);
let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
let expect = RangeDiffStatus::Critical {
adv_range: btreemap!((
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(2),
ts_max: Duration::from_secs(4),
}
)),
lag_range: btreemap!((
UUID_B,
ReplCidRange {
ts_min: Duration::from_secs(4),
ts_max: Duration::from_secs(2),
}
)),
};
assert_eq!(result, expect);
let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
let expect = RangeDiffStatus::Critical {
adv_range: btreemap!((
UUID_B,
ReplCidRange {
ts_min: Duration::from_secs(2),
ts_max: Duration::from_secs(4),
}
)),
lag_range: btreemap!((
UUID_A,
ReplCidRange {
ts_min: Duration::from_secs(4),
ts_max: Duration::from_secs(2),
}
)),
};
assert_eq!(result, expect);
}
}

View file

@ -1,4 +1,8 @@
use super::proto::{ReplEntryV1, ReplRefreshContext};
use super::proto::{
ReplEntryV1, ReplIncrementalContext, ReplIncrementalEntryV1, ReplRefreshContext, ReplRuvRange,
};
use super::ruv::{RangeDiffStatus, ReplicationUpdateVector, ReplicationUpdateVectorTransaction};
use crate::be::BackendTransaction;
use crate::prelude::*;
impl<'a> QueryServerReadTransaction<'a> {
@ -10,8 +14,124 @@ impl<'a> QueryServerReadTransaction<'a> {
// * Which entry attr-states need to be sent, if any
#[instrument(level = "debug", skip_all)]
pub fn supplier_provide_changes(&mut self) -> Result<(), OperationError> {
Ok(())
pub fn supplier_provide_changes(
&mut self,
ctx_ruv: ReplRuvRange,
) -> Result<ReplIncrementalContext, OperationError> {
// Convert types if needed. This way we can compare ruv's correctly.
let ctx_ranges = match ctx_ruv {
ReplRuvRange::V1 { ranges } => ranges,
};
let our_ranges = self
.get_be_txn()
.get_ruv()
.current_ruv_range()
.map_err(|e| {
error!(err = ?e, "Unable to access supplier RUV range");
e
})?;
// Compare this to our internal ranges - work out the list of entry
// id's that are now different.
let supply_ranges = ReplicationUpdateVector::range_diff(&ctx_ranges, &our_ranges);
// If empty, return an empty set of changes!
let ranges = match supply_ranges {
RangeDiffStatus::Ok(ranges) => ranges,
RangeDiffStatus::Refresh { lag_range } => {
error!("Replication - Consumer is lagging and must be refreshed.");
debug!(?lag_range);
return Ok(ReplIncrementalContext::RefreshRequired);
}
RangeDiffStatus::Unwilling { adv_range } => {
error!("Replication - Supplier is lagging and must be investigated.");
debug!(?adv_range);
return Ok(ReplIncrementalContext::UnwillingToSupply);
}
RangeDiffStatus::Critical {
lag_range,
adv_range,
} => {
error!("Replication Critical - Servers are advanced of us, and also lagging! This must be immediately investigated!");
debug!(?lag_range);
debug!(?adv_range);
return Ok(ReplIncrementalContext::UnwillingToSupply);
}
};
debug!(?ranges, "these ranges will be supplied");
if ranges.is_empty() {
return Ok(ReplIncrementalContext::NoChangesAvailable);
}
// From the set of change id's, fetch those entries.
// This is done by supplying the ranges to the be which extracts
// the entries affected by the idls in question.
let entries = self.get_be_txn().retrieve_range(&ranges).map_err(|e| {
admin_error!(?e, "backend failure");
OperationError::Backend
})?;
// Separate the entries into schema, meta and remaining.
let (schema_entries, rem_entries): (Vec<_>, Vec<_>) = entries.into_iter().partition(|e| {
e.get_ava_set("class")
.map(|cls| {
cls.contains(&PVCLASS_ATTRIBUTETYPE as &PartialValue)
|| cls.contains(&PVCLASS_CLASSTYPE as &PartialValue)
})
.unwrap_or(false)
});
let (meta_entries, entries): (Vec<_>, Vec<_>) = rem_entries.into_iter().partition(|e| {
e.get_ava_set("uuid")
.map(|uset| {
uset.contains(&PVUUID_DOMAIN_INFO as &PartialValue)
|| uset.contains(&PVUUID_SYSTEM_INFO as &PartialValue)
|| uset.contains(&PVUUID_SYSTEM_CONFIG as &PartialValue)
})
.unwrap_or(false)
});
trace!(?schema_entries);
trace!(?meta_entries);
trace!(?entries);
// For each entry, determine the changes that exist on the entry that fall
// into the ruv range - reduce to a incremental set of changes.
let schema = self.get_schema();
let domain_version = self.d_info.d_vers;
let domain_uuid = self.d_info.d_uuid;
let schema_entries: Vec<_> = schema_entries
.into_iter()
.map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
.collect();
let meta_entries: Vec<_> = meta_entries
.into_iter()
.map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
.collect();
let entries: Vec<_> = entries
.into_iter()
.map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
.collect();
// Build the incremental context.
Ok(ReplIncrementalContext::V1 {
domain_version,
domain_uuid,
ranges,
schema_entries,
meta_entries,
entries,
})
}
#[instrument(level = "debug", skip_all)]
@ -25,6 +145,16 @@ impl<'a> QueryServerReadTransaction<'a> {
let domain_version = self.d_info.d_vers;
let domain_uuid = self.d_info.d_uuid;
// What is the set of data we are providing?
let ranges = self
.get_be_txn()
.get_ruv()
.current_ruv_range()
.map_err(|e| {
error!(err = ?e, "Unable to access supplier RUV range");
e
})?;
// * the domain uuid
// * the set of schema entries
// * the set of non-schema entries
@ -93,6 +223,7 @@ impl<'a> QueryServerReadTransaction<'a> {
Ok(ReplRefreshContext::V1 {
domain_version,
domain_uuid,
ranges,
schema_entries,
meta_entries,
entries,

View file

@ -19,6 +19,22 @@ fn repl_initialise(
// Need same d_uuid
assert_eq!(from.get_domain_uuid(), to.get_domain_uuid());
// Ruvs are the same now
let a_ruv_range = from
.get_be_txn()
.get_ruv()
.current_ruv_range()
.expect("Failed to get RUV range A");
let b_ruv_range = to
.get_be_txn()
.get_ruv()
.current_ruv_range()
.expect("Failed to get RUV range B");
trace!(?a_ruv_range);
trace!(?b_ruv_range);
assert!(a_ruv_range == b_ruv_range);
Ok(())
}
@ -105,13 +121,6 @@ async fn test_repl_refresh_basic(server_a: &QueryServer, server_b: &QueryServer)
// Done! The entry content are identical as are their replication metadata. We are good
// to go!
let a_ruv_range = server_a_txn.get_be_txn().get_ruv().current_ruv_range();
let b_ruv_range = server_b_txn.get_be_txn().get_ruv().current_ruv_range();
trace!(?a_ruv_range);
trace!(?b_ruv_range);
assert!(a_ruv_range == b_ruv_range);
// Both servers will be post-test validated.
}
@ -126,25 +135,147 @@ async fn test_repl_increment_basic(server_a: &QueryServer, server_b: &QueryServe
.and_then(|_| server_a_txn.commit())
.is_ok());
// - incremental - no changes should be present
let mut server_a_txn = server_a.read().await;
let a_ruv_range = server_a_txn
.consumer_get_state()
.expect("Unable to access RUV range");
// End the read.
drop(server_a_txn);
// Get the changes.
let changes = server_b_txn
.supplier_provide_changes(a_ruv_range)
.expect("Unable to generate supplier changes");
// Check the changes = should be empty.
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let a_ruv_range = server_a_txn.get_be_txn().get_ruv().current_ruv_range();
server_a_txn
.consumer_apply_changes(&changes)
.expect("Unable to apply changes to consumer.");
let b_ruv_range = server_b_txn.get_be_txn().get_ruv().current_ruv_range();
// Do a ruv check - should still be the same.
let a_ruv_range = server_a_txn
.get_be_txn()
.get_ruv()
.current_ruv_range()
.expect("Failed to get RUV range A");
let b_ruv_range = server_b_txn
.get_be_txn()
.get_ruv()
.current_ruv_range()
.expect("Failed to get RUV range B");
trace!(?a_ruv_range);
trace!(?b_ruv_range);
assert!(a_ruv_range == b_ruv_range);
// Check ruv
// - should be same
// - incremental
// - no change.
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
// Add an entry.
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
let t_uuid = Uuid::new_v4();
assert!(server_b_txn
.internal_create(vec![entry_init!(
("class", Value::new_class("object")),
("class", Value::new_class("person")),
("name", Value::new_iname("testperson1")),
("uuid", Value::Uuid(t_uuid)),
("description", Value::new_utf8s("testperson1")),
("displayname", Value::new_utf8s("testperson1"))
),])
.is_ok());
server_b_txn.commit().expect("Failed to commit");
// Do a ruv check.
// let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.read().await;
// Assert the entry is not on A.
assert_eq!(
server_a_txn.internal_search_uuid(t_uuid),
Err(OperationError::NoMatchingEntries)
);
let a_ruv_range = server_a_txn
.get_be_txn()
.get_ruv()
.current_ruv_range()
.expect("Failed to get RUV range A");
let b_ruv_range = server_b_txn
.get_be_txn()
.get_ruv()
.current_ruv_range()
.expect("Failed to get RUV range B");
trace!(?a_ruv_range);
trace!(?b_ruv_range);
assert!(a_ruv_range != b_ruv_range);
// Now setup the consumer state for the next incremental replication.
let a_ruv_range = server_a_txn
.consumer_get_state()
.expect("Unable to access RUV range");
// End the read.
drop(server_a_txn);
// Incremental.
// Should now be on the other partner.
// Get the changes.
let changes = server_b_txn
.supplier_provide_changes(a_ruv_range)
.expect("Unable to generate supplier changes");
// Check the changes = should be empty.
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
server_a_txn
.consumer_apply_changes(&changes)
.expect("Unable to apply changes to consumer.");
// RUV should be consistent again.
let a_ruv_range = server_a_txn
.get_be_txn()
.get_ruv()
.current_ruv_range()
.expect("Failed to get RUV range A");
let b_ruv_range = server_b_txn
.get_be_txn()
.get_ruv()
.current_ruv_range()
.expect("Failed to get RUV range B");
trace!(?a_ruv_range);
trace!(?b_ruv_range);
assert!(a_ruv_range == b_ruv_range);
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
}
// Test RUV content when a server's changes have been trimmed out and are not present
// in a refresh.
// Test change of a domain name over incremental.
// Test schema addition / change over incremental.
// Test change of domain version over incremental.
// Test when a group has a member A, and then the group is conflicted, that when
// group is moved to conflict the memberShip of A is removed.
// Multiple tombstone states / directions.
// Ref int deletes references when tombstone is replicated over. May need consumer
// to have some extra groups that need cleanup
// Test add then delete on an attribute, and that it replicates the empty state to
// the other side.
// Test memberof over replication boundaries.

View file

@ -134,7 +134,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
// do the CORRECT thing and recommit as we may find later we always
// want to add CSN's or other.
let res: Result<Vec<Entry<EntrySealed, EntryCommitted>>, OperationError> = candidates
let res: Result<Vec<EntrySealedCommitted>, OperationError> = candidates
.into_iter()
.map(|entry| {
entry