68 20230828 replication of schema (#2045)

This commit is contained in:
Firstyear 2023-08-29 12:20:27 +10:00 committed by GitHub
parent 6422313f2e
commit 0f977d33b9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 182 additions and 38 deletions

View file

@ -87,7 +87,7 @@ impl ServerState {
// Get the first header value.
hv.to_str().ok()
})
.and_then(|s| Some(self.reinflate_uuid_from_bytes(s)).unwrap_or(None))
.and_then(|s| self.reinflate_uuid_from_bytes(s))
}
}

View file

@ -781,7 +781,8 @@ pub trait BackendTransaction {
}
}
r => {
admin_error!(state = ?r, "Invalid uuid2spn state");
admin_error!(state = ?r, ?e_uuid, "Invalid uuid2spn state");
trace!(entry = ?e);
return Err(ConsistencyError::BackendIndexSync);
}
};
@ -1134,10 +1135,15 @@ impl<'a> BackendWriteTransaction<'a> {
// allocated.
id_max += 1;
Arc::new(EntrySealedCommitted::stub_sealed_committed_id(
let stub_entry = Arc::new(EntrySealedCommitted::stub_sealed_committed_id(
id_max, ctx_ent,
))
));
// Now, the stub entry needs to be indexed. If not, uuid2spn
// isn't created, so subsequent index diffs don't work correctly.
self.entry_index(None, Some(stub_entry.as_ref()))?;
// Okay, entry ready to go.
stub_entry
}
Some(idl) if idl.len() == 1 => {
// Get the entry from this idl.

View file

@ -789,7 +789,7 @@ impl AuthSession {
handlers.push(ch);
};
if let Some(non_empty_handlers) = NonEmpty::collect(handlers.into_iter()) {
if let Some(non_empty_handlers) = NonEmpty::collect(handlers) {
AuthSessionState::Init(non_empty_handlers)
} else {
security_info!("account has no available credentials");

View file

@ -1401,7 +1401,7 @@ impl<'a> IdmServerProxyReadTransaction<'a> {
})
.flatten()
.cloned()
.chain(req_scopes.into_iter())
.chain(req_scopes)
.collect();
let consent_previously_granted =

View file

@ -408,7 +408,7 @@ impl MemberOf {
let group_affect = cand
.iter()
.map(|e| e.get_uuid())
.chain(dyngroup_change.into_iter())
.chain(dyngroup_change)
.chain(
cand.iter()
.filter_map(|e| {
@ -441,7 +441,7 @@ impl MemberOf {
let group_affect = cand
.iter()
.map(|post| post.get_uuid())
.chain(dyngroup_change.into_iter())
.chain(dyngroup_change)
.chain(
pre_cand
.iter()

View file

@ -60,7 +60,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
let (conflicts, proceed): (Vec<_>, Vec<_>) = ctx_entries
.iter()
.zip(db_entries.into_iter())
.zip(db_entries)
.partition(|(ctx_ent, db_ent)| ctx_ent.is_add_conflict(db_ent.as_ref()));
// Now we have a set of conflicts and a set of entries to proceed.
@ -100,7 +100,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
// To be consistent to Modify, we need to run pre-modify here.
let mut all_updates = conflict_update
.into_iter()
.chain(proceed_update.into_iter())
.chain(proceed_update)
.collect::<Vec<_>>();
// Plugins can mark entries into a conflict status.
@ -311,8 +311,8 @@ impl<'a> QueryServerWriteTransaction<'a> {
// Trigger for post commit hooks. Should we detect better in the entry
// apply phases?
self.changed_schema = true;
self.changed_domain = true;
// self.changed_schema = true;
// self.changed_domain = true;
debug!("Applying all context entries");
// Update all other entries now.

View file

@ -142,6 +142,14 @@ impl EntryChangeState {
self.cid_iter().pop().cloned().unwrap()
}
#[cfg(test)]
pub(crate) fn get_attr_cid(&self, attr: &Attribute) -> Option<Cid> {
match &self.st {
State::Live { at: _, changes } => changes.get(attr.as_ref()).map(|cid| cid.clone()),
State::Tombstone { at: _ } => None,
}
}
pub fn cid_iter(&self) -> Vec<&Cid> {
match &self.st {
State::Live { at: _, changes } => {

View file

@ -525,8 +525,8 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
idl.maybe_compress();
});
self.data.extend(rebuild_ruv.into_iter());
self.ranged.extend(rebuild_range.into_iter());
self.data.extend(rebuild_ruv);
self.ranged.extend(rebuild_range);
Ok(())
}

View file

@ -673,6 +673,79 @@ async fn test_repl_increment_basic_bidirectional_write(
drop(server_a_txn);
}
// Create Entry on A
// Delete an attr of the entry on A
// Should send the empty attr + changestate state to B
#[qs_pair_test]
async fn test_repl_increment_basic_deleted_attr(server_a: &QueryServer, server_b: &QueryServer) {
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
// Add an entry.
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
let t_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::Person.to_value()),
(Attribute::Name.as_ref(), Value::new_iname("testperson1")),
(Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)),
(
Attribute::Description.as_ref(),
Value::new_utf8s("testperson1")
),
(
Attribute::DisplayName.as_ref(),
Value::new_utf8s("testperson1")
)
),])
.is_ok());
server_a_txn.commit().expect("Failed to commit");
// Delete an attribute so that the changestate doesn't reflect it's
// presence
let mut server_a_txn = server_a.write(duration_from_epoch_now()).await;
assert!(server_a_txn
.internal_modify_uuid(
t_uuid,
&ModifyList::new_purge(Attribute::Description.as_ref())
)
.is_ok());
server_a_txn.commit().expect("Failed to commit");
// Incremental repl in the reverse direction.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(duration_from_epoch_now()).await;
// from to
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
// They are consistent again.
assert!(e1.get_ava_set(Attribute::Description.as_ref()).is_none());
assert!(e1 == e2);
let e1_cs = e1.get_changestate();
let e2_cs = e2.get_changestate();
assert!(e1_cs == e2_cs);
assert!(e1_cs.get_attr_cid(&Attribute::Description).is_some());
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
}
// Create Entry on A -> B
// Write to both
// B -> A and A -> B become consistent.
@ -1935,18 +2008,84 @@ async fn test_repl_increment_domain_rename(server_a: &QueryServer, server_b: &Qu
}
// Test schema addition / change over incremental.
#[qs_pair_test]
async fn test_repl_increment_schema_dynamic(server_a: &QueryServer, server_b: &QueryServer) {
let ct = duration_from_epoch_now();
// Test change of domain version over incremental.
let mut server_a_txn = server_a.write(ct).await;
let mut server_b_txn = server_b.read().await;
assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn)
.and_then(|_| server_a_txn.commit())
.is_ok());
drop(server_b_txn);
let mut server_a_txn = server_a.write(ct).await;
// Add a new schema entry/item.
let s_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::ClassType.to_value()),
("classname", EntryClass::TestClass.to_value()),
(Attribute::Uuid.as_ref(), Value::Uuid(s_uuid)),
("description", Value::new_utf8s("Test Class")),
("may", Value::new_iutf8("name"))
)])
.is_ok());
// Schema doesn't take effect til after a commit.
server_a_txn.commit().expect("Failed to commit");
// Now use the new schema in an entry.
let mut server_a_txn = server_a.write(ct).await;
let t_uuid = Uuid::new_v4();
assert!(server_a_txn
.internal_create(vec![entry_init!(
(Attribute::Class.as_ref(), EntryClass::Object.to_value()),
(Attribute::Class.as_ref(), EntryClass::TestClass.to_value()),
(Attribute::Uuid.as_ref(), Value::Uuid(t_uuid))
)])
.is_ok());
server_a_txn.commit().expect("Failed to commit");
// Now replicate from A to B. B should not only get the new schema,
// but should accept the entry that was created.
let mut server_a_txn = server_a.read().await;
let mut server_b_txn = server_b.write(ct).await;
trace!("========================================");
repl_incremental(&mut server_a_txn, &mut server_b_txn);
let e1 = server_a_txn
.internal_search_all_uuid(s_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(s_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
let e1 = server_a_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access new entry.");
let e2 = server_b_txn
.internal_search_all_uuid(t_uuid)
.expect("Unable to access entry.");
assert!(e1 == e2);
server_b_txn.commit().expect("Failed to commit");
drop(server_a_txn);
}
// Test when a group has a member A, and then the group is conflicted, that when
// group is moved to conflict the memberShip of A is removed.
// Multiple tombstone states / directions.
// Ref int deletes references when tombstone is replicated over. May need consumer
// to have some extra groups that need cleanup
// Test add then delete on an attribute, and that it replicates the empty state to
// the other side.
// Test memberof over replication boundaries.
// Test change of domain version over incremental.

View file

@ -315,14 +315,11 @@ impl From<SchemaAttribute> for EntryInitNew {
let mut entry = EntryInitNew::new();
#[allow(clippy::expect_used)]
entry.set_ava(
"attributename",
vec![Value::new_iutf8(&value.name)].into_iter(),
);
entry.set_ava("attributename", vec![Value::new_iutf8(&value.name)]);
entry.add_ava("multivalue", Value::Bool(value.multivalue));
// syntax
entry.set_ava("syntax", vec![Value::Syntax(value.syntax)]);
entry.set_ava("unique", vec![Value::Bool(value.unique)].into_iter());
entry.set_ava("unique", vec![Value::Bool(value.unique)]);
// index
entry.set_ava("index", value.index.into_iter().map(Value::Index));
@ -338,19 +335,16 @@ impl From<SchemaAttribute> for EntryInitNew {
// description
entry.set_ava(
Attribute::Description.as_ref(),
vec![Value::new_utf8s(&value.description)].into_iter(),
vec![Value::new_utf8s(&value.description)],
);
// unique
// multivalue
// sync_allowed
entry.set_ava(
"sync_allowed",
vec![Value::Bool(value.sync_allowed)].into_iter(),
);
entry.set_ava("sync_allowed", vec![Value::Bool(value.sync_allowed)]);
// uid
entry.set_ava("uuid", vec![Value::Uuid(value.uuid)].into_iter());
entry.set_ava("uuid", vec![Value::Uuid(value.uuid)]);
entry
}
@ -496,7 +490,7 @@ impl From<SchemaClass> for EntryInitNew {
let mut entry = EntryInitNew::new();
#[allow(clippy::expect_used)]
entry.set_ava("classname", vec![Value::new_iutf8(&value.name)].into_iter());
entry.set_ava("classname", vec![Value::new_iutf8(&value.name)]);
// class
entry.set_ava(
@ -511,17 +505,14 @@ impl From<SchemaClass> for EntryInitNew {
// description
entry.set_ava(
Attribute::Description.as_ref(),
vec![Value::new_utf8s(&value.description)].into_iter(),
vec![Value::new_utf8s(&value.description)],
);
// sync_allowed
entry.set_ava(
"sync_allowed",
vec![Value::Bool(value.sync_allowed)].into_iter(),
);
entry.set_ava("sync_allowed", vec![Value::Bool(value.sync_allowed)]);
// uid
entry.set_ava("uuid", vec![Value::Uuid(value.uuid)].into_iter());
entry.set_ava("uuid", vec![Value::Uuid(value.uuid)]);
// systemmay
if !value.systemmay.is_empty() {