From f053ff7fba622d2314a22970865c9d61e3c67ab4 Mon Sep 17 00:00:00 2001 From: Firstyear Date: Thu, 12 Sep 2024 11:42:16 +1000 Subject: [PATCH] CreatedAt/ModifiedAt fix (#3034) * fix(repl): CreatedAt/ModifiedAt attributes --- proto/src/attribute.rs | 3 + proto/src/constants.rs | 1 + server/lib/src/be/mod.rs | 2 +- server/lib/src/constants/uuids.rs | 1 + server/lib/src/entry.rs | 123 +++++++++++++++++++++--------- server/lib/src/repl/entry.rs | 14 ++-- server/lib/src/repl/proto.rs | 28 ++++++- server/lib/src/repl/tests.rs | 16 +++- server/lib/src/schema.rs | 23 +++++- 9 files changed, 159 insertions(+), 52 deletions(-) diff --git a/proto/src/attribute.rs b/proto/src/attribute.rs index b9f05da56..5f2946bf1 100644 --- a/proto/src/attribute.rs +++ b/proto/src/attribute.rs @@ -39,6 +39,7 @@ pub enum Attribute { ClassName, Cn, CookiePrivateKey, + CreatedAtCid, CredentialUpdateIntentToken, CredentialTypeMinimum, DeniedName, @@ -253,6 +254,7 @@ impl Attribute { Attribute::ClassName => ATTR_CLASSNAME, Attribute::Cn => ATTR_CN, Attribute::CookiePrivateKey => ATTR_COOKIE_PRIVATE_KEY, + Attribute::CreatedAtCid => ATTR_CREATED_AT_CID, Attribute::CredentialUpdateIntentToken => ATTR_CREDENTIAL_UPDATE_INTENT_TOKEN, Attribute::CredentialTypeMinimum => ATTR_CREDENTIAL_TYPE_MINIMUM, Attribute::DeniedName => ATTR_DENIED_NAME, @@ -431,6 +433,7 @@ impl Attribute { ATTR_CLASSNAME => Attribute::ClassName, ATTR_CN => Attribute::Cn, ATTR_COOKIE_PRIVATE_KEY => Attribute::CookiePrivateKey, + ATTR_CREATED_AT_CID => Attribute::CreatedAtCid, ATTR_CREDENTIAL_UPDATE_INTENT_TOKEN => Attribute::CredentialUpdateIntentToken, ATTR_CREDENTIAL_TYPE_MINIMUM => Attribute::CredentialTypeMinimum, ATTR_DENIED_NAME => Attribute::DeniedName, diff --git a/proto/src/constants.rs b/proto/src/constants.rs index 9794ca693..03ff5b67c 100644 --- a/proto/src/constants.rs +++ b/proto/src/constants.rs @@ -81,6 +81,7 @@ pub const ATTR_CLASS: &str = "class"; pub const ATTR_CLASSNAME: &str = "classname"; pub const ATTR_CN: &str = "cn"; pub const ATTR_COOKIE_PRIVATE_KEY: &str = "cookie_private_key"; +pub const ATTR_CREATED_AT_CID: &str = "created_at_cid"; pub const ATTR_CREDENTIAL_UPDATE_INTENT_TOKEN: &str = "credential_update_intent_token"; pub const ATTR_CREDENTIAL_TYPE_MINIMUM: &str = "credential_type_minimum"; pub const ATTR_DENIED_NAME: &str = "denied_name"; diff --git a/server/lib/src/be/mod.rs b/server/lib/src/be/mod.rs index 73096c33d..6c5721c46 100644 --- a/server/lib/src/be/mod.rs +++ b/server/lib/src/be/mod.rs @@ -146,7 +146,7 @@ impl BackendConfig { path: "".to_string(), db_name, fstype: FsType::Generic, - arcsize: Some(1024), + arcsize: Some(2048), } } } diff --git a/server/lib/src/constants/uuids.rs b/server/lib/src/constants/uuids.rs index 4c9b96a4b..7dc456ece 100644 --- a/server/lib/src/constants/uuids.rs +++ b/server/lib/src/constants/uuids.rs @@ -318,6 +318,7 @@ pub const UUID_SCHEMA_CLASS_APPLICATION: Uuid = uuid!("00000000-0000-0000-0000-f pub const UUID_SCHEMA_ATTR_LINKED_GROUP: Uuid = uuid!("00000000-0000-0000-0000-ffff00000182"); pub const UUID_SCHEMA_ATTR_APPLICATION_PASSWORD: Uuid = uuid!("00000000-0000-0000-0000-ffff00000183"); +pub const UUID_SCHEMA_ATTR_CREATED_AT_CID: Uuid = uuid!("00000000-0000-0000-0000-ffff00000184"); // System and domain infos // I'd like to strongly criticise william of the past for making poor choices about these allocations. diff --git a/server/lib/src/entry.rs b/server/lib/src/entry.rs index 35d039b0f..531d92bba 100644 --- a/server/lib/src/entry.rs +++ b/server/lib/src/entry.rs @@ -173,7 +173,7 @@ pub(crate) fn compare_attrs(left: &Eattrs, right: &Eattrs) -> bool { let allkeys: Set<&Attribute> = left .keys() .chain(right.keys()) - .filter(|k| *k != &Attribute::LastModifiedCid) + .filter(|k| *k != &Attribute::LastModifiedCid && *k != &Attribute::CreatedAtCid) .collect(); allkeys.into_iter().all(|k| { @@ -526,7 +526,7 @@ impl Entry { /// Assign the Change Identifier to this Entry, allowing it to be modified and then /// written to the `Backend` pub fn assign_cid( - self, + mut self, cid: Cid, schema: &dyn SchemaTransaction, ) -> Entry { @@ -537,14 +537,18 @@ impl Entry { */ let ecstate = EntryChangeState::new(&cid, &self.attrs, schema); - let mut ent = Entry { + // Since the entry is now created, and modified here, we set the initial CID + // values. + let cv = vs_cid![cid.clone()]; + let _ = self.attrs.insert(Attribute::LastModifiedCid, cv); + let cv = vs_cid![cid.clone()]; + let _ = self.attrs.insert(Attribute::CreatedAtCid, cv); + + Entry { valid: EntryInvalid { cid, ecstate }, state: EntryNew, attrs: self.attrs, - }; - // trace!("trigger_last_changed - assign_cid"); - ent.trigger_last_changed(); - ent + } } /// Compare this entry to another. @@ -652,7 +656,17 @@ impl Entry { impl Entry { pub fn from_repl_entry_v1(repl_entry: &ReplEntryV1) -> Result { // From the entry, we have to rebuild the ecstate and the attrs. - let (ecstate, attrs) = repl_entry.rehydrate()?; + let (ecstate, mut attrs) = repl_entry.rehydrate()?; + + // During seal, these values will be re-written, but we need them present for + // schema validation. + let last_mod_cid = ecstate.get_max_cid(); + let cv = vs_cid![last_mod_cid.clone()]; + let _ = attrs.insert(Attribute::LastModifiedCid, cv); + + let create_at_cid = ecstate.at(); + let cv = vs_cid![create_at_cid.clone()]; + let _ = attrs.insert(Attribute::CreatedAtCid, cv); Ok(Entry { valid: EntryRefresh { ecstate }, @@ -756,6 +770,10 @@ impl Entry { // 1. The incoming ReplIncremental is after DBentry. This means RI is the // conflicting node. We take no action and just return the db_ent // as the valid state. + // + // Since we are returning the existing database entry, we already have + // locally applies the needed LastModifiedCid and CreatedAtCid. We + // can proceed with no other changes. if at_left > at_right { trace!("RI > DE, return DE"); ( @@ -797,9 +815,6 @@ impl Entry { attrs: db_ent.attrs.clone(), }; - // Setup the last changed to now. - cnf_ent.trigger_last_changed(); - // Move the current uuid to source_uuid cnf_ent.add_ava(Attribute::SourceUuid, Value::Uuid(db_ent.valid.uuid)); @@ -810,6 +825,15 @@ impl Entry { cnf_ent.add_ava(Attribute::Class, EntryClass::Recycled.into()); cnf_ent.add_ava(Attribute::Class, EntryClass::Conflict.into()); + // Bypass add_ava here so that we don't update the ecstate with the + // metadata of these attrs. + // Setup the last changed to now. + let cv = vs_cid![cid.clone()]; + let _ = cnf_ent.attrs.insert(Attribute::LastModifiedCid, cv); + // Set the created_at to now, since we are creating a new conflict entry here. + let cv = vs_cid![cid.clone()]; + let _ = cnf_ent.attrs.insert(Attribute::CreatedAtCid, cv); + // Now we have to internally bypass some states. // This is okay because conflict entries aren't subject // to schema anyway. @@ -833,17 +857,31 @@ impl Entry { None }; + // Since we are going to make the incoming node, we need to now + // populate it's last-mod and created attributes. + + let mut attrs = self.attrs.clone(); + let ecstate = self_cs.clone(); + + let last_mod_cid = ecstate.get_max_cid(); + let cv = vs_cid![last_mod_cid.clone()]; + let _ = attrs.insert(Attribute::LastModifiedCid, cv); + + let create_at_cid = ecstate.at(); + let cv = vs_cid![create_at_cid.clone()]; + let _ = attrs.insert(Attribute::CreatedAtCid, cv); + ( conflict, Entry { valid: EntryIncremental { uuid: self.valid.uuid, - ecstate: self_cs.clone(), + ecstate, }, state: EntryCommitted { id: db_ent.state.id, }, - attrs: self.attrs.clone(), + attrs, }, ) } @@ -856,7 +894,7 @@ impl Entry { pub(crate) fn merge_state( &self, db_ent: &EntrySealedCommitted, - _schema: &dyn SchemaTransaction, + schema: &dyn SchemaTransaction, trim_cid: &Cid, ) -> EntryIncrementalCommitted { use crate::repl::entry::State; @@ -982,11 +1020,22 @@ impl Entry { } } - let ecstate = EntryChangeState::build(State::Live { + let mut ecstate = EntryChangeState::build(State::Live { at: at_left.clone(), changes, }); + // Similar to the process of "seal", remove anything that isn't + // replicated from the ecstate (should be a no-op), and then update + // the created/mod cid's. + ecstate.retain(|k, _| schema.is_replicated(k)); + + let cv = vs_cid![ecstate.get_max_cid().clone()]; + let _ = eattrs.insert(Attribute::LastModifiedCid, cv); + + let cv = vs_cid![ecstate.at().clone()]; + let _ = eattrs.insert(Attribute::CreatedAtCid, cv); + Entry { valid: EntryIncremental { uuid: self.valid.uuid, @@ -1005,10 +1054,12 @@ impl Entry { let mut attrs_new: Eattrs = Map::new(); let class_ava = vs_iutf8![EntryClass::Object.into(), EntryClass::Tombstone.into()]; let last_mod_ava = vs_cid![left_at.clone()]; + let created_ava = vs_cid![left_at.clone()]; attrs_new.insert(Attribute::Uuid, vs_uuid![self.valid.uuid]); attrs_new.insert(Attribute::Class, class_ava); attrs_new.insert(Attribute::LastModifiedCid, last_mod_ava); + attrs_new.insert(Attribute::CreatedAtCid, created_ava); Entry { valid: EntryIncremental { @@ -1054,10 +1105,12 @@ impl Entry { let mut attrs_new: Eattrs = Map::new(); let class_ava = vs_iutf8![EntryClass::Object.into(), EntryClass::Tombstone.into()]; let last_mod_ava = vs_cid![at.clone()]; + let created_ava = vs_cid![at.clone()]; attrs_new.insert(Attribute::Uuid, vs_uuid![db_ent.valid.uuid]); attrs_new.insert(Attribute::Class, class_ava); attrs_new.insert(Attribute::LastModifiedCid, last_mod_ava); + attrs_new.insert(Attribute::CreatedAtCid, created_ava); Entry { valid: EntryIncremental { @@ -1346,7 +1399,7 @@ impl Entry { impl Entry { #[cfg(test)] pub(crate) fn get_last_changed(&self) -> Cid { - self.valid.ecstate.get_tail_cid() + self.valid.ecstate.get_max_cid().clone() } /// State transititon to allow self to self for certain test macros. @@ -1969,10 +2022,12 @@ impl Entry { let class_ava = vs_iutf8![EntryClass::Object.into(), EntryClass::Tombstone.into()]; let last_mod_ava = vs_cid![cid.clone()]; + let created_ava = vs_cid![cid.clone()]; attrs_new.insert(Attribute::Uuid, vs_uuid![self.get_uuid()]); attrs_new.insert(Attribute::Class, class_ava); attrs_new.insert(Attribute::LastModifiedCid, last_mod_ava); + attrs_new.insert(Attribute::CreatedAtCid, created_ava); // ⚠️ No return from this point! ecstate.tombstone(&cid); @@ -2255,7 +2310,7 @@ impl Entry { Ok(()) } - pub fn seal(self, schema: &dyn SchemaTransaction) -> Entry { + pub fn seal(mut self, schema: &dyn SchemaTransaction) -> Entry { let EntryValid { uuid, mut ecstate } = self.valid; // Remove anything from the ecstate that is not a replicated attribute in the schema. @@ -2263,6 +2318,18 @@ impl Entry { // replicating things that only touched or changed phantom attrs. ecstate.retain(|k, _| schema.is_replicated(k)); + // Update the last changed time. + let last_mod_cid = ecstate.get_max_cid(); + let cv = vs_cid![last_mod_cid.clone()]; + let _ = self.attrs.insert(Attribute::LastModifiedCid, cv); + + // Update created-at time. This is needed for migrations currently. It could + // be alternately in the entry create path, but it makes more sense here as + // we get the create_at time from the replication metadata + let create_at_cid = ecstate.at(); + let cv = vs_cid![create_at_cid.clone()]; + let _ = self.attrs.insert(Attribute::CreatedAtCid, cv); + Entry { valid: EntrySealed { uuid, ecstate }, state: self.state, @@ -2293,16 +2360,6 @@ where state: self.state, attrs: self.attrs, } - /* Setup our last changed time. */ - // We can't actually trigger last changed here. This creates a replication loop - // inside of memberof plugin which invalidates. For now we treat last_changed - // more as "create" so we only trigger it via assign_cid in the create path - // and in conflict entry creation. - /* - trace!("trigger_last_changed - invalidate"); - ent.trigger_last_changed(); - ent - */ } pub fn get_uuid(&self) -> Uuid { @@ -2538,8 +2595,10 @@ impl Entry { /// Update the last_changed flag of this entry to the given change identifier. #[cfg(test)] fn set_last_changed(&mut self, cid: Cid) { - let cv = vs_cid![cid]; + let cv = vs_cid![cid.clone()]; let _ = self.attrs.insert(Attribute::LastModifiedCid, cv); + let cv = vs_cid![cid]; + let _ = self.attrs.insert(Attribute::CreatedAtCid, cv); } pub(crate) fn get_display_id(&self) -> String { @@ -3115,14 +3174,6 @@ impl Entry where STATE: Clone, { - fn trigger_last_changed(&mut self) { - self.valid - .ecstate - .change_ava(&self.valid.cid, &Attribute::LastModifiedCid); - let cv = vs_cid![self.valid.cid.clone()]; - let _ = self.attrs.insert(Attribute::LastModifiedCid, cv); - } - // This should always work? It's only on validate that we'll build // a list of syntax violations ... // If this already exists, we silently drop the event. This is because diff --git a/server/lib/src/repl/entry.rs b/server/lib/src/repl/entry.rs index 620f7a43d..4f7c26dfb 100644 --- a/server/lib/src/repl/entry.rs +++ b/server/lib/src/repl/entry.rs @@ -209,13 +209,11 @@ impl EntryChangeState { } } - #[cfg(test)] - pub(crate) fn get_tail_cid(&self) -> Cid { - #![allow(clippy::expect_used)] - self.cid_iter() - .pop() - .cloned() - .expect("Failed to get tail cid") + pub(crate) fn get_max_cid(&self) -> &Cid { + match &self.st { + State::Live { at, changes } => changes.values().max().unwrap_or(at), + State::Tombstone { at } => at, + } } #[cfg(test)] @@ -226,7 +224,7 @@ impl EntryChangeState { } } - pub fn cid_iter(&self) -> Vec<&Cid> { + pub(crate) fn cid_iter(&self) -> Vec<&Cid> { match &self.st { State::Live { at: _, changes } => { let mut v: Vec<_> = changes.values().collect(); diff --git a/server/lib/src/repl/proto.rs b/server/lib/src/repl/proto.rs index 7cd03a5b1..8c1de1f4f 100644 --- a/server/lib/src/repl/proto.rs +++ b/server/lib/src/repl/proto.rs @@ -15,6 +15,7 @@ use base64urlsafedata::Base64UrlSafeData; use serde::{Deserialize, Serialize}; use serde_with::skip_serializing_none; use std::collections::{BTreeMap, BTreeSet}; +use std::fmt; use webauthn_rs::prelude::{ AttestationCaList, AttestedPasskey as AttestedPasskeyV4, Passkey as PasskeyV4, @@ -69,7 +70,7 @@ impl From<&ReplCidV1> for Cid { /// and also includes the list of all CIDs that occur between those two points. This allows these /// extra change "anchors" to be injected into the consumer RUV during an incremental. Once /// inserted, these anchors prevent RUV trimming from creating "jumps" due to idle servers. -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +#[derive(Serialize, Deserialize, PartialEq, Eq)] pub struct ReplAnchoredCidRange { #[serde(rename = "m")] pub ts_min: Duration, @@ -79,9 +80,21 @@ pub struct ReplAnchoredCidRange { pub ts_max: Duration, } +impl fmt::Debug for ReplAnchoredCidRange { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{:032} --{}-> {:032}", + self.ts_min.as_nanos(), + self.anchors.len(), + self.ts_max.as_nanos() + ) + } +} + /// A CID range. This contains the minimum and maximum values of a range. This is used for /// querying the RUV to select all elements in this range. -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +#[derive(Serialize, Deserialize, PartialEq, Eq)] pub struct ReplCidRange { #[serde(rename = "m")] pub ts_min: Duration, @@ -89,6 +102,17 @@ pub struct ReplCidRange { pub ts_max: Duration, } +impl fmt::Debug for ReplCidRange { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{:032} -> {:032}", + self.ts_min.as_nanos(), + self.ts_max.as_nanos() + ) + } +} + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub enum ReplRuvRange { V1 { diff --git a/server/lib/src/repl/tests.rs b/server/lib/src/repl/tests.rs index 809ed8c38..ef37ff74e 100644 --- a/server/lib/src/repl/tests.rs +++ b/server/lib/src/repl/tests.rs @@ -2424,8 +2424,9 @@ async fn test_repl_increment_memberof_conflict(server_a: &QueryServer, server_b: .is_ok()); drop(server_b_txn); - // First, we need to create a group on b that will conflict - let mut server_b_txn = server_b.write(duration_from_epoch_now()).await.unwrap(); + // First, we need to create a group on b that will conflict. This needs to be + // at a time point earlier than A. + let mut server_b_txn = server_b.write(ct).await.unwrap(); let g_uuid = Uuid::new_v4(); assert!(server_b_txn @@ -2439,8 +2440,10 @@ async fn test_repl_increment_memberof_conflict(server_a: &QueryServer, server_b: server_b_txn.commit().expect("Failed to commit"); + // Advance the clock so that A's operation is later than B + let ct = ct + Duration::from_secs(1); // Now on a, use the same uuid, make the user and a group as it's member. - let mut server_a_txn = server_a.write(duration_from_epoch_now()).await.unwrap(); + let mut server_a_txn = server_a.write(ct).await.unwrap(); let t_uuid = Uuid::new_v4(); assert!(server_a_txn .internal_create(vec![entry_init!( @@ -2459,6 +2462,7 @@ async fn test_repl_increment_memberof_conflict(server_a: &QueryServer, server_b: (Attribute::Class, EntryClass::Object.to_value()), (Attribute::Class, EntryClass::Group.to_value()), (Attribute::Name, Value::new_iname("testgroup1")), + // This UUID is what will conflict (Attribute::Uuid, Value::Uuid(g_uuid)), (Attribute::Member, Value::Refer(t_uuid)) ),]) @@ -2468,6 +2472,7 @@ async fn test_repl_increment_memberof_conflict(server_a: &QueryServer, server_b: // Now do A -> B. B should show that the second group was a conflict and // the membership drops. + let ct = ct + Duration::from_secs(1); let mut server_a_txn = server_a.read().await.unwrap(); let mut server_b_txn = server_b.write(ct).await.unwrap(); @@ -2491,7 +2496,10 @@ async fn test_repl_increment_memberof_conflict(server_a: &QueryServer, server_b: server_b_txn.commit().expect("Failed to commit"); drop(server_a_txn); - // Now B -> A. A will now reflect the conflict as well. + // Now B -> A. A will now reflect the conflict as well, causing the local group on A + // to become a conflict, and the group from B will take over. This causes the membership + // on test user to be dropped. + let ct = ct + Duration::from_secs(1); let mut server_b_txn = server_b.read().await.unwrap(); let mut server_a_txn = server_a.write(ct).await.unwrap(); diff --git a/server/lib/src/schema.rs b/server/lib/src/schema.rs index 9dd07d173..82738c9d7 100644 --- a/server/lib/src/schema.rs +++ b/server/lib/src/schema.rs @@ -904,6 +904,23 @@ impl<'a> SchemaWriteTransaction<'a> { syntax: SyntaxType::Uuid, }, ); + self.attributes.insert( + Attribute::CreatedAtCid, + SchemaAttribute { + name: Attribute::CreatedAtCid, + uuid: UUID_SCHEMA_ATTR_CREATED_AT_CID, + description: String::from("The cid when this entry was created"), + multivalue: false, + // Uniqueness is handled by base.rs, not attrunique here due to + // needing to check recycled objects too. + unique: false, + phantom: false, + sync_allowed: false, + replicated: false, + index: vec![], + syntax: SyntaxType::Cid, + }, + ); self.attributes.insert( Attribute::LastModifiedCid, SchemaAttribute { @@ -916,7 +933,7 @@ impl<'a> SchemaWriteTransaction<'a> { unique: false, phantom: false, sync_allowed: false, - replicated: true, + replicated: false, index: vec![], syntax: SyntaxType::Cid, }, @@ -1978,6 +1995,7 @@ impl<'a> SchemaWriteTransaction<'a> { Attribute::Class, Attribute::Uuid, Attribute::LastModifiedCid, + Attribute::CreatedAtCid, ], ..Default::default() }, @@ -2996,6 +3014,7 @@ mod tests { Attribute::Class, Attribute::Uuid, Attribute::LastModifiedCid, + Attribute::CreatedAtCid, ], systemsupplements: vec![EntryClass::Service.into(), EntryClass::Person.into()], ..Default::default() @@ -3009,6 +3028,7 @@ mod tests { Attribute::Class, Attribute::Uuid, Attribute::LastModifiedCid, + Attribute::CreatedAtCid, ], ..Default::default() }; @@ -3021,6 +3041,7 @@ mod tests { Attribute::Class, Attribute::Uuid, Attribute::LastModifiedCid, + Attribute::CreatedAtCid, ], excludes: vec![EntryClass::Person.into()], ..Default::default()