From c779443454a171c2990781e499c2ef630be9d503 Mon Sep 17 00:00:00 2001 From: Firstyear Date: Sat, 5 Oct 2024 19:53:39 +1000 Subject: [PATCH] Fix Increment Replication Post Upgrade (#3089) --- server/lib/src/repl/consumer.rs | 44 +++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/server/lib/src/repl/consumer.rs b/server/lib/src/repl/consumer.rs index 175fc8d73..0fb8fae27 100644 --- a/server/lib/src/repl/consumer.rs +++ b/server/lib/src/repl/consumer.rs @@ -89,8 +89,32 @@ impl<'a> QueryServerWriteTransaction<'a> { // server for some conflicts, but we still need to know the UUID is IN the conflict // state for plugins. We also need to do this here before the conflict_update // set is consumed by later steps. - let mut conflict_uuids: BTreeSet<_> = - conflict_update.iter().map(|(_, e)| e.get_uuid()).collect(); + // + // ⚠️ When we upgrade between two nodes, migrations will often create *new* system + // entries on both nodes. Until both nodes upgrade they can't replicate. This creates + // a situation where both nodes have identical entry content for system entries, but + // the entries that were created now are conflicts. Normally this is okay, because the + // first node to upgrade will have it's entries persisted, and the other nodes duplicate + // entries will be removed. However, just through the nature of being in the conflict + // state, these entries are then added to the conflict_uuid set. This conflict_uuid set + // is used by referential integrity to remove uuids from references so that group + // memberships don't accidentally leak to recipients that were not intended. + // + // To avoid this, we remove any system entries from this conflict set, so that they are + // exempt from this conflict handling which allows upgrades to work. + let mut conflict_uuids: BTreeSet<_> = conflict_update + .iter() + .filter_map(|(_, e)| { + let u = e.get_uuid(); + if u >= DYNAMIC_RANGE_MINIMUM_UUID { + // It is a user created node, process the conflict within plugins + Some(u) + } else { + // It is in a system range, do not process this entry + None + } + }) + .collect(); // Filter out None from conflict_create let conflict_create: Vec = conflict_create.into_iter().flatten().collect(); @@ -128,10 +152,7 @@ impl<'a> QueryServerWriteTransaction<'a> { // inconsistent, then we fix it immediately. This hook remains for cases in future // where we may wish to have session cleanup performed for example. Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| { - admin_error!( - "Refresh operation failed (pre_repl_incremental plugin), {:?}", - e - ); + admin_error!("Operation failed (pre_repl_incremental plugin), {:?}", e); e })?; @@ -177,7 +198,7 @@ impl<'a> QueryServerWriteTransaction<'a> { ) .map_err(|e| { error!( - "Refresh operation failed (post_repl_incremental_conflict plugin), {:?}", + "Operation failed (post_repl_incremental_conflict plugin), {:?}", e ); e @@ -213,10 +234,7 @@ impl<'a> QueryServerWriteTransaction<'a> { &conflict_uuids, ) .map_err(|e| { - error!( - "Refresh operation failed (post_repl_incremental plugin), {:?}", - e - ); + error!("Operation failed (post_repl_incremental plugin), {:?}", e); e })?; @@ -403,7 +421,7 @@ impl<'a> QueryServerWriteTransaction<'a> { let meta_changed = self .consumer_incremental_apply_entries(ctx_meta_entries) .inspect_err(|err| { - error!(?err, "Failed to apply incremental schema entries"); + error!(?err, "Failed to apply incremental meta entries"); })?; // This is re-loaded in case the domain name changed on the remote @@ -420,7 +438,7 @@ impl<'a> QueryServerWriteTransaction<'a> { // Update all other entries now. self.consumer_incremental_apply_entries(ctx_entries) .inspect_err(|err| { - error!(?err, "Failed to apply incremental schema entries"); + error!(?err, "Failed to apply incremental meta entries"); })?; // Reload the domain version, doing any needed migrations.