Minor improvements to incoming replication ()

This commit is contained in:
Firstyear 2023-11-02 11:21:21 +10:00 committed by GitHub
parent 85c2b0fd82
commit 9e5449a644
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 55 additions and 2 deletions
proto/src
server/lib/src

View file

@ -274,6 +274,7 @@ pub enum OperationError {
ReplInvalidRUVState,
ReplDomainLevelUnsatisfiable,
ReplDomainUuidMismatch,
ReplServerUuidSplitDataState,
TransactionAlreadyCommitted,
/// when you ask for a gid that's lower than a safe minimum
GidOverlapsSystemMin(u32),

View file

@ -98,3 +98,7 @@ pub const GRACE_WINDOW: Duration = Duration::from_secs(300);
/// How long access tokens should last. This is NOT the length
/// of the refresh token, which is bound to the issuing session.
pub const OAUTH2_ACCESS_TOKEN_EXPIRY: u32 = 15 * 60;
/// The amount of time a suppliers clock can be "ahead" before
/// we warn about possible clock synchronisation issues.
pub const REPL_SUPPLIER_ADVANCE_WINDOW: Duration = Duration::from_secs(600);

View file

@ -313,13 +313,22 @@ impl<'a> QueryServerWriteTransaction<'a> {
return Err(OperationError::ReplDomainUuidMismatch);
}
// Preflight checks of the incoming RUV to ensure it's in a good state.
let txn_cid = self.get_cid().clone();
let ruv = self.be_txn.get_ruv_write();
ruv.incremental_preflight_validate_ruv(ctx_ranges, &txn_cid)
.map_err(|e| {
error!("Incoming RUV failed preflight checks, unable to proceed.");
e
})?;
// == ⚠️ Below this point we begin to make changes! ==
debug!(
"Proceeding to apply incremental from domain {:?} at level {}",
ctx_domain_uuid, ctx_domain_version
);
// == ⚠️ Below this point we begin to make changes! ==
debug!("Applying schema entries");
// Apply the schema entries first.
self.consumer_incremental_apply_entries(ctx_schema_entries)

View file

@ -480,6 +480,45 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
self.ranged.clear();
}
pub(crate) fn incremental_preflight_validate_ruv(
&self,
ctx_ranges: &BTreeMap<Uuid, ReplCidRange>,
txn_cid: &Cid,
) -> Result<(), OperationError> {
// Check that the incoming ranges, for our servers id, do not exceed
// our servers max state. This can occur if we restore from a backup
// where the replication state is older than what our partners have,
// meaning that the context may have diverged in a way we can't then
// resolve.
if let Some(our_cid_range_max) = self
.ranged
.get(&txn_cid.s_uuid)
.and_then(|range| range.last().copied())
{
if let Some(incoming_cid_range) = ctx_ranges.get(&txn_cid.s_uuid) {
if incoming_cid_range.ts_max > our_cid_range_max {
error!("The incoming data contains changes matching this server's UUID, and those changes are newer than the local version. This can occur if the server was restored from a backup which was taken before sending out changes. Replication is unable to proceed as data corruption may occur. You must refresh this consumer immediately to continue.");
return Err(OperationError::ReplServerUuidSplitDataState);
}
}
}
let warn_time = txn_cid.ts + REPL_SUPPLIER_ADVANCE_WINDOW;
for (s_uuid, incoming_cid_range) in ctx_ranges.iter() {
if incoming_cid_range.ts_max > warn_time {
// TODO: This would be a great place for fault management to pick up this warning
warn!(
"Incoming changes from {:?} are further than {} seconds in the future.",
s_uuid,
REPL_SUPPLIER_ADVANCE_WINDOW.as_secs()
);
}
}
Ok(())
}
pub(crate) fn refresh_validate_ruv(
&self,
ctx_ranges: &BTreeMap<Uuid, ReplCidRange>,