Add more replication tests, improve some handling of tombstones. (#1656)

This commit is contained in:
Firstyear 2023-05-26 12:18:53 +10:00 committed by GitHub
parent 59c6723f7d
commit 2752965de1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 388 additions and 60 deletions

View file

@ -758,7 +758,17 @@ impl Entry<EntryIncremental, EntryNew> {
// Due to previous checks, this must be equal!
debug_assert!(left_at == right_at);
debug_assert!(self.attrs == db_ent.attrs);
// Doesn't matter which side we take.
// We have to generate the attrs here, since on replication
// we just send the tombstone ecstate rather than attrs. Our
// db stub also lacks these attributes too.
let mut attrs_new: Eattrs = Map::new();
let class_ava = vs_iutf8!["object", "tombstone"];
let last_mod_ava = vs_cid![left_at.clone()];
attrs_new.insert(AttrString::from("uuid"), vs_uuid![self.valid.uuid]);
attrs_new.insert(AttrString::from("class"), class_ava);
attrs_new.insert(AttrString::from("last_modified_cid"), last_mod_ava);
Entry {
valid: EntryIncremental {
uuid: self.valid.uuid,
@ -767,10 +777,11 @@ impl Entry<EntryIncremental, EntryNew> {
state: EntryCommitted {
id: db_ent.state.id,
},
attrs: self.attrs.clone(),
attrs: attrs_new,
}
}
(State::Tombstone { .. }, State::Live { .. }) => {
debug_assert!(false);
// Keep the left side.
Entry {
valid: EntryIncremental {
@ -784,6 +795,7 @@ impl Entry<EntryIncremental, EntryNew> {
}
}
(State::Live { .. }, State::Tombstone { .. }) => {
debug_assert!(false);
// Keep the right side
Entry {
valid: EntryIncremental {

View file

@ -154,6 +154,7 @@ trait Plugin {
"plugin {} has an unimplemented pre_repl_incremental!",
Self::id()
);
// debug_assert!(false);
// Err(OperationError::InvalidState)
Ok(())
}
@ -167,6 +168,7 @@ trait Plugin {
"plugin {} has an unimplemented post_repl_incremental!",
Self::id()
);
// debug_assert!(false);
// Err(OperationError::InvalidState)
Ok(())
}
@ -327,14 +329,11 @@ impl Plugins {
qs: &mut QueryServerWriteTransaction,
cand: &mut [(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)],
) -> Result<(), OperationError> {
base::Base::pre_repl_incremental(qs, cand)
// .and_then(|_| jwskeygen::JwsKeygen::pre_repl_incremental(qs, cand, me))
// .and_then(|_| gidnumber::GidNumber::pre_repl_incremental(qs, cand, me))
.and_then(|_| domain::Domain::pre_repl_incremental(qs, cand))
.and_then(|_| spn::Spn::pre_repl_incremental(qs, cand))
.and_then(|_| session::SessionConsistency::pre_repl_incremental(qs, cand))
// attr unique should always be last
.and_then(|_| attrunique::AttrUnique::pre_repl_incremental(qs, cand))
// Cleanup sessions on incoming replication? May not actually
// be needed ...
// session::SessionConsistency::pre_repl_incremental(qs, cand)?;
// attr unique should always be last
attrunique::AttrUnique::pre_repl_incremental(qs, cand)
}
#[instrument(level = "debug", name = "plugins::run_post_repl_incremental", skip_all)]
@ -343,9 +342,10 @@ impl Plugins {
pre_cand: &[Arc<EntrySealedCommitted>],
cand: &[EntrySealedCommitted],
) -> Result<(), OperationError> {
refint::ReferentialIntegrity::post_repl_incremental(qs, pre_cand, cand)
.and_then(|_| spn::Spn::post_repl_incremental(qs, pre_cand, cand))
.and_then(|_| memberof::MemberOf::post_repl_incremental(qs, pre_cand, cand))
domain::Domain::post_repl_incremental(qs, pre_cand, cand)?;
spn::Spn::post_repl_incremental(qs, pre_cand, cand)?;
refint::ReferentialIntegrity::post_repl_incremental(qs, pre_cand, cand)?;
memberof::MemberOf::post_repl_incremental(qs, pre_cand, cand)
}
#[instrument(level = "debug", name = "plugins::run_verify", skip_all)]

View file

@ -1,48 +1,12 @@
use super::proto::*;
use crate::be::BackendTransaction;
use crate::plugins::Plugins;
use crate::prelude::*;
use crate::repl::proto::ReplRuvRange;
use crate::repl::ruv::ReplicationUpdateVectorTransaction;
use std::collections::BTreeMap;
use std::sync::Arc;
impl<'a> QueryServerReadTransaction<'a> {
// Get the current state of "where we are up to"
//
// There are two approaches we can use here. We can either store a cookie
// related to the supplier we are fetching from, or we can use our RUV state.
//
// Initially I'm using RUV state, because it lets us select exactly what has
// changed, where the cookie approach is more coarse grained. The cookie also
// requires some more knowledge about what supplier we are communicating too
// where the RUV approach doesn't since the supplier calcs the diff.
#[instrument(level = "debug", skip_all)]
pub fn consumer_get_state(&mut self) -> Result<ReplRuvRange, OperationError> {
// We need the RUV as a state of
//
// [ s_uuid, cid_min, cid_max ]
// [ s_uuid, cid_min, cid_max ]
// [ s_uuid, cid_min, cid_max ]
// ...
//
// This way the remote can diff against it's knowledge and work out:
//
// [ s_uuid, from_cid, to_cid ]
// [ s_uuid, from_cid, to_cid ]
//
// ...
// Which then the supplier will use to actually retrieve the set of entries.
// and the needed attributes we need.
let ruv_snapshot = self.get_be_txn().get_ruv();
// What's the current set of ranges?
ruv_snapshot
.current_ruv_range()
.map(|ranges| ReplRuvRange::V1 { ranges })
}
pub enum ConsumerState {
Ok,
RefreshRequired,
}
impl<'a> QueryServerWriteTransaction<'a> {
@ -79,6 +43,9 @@ impl<'a> QueryServerWriteTransaction<'a> {
e
})?;
trace!("===========================================");
trace!(?ctx_entries);
let db_entries = self.be_txn.incremental_prepare(&ctx_entries).map_err(|e| {
error!("Failed to access entries from db");
e
@ -237,14 +204,16 @@ impl<'a> QueryServerWriteTransaction<'a> {
pub fn consumer_apply_changes(
&mut self,
ctx: &ReplIncrementalContext,
) -> Result<(), OperationError> {
) -> Result<ConsumerState, OperationError> {
match ctx {
ReplIncrementalContext::NoChangesAvailable => {
info!("no changes are available");
Ok(())
Ok(ConsumerState::Ok)
}
ReplIncrementalContext::RefreshRequired => {
todo!();
error!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is outdated, and replication would introduce data corruption.");
error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
Ok(ConsumerState::RefreshRequired)
}
ReplIncrementalContext::UnwillingToSupply => {
todo!();
@ -276,7 +245,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
ctx_schema_entries: &[ReplIncrementalEntryV1],
ctx_meta_entries: &[ReplIncrementalEntryV1],
ctx_entries: &[ReplIncrementalEntryV1],
) -> Result<(), OperationError> {
) -> Result<ConsumerState, OperationError> {
if ctx_domain_version < DOMAIN_MIN_LEVEL {
error!("Unable to proceed with consumer incremental - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
return Err(OperationError::ReplDomainLevelUnsatisfiable);
@ -352,7 +321,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
e
})?;
Ok(())
Ok(ConsumerState::Ok)
}
pub fn consumer_apply_refresh(

View file

@ -1,5 +1,7 @@
use crate::be::BackendTransaction;
use crate::prelude::*;
use crate::repl::consumer::ConsumerState;
use crate::repl::proto::ReplIncrementalContext;
use crate::repl::ruv::ReplicationUpdateVectorTransaction;
use std::collections::BTreeMap;
@ -38,6 +40,59 @@ fn repl_initialise(
Ok(())
}
fn repl_incremental(
from: &mut QueryServerReadTransaction<'_>,
to: &mut QueryServerWriteTransaction<'_>,
) {
let a_ruv_range = to
.get_be_txn()
.get_ruv()
.current_ruv_range()
.expect("Failed to get RUV range from");
let b_ruv_range = from
.get_be_txn()
.get_ruv()
.current_ruv_range()
.expect("Failed to get RUV range to");
trace!(?a_ruv_range);
trace!(?b_ruv_range);
assert!(a_ruv_range != b_ruv_range);
// Now setup the consumer state for the next incremental replication.
let a_ruv_range = to.consumer_get_state().expect("Unable to access RUV range");
// Incremental.
// Should now be on the other partner.
// Get the changes.
let changes = from
.supplier_provide_changes(a_ruv_range)
.expect("Unable to generate supplier changes");
// Check the changes = should be empty.
to.consumer_apply_changes(&changes)
.expect("Unable to apply changes to consumer.");
// RUV should be consistent again.
let a_ruv_range = to
.get_be_txn()
.get_ruv()
.current_ruv_range()
.expect("Failed to get RUV range A");
let b_ruv_range = from
.get_be_txn()
.get_ruv()
.current_ruv_range()
.expect("Failed to get RUV range B");
trace!(?a_ruv_range);
trace!(?b_ruv_range);
// May need to be "is subset" for future when we are testing
// some more complex scenarioes.
assert!(a_ruv_range == b_ruv_range);
}
#[qs_pair_test]
async fn test_repl_refresh_basic(server_a: &QueryServer, server_b: &QueryServer) {
// Rebuild / refresh the content of server a with the content from b.
@ -125,8 +180,9 @@ async fn test_repl_refresh_basic(server_a: &QueryServer, server_b: &QueryServer)
// Both servers will be post-test validated.
}
// Test that adding an entry to one side replicates correctly.
#[qs_pair_test]
async fn test_repl_increment_basic(server_a: &QueryServer, server_b: &QueryServer) {
async fn test_repl_increment_basic_entry_add(server_a: &QueryServer, server_b: &QueryServer) {
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let mut server_b_txn = server_b.read().await;
@ -253,13 +309,245 @@ async fn test_repl_increment_basic(server_a: &QueryServer, server_b: &QueryServe
trace!(?b_ruv_range);
assert!(a_ruv_range == b_ruv_range);
server_a_txn.commit().expect("Failed to commit");
// Assert the entry is now present, and the same on both sides
let e1 = server_a_txn
.internal_search_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
}
// Test that adding an entry to one side, then recycling it replicates correctly.
#[qs_pair_test]
async fn test_repl_increment_basic_entry_recycle(server_a: &QueryServer, server_b: &QueryServer) {
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
// Add an entry.
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
let t_uuid = Uuid::new_v4();
assert!(server_b_txn
.internal_create(vec![entry_init!(
("class", Value::new_class("object")),
("class", Value::new_class("person")),
("name", Value::new_iname("testperson1")),
("uuid", Value::Uuid(t_uuid)),
("description", Value::new_utf8s("testperson1")),
("displayname", Value::new_utf8s("testperson1"))
),])
.is_ok());
// Now recycle it.
assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok());
server_b_txn.commit().expect("Failed to commit");
// Assert the entry is not on A.
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let mut server_b_txn = server_b.read().await;
assert_eq!(
server_a_txn.internal_search_uuid(t_uuid),
Err(OperationError::NoMatchingEntries)
);
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
}
// Test that adding an entry to one side, then recycling it, and tombstoning it
// replicates correctly.
#[qs_pair_test]
async fn test_repl_increment_basic_entry_tombstone(server_a: &QueryServer, server_b: &QueryServer) {
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
// Add an entry.
let mut server_b_txn = server_b.write(ct).await;
let t_uuid = Uuid::new_v4();
assert!(server_b_txn
.internal_create(vec![entry_init!(
("class", Value::new_class("object")),
("class", Value::new_class("person")),
("name", Value::new_iname("testperson1")),
("uuid", Value::Uuid(t_uuid)),
("description", Value::new_utf8s("testperson1")),
("displayname", Value::new_utf8s("testperson1"))
),])
.is_ok());
// Now recycle it.
assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok());
server_b_txn.commit().expect("Failed to commit");
// Now move past the recyclebin time.
let ct = ct + Duration::from_secs(RECYCLEBIN_MAX_AGE + 1);
let mut server_b_txn = server_b.write(ct).await;
// Clean out the recycle bin.
assert!(server_b_txn.purge_recycled().is_ok());
server_b_txn.commit().expect("Failed to commit");
// Assert the entry is not on A.
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert_eq!(
server_a_txn.internal_search_uuid(t_uuid),
Err(OperationError::NoMatchingEntries)
);
repl_incremental(&mut server_b_txn, &mut server_a_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1.attribute_equality("class", &PVCLASS_TOMBSTONE));
assert!(e1 == e2);
server_a_txn.commit().expect("Failed to commit");
drop(server_b_txn);
}
// Test that adding an entry -> tombstone then the tombstone is trimmed raises
// a replication error.
#[qs_pair_test]
async fn test_repl_increment_consumer_lagging_tombstone(
server_a: &QueryServer,
server_b: &QueryServer,
) {
let ct = duration_from_epoch_now();
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
// Add an entry.
let mut server_b_txn = server_b.write(ct).await;
let t_uuid = Uuid::new_v4();
assert!(server_b_txn
.internal_create(vec![entry_init!(
("class", Value::new_class("object")),
("class", Value::new_class("person")),
("name", Value::new_iname("testperson1")),
("uuid", Value::Uuid(t_uuid)),
("description", Value::new_utf8s("testperson1")),
("displayname", Value::new_utf8s("testperson1"))
),])
.is_ok());
// Now recycle it.
assert!(server_b_txn.internal_delete_uuid(t_uuid).is_ok());
server_b_txn.commit().expect("Failed to commit");
// Now move past the recyclebin time.
let ct = ct + Duration::from_secs(RECYCLEBIN_MAX_AGE + 1);
let mut server_b_txn = server_b.write(ct).await;
// Clean out the recycle bin.
assert!(server_b_txn.purge_recycled().is_ok());
server_b_txn.commit().expect("Failed to commit");
// Now move past the tombstone trim time.
let ct = ct + Duration::from_secs(CHANGELOG_MAX_AGE + 1);
let mut server_b_txn = server_b.write(ct).await;
// Clean out the recycle bin.
assert!(server_b_txn.purge_tombstones().is_ok());
server_b_txn.commit().expect("Failed to commit");
// Assert the entry is not on A *or* B.
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert_eq!(
server_a_txn.internal_search_uuid(t_uuid),
Err(OperationError::NoMatchingEntries)
);
assert_eq!(
server_b_txn.internal_search_uuid(t_uuid),
Err(OperationError::NoMatchingEntries)
);
// The ruvs must be different
let a_ruv_range = server_a_txn
.get_be_txn()
.get_ruv()
.current_ruv_range()
.expect("Failed to get RUV range A");
let b_ruv_range = server_b_txn
.get_be_txn()
.get_ruv()
.current_ruv_range()
.expect("Failed to get RUV range B");
trace!(?a_ruv_range);
trace!(?b_ruv_range);
assert!(a_ruv_range != b_ruv_range);
let a_ruv_range = server_a_txn
.consumer_get_state()
.expect("Unable to access RUV range");
let changes = server_b_txn
.supplier_provide_changes(a_ruv_range)
.expect("Unable to generate supplier changes");
assert!(matches!(changes, ReplIncrementalContext::RefreshRequired));
let result = server_a_txn
.consumer_apply_changes(&changes)
.expect("Unable to apply changes to consumer.");
assert!(matches!(result, ConsumerState::RefreshRequired));
drop(server_a_txn);
drop(server_b_txn);
}
// Test RUV content when a server's changes have been trimmed out and are not present
// in a refresh.
// in a refresh. This is not about tombstones, this is about attribute state.
// Test change of a domain name over incremental.

View file

@ -27,6 +27,8 @@ use crate::filter::{Filter, FilterInvalid, FilterValid, FilterValidResolved};
use crate::plugins::dyngroup::{DynGroup, DynGroupCache};
use crate::plugins::Plugins;
use crate::repl::cid::Cid;
use crate::repl::proto::ReplRuvRange;
use crate::repl::ruv::ReplicationUpdateVectorTransaction;
use crate::schema::{
Schema, SchemaAttribute, SchemaClass, SchemaReadTransaction, SchemaTransaction,
SchemaWriteTransaction,
@ -420,6 +422,27 @@ pub trait QueryServerTransaction<'a> {
}
}
/// Get a single entry by its UUID, even if the entry in question
/// is in a masked state (recycled, tombstoned).
#[instrument(level = "debug", skip_all)]
fn internal_search_all_uuid(
&mut self,
uuid: Uuid,
) -> Result<Arc<EntrySealedCommitted>, OperationError> {
let filter = filter_all!(f_eq("uuid", PartialValue::Uuid(uuid)));
let f_valid = filter.validate(self.get_schema()).map_err(|e| {
error!(?e, "Filter Validate - SchemaViolation");
OperationError::SchemaViolation(e)
})?;
let se = SearchEvent::new_internal(f_valid);
let mut vs = self.search(&se)?;
match vs.pop() {
Some(entry) if vs.is_empty() => Ok(entry),
_ => Err(OperationError::NoMatchingEntries),
}
}
#[instrument(level = "debug", skip_all)]
fn impersonate_search_ext_uuid(
&mut self,
@ -775,6 +798,42 @@ pub trait QueryServerTransaction<'a> {
fn get_oauth2rs_set(&mut self) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
self.internal_search(filter!(f_eq("class", PVCLASS_OAUTH2_RS.clone(),)))
}
#[instrument(level = "debug", skip_all)]
fn consumer_get_state(&mut self) -> Result<ReplRuvRange, OperationError> {
// Get the current state of "where we are up to"
//
// There are two approaches we can use here. We can either store a cookie
// related to the supplier we are fetching from, or we can use our RUV state.
//
// Initially I'm using RUV state, because it lets us select exactly what has
// changed, where the cookie approach is more coarse grained. The cookie also
// requires some more knowledge about what supplier we are communicating too
// where the RUV approach doesn't since the supplier calcs the diff.
//
// We need the RUV as a state of
//
// [ s_uuid, cid_min, cid_max ]
// [ s_uuid, cid_min, cid_max ]
// [ s_uuid, cid_min, cid_max ]
// ...
//
// This way the remote can diff against it's knowledge and work out:
//
// [ s_uuid, from_cid, to_cid ]
// [ s_uuid, from_cid, to_cid ]
//
// ...
// Which then the supplier will use to actually retrieve the set of entries.
// and the needed attributes we need.
let ruv_snapshot = self.get_be_txn().get_ruv();
// What's the current set of ranges?
ruv_snapshot
.current_ruv_range()
.map(|ranges| ReplRuvRange::V1 { ranges })
}
}
// Actually conduct a search request