Fix Increment Replication Post Upgrade (#3089)

This commit is contained in:
Firstyear 2024-10-05 19:53:39 +10:00 committed by GitHub
parent b1f8fdab6f
commit c779443454
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -89,8 +89,32 @@ impl<'a> QueryServerWriteTransaction<'a> {
// server for some conflicts, but we still need to know the UUID is IN the conflict // server for some conflicts, but we still need to know the UUID is IN the conflict
// state for plugins. We also need to do this here before the conflict_update // state for plugins. We also need to do this here before the conflict_update
// set is consumed by later steps. // set is consumed by later steps.
let mut conflict_uuids: BTreeSet<_> = //
conflict_update.iter().map(|(_, e)| e.get_uuid()).collect(); // ⚠️ When we upgrade between two nodes, migrations will often create *new* system
// entries on both nodes. Until both nodes upgrade they can't replicate. This creates
// a situation where both nodes have identical entry content for system entries, but
// the entries that were created now are conflicts. Normally this is okay, because the
// first node to upgrade will have it's entries persisted, and the other nodes duplicate
// entries will be removed. However, just through the nature of being in the conflict
// state, these entries are then added to the conflict_uuid set. This conflict_uuid set
// is used by referential integrity to remove uuids from references so that group
// memberships don't accidentally leak to recipients that were not intended.
//
// To avoid this, we remove any system entries from this conflict set, so that they are
// exempt from this conflict handling which allows upgrades to work.
let mut conflict_uuids: BTreeSet<_> = conflict_update
.iter()
.filter_map(|(_, e)| {
let u = e.get_uuid();
if u >= DYNAMIC_RANGE_MINIMUM_UUID {
// It is a user created node, process the conflict within plugins
Some(u)
} else {
// It is in a system range, do not process this entry
None
}
})
.collect();
// Filter out None from conflict_create // Filter out None from conflict_create
let conflict_create: Vec<EntrySealedNew> = conflict_create.into_iter().flatten().collect(); let conflict_create: Vec<EntrySealedNew> = conflict_create.into_iter().flatten().collect();
@ -128,10 +152,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
// inconsistent, then we fix it immediately. This hook remains for cases in future // inconsistent, then we fix it immediately. This hook remains for cases in future
// where we may wish to have session cleanup performed for example. // where we may wish to have session cleanup performed for example.
Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| { Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| {
admin_error!( admin_error!("Operation failed (pre_repl_incremental plugin), {:?}", e);
"Refresh operation failed (pre_repl_incremental plugin), {:?}",
e
);
e e
})?; })?;
@ -177,7 +198,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
) )
.map_err(|e| { .map_err(|e| {
error!( error!(
"Refresh operation failed (post_repl_incremental_conflict plugin), {:?}", "Operation failed (post_repl_incremental_conflict plugin), {:?}",
e e
); );
e e
@ -213,10 +234,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
&conflict_uuids, &conflict_uuids,
) )
.map_err(|e| { .map_err(|e| {
error!( error!("Operation failed (post_repl_incremental plugin), {:?}", e);
"Refresh operation failed (post_repl_incremental plugin), {:?}",
e
);
e e
})?; })?;
@ -403,7 +421,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
let meta_changed = self let meta_changed = self
.consumer_incremental_apply_entries(ctx_meta_entries) .consumer_incremental_apply_entries(ctx_meta_entries)
.inspect_err(|err| { .inspect_err(|err| {
error!(?err, "Failed to apply incremental schema entries"); error!(?err, "Failed to apply incremental meta entries");
})?; })?;
// This is re-loaded in case the domain name changed on the remote // This is re-loaded in case the domain name changed on the remote
@ -420,7 +438,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
// Update all other entries now. // Update all other entries now.
self.consumer_incremental_apply_entries(ctx_entries) self.consumer_incremental_apply_entries(ctx_entries)
.inspect_err(|err| { .inspect_err(|err| {
error!(?err, "Failed to apply incremental schema entries"); error!(?err, "Failed to apply incremental meta entries");
})?; })?;
// Reload the domain version, doing any needed migrations. // Reload the domain version, doing any needed migrations.