diff --git a/server/core/src/https/mod.rs b/server/core/src/https/mod.rs index 7b31dccac..f9282f711 100644 --- a/server/core/src/https/mod.rs +++ b/server/core/src/https/mod.rs @@ -87,7 +87,7 @@ impl ServerState { // Get the first header value. hv.to_str().ok() }) - .and_then(|s| Some(self.reinflate_uuid_from_bytes(s)).unwrap_or(None)) + .and_then(|s| self.reinflate_uuid_from_bytes(s)) } } diff --git a/server/lib/src/be/mod.rs b/server/lib/src/be/mod.rs index 1d0d3a71c..12b8f9637 100644 --- a/server/lib/src/be/mod.rs +++ b/server/lib/src/be/mod.rs @@ -781,7 +781,8 @@ pub trait BackendTransaction { } } r => { - admin_error!(state = ?r, "Invalid uuid2spn state"); + admin_error!(state = ?r, ?e_uuid, "Invalid uuid2spn state"); + trace!(entry = ?e); return Err(ConsistencyError::BackendIndexSync); } }; @@ -1134,10 +1135,15 @@ impl<'a> BackendWriteTransaction<'a> { // allocated. id_max += 1; - Arc::new(EntrySealedCommitted::stub_sealed_committed_id( + let stub_entry = Arc::new(EntrySealedCommitted::stub_sealed_committed_id( id_max, ctx_ent, - )) + )); + // Now, the stub entry needs to be indexed. If not, uuid2spn + // isn't created, so subsequent index diffs don't work correctly. + self.entry_index(None, Some(stub_entry.as_ref()))?; + // Okay, entry ready to go. + stub_entry } Some(idl) if idl.len() == 1 => { // Get the entry from this idl. diff --git a/server/lib/src/idm/authsession.rs b/server/lib/src/idm/authsession.rs index ec60a83c3..188493024 100644 --- a/server/lib/src/idm/authsession.rs +++ b/server/lib/src/idm/authsession.rs @@ -789,7 +789,7 @@ impl AuthSession { handlers.push(ch); }; - if let Some(non_empty_handlers) = NonEmpty::collect(handlers.into_iter()) { + if let Some(non_empty_handlers) = NonEmpty::collect(handlers) { AuthSessionState::Init(non_empty_handlers) } else { security_info!("account has no available credentials"); diff --git a/server/lib/src/idm/oauth2.rs b/server/lib/src/idm/oauth2.rs index adc96eac6..7e4ac00b5 100644 --- a/server/lib/src/idm/oauth2.rs +++ b/server/lib/src/idm/oauth2.rs @@ -1401,7 +1401,7 @@ impl<'a> IdmServerProxyReadTransaction<'a> { }) .flatten() .cloned() - .chain(req_scopes.into_iter()) + .chain(req_scopes) .collect(); let consent_previously_granted = diff --git a/server/lib/src/plugins/memberof.rs b/server/lib/src/plugins/memberof.rs index 6ac26c6c1..483d4fe50 100644 --- a/server/lib/src/plugins/memberof.rs +++ b/server/lib/src/plugins/memberof.rs @@ -408,7 +408,7 @@ impl MemberOf { let group_affect = cand .iter() .map(|e| e.get_uuid()) - .chain(dyngroup_change.into_iter()) + .chain(dyngroup_change) .chain( cand.iter() .filter_map(|e| { @@ -441,7 +441,7 @@ impl MemberOf { let group_affect = cand .iter() .map(|post| post.get_uuid()) - .chain(dyngroup_change.into_iter()) + .chain(dyngroup_change) .chain( pre_cand .iter() diff --git a/server/lib/src/repl/consumer.rs b/server/lib/src/repl/consumer.rs index ce8f67f46..c2ea4d4ad 100644 --- a/server/lib/src/repl/consumer.rs +++ b/server/lib/src/repl/consumer.rs @@ -60,7 +60,7 @@ impl<'a> QueryServerWriteTransaction<'a> { let (conflicts, proceed): (Vec<_>, Vec<_>) = ctx_entries .iter() - .zip(db_entries.into_iter()) + .zip(db_entries) .partition(|(ctx_ent, db_ent)| ctx_ent.is_add_conflict(db_ent.as_ref())); // Now we have a set of conflicts and a set of entries to proceed. @@ -100,7 +100,7 @@ impl<'a> QueryServerWriteTransaction<'a> { // To be consistent to Modify, we need to run pre-modify here. let mut all_updates = conflict_update .into_iter() - .chain(proceed_update.into_iter()) + .chain(proceed_update) .collect::>(); // Plugins can mark entries into a conflict status. @@ -311,8 +311,8 @@ impl<'a> QueryServerWriteTransaction<'a> { // Trigger for post commit hooks. Should we detect better in the entry // apply phases? - self.changed_schema = true; - self.changed_domain = true; + // self.changed_schema = true; + // self.changed_domain = true; debug!("Applying all context entries"); // Update all other entries now. diff --git a/server/lib/src/repl/entry.rs b/server/lib/src/repl/entry.rs index 61915aecd..367ff68fd 100644 --- a/server/lib/src/repl/entry.rs +++ b/server/lib/src/repl/entry.rs @@ -142,6 +142,14 @@ impl EntryChangeState { self.cid_iter().pop().cloned().unwrap() } + #[cfg(test)] + pub(crate) fn get_attr_cid(&self, attr: &Attribute) -> Option { + match &self.st { + State::Live { at: _, changes } => changes.get(attr.as_ref()).map(|cid| cid.clone()), + State::Tombstone { at: _ } => None, + } + } + pub fn cid_iter(&self) -> Vec<&Cid> { match &self.st { State::Live { at: _, changes } => { diff --git a/server/lib/src/repl/ruv.rs b/server/lib/src/repl/ruv.rs index 1036b21db..328dc1ca7 100644 --- a/server/lib/src/repl/ruv.rs +++ b/server/lib/src/repl/ruv.rs @@ -525,8 +525,8 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> { idl.maybe_compress(); }); - self.data.extend(rebuild_ruv.into_iter()); - self.ranged.extend(rebuild_range.into_iter()); + self.data.extend(rebuild_ruv); + self.ranged.extend(rebuild_range); Ok(()) } diff --git a/server/lib/src/repl/tests.rs b/server/lib/src/repl/tests.rs index 7a1fed3b9..5789cb64e 100644 --- a/server/lib/src/repl/tests.rs +++ b/server/lib/src/repl/tests.rs @@ -673,6 +673,79 @@ async fn test_repl_increment_basic_bidirectional_write( drop(server_a_txn); } +// Create Entry on A +// Delete an attr of the entry on A +// Should send the empty attr + changestate state to B + +#[qs_pair_test] +async fn test_repl_increment_basic_deleted_attr(server_a: &QueryServer, server_b: &QueryServer) { + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + // Add an entry. + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + let t_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::Person.to_value()), + (Attribute::Name.as_ref(), Value::new_iname("testperson1")), + (Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)), + ( + Attribute::Description.as_ref(), + Value::new_utf8s("testperson1") + ), + ( + Attribute::DisplayName.as_ref(), + Value::new_utf8s("testperson1") + ) + ),]) + .is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + // Delete an attribute so that the changestate doesn't reflect it's + // presence + let mut server_a_txn = server_a.write(duration_from_epoch_now()).await; + assert!(server_a_txn + .internal_modify_uuid( + t_uuid, + &ModifyList::new_purge(Attribute::Description.as_ref()) + ) + .is_ok()); + server_a_txn.commit().expect("Failed to commit"); + + // Incremental repl in the reverse direction. + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(duration_from_epoch_now()).await; + + // from to + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + // They are consistent again. + assert!(e1.get_ava_set(Attribute::Description.as_ref()).is_none()); + assert!(e1 == e2); + + let e1_cs = e1.get_changestate(); + let e2_cs = e2.get_changestate(); + assert!(e1_cs == e2_cs); + assert!(e1_cs.get_attr_cid(&Attribute::Description).is_some()); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); +} + // Create Entry on A -> B // Write to both // B -> A and A -> B become consistent. @@ -1935,18 +2008,84 @@ async fn test_repl_increment_domain_rename(server_a: &QueryServer, server_b: &Qu } // Test schema addition / change over incremental. +#[qs_pair_test] +async fn test_repl_increment_schema_dynamic(server_a: &QueryServer, server_b: &QueryServer) { + let ct = duration_from_epoch_now(); -// Test change of domain version over incremental. + let mut server_a_txn = server_a.write(ct).await; + let mut server_b_txn = server_b.read().await; + + assert!(repl_initialise(&mut server_b_txn, &mut server_a_txn) + .and_then(|_| server_a_txn.commit()) + .is_ok()); + drop(server_b_txn); + + let mut server_a_txn = server_a.write(ct).await; + // Add a new schema entry/item. + let s_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::ClassType.to_value()), + ("classname", EntryClass::TestClass.to_value()), + (Attribute::Uuid.as_ref(), Value::Uuid(s_uuid)), + ("description", Value::new_utf8s("Test Class")), + ("may", Value::new_iutf8("name")) + )]) + .is_ok()); + // Schema doesn't take effect til after a commit. + server_a_txn.commit().expect("Failed to commit"); + + // Now use the new schema in an entry. + let mut server_a_txn = server_a.write(ct).await; + let t_uuid = Uuid::new_v4(); + assert!(server_a_txn + .internal_create(vec![entry_init!( + (Attribute::Class.as_ref(), EntryClass::Object.to_value()), + (Attribute::Class.as_ref(), EntryClass::TestClass.to_value()), + (Attribute::Uuid.as_ref(), Value::Uuid(t_uuid)) + )]) + .is_ok()); + + server_a_txn.commit().expect("Failed to commit"); + + // Now replicate from A to B. B should not only get the new schema, + // but should accept the entry that was created. + + let mut server_a_txn = server_a.read().await; + let mut server_b_txn = server_b.write(ct).await; + + trace!("========================================"); + repl_incremental(&mut server_a_txn, &mut server_b_txn); + + let e1 = server_a_txn + .internal_search_all_uuid(s_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(s_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + + let e1 = server_a_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access new entry."); + let e2 = server_b_txn + .internal_search_all_uuid(t_uuid) + .expect("Unable to access entry."); + + assert!(e1 == e2); + + server_b_txn.commit().expect("Failed to commit"); + drop(server_a_txn); +} // Test when a group has a member A, and then the group is conflicted, that when // group is moved to conflict the memberShip of A is removed. -// Multiple tombstone states / directions. - // Ref int deletes references when tombstone is replicated over. May need consumer // to have some extra groups that need cleanup -// Test add then delete on an attribute, and that it replicates the empty state to -// the other side. - // Test memberof over replication boundaries. + +// Test change of domain version over incremental. diff --git a/server/lib/src/schema.rs b/server/lib/src/schema.rs index fef63e3fa..4b552c551 100644 --- a/server/lib/src/schema.rs +++ b/server/lib/src/schema.rs @@ -315,14 +315,11 @@ impl From for EntryInitNew { let mut entry = EntryInitNew::new(); #[allow(clippy::expect_used)] - entry.set_ava( - "attributename", - vec![Value::new_iutf8(&value.name)].into_iter(), - ); + entry.set_ava("attributename", vec![Value::new_iutf8(&value.name)]); entry.add_ava("multivalue", Value::Bool(value.multivalue)); // syntax entry.set_ava("syntax", vec![Value::Syntax(value.syntax)]); - entry.set_ava("unique", vec![Value::Bool(value.unique)].into_iter()); + entry.set_ava("unique", vec![Value::Bool(value.unique)]); // index entry.set_ava("index", value.index.into_iter().map(Value::Index)); @@ -338,19 +335,16 @@ impl From for EntryInitNew { // description entry.set_ava( Attribute::Description.as_ref(), - vec![Value::new_utf8s(&value.description)].into_iter(), + vec![Value::new_utf8s(&value.description)], ); // unique // multivalue // sync_allowed - entry.set_ava( - "sync_allowed", - vec![Value::Bool(value.sync_allowed)].into_iter(), - ); + entry.set_ava("sync_allowed", vec![Value::Bool(value.sync_allowed)]); // uid - entry.set_ava("uuid", vec![Value::Uuid(value.uuid)].into_iter()); + entry.set_ava("uuid", vec![Value::Uuid(value.uuid)]); entry } @@ -496,7 +490,7 @@ impl From for EntryInitNew { let mut entry = EntryInitNew::new(); #[allow(clippy::expect_used)] - entry.set_ava("classname", vec![Value::new_iutf8(&value.name)].into_iter()); + entry.set_ava("classname", vec![Value::new_iutf8(&value.name)]); // class entry.set_ava( @@ -511,17 +505,14 @@ impl From for EntryInitNew { // description entry.set_ava( Attribute::Description.as_ref(), - vec![Value::new_utf8s(&value.description)].into_iter(), + vec![Value::new_utf8s(&value.description)], ); // sync_allowed - entry.set_ava( - "sync_allowed", - vec![Value::Bool(value.sync_allowed)].into_iter(), - ); + entry.set_ava("sync_allowed", vec![Value::Bool(value.sync_allowed)]); // uid - entry.set_ava("uuid", vec![Value::Uuid(value.uuid)].into_iter()); + entry.set_ava("uuid", vec![Value::Uuid(value.uuid)]); // systemmay if !value.systemmay.is_empty() {