mirror of
https://github.com/kanidm/kanidm.git
synced 2025-05-23 01:13:54 +02:00
Fix RUV trim (#2466)
Fixes two major issues with replication. The first was related to server refreshes. When a server was refreshed it would retain it's server unique id. If the server had lagged and was disconnected from replication and administrator would naturally then refresh it's database. This meant that on next tombstone purge of the server, it's RUV would jump ahead causing it's refresh-supplier to now believe it was lagging (which was not the case). In the situation where a server is refreshed, we reset the servers unique replication ID which avoids the RUV having "jumps". The second issue was related to RUV trimming. A server which had older RUV entries (say from servers that have been trimmed) would "taint" and re-supply those server ID's back to nodes that wanted to trim them. This also meant that on a restart of the server, that if the node had correctly trimmed the server ID, it would be re-added in memory. This improves RUV trimming by limiting what what compare and check as a supplier to only CID's that are within the valid changelog window. This itself presented challenges with "how to determine if a server should be removed from the RUV". To achieve this we now check for "overlap" of the RUVS. If overlap isn't occurring it indicates split brain or node isolation, and replication is stopped in these cases.
This commit is contained in:
parent
d42268269a
commit
23cc2e7745
899
Cargo.lock
generated
899
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -116,7 +116,7 @@ clap_complete = "^4.4.4"
|
|||
# Forced by saffron/cron
|
||||
chrono = "^0.4.31"
|
||||
compact_jwt = { version = "^0.3.3", default-features = false }
|
||||
concread = "^0.4.3"
|
||||
concread = "^0.4.4"
|
||||
cron = "0.12.0"
|
||||
crossbeam = "0.8.1"
|
||||
criterion = "^0.5.1"
|
||||
|
|
|
@ -9,7 +9,7 @@ use chrono::Utc;
|
|||
use cron::Schedule;
|
||||
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time::{interval, sleep, Duration};
|
||||
use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
|
||||
|
||||
use crate::config::OnlineBackup;
|
||||
use crate::CoreAction;
|
||||
|
@ -28,8 +28,16 @@ impl IntervalActor {
|
|||
) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
let mut inter = interval(Duration::from_secs(PURGE_FREQUENCY));
|
||||
inter.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
|
||||
loop {
|
||||
server
|
||||
.handle_purgetombstoneevent(PurgeTombstoneEvent::new())
|
||||
.await;
|
||||
server
|
||||
.handle_purgerecycledevent(PurgeRecycledEvent::new())
|
||||
.await;
|
||||
|
||||
tokio::select! {
|
||||
Ok(action) = rx.recv() => {
|
||||
match action {
|
||||
|
@ -37,12 +45,8 @@ impl IntervalActor {
|
|||
}
|
||||
}
|
||||
_ = inter.tick() => {
|
||||
server
|
||||
.handle_purgetombstoneevent(PurgeTombstoneEvent::new())
|
||||
.await;
|
||||
server
|
||||
.handle_purgerecycledevent(PurgeRecycledEvent::new())
|
||||
.await;
|
||||
// Next iter.
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,8 +96,9 @@ async fn setup_qs_idms(
|
|||
schema: Schema,
|
||||
config: &Configuration,
|
||||
) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
|
||||
let curtime = duration_from_epoch_now();
|
||||
// Create a query_server implementation
|
||||
let query_server = QueryServer::new(be, schema, config.domain.clone())?;
|
||||
let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
|
||||
|
||||
// TODO #62: Should the IDM parts be broken out to the IdmServer?
|
||||
// What's important about this initial setup here is that it also triggers
|
||||
|
@ -107,9 +108,7 @@ async fn setup_qs_idms(
|
|||
// Now search for the schema itself, and validate that the system
|
||||
// in memory matches the BE on disk, and that it's syntactically correct.
|
||||
// Write it out if changes are needed.
|
||||
query_server
|
||||
.initialise_helper(duration_from_epoch_now())
|
||||
.await?;
|
||||
query_server.initialise_helper(curtime).await?;
|
||||
|
||||
// We generate a SINGLE idms only!
|
||||
|
||||
|
@ -124,8 +123,9 @@ async fn setup_qs(
|
|||
schema: Schema,
|
||||
config: &Configuration,
|
||||
) -> Result<QueryServer, OperationError> {
|
||||
let curtime = duration_from_epoch_now();
|
||||
// Create a query_server implementation
|
||||
let query_server = QueryServer::new(be, schema, config.domain.clone())?;
|
||||
let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
|
||||
|
||||
// TODO #62: Should the IDM parts be broken out to the IdmServer?
|
||||
// What's important about this initial setup here is that it also triggers
|
||||
|
@ -135,9 +135,7 @@ async fn setup_qs(
|
|||
// Now search for the schema itself, and validate that the system
|
||||
// in memory matches the BE on disk, and that it's syntactically correct.
|
||||
// Write it out if changes are needed.
|
||||
query_server
|
||||
.initialise_helper(duration_from_epoch_now())
|
||||
.await?;
|
||||
query_server.initialise_helper(curtime).await?;
|
||||
|
||||
Ok(query_server)
|
||||
}
|
||||
|
@ -582,6 +580,7 @@ pub async fn domain_rename_core(config: &Configuration) {
|
|||
}
|
||||
|
||||
pub async fn verify_server_core(config: &Configuration) {
|
||||
let curtime = duration_from_epoch_now();
|
||||
// setup the qs - without initialise!
|
||||
let schema_mem = match Schema::new() {
|
||||
Ok(sc) => sc,
|
||||
|
@ -599,7 +598,7 @@ pub async fn verify_server_core(config: &Configuration) {
|
|||
}
|
||||
};
|
||||
|
||||
let server = match QueryServer::new(be, schema_mem, config.domain.clone()) {
|
||||
let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
|
||||
Ok(qs) => qs,
|
||||
Err(err) => {
|
||||
error!(?err, "Failed to setup query server");
|
||||
|
|
|
@ -1914,7 +1914,7 @@ impl<'a> BackendWriteTransaction<'a> {
|
|||
})
|
||||
}
|
||||
|
||||
fn reset_db_s_uuid(&mut self) -> Result<Uuid, OperationError> {
|
||||
pub(crate) fn reset_db_s_uuid(&mut self) -> Result<Uuid, OperationError> {
|
||||
// The value is missing. Generate a new one and store it.
|
||||
let nsid = Uuid::new_v4();
|
||||
self.get_idlayer().write_db_s_uuid(nsid).map_err(|err| {
|
||||
|
|
|
@ -56,10 +56,10 @@ pub const DOMAIN_TGT_LEVEL: DomainVersion = DOMAIN_LEVEL_5;
|
|||
// The maximum supported domain functional level
|
||||
pub const DOMAIN_MAX_LEVEL: DomainVersion = DOMAIN_LEVEL_5;
|
||||
|
||||
// On test builds, define to 60 seconds
|
||||
// On test builds define to 60 seconds
|
||||
#[cfg(test)]
|
||||
pub const PURGE_FREQUENCY: u64 = 60;
|
||||
// For production, 10 minutes.
|
||||
// For production 10 minutes.
|
||||
#[cfg(not(test))]
|
||||
pub const PURGE_FREQUENCY: u64 = 600;
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ macro_rules! setup_test {
|
|||
let be = Backend::new(BackendConfig::new_test("main"), idxmeta, false)
|
||||
.expect("Failed to init BE");
|
||||
|
||||
let qs = QueryServer::new(be, schema_outer, "example.com".to_string())
|
||||
let qs = QueryServer::new(be, schema_outer, "example.com".to_string(), Duration::ZERO)
|
||||
.expect("Failed to setup Query Server");
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
|
@ -38,7 +38,7 @@ macro_rules! setup_test {
|
|||
let be = Backend::new(BackendConfig::new_test("main"), idxmeta, false)
|
||||
.expect("Failed to init BE");
|
||||
|
||||
let qs = QueryServer::new(be, schema_outer, "example.com".to_string())
|
||||
let qs = QueryServer::new(be, schema_outer, "example.com".to_string(), Duration::ZERO)
|
||||
.expect("Failed to setup Query Server");
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
|
|
|
@ -10,13 +10,13 @@ impl<'a> QueryServerWriteTransaction<'a> {
|
|||
fn consumer_incremental_apply_entries(
|
||||
&mut self,
|
||||
ctx_entries: &[ReplIncrementalEntryV1],
|
||||
) -> Result<(), OperationError> {
|
||||
) -> Result<bool, OperationError> {
|
||||
// trace!(?ctx_entries);
|
||||
|
||||
// No action needed for this if the entries are empty.
|
||||
if ctx_entries.is_empty() {
|
||||
debug!("No entries to act upon");
|
||||
return Ok(());
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -250,7 +250,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
|
|||
changed_sync_agreement = ?self.changed_sync_agreement
|
||||
);
|
||||
|
||||
Ok(())
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub fn consumer_apply_changes(
|
||||
|
@ -332,43 +332,46 @@ impl<'a> QueryServerWriteTransaction<'a> {
|
|||
})?;
|
||||
|
||||
// == ⚠️ Below this point we begin to make changes! ==
|
||||
debug!(
|
||||
info!(
|
||||
"Proceeding to apply incremental from domain {:?} at level {}",
|
||||
ctx_domain_uuid, ctx_domain_version
|
||||
);
|
||||
|
||||
debug!(?ctx_ranges);
|
||||
|
||||
debug!("Applying schema entries");
|
||||
// Apply the schema entries first.
|
||||
self.consumer_incremental_apply_entries(ctx_schema_entries)
|
||||
let schema_changed = self
|
||||
.consumer_incremental_apply_entries(ctx_schema_entries)
|
||||
.map_err(|e| {
|
||||
error!("Failed to apply incremental schema entries");
|
||||
e
|
||||
})?;
|
||||
|
||||
// We need to reload schema now!
|
||||
self.reload_schema().map_err(|e| {
|
||||
error!("Failed to reload schema");
|
||||
e
|
||||
})?;
|
||||
if schema_changed {
|
||||
// We need to reload schema now!
|
||||
self.reload_schema().map_err(|e| {
|
||||
error!("Failed to reload schema");
|
||||
e
|
||||
})?;
|
||||
}
|
||||
|
||||
debug!("Applying meta entries");
|
||||
// Apply meta entries now.
|
||||
self.consumer_incremental_apply_entries(ctx_meta_entries)
|
||||
let meta_changed = self
|
||||
.consumer_incremental_apply_entries(ctx_meta_entries)
|
||||
.map_err(|e| {
|
||||
error!("Failed to apply incremental schema entries");
|
||||
e
|
||||
})?;
|
||||
|
||||
// This is re-loaded in case the domain name changed on the remote
|
||||
self.reload_domain_info().map_err(|e| {
|
||||
error!("Failed to reload domain info");
|
||||
e
|
||||
})?;
|
||||
|
||||
// Trigger for post commit hooks. Should we detect better in the entry
|
||||
// apply phases?
|
||||
// self.changed_schema = true;
|
||||
// self.changed_domain = true;
|
||||
if meta_changed {
|
||||
self.reload_domain_info().map_err(|e| {
|
||||
error!("Failed to reload domain info");
|
||||
e
|
||||
})?;
|
||||
}
|
||||
|
||||
debug!("Applying all context entries");
|
||||
// Update all other entries now.
|
||||
|
@ -384,10 +387,12 @@ impl<'a> QueryServerWriteTransaction<'a> {
|
|||
// this is because the supplier will already be sending us everything that
|
||||
// was just migrated. As a result, we only need to apply the migrations to entries
|
||||
// that were not on the supplier, and therefore need updates here.
|
||||
self.reload_domain_info_version().map_err(|e| {
|
||||
error!("Failed to reload domain info version");
|
||||
e
|
||||
})?;
|
||||
if meta_changed {
|
||||
self.reload_domain_info_version().map_err(|e| {
|
||||
error!("Failed to reload domain info version");
|
||||
e
|
||||
})?;
|
||||
}
|
||||
|
||||
// Finally, confirm that the ranges that we have added match the ranges from our
|
||||
// context. Note that we get this in a writeable form!
|
||||
|
@ -519,9 +524,10 @@ impl<'a> QueryServerWriteTransaction<'a> {
|
|||
e
|
||||
})?;
|
||||
|
||||
// Do we need to reset our s_uuid to avoid potential RUV conflicts?
|
||||
// - I don't think so, since the refresh is supplying and rebuilding
|
||||
// our local state.
|
||||
// We need to reset our server uuid now. This is so that any other servers
|
||||
// which had our former server_uuid in their RUV, is able to start to age it
|
||||
// out and trim it.
|
||||
self.reset_server_uuid()?;
|
||||
|
||||
// Delete all entries - *proper delete, not just tombstone!*
|
||||
|
||||
|
|
|
@ -57,13 +57,19 @@ pub(crate) enum RangeDiffStatus {
|
|||
lag_range: BTreeMap<Uuid, ReplCidRange>,
|
||||
adv_range: BTreeMap<Uuid, ReplCidRange>,
|
||||
},
|
||||
/// No RUV Overlap - The consumer has likely desynchronised and no longer has
|
||||
/// common overlap with it's RUV to ours. This can indicate it has trimmed
|
||||
/// content we may have, or may have become part of a split brain situation.
|
||||
/// For replication to proceed, there must be *at least* one common overlapping
|
||||
/// point in the RUV.
|
||||
NoRUVOverlap,
|
||||
}
|
||||
|
||||
impl ReplicationUpdateVector {
|
||||
pub fn write(&self) -> ReplicationUpdateVectorWriteTransaction<'_> {
|
||||
ReplicationUpdateVectorWriteTransaction {
|
||||
// Need to take the write first.
|
||||
cleared: false,
|
||||
// Need to take the write first, then the read to guarantee ordering.
|
||||
added: Some(BTreeSet::default()),
|
||||
data: self.data.write(),
|
||||
data_pre: self.data.read(),
|
||||
ranged: self.ranged.write(),
|
||||
|
@ -89,6 +95,7 @@ impl ReplicationUpdateVector {
|
|||
|
||||
let mut consumer_lagging = false;
|
||||
let mut supplier_lagging = false;
|
||||
let mut valid_content_overlap = false;
|
||||
|
||||
// We need to look at each uuid in the *supplier* and assert if they are present
|
||||
// on the *consumer*.
|
||||
|
@ -99,6 +106,10 @@ impl ReplicationUpdateVector {
|
|||
for (supplier_s_uuid, supplier_cid_range) in supplier_range.iter() {
|
||||
match consumer_range.get(supplier_s_uuid) {
|
||||
Some(consumer_cid_range) => {
|
||||
// We have the same server uuid in our RUV's so some content overlap
|
||||
// must exist (or has existed);
|
||||
valid_content_overlap = true;
|
||||
|
||||
// The two windows just have to overlap. If they over lap
|
||||
// meaning that consumer max > supplier min, then if supplier
|
||||
// max > consumer max, then the range between consumer max
|
||||
|
@ -169,7 +180,6 @@ impl ReplicationUpdateVector {
|
|||
},
|
||||
);
|
||||
}
|
||||
// else ...
|
||||
//
|
||||
// /-- The consumer has changes we don't have.
|
||||
// | So we don't need to supply
|
||||
|
@ -204,6 +214,10 @@ impl ReplicationUpdateVector {
|
|||
}
|
||||
}
|
||||
|
||||
if !valid_content_overlap {
|
||||
return RangeDiffStatus::NoRUVOverlap;
|
||||
}
|
||||
|
||||
match (consumer_lagging, supplier_lagging) {
|
||||
(false, false) => RangeDiffStatus::Ok(diff_range),
|
||||
(true, false) => RangeDiffStatus::Refresh { lag_range },
|
||||
|
@ -217,7 +231,7 @@ impl ReplicationUpdateVector {
|
|||
}
|
||||
|
||||
pub struct ReplicationUpdateVectorWriteTransaction<'a> {
|
||||
cleared: bool,
|
||||
added: Option<BTreeSet<Cid>>,
|
||||
data: BptreeMapWriteTxn<'a, Cid, IDLBitRange>,
|
||||
data_pre: BptreeMapReadTxn<'a, Cid, IDLBitRange>,
|
||||
ranged: BptreeMapWriteTxn<'a, Uuid, BTreeSet<Duration>>,
|
||||
|
@ -266,6 +280,40 @@ pub trait ReplicationUpdateVectorTransaction {
|
|||
}
|
||||
}
|
||||
|
||||
/// Return a filtered view of our RUV ranges. This acts similar to "trim" where any s_uuid
|
||||
/// where the max cid is less than trim_cid will be excluded from the view.
|
||||
fn filter_ruv_range(
|
||||
&self,
|
||||
trim_cid: &Cid,
|
||||
) -> Result<BTreeMap<Uuid, ReplCidRange>, OperationError> {
|
||||
self.range_snapshot()
|
||||
.iter()
|
||||
.filter_map(|(s_uuid, range)| match (range.first(), range.last()) {
|
||||
(Some(first), Some(last)) => {
|
||||
if last < &trim_cid.ts {
|
||||
None
|
||||
} else {
|
||||
Some(Ok((
|
||||
*s_uuid,
|
||||
ReplCidRange {
|
||||
ts_min: *first,
|
||||
ts_max: *last,
|
||||
},
|
||||
)))
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
error!(
|
||||
"invalid state for server uuid {:?}, no ranges present",
|
||||
s_uuid
|
||||
);
|
||||
Some(Err(OperationError::InvalidState))
|
||||
}
|
||||
})
|
||||
.collect::<Result<BTreeMap<_, _>, _>>()
|
||||
}
|
||||
|
||||
/// Return the complete set of RUV ranges present on this replica
|
||||
fn current_ruv_range(&self) -> Result<BTreeMap<Uuid, ReplCidRange>, OperationError> {
|
||||
self.range_snapshot()
|
||||
.iter()
|
||||
|
@ -515,7 +563,7 @@ impl<'a> ReplicationUpdateVectorTransaction for ReplicationUpdateVectorReadTrans
|
|||
|
||||
impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
|
||||
pub fn clear(&mut self) {
|
||||
self.cleared = true;
|
||||
self.added = None;
|
||||
self.data.clear();
|
||||
self.ranged.clear();
|
||||
}
|
||||
|
@ -633,6 +681,7 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
|
|||
self.insert_change(&cid, IDLBitRange::default())?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -676,8 +725,11 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
|
|||
// Entries and their internal changestates are the "source of truth" for all changes
|
||||
// that have ever occurred and are stored on this server. So we use them to rebuild our RUV
|
||||
// here!
|
||||
let mut rebuild_ruv: BTreeMap<Cid, IDLBitRange> = BTreeMap::new();
|
||||
let mut rebuild_range: BTreeMap<Uuid, BTreeSet<Duration>> = BTreeMap::default();
|
||||
//
|
||||
// We only update RUV items where an anchor exists.
|
||||
|
||||
// let mut rebuild_ruv: BTreeMap<Cid, IDLBitRange> = BTreeMap::new();
|
||||
// let mut rebuild_range: BTreeMap<Uuid, BTreeSet<Duration>> = BTreeMap::default();
|
||||
|
||||
for entry in entries {
|
||||
// The DB id we need.
|
||||
|
@ -686,15 +738,19 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
|
|||
// We don't need the details of the change - only the cid of the
|
||||
// change that this entry was involved in.
|
||||
for cid in ecstate.cid_iter() {
|
||||
if let Some(idl) = rebuild_ruv.get_mut(cid) {
|
||||
// if let Some(idl) = rebuild_ruv.get_mut(cid) {
|
||||
if let Some(idl) = self.data.get_mut(cid) {
|
||||
// We can't guarantee id order, so we have to do this properly.
|
||||
idl.insert_id(eid);
|
||||
} else {
|
||||
let mut idl = IDLBitRange::new();
|
||||
idl.insert_id(eid);
|
||||
rebuild_ruv.insert(cid.clone(), idl);
|
||||
/*
|
||||
} else {
|
||||
let mut idl = IDLBitRange::new();
|
||||
idl.insert_id(eid);
|
||||
rebuild_ruv.insert(cid.clone(), idl);
|
||||
*/
|
||||
}
|
||||
|
||||
/*
|
||||
if let Some(server_range) = rebuild_range.get_mut(&cid.s_uuid) {
|
||||
server_range.insert(cid.ts);
|
||||
} else {
|
||||
|
@ -702,18 +758,21 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
|
|||
ts_range.insert(cid.ts);
|
||||
rebuild_range.insert(cid.s_uuid, ts_range);
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
// Finally, we need to do a cleanup/compact of the IDL's if possible.
|
||||
rebuild_ruv.iter_mut().for_each(|(_k, idl)| {
|
||||
self.data.range_mut(..).for_each(|(_k, idl)| {
|
||||
idl.maybe_compress();
|
||||
});
|
||||
|
||||
self.data.extend(rebuild_ruv);
|
||||
// self.data.extend(rebuild_ruv);
|
||||
|
||||
// Warning - you can't extend here because this is keyed by UUID. You need
|
||||
// to go through each key and then merge the sets.
|
||||
|
||||
/*
|
||||
rebuild_range.into_iter().for_each(|(s_uuid, ts_set)| {
|
||||
if let Some(ex_ts_set) = self.ranged.get_mut(&s_uuid) {
|
||||
ex_ts_set.extend(ts_set)
|
||||
|
@ -721,6 +780,7 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
|
|||
self.ranged.insert(s_uuid, ts_set);
|
||||
}
|
||||
});
|
||||
*/
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -743,6 +803,10 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
|
|||
self.ranged.insert(cid.s_uuid, range);
|
||||
}
|
||||
|
||||
if let Some(added) = &mut self.added {
|
||||
added.insert(cid.clone());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -859,30 +923,18 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
|
|||
match self.ranged.get_mut(&cid.s_uuid) {
|
||||
Some(server_range) => {
|
||||
// Remove returns a bool if the element WAS present.
|
||||
let last = match server_range.last() {
|
||||
Some(l) => *l,
|
||||
None => {
|
||||
error!("Impossible State - The RUV should not be empty");
|
||||
error!(ruv = ?self);
|
||||
error!(?cid);
|
||||
return Err(OperationError::InvalidState);
|
||||
}
|
||||
};
|
||||
|
||||
if cid.ts != last {
|
||||
if !server_range.remove(&cid.ts) {
|
||||
error!("Impossible State - The RUV is corrupted due to missing sid:ts pair in ranged index");
|
||||
error!(ruv = ?self);
|
||||
error!(?cid);
|
||||
return Err(OperationError::InvalidState);
|
||||
}
|
||||
} else {
|
||||
debug!("skip trimming maximum cid for s_uuid {}", cid.s_uuid);
|
||||
if !server_range.remove(&cid.ts) {
|
||||
error!("Impossible State - The RUV is corrupted due to missing sid:ts pair in ranged index");
|
||||
error!(ruv = ?self);
|
||||
error!(?cid);
|
||||
return Err(OperationError::InvalidState);
|
||||
}
|
||||
|
||||
if server_range.is_empty() {
|
||||
remove_suuid.push(cid.s_uuid);
|
||||
warn!(s_uuid = ?cid.s_uuid, "disconnected server detected - this will be removed!");
|
||||
} else {
|
||||
trace!(?server_range, "retaining server");
|
||||
}
|
||||
}
|
||||
None => {
|
||||
|
@ -910,36 +962,35 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
|
|||
Ok(idl)
|
||||
}
|
||||
|
||||
pub fn added(&self) -> impl Iterator<Item = Cid> + '_ {
|
||||
// Find the max from the previous dataset.
|
||||
let prev_bound = if self.cleared {
|
||||
pub fn added(&self) -> Box<dyn Iterator<Item = Cid> + '_> {
|
||||
if let Some(added) = self.added.as_ref() {
|
||||
// return what was added this txn. We previously would iterate
|
||||
// from data_pre.max() with data, but if an anchor was added that
|
||||
// pre-dated data_pre.max() it wouldn't be committed to the db ruv
|
||||
// (even though it was in the in memory ruv).
|
||||
Box::new(added.iter().map(|cid| {
|
||||
debug!(added_cid = ?cid);
|
||||
cid.clone()
|
||||
}))
|
||||
} else {
|
||||
// We have been cleared during this txn, so everything in data is
|
||||
// added.
|
||||
Unbounded
|
||||
} else if let Some((max, _)) = self.data_pre.last_key_value() {
|
||||
Excluded(max.clone())
|
||||
} else {
|
||||
// If empty, assume everything is new.
|
||||
Unbounded
|
||||
};
|
||||
|
||||
// Starting from the previous max, iterate through our data to find what
|
||||
// has been added.
|
||||
self.data.range((prev_bound, Unbounded)).map(|(cid, _)| {
|
||||
trace!(added_cid = ?cid);
|
||||
cid.clone()
|
||||
})
|
||||
Box::new(self.data.iter().map(|(cid, _)| {
|
||||
debug!(added_cid = ?cid);
|
||||
cid.clone()
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn removed(&self) -> impl Iterator<Item = Cid> + '_ {
|
||||
let prev_bound = if self.cleared {
|
||||
let prev_bound = if self.added.is_none() {
|
||||
// We have been cleared during this txn, so everything in pre is
|
||||
// removed.
|
||||
Unbounded
|
||||
} else if let Some((min, _)) = self.data.first_key_value() {
|
||||
Excluded(min.clone())
|
||||
} else {
|
||||
// If empty, assume everything is new.
|
||||
// If empty, assume everything is removed.
|
||||
Unbounded
|
||||
};
|
||||
|
||||
|
@ -948,7 +999,7 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
|
|||
self.data_pre
|
||||
.range((Unbounded, prev_bound))
|
||||
.map(|(cid, _)| {
|
||||
trace!(removed_cid = ?cid);
|
||||
debug!(removed_cid = ?cid);
|
||||
cid.clone()
|
||||
})
|
||||
}
|
||||
|
@ -977,12 +1028,12 @@ mod tests {
|
|||
let ctx_b = BTreeMap::default();
|
||||
|
||||
let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
|
||||
let expect = RangeDiffStatus::Ok(BTreeMap::default());
|
||||
let expect = RangeDiffStatus::NoRUVOverlap;
|
||||
assert_eq!(result, expect);
|
||||
|
||||
// Test the inverse.
|
||||
let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
|
||||
let expect = RangeDiffStatus::Ok(BTreeMap::default());
|
||||
let expect = RangeDiffStatus::NoRUVOverlap;
|
||||
assert_eq!(result, expect);
|
||||
}
|
||||
|
||||
|
@ -998,17 +1049,11 @@ mod tests {
|
|||
let ctx_b = BTreeMap::default();
|
||||
|
||||
let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
|
||||
let expect = RangeDiffStatus::Ok(BTreeMap::default());
|
||||
let expect = RangeDiffStatus::NoRUVOverlap;
|
||||
assert_eq!(result, expect);
|
||||
|
||||
let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
|
||||
let expect = RangeDiffStatus::Ok(btreemap!((
|
||||
UUID_A,
|
||||
ReplCidRange {
|
||||
ts_min: Duration::ZERO,
|
||||
ts_max: Duration::from_secs(3),
|
||||
}
|
||||
)));
|
||||
let expect = RangeDiffStatus::NoRUVOverlap;
|
||||
assert_eq!(result, expect);
|
||||
}
|
||||
|
||||
|
|
|
@ -105,9 +105,32 @@ impl<'a> QueryServerReadTransaction<'a> {
|
|||
return Ok(ReplIncrementalContext::DomainMismatch);
|
||||
}
|
||||
|
||||
// This is a reasonably tricky part of the code, because we are attempting to do a
|
||||
// distributed and async liveness check. What content has the consumer seen? What
|
||||
// could they have trimmed from their own RUV?
|
||||
//
|
||||
// Since tombstone purging always creates an anchor, then there are always "pings"
|
||||
// effectively going out of "empty" changes that drive the RUV forward. This assists us
|
||||
// to detect this situation.
|
||||
//
|
||||
// If a server has been replicating correctly, then it should have at least *some* overlap
|
||||
// with us since content has always advanced.
|
||||
//
|
||||
// If a server has "stalled" then it will have *no* overlap. This can manifest as a need
|
||||
// to supply all ranges as though they were new because the lagging consumer has trimmed out
|
||||
// all the old content.
|
||||
//
|
||||
// When a server is newly added it will have overlap because it will have refreshed from
|
||||
// another server.
|
||||
//
|
||||
// When a server is "trimmed" from the RUV, it no longer influences the overlap decision
|
||||
// because the other servers will have continued to advance.
|
||||
|
||||
let trim_cid = self.trim_cid().clone();
|
||||
|
||||
let supplier_ruv = self.get_be_txn().get_ruv();
|
||||
|
||||
let our_ranges = supplier_ruv.current_ruv_range().map_err(|e| {
|
||||
let our_ranges = supplier_ruv.filter_ruv_range(&trim_cid).map_err(|e| {
|
||||
error!(err = ?e, "Unable to access supplier RUV range");
|
||||
e
|
||||
})?;
|
||||
|
@ -146,6 +169,12 @@ impl<'a> QueryServerReadTransaction<'a> {
|
|||
debug!(supplier_ranges = ?our_ranges);
|
||||
return Ok(ReplIncrementalContext::UnwillingToSupply);
|
||||
}
|
||||
RangeDiffStatus::NoRUVOverlap => {
|
||||
error!("Replication Critical - Consumers RUV has desynchronsied and diverged! This must be immediately investigated!");
|
||||
debug!(consumer_ranges = ?ctx_ranges);
|
||||
debug!(supplier_ranges = ?our_ranges);
|
||||
return Ok(ReplIncrementalContext::UnwillingToSupply);
|
||||
}
|
||||
};
|
||||
|
||||
debug!("these ranges will be supplied");
|
||||
|
@ -238,11 +267,13 @@ impl<'a> QueryServerReadTransaction<'a> {
|
|||
let domain_version = self.d_info.d_vers;
|
||||
let domain_uuid = self.d_info.d_uuid;
|
||||
|
||||
let trim_cid = self.trim_cid().clone();
|
||||
|
||||
// What is the set of data we are providing?
|
||||
let ranges = self
|
||||
.get_be_txn()
|
||||
.get_ruv()
|
||||
.current_ruv_range()
|
||||
.filter_ruv_range(&trim_cid)
|
||||
.map_err(|e| {
|
||||
error!(err = ?e, "Unable to access supplier RUV range");
|
||||
e
|
||||
|
|
|
@ -1518,7 +1518,7 @@ async fn test_repl_increment_create_tombstone_conflict(
|
|||
server_b_txn.commit().expect("Failed to commit");
|
||||
|
||||
// Get a new time.
|
||||
let ct = duration_from_epoch_now();
|
||||
let ct = ct + Duration::from_secs(1);
|
||||
let mut server_a_txn = server_a.write(ct).await;
|
||||
assert!(server_a_txn.internal_create(vec![e_init.clone(),]).is_ok());
|
||||
// Immediately send it to the shadow realm
|
||||
|
@ -1531,7 +1531,7 @@ async fn test_repl_increment_create_tombstone_conflict(
|
|||
assert!(server_b_txn.purge_recycled().is_ok());
|
||||
server_b_txn.commit().expect("Failed to commit");
|
||||
|
||||
let ct = ct + Duration::from_secs(RECYCLEBIN_MAX_AGE + 2);
|
||||
let ct = ct + Duration::from_secs(1);
|
||||
let mut server_a_txn = server_a.write(ct).await;
|
||||
assert!(server_a_txn.purge_recycled().is_ok());
|
||||
server_a_txn.commit().expect("Failed to commit");
|
||||
|
@ -1539,8 +1539,9 @@ async fn test_repl_increment_create_tombstone_conflict(
|
|||
// Since B was tombstoned first, it is the tombstone that should persist.
|
||||
|
||||
// This means A -> B - no change on B, it's the persisting tombstone.
|
||||
let ct = ct + Duration::from_secs(1);
|
||||
let mut server_a_txn = server_a.read().await;
|
||||
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
|
||||
let mut server_b_txn = server_b.write(ct).await;
|
||||
|
||||
trace!("========================================");
|
||||
repl_incremental(&mut server_a_txn, &mut server_b_txn);
|
||||
|
@ -1913,13 +1914,15 @@ async fn test_repl_increment_consumer_ruv_trim_past_valid(
|
|||
.supplier_provide_changes(a_ruv_range)
|
||||
.expect("Unable to generate supplier changes");
|
||||
|
||||
assert!(matches!(changes, ReplIncrementalContext::RefreshRequired));
|
||||
trace!(?changes);
|
||||
|
||||
assert!(matches!(changes, ReplIncrementalContext::UnwillingToSupply));
|
||||
|
||||
let result = server_a_txn
|
||||
.consumer_apply_changes(&changes)
|
||||
.expect("Unable to apply changes to consumer.");
|
||||
|
||||
assert!(matches!(result, ConsumerState::RefreshRequired));
|
||||
assert!(matches!(result, ConsumerState::Ok));
|
||||
|
||||
drop(server_a_txn);
|
||||
drop(server_b_txn);
|
||||
|
@ -1938,6 +1941,8 @@ async fn test_repl_increment_consumer_ruv_trim_past_valid(
|
|||
.supplier_provide_changes(b_ruv_range)
|
||||
.expect("Unable to generate supplier changes");
|
||||
|
||||
trace!(?changes);
|
||||
|
||||
assert!(matches!(changes, ReplIncrementalContext::UnwillingToSupply));
|
||||
|
||||
let result = server_b_txn
|
||||
|
@ -3374,6 +3379,354 @@ async fn test_repl_increment_session_new(server_a: &QueryServer, server_b: &Quer
|
|||
drop(server_b_txn);
|
||||
}
|
||||
|
||||
/// Test the process of refreshing a consumer once it has entered a lag state.
|
||||
///
|
||||
/// It was noticed in a production instance that it was possible for a consumer
|
||||
/// to enter an unrecoverable state where replication could no longer proceed.
|
||||
///
|
||||
/// We have server A and B. We will focus on A and it's RUV state.
|
||||
///
|
||||
/// - A accepts a change setting it's RUV to A: 1
|
||||
/// - A replicates to B, setting B ruv to A: 1
|
||||
/// - Now A begins to lag and exceeds the changelog max age.
|
||||
/// - At this point incremental replication will cease to function.
|
||||
/// - The correct response (and automatically occurs) is that A would be
|
||||
/// refreshed from B. This then sets A ruv to A:1 - which is significantly
|
||||
/// behind the changelog max age.
|
||||
/// - Then A does a RUV trim. This set's it's RUV to A: X where X is > 1 + CL max.
|
||||
/// - On next replication to B, the replication stops as now "B" appears to be
|
||||
/// lagging since there is no overlap of it's RUV window to A.
|
||||
///
|
||||
/// The resolution in this case is two-fold.
|
||||
/// On a server refresh, the server-replication-id must be reset and regenerated. This
|
||||
/// ensures that any RUV state to a server is now fresh and unique
|
||||
///
|
||||
/// Second, to prevent tainting the RUV with outdated information, we need to stop it
|
||||
/// propogating when consumed. At the end of each consumption, the RUV should be trimmed
|
||||
/// if and only if entries exist in it that exceed the CL max. It is only trimmed conditionally
|
||||
/// to prevent infinite replication loops since a trim implies the creation of a new anchor.
|
||||
|
||||
#[qs_pair_test]
|
||||
async fn test_repl_increment_consumer_lagging_refresh(
|
||||
server_a: &QueryServer,
|
||||
server_b: &QueryServer,
|
||||
) {
|
||||
let ct = duration_from_epoch_now();
|
||||
|
||||
let mut server_a_txn = server_a.write(ct).await;
|
||||
let mut server_b_txn = server_b.read().await;
|
||||
|
||||
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
|
||||
.and_then(|_| server_a_txn.commit())
|
||||
.is_ok());
|
||||
drop(server_b_txn);
|
||||
|
||||
// - A accepts a change setting it's RUV to A: 1
|
||||
let mut server_a_txn = server_a.write(ct).await;
|
||||
let t_uuid = Uuid::new_v4();
|
||||
assert!(server_a_txn
|
||||
.internal_create(vec![entry_init!(
|
||||
(Attribute::Class, EntryClass::Object.to_value()),
|
||||
(Attribute::Class, EntryClass::Account.to_value()),
|
||||
(Attribute::Class, EntryClass::Person.to_value()),
|
||||
(Attribute::Name, Value::new_iname("testperson1")),
|
||||
(Attribute::Uuid, Value::Uuid(t_uuid)),
|
||||
(Attribute::Description, Value::new_utf8s("testperson1")),
|
||||
(Attribute::DisplayName, Value::new_utf8s("testperson1"))
|
||||
),])
|
||||
.is_ok());
|
||||
|
||||
// Take a copy of the CID here for it's s_uuid - this allows us to
|
||||
// validate later that the s_uuid is rotated, and trimmed from the
|
||||
// RUV.
|
||||
let server_a_initial_uuid = server_a_txn.get_server_uuid();
|
||||
|
||||
server_a_txn.commit().expect("Failed to commit");
|
||||
|
||||
// - A replicates to B, setting B ruv to A: 1
|
||||
let mut server_a_txn = server_a.read().await;
|
||||
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
|
||||
|
||||
repl_incremental(&mut server_a_txn, &mut server_b_txn);
|
||||
|
||||
let e1 = server_a_txn
|
||||
.internal_search_all_uuid(t_uuid)
|
||||
.expect("Unable to access new entry.");
|
||||
let e2 = server_b_txn
|
||||
.internal_search_all_uuid(t_uuid)
|
||||
.expect("Unable to access entry.");
|
||||
|
||||
assert!(e1.get_last_changed() == e2.get_last_changed());
|
||||
|
||||
server_b_txn.commit().expect("Failed to commit");
|
||||
drop(server_a_txn);
|
||||
|
||||
// - B working properly, creates an update within the max win
|
||||
let ct_half = ct + Duration::from_secs(CHANGELOG_MAX_AGE / 2);
|
||||
|
||||
let mut server_b_txn = server_b.write(ct_half).await;
|
||||
assert!(server_b_txn.purge_tombstones().is_ok());
|
||||
server_b_txn.commit().expect("Failed to commit");
|
||||
|
||||
// - Now A begins to lag and exceeds the changelog max age.
|
||||
let ct = ct + Duration::from_secs(CHANGELOG_MAX_AGE + 1);
|
||||
|
||||
trace!("========================================");
|
||||
// Purge tombstones - this triggers a write anchor to be created
|
||||
// on both servers, and it will also trim the old values from the ruv.
|
||||
let mut server_a_txn = server_a.write(ct).await;
|
||||
assert!(server_a_txn.purge_tombstones().is_ok());
|
||||
server_a_txn.commit().expect("Failed to commit");
|
||||
|
||||
let mut server_b_txn = server_b.write(ct).await;
|
||||
assert!(server_b_txn.purge_tombstones().is_ok());
|
||||
server_b_txn.commit().expect("Failed to commit");
|
||||
|
||||
// - At this point incremental replication will cease to function in either
|
||||
// direction.
|
||||
let ct = ct + Duration::from_secs(1);
|
||||
|
||||
let mut server_a_txn = server_a.write(ct).await;
|
||||
let mut server_b_txn = server_b.read().await;
|
||||
|
||||
// The ruvs must be different
|
||||
let a_ruv_range = server_a_txn
|
||||
.get_be_txn()
|
||||
.get_ruv()
|
||||
.current_ruv_range()
|
||||
.expect("Failed to get RUV range A");
|
||||
let b_ruv_range = server_b_txn
|
||||
.get_be_txn()
|
||||
.get_ruv()
|
||||
.current_ruv_range()
|
||||
.expect("Failed to get RUV range B");
|
||||
|
||||
trace!(?a_ruv_range);
|
||||
trace!(?b_ruv_range);
|
||||
assert!(a_ruv_range != b_ruv_range);
|
||||
|
||||
let a_ruv_range = server_a_txn
|
||||
.consumer_get_state()
|
||||
.expect("Unable to access RUV range");
|
||||
|
||||
let changes = server_b_txn
|
||||
.supplier_provide_changes(a_ruv_range)
|
||||
.expect("Unable to generate supplier changes");
|
||||
|
||||
trace!(?changes);
|
||||
assert!(matches!(changes, ReplIncrementalContext::UnwillingToSupply));
|
||||
|
||||
drop(server_a_txn);
|
||||
drop(server_b_txn);
|
||||
|
||||
// - The correct response (and automatically occurs) is that A would be
|
||||
// refreshed from B. This then sets A ruv to A:1 - which is significantly
|
||||
// behind the changelog max age.
|
||||
let ct = ct + Duration::from_secs(1);
|
||||
|
||||
let mut server_a_txn = server_a.write(ct).await;
|
||||
let mut server_b_txn = server_b.read().await;
|
||||
|
||||
// First, build the refresh context.
|
||||
let refresh_context = server_b_txn
|
||||
.supplier_provide_refresh()
|
||||
.expect("Unable to supply refresh");
|
||||
|
||||
// Apply it to the server
|
||||
server_a_txn
|
||||
.consumer_apply_refresh(&refresh_context)
|
||||
.expect("Unable to apply refresh");
|
||||
|
||||
// Need same d_uuid
|
||||
assert_eq!(
|
||||
server_b_txn.get_domain_uuid(),
|
||||
server_a_txn.get_domain_uuid()
|
||||
);
|
||||
|
||||
// Assert that the server's repl uuid was rotated as part of the refresh.
|
||||
let server_a_rotated_uuid = server_a_txn.get_server_uuid();
|
||||
assert_ne!(server_a_initial_uuid, server_a_rotated_uuid);
|
||||
|
||||
// Ruvs are the same now
|
||||
let a_ruv_range = server_a_txn
|
||||
.get_be_txn()
|
||||
.get_ruv()
|
||||
.current_ruv_range()
|
||||
.expect("Failed to get RUV range A");
|
||||
let b_ruv_range = server_b_txn
|
||||
.get_be_txn()
|
||||
.get_ruv()
|
||||
.current_ruv_range()
|
||||
.expect("Failed to get RUV range B");
|
||||
|
||||
trace!(?a_ruv_range);
|
||||
trace!(?b_ruv_range);
|
||||
|
||||
assert!(server_a_txn.commit().is_ok());
|
||||
drop(server_b_txn);
|
||||
|
||||
// - Then A does a RUV trim. This set's it's RUV to A: X where X is > 1 + CL max.
|
||||
|
||||
let mut server_a_txn = server_a.write(ct).await;
|
||||
assert!(server_a_txn.purge_tombstones().is_ok());
|
||||
|
||||
let a_ruv_range = server_a_txn
|
||||
.get_be_txn()
|
||||
.get_ruv()
|
||||
.current_ruv_range()
|
||||
.expect("Failed to get RUV range A");
|
||||
trace!(?a_ruv_range);
|
||||
|
||||
server_a_txn.commit().expect("Failed to commit");
|
||||
|
||||
// ERROR CASE: On next replication to B, the replication stops as now "B"
|
||||
// appears to be lagging since there is no overlap of it's RUV
|
||||
// window to A.
|
||||
// EXPECTED: replication proceeds as usual as consumer was refreshed and should
|
||||
// be in sync now!
|
||||
|
||||
let ct = ct + Duration::from_secs(1);
|
||||
|
||||
let mut server_a_txn = server_a.write(ct).await;
|
||||
let mut server_b_txn = server_b.read().await;
|
||||
|
||||
repl_incremental(&mut server_b_txn, &mut server_a_txn);
|
||||
|
||||
let e1 = server_a_txn
|
||||
.internal_search_all_uuid(t_uuid)
|
||||
.expect("Unable to access new entry.");
|
||||
let e2 = server_b_txn
|
||||
.internal_search_all_uuid(t_uuid)
|
||||
.expect("Unable to access entry.");
|
||||
|
||||
assert!(e1.get_last_changed() == e2.get_last_changed());
|
||||
|
||||
server_a_txn.commit().expect("Failed to commit");
|
||||
drop(server_b_txn);
|
||||
|
||||
let mut server_a_txn = server_a.read().await;
|
||||
let mut server_b_txn = server_b.write(ct).await;
|
||||
|
||||
repl_incremental(&mut server_a_txn, &mut server_b_txn);
|
||||
|
||||
let e1 = server_a_txn
|
||||
.internal_search_all_uuid(t_uuid)
|
||||
.expect("Unable to access new entry.");
|
||||
let e2 = server_b_txn
|
||||
.internal_search_all_uuid(t_uuid)
|
||||
.expect("Unable to access entry.");
|
||||
|
||||
assert!(e1.get_last_changed() == e2.get_last_changed());
|
||||
|
||||
server_b_txn.commit().expect("Failed to commit");
|
||||
drop(server_a_txn);
|
||||
|
||||
// Now we run the incremental replication in a loop to trim out the initial server uuid.
|
||||
|
||||
let mut ct = ct;
|
||||
let changelog_quarter_life = Duration::from_secs(CHANGELOG_MAX_AGE / 4);
|
||||
let one_second = Duration::from_secs(1);
|
||||
|
||||
for i in 0..8 {
|
||||
trace!("========================================");
|
||||
trace!("repl iteration {}", i);
|
||||
// Purge tombstones.
|
||||
let mut server_a_txn = server_a.write(ct).await;
|
||||
assert!(server_a_txn.purge_tombstones().is_ok());
|
||||
server_a_txn.commit().expect("Failed to commit");
|
||||
|
||||
ct += one_second;
|
||||
|
||||
let mut server_b_txn = server_b.write(ct).await;
|
||||
assert!(server_b_txn.purge_tombstones().is_ok());
|
||||
server_b_txn.commit().expect("Failed to commit");
|
||||
|
||||
ct += one_second;
|
||||
|
||||
// Now check incremental in both directions. Should show *no* changes
|
||||
// needed (rather than an error/lagging).
|
||||
let mut server_a_txn = server_a.write(ct).await;
|
||||
let mut server_b_txn = server_b.read().await;
|
||||
|
||||
let a_ruv_range = server_a_txn
|
||||
.consumer_get_state()
|
||||
.expect("Unable to access RUV range");
|
||||
|
||||
trace!(?a_ruv_range);
|
||||
|
||||
let changes = server_b_txn
|
||||
.supplier_provide_changes(a_ruv_range)
|
||||
.expect("Unable to generate supplier changes");
|
||||
|
||||
assert!(matches!(changes, ReplIncrementalContext::V1 { .. }));
|
||||
|
||||
let result = server_a_txn
|
||||
.consumer_apply_changes(&changes)
|
||||
.expect("Unable to apply changes to consumer.");
|
||||
|
||||
assert!(matches!(result, ConsumerState::Ok));
|
||||
|
||||
server_a_txn.commit().expect("Failed to commit");
|
||||
drop(server_b_txn);
|
||||
|
||||
ct += one_second;
|
||||
|
||||
// Reverse it!
|
||||
let mut server_a_txn = server_a.read().await;
|
||||
let mut server_b_txn = server_b.write(ct).await;
|
||||
|
||||
let b_ruv_range = server_b_txn
|
||||
.consumer_get_state()
|
||||
.expect("Unable to access RUV range");
|
||||
|
||||
trace!(?b_ruv_range);
|
||||
|
||||
let changes = server_a_txn
|
||||
.supplier_provide_changes(b_ruv_range)
|
||||
.expect("Unable to generate supplier changes");
|
||||
|
||||
assert!(matches!(changes, ReplIncrementalContext::V1 { .. }));
|
||||
|
||||
let result = server_b_txn
|
||||
.consumer_apply_changes(&changes)
|
||||
.expect("Unable to apply changes to consumer.");
|
||||
|
||||
assert!(matches!(result, ConsumerState::Ok));
|
||||
|
||||
drop(server_a_txn);
|
||||
server_b_txn.commit().expect("Failed to commit");
|
||||
|
||||
ct += changelog_quarter_life;
|
||||
}
|
||||
|
||||
// Finally, verify that the former server RUV has been trimmed out.
|
||||
let mut server_b_txn = server_b.read().await;
|
||||
let mut server_a_txn = server_a.read().await;
|
||||
|
||||
assert_ne!(server_a_initial_uuid, server_a_rotated_uuid);
|
||||
|
||||
// Ruvs are the same now
|
||||
let a_ruv_range = server_a_txn
|
||||
.get_be_txn()
|
||||
.get_ruv()
|
||||
.current_ruv_range()
|
||||
.expect("Failed to get RUV range A");
|
||||
|
||||
let b_ruv_range = server_b_txn
|
||||
.get_be_txn()
|
||||
.get_ruv()
|
||||
.current_ruv_range()
|
||||
.expect("Failed to get RUV range B");
|
||||
|
||||
trace!("TRACE MARKER");
|
||||
trace!(?server_a_initial_uuid, ?server_a_rotated_uuid);
|
||||
trace!(?a_ruv_range);
|
||||
trace!(?b_ruv_range);
|
||||
|
||||
assert!(!a_ruv_range.contains_key(&server_a_initial_uuid));
|
||||
assert!(!b_ruv_range.contains_key(&server_a_initial_uuid));
|
||||
}
|
||||
|
||||
// Test change of domain version over incremental.
|
||||
//
|
||||
// todo when I have domain version migrations working.
|
||||
|
|
|
@ -78,7 +78,6 @@ pub struct SystemConfig {
|
|||
#[derive(Clone)]
|
||||
pub struct QueryServer {
|
||||
phase: Arc<CowCell<ServerPhase>>,
|
||||
s_uuid: Uuid,
|
||||
pub(crate) d_info: Arc<CowCell<DomainInfo>>,
|
||||
system_config: Arc<CowCell<SystemConfig>>,
|
||||
be: Backend,
|
||||
|
@ -89,6 +88,7 @@ pub struct QueryServer {
|
|||
resolve_filter_cache:
|
||||
Arc<ARCache<(IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>>>,
|
||||
dyngroup_cache: Arc<CowCell<DynGroupCache>>,
|
||||
cid_max: Arc<CowCell<Cid>>,
|
||||
}
|
||||
|
||||
pub struct QueryServerReadTransaction<'a> {
|
||||
|
@ -102,6 +102,9 @@ pub struct QueryServerReadTransaction<'a> {
|
|||
_db_ticket: SemaphorePermit<'a>,
|
||||
resolve_filter_cache:
|
||||
ARCacheReadTxn<'a, (IdentityId, Filter<FilterValid>), Filter<FilterValidResolved>, ()>,
|
||||
// Future we may need this.
|
||||
// cid_max: CowCellReadTxn<Cid>,
|
||||
trim_cid: Cid,
|
||||
}
|
||||
|
||||
unsafe impl<'a> Sync for QueryServerReadTransaction<'a> {}
|
||||
|
@ -114,7 +117,7 @@ pub struct QueryServerWriteTransaction<'a> {
|
|||
d_info: CowCellWriteTxn<'a, DomainInfo>,
|
||||
system_config: CowCellWriteTxn<'a, SystemConfig>,
|
||||
curtime: Duration,
|
||||
cid: Cid,
|
||||
cid: CowCellWriteTxn<'a, Cid>,
|
||||
trim_cid: Cid,
|
||||
pub(crate) be_txn: BackendWriteTransaction<'a>,
|
||||
pub(crate) schema: SchemaWriteTransaction<'a>,
|
||||
|
@ -1014,6 +1017,10 @@ impl<'a> QueryServerTransaction<'a> for QueryServerReadTransaction<'a> {
|
|||
}
|
||||
|
||||
impl<'a> QueryServerReadTransaction<'a> {
|
||||
pub(crate) fn trim_cid(&self) -> &Cid {
|
||||
&self.trim_cid
|
||||
}
|
||||
|
||||
// Verify the data content of the server is as expected. This will probably
|
||||
// call various functions for validation, including possibly plugin
|
||||
// verifications.
|
||||
|
@ -1140,15 +1147,21 @@ impl<'a> QueryServerTransaction<'a> for QueryServerWriteTransaction<'a> {
|
|||
}
|
||||
|
||||
impl QueryServer {
|
||||
pub fn new(be: Backend, schema: Schema, domain_name: String) -> Result<Self, OperationError> {
|
||||
let (s_uuid, d_uuid) = {
|
||||
pub fn new(
|
||||
be: Backend,
|
||||
schema: Schema,
|
||||
domain_name: String,
|
||||
curtime: Duration,
|
||||
) -> Result<Self, OperationError> {
|
||||
let (s_uuid, d_uuid, ts_max) = {
|
||||
let mut wr = be.write()?;
|
||||
let s_uuid = wr.get_db_s_uuid()?;
|
||||
let d_uuid = wr.get_db_d_uuid()?;
|
||||
let ts_max = wr.get_db_ts_max(curtime)?;
|
||||
#[allow(clippy::expect_used)]
|
||||
wr.commit()
|
||||
.expect("Critical - unable to commit db_s_uuid or db_d_uuid");
|
||||
(s_uuid, d_uuid)
|
||||
(s_uuid, d_uuid, ts_max)
|
||||
};
|
||||
|
||||
let pool_size = be.get_pool_size();
|
||||
|
@ -1169,6 +1182,9 @@ impl QueryServer {
|
|||
d_ldap_allow_unix_pw_bind: false,
|
||||
}));
|
||||
|
||||
let cid = Cid::new_lamport(s_uuid, curtime, &ts_max);
|
||||
let cid_max = Arc::new(CowCell::new(cid));
|
||||
|
||||
// These default to empty, but they'll be populated shortly.
|
||||
let system_config = Arc::new(CowCell::new(SystemConfig::default()));
|
||||
|
||||
|
@ -1187,7 +1203,6 @@ impl QueryServer {
|
|||
|
||||
Ok(QueryServer {
|
||||
phase,
|
||||
s_uuid,
|
||||
d_info,
|
||||
system_config,
|
||||
be,
|
||||
|
@ -1197,6 +1212,7 @@ impl QueryServer {
|
|||
write_ticket: Arc::new(Semaphore::new(1)),
|
||||
resolve_filter_cache,
|
||||
dyngroup_cache,
|
||||
cid_max,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -1221,18 +1237,27 @@ impl QueryServer {
|
|||
.expect("unable to acquire db_ticket for qsr")
|
||||
};
|
||||
|
||||
let schema = self.schema.read();
|
||||
|
||||
let cid_max = self.cid_max.read();
|
||||
#[allow(clippy::expect_used)]
|
||||
let trim_cid = cid_max
|
||||
.sub_secs(CHANGELOG_MAX_AGE)
|
||||
.expect("unable to generate trim cid");
|
||||
|
||||
#[allow(clippy::expect_used)]
|
||||
QueryServerReadTransaction {
|
||||
be_txn: self
|
||||
.be
|
||||
.read()
|
||||
.expect("unable to create backend read transaction"),
|
||||
schema: self.schema.read(),
|
||||
schema,
|
||||
d_info: self.d_info.read(),
|
||||
system_config: self.system_config.read(),
|
||||
accesscontrols: self.accesscontrols.read(),
|
||||
_db_ticket: db_ticket,
|
||||
resolve_filter_cache: self.resolve_filter_cache.read(),
|
||||
trim_cid,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1265,7 +1290,7 @@ impl QueryServer {
|
|||
};
|
||||
|
||||
#[allow(clippy::expect_used)]
|
||||
let mut be_txn = self
|
||||
let be_txn = self
|
||||
.be
|
||||
.write()
|
||||
.expect("unable to create backend write transaction");
|
||||
|
@ -1275,11 +1300,10 @@ impl QueryServer {
|
|||
let system_config = self.system_config.write();
|
||||
let phase = self.phase.write();
|
||||
|
||||
#[allow(clippy::expect_used)]
|
||||
let ts_max = be_txn
|
||||
.get_db_ts_max(curtime)
|
||||
.expect("Unable to get db_ts_max");
|
||||
let cid = Cid::new_lamport(self.s_uuid, curtime, &ts_max);
|
||||
let mut cid = self.cid_max.write();
|
||||
// Update the cid now.
|
||||
*cid = Cid::new_lamport(cid.s_uuid, curtime, &cid.ts);
|
||||
|
||||
#[allow(clippy::expect_used)]
|
||||
let trim_cid = cid
|
||||
.sub_secs(CHANGELOG_MAX_AGE)
|
||||
|
@ -1336,6 +1360,19 @@ impl<'a> QueryServerWriteTransaction<'a> {
|
|||
self.cid.s_uuid
|
||||
}
|
||||
|
||||
pub(crate) fn reset_server_uuid(&mut self) -> Result<(), OperationError> {
|
||||
let s_uuid = self.be_txn.reset_db_s_uuid().map_err(|err| {
|
||||
error!(?err, "Failed to reset server replication uuid");
|
||||
err
|
||||
})?;
|
||||
|
||||
debug!(?s_uuid, "reset server replication uuid");
|
||||
|
||||
self.cid.s_uuid = s_uuid;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn get_curtime(&self) -> Duration {
|
||||
self.curtime
|
||||
}
|
||||
|
@ -1825,6 +1862,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
|
|||
// Write the cid to the db. If this fails, we can't assume replication
|
||||
// will be stable, so return if it fails.
|
||||
be_txn.set_db_ts_max(cid.ts)?;
|
||||
cid.commit();
|
||||
|
||||
// Point of no return - everything has been validated and reloaded.
|
||||
//
|
||||
|
|
|
@ -16,7 +16,7 @@ pub async fn setup_test() -> QueryServer {
|
|||
Backend::new(BackendConfig::new_test("main"), idxmeta, false).expect("Failed to init BE");
|
||||
|
||||
// Init is called via the proc macro
|
||||
QueryServer::new(be, schema_outer, "example.com".to_string())
|
||||
QueryServer::new(be, schema_outer, "example.com".to_string(), Duration::ZERO)
|
||||
.expect("Failed to setup Query Server")
|
||||
}
|
||||
|
||||
|
@ -35,7 +35,7 @@ pub async fn setup_pair_test() -> (QueryServer, QueryServer) {
|
|||
.expect("Failed to init BE");
|
||||
|
||||
// Init is called via the proc macro
|
||||
QueryServer::new(be, schema_outer, "example.com".to_string())
|
||||
QueryServer::new(be, schema_outer, "example.com".to_string(), Duration::ZERO)
|
||||
.expect("Failed to setup Query Server")
|
||||
};
|
||||
|
||||
|
@ -50,7 +50,7 @@ pub async fn setup_pair_test() -> (QueryServer, QueryServer) {
|
|||
.expect("Failed to init BE");
|
||||
|
||||
// Init is called via the proc macro
|
||||
QueryServer::new(be, schema_outer, "example.com".to_string())
|
||||
QueryServer::new(be, schema_outer, "example.com".to_string(), Duration::ZERO)
|
||||
.expect("Failed to setup Query Server")
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in a new issue