fix RUV on startup, improve filter output (#2211)

This commit is contained in:
Firstyear 2023-10-11 21:14:27 +10:00 committed by GitHub
parent d9da1eeca0
commit fbc62ea51e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 167 additions and 20 deletions

View file

@ -41,7 +41,7 @@ pub struct QueryServerWriteV1 {
impl QueryServerWriteV1 { impl QueryServerWriteV1 {
pub fn new(idms: Arc<IdmServer>) -> Self { pub fn new(idms: Arc<IdmServer>) -> Self {
info!("Starting query server v1 worker ..."); debug!("Starting a query server v1 worker ...");
QueryServerWriteV1 { idms } QueryServerWriteV1 { idms }
} }

View file

@ -778,11 +778,7 @@ impl CoreHandle {
// Wait on the handles. // Wait on the handles.
while let Some((handle_name, handle)) = self.handles.pop() { while let Some((handle_name, handle)) = self.handles.pop() {
if let Err(error) = handle.await { if let Err(error) = handle.await {
eprintln!( eprintln!("Task {} failed to finish: {:?}", handle_name, error);
"Task {} failed to finish: {:?}",
handle_name,
error
);
} }
} }

View file

@ -10,6 +10,7 @@
use std::cmp::{Ordering, PartialOrd}; use std::cmp::{Ordering, PartialOrd};
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::fmt;
use std::hash::Hash; use std::hash::Hash;
use std::iter; use std::iter;
use std::num::NonZeroU8; use std::num::NonZeroU8;
@ -111,7 +112,7 @@ pub enum FC<'a> {
} }
/// This is the filters internal representation /// This is the filters internal representation
#[derive(Debug, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)] #[derive(Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
enum FilterComp { enum FilterComp {
// This is attr - value // This is attr - value
Eq(AttrString, PartialValue), Eq(AttrString, PartialValue),
@ -127,6 +128,61 @@ enum FilterComp {
// Not(Box<FilterComp>), // Not(Box<FilterComp>),
} }
impl fmt::Debug for FilterComp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
FilterComp::Eq(attr, pv) => {
write!(f, "{} eq {:?}", attr, pv)
}
FilterComp::Sub(attr, pv) => {
write!(f, "{} sub {:?}", attr, pv)
}
FilterComp::Pres(attr) => {
write!(f, "{} pres", attr)
}
FilterComp::LessThan(attr, pv) => {
write!(f, "{} lt {:?}", attr, pv)
}
FilterComp::And(list) => {
write!(f, "(")?;
for (i, fc) in list.iter().enumerate() {
write!(f, "{:?}", fc)?;
if i != list.len() - 1 {
write!(f, " and ")?;
}
}
write!(f, ")")
}
FilterComp::Or(list) => {
write!(f, "(")?;
for (i, fc) in list.iter().enumerate() {
write!(f, "{:?}", fc)?;
if i != list.len() - 1 {
write!(f, " or ")?;
}
}
write!(f, ")")
}
FilterComp::Inclusion(list) => {
write!(f, "(")?;
for (i, fc) in list.iter().enumerate() {
write!(f, "{:?}", fc)?;
if i != list.len() - 1 {
write!(f, " inc ")?;
}
}
write!(f, ")")
}
FilterComp::AndNot(inner) => {
write!(f, "and not ( {:?} )", inner)
}
FilterComp::SelfUuid => {
write!(f, "uuid eq self")
}
}
}
}
/// This is the fully resolved internal representation. Note the lack of Not and selfUUID /// This is the fully resolved internal representation. Note the lack of Not and selfUUID
/// because these are resolved into And(Pres(class), AndNot(term)) and Eq(uuid, ...). /// because these are resolved into And(Pres(class), AndNot(term)) and Eq(uuid, ...).
/// Importantly, we make this accessible to Entry so that it can then match on filters /// Importantly, we make this accessible to Entry so that it can then match on filters
@ -137,7 +193,7 @@ enum FilterComp {
/// where small value - faster index, larger value - slower index. This metadata is extremely /// where small value - faster index, larger value - slower index. This metadata is extremely
/// important for the query optimiser to make decisions about how to re-arrange queries /// important for the query optimiser to make decisions about how to re-arrange queries
/// correctly. /// correctly.
#[derive(Debug, Clone, Eq)] #[derive(Clone, Eq)]
pub enum FilterResolved { pub enum FilterResolved {
// This is attr - value - indexed slope factor // This is attr - value - indexed slope factor
Eq(AttrString, PartialValue, Option<NonZeroU8>), Eq(AttrString, PartialValue, Option<NonZeroU8>),
@ -151,6 +207,58 @@ pub enum FilterResolved {
AndNot(Box<FilterResolved>, Option<NonZeroU8>), AndNot(Box<FilterResolved>, Option<NonZeroU8>),
} }
impl fmt::Debug for FilterResolved {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
FilterResolved::Eq(attr, pv, idx) => {
write!(f, "{} ({:?}) eq {:?}", attr, idx, pv)
}
FilterResolved::Sub(attr, pv, idx) => {
write!(f, "{} ({:?}) sub {:?}", attr, idx, pv)
}
FilterResolved::Pres(attr, idx) => {
write!(f, "{} ({:?}) pres", attr, idx)
}
FilterResolved::LessThan(attr, pv, idx) => {
write!(f, "{} ({:?}) lt {:?}", attr, idx, pv)
}
FilterResolved::And(list, idx) => {
write!(f, "({:?} ", idx)?;
for (i, fc) in list.iter().enumerate() {
write!(f, "{:?}", fc)?;
if i != list.len() - 1 {
write!(f, " and ")?;
}
}
write!(f, ")")
}
FilterResolved::Or(list, idx) => {
write!(f, "({:?} ", idx)?;
for (i, fc) in list.iter().enumerate() {
write!(f, "{:?}", fc)?;
if i != list.len() - 1 {
write!(f, " or ")?;
}
}
write!(f, ")")
}
FilterResolved::Inclusion(list, idx) => {
write!(f, "({:?} ", idx)?;
for (i, fc) in list.iter().enumerate() {
write!(f, "{:?}", fc)?;
if i != list.len() - 1 {
write!(f, " inc ")?;
}
}
write!(f, ")")
}
FilterResolved::AndNot(inner, idx) => {
write!(f, "and not ({:?} {:?} )", idx, inner)
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct FilterInvalid { pub struct FilterInvalid {
inner: FilterComp, inner: FilterComp,
@ -161,7 +269,7 @@ pub struct FilterValid {
inner: FilterComp, inner: FilterComp,
} }
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Clone, PartialEq, Eq)]
pub struct FilterValidResolved { pub struct FilterValidResolved {
inner: FilterResolved, inner: FilterResolved,
} }
@ -214,11 +322,29 @@ pub enum FilterPlan {
/// helps to prevent errors at compile time to assert `Filters` are secuerly. checked /// helps to prevent errors at compile time to assert `Filters` are secuerly. checked
/// ///
/// [`Entry`]: ../entry/struct.Entry.html /// [`Entry`]: ../entry/struct.Entry.html
#[derive(Debug, Clone, Hash, Ord, Eq, PartialOrd, PartialEq)] #[derive(Clone, Hash, Ord, Eq, PartialOrd, PartialEq)]
pub struct Filter<STATE> { pub struct Filter<STATE> {
state: STATE, state: STATE,
} }
impl fmt::Debug for Filter<FilterValidResolved> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Filter(Valid) {:?}", self.state.inner)
}
}
impl fmt::Debug for Filter<FilterValid> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Filter(Valid) {:?}", self.state.inner)
}
}
impl fmt::Debug for Filter<FilterInvalid> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Filter(Invalid) {:?}", self.state.inner)
}
}
impl Filter<FilterValidResolved> { impl Filter<FilterValidResolved> {
// Does this need mut self? Aren't we returning // Does this need mut self? Aren't we returning
// a new copied filter? // a new copied filter?

View file

@ -319,10 +319,10 @@ impl<'a> Oauth2ResourceServersWriteTransaction<'a> {
.into_iter() .into_iter()
.map(|ent| { .map(|ent| {
let uuid = ent.get_uuid(); let uuid = ent.get_uuid();
admin_info!(?uuid, "Checking oauth2 configuration"); trace!(?uuid, "Checking oauth2 configuration");
// From each entry, attempt to make an oauth2 configuration. // From each entry, attempt to make an oauth2 configuration.
if !ent.attribute_equality(Attribute::Class, &EntryClass::OAuth2ResourceServer.into()) { if !ent.attribute_equality(Attribute::Class, &EntryClass::OAuth2ResourceServer.into()) {
admin_error!("Missing class oauth2_resource_server"); error!("Missing class oauth2_resource_server");
// Check we have oauth2_resource_server class // Check we have oauth2_resource_server class
return Err(OperationError::InvalidEntryState); return Err(OperationError::InvalidEntryState);
} }

View file

@ -6,7 +6,7 @@ use crate::prelude::*;
use kanidm_proto::v1::OperationError; use kanidm_proto::v1::OperationError;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Eq, PartialOrd, Ord, Hash)] #[derive(Serialize, Deserialize, PartialEq, Clone, Eq, PartialOrd, Ord, Hash)]
pub struct Cid { pub struct Cid {
// Mental note: Derive ord always checks in order of struct fields. // Mental note: Derive ord always checks in order of struct fields.
pub ts: Duration, pub ts: Duration,
@ -27,6 +27,12 @@ impl From<DbCidV1> for Cid {
} }
} }
impl fmt::Debug for Cid {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:032}-{}", self.ts.as_nanos(), self.s_uuid)
}
}
impl fmt::Display for Cid { impl fmt::Display for Cid {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:032}-{}", self.ts.as_nanos(), self.s_uuid) write!(f, "{:032}-{}", self.ts.as_nanos(), self.s_uuid)
@ -34,7 +40,6 @@ impl fmt::Display for Cid {
} }
impl Cid { impl Cid {
#[cfg(test)]
pub(crate) fn new(s_uuid: Uuid, ts: Duration) -> Self { pub(crate) fn new(s_uuid: Uuid, ts: Duration) -> Self {
Cid { s_uuid, ts } Cid { s_uuid, ts }
} }

View file

@ -232,7 +232,8 @@ impl<'a> fmt::Debug for ReplicationUpdateVectorWriteTransaction<'a> {
writeln!(f, "RUV RANGE DUMP")?; writeln!(f, "RUV RANGE DUMP")?;
self.ranged self.ranged
.iter() .iter()
.try_for_each(|(s_uuid, ts)| writeln!(f, "* [{s_uuid} {ts:?}]")) .flat_map(|(s_uuid, ts_set)| ts_set.iter().map(|ts| Cid::new(*s_uuid, *ts)))
.try_for_each(|cid| writeln!(f, "[{cid}]"))
} }
} }
@ -561,6 +562,7 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
/// db entries are restored, their changesets will re-populate the data that we /// db entries are restored, their changesets will re-populate the data that we
/// need in the RUV at these points. The reason we need these ranges without IDL /// need in the RUV at these points. The reason we need these ranges without IDL
/// is so that trim and replication works properly. /// is so that trim and replication works properly.
#[instrument(level = "debug", name = "ruv::restore", skip_all)]
pub(crate) fn restore<I>(&mut self, iter: I) -> Result<(), OperationError> pub(crate) fn restore<I>(&mut self, iter: I) -> Result<(), OperationError>
where where
I: IntoIterator<Item = Cid>, I: IntoIterator<Item = Cid>,
@ -589,6 +591,7 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
Ok(()) Ok(())
} }
#[instrument(level = "debug", name = "ruv::rebuild", skip_all)]
pub fn rebuild(&mut self, entries: &[Arc<EntrySealedCommitted>]) -> Result<(), OperationError> { pub fn rebuild(&mut self, entries: &[Arc<EntrySealedCommitted>]) -> Result<(), OperationError> {
// Entries and their internal changestates are the "source of truth" for all changes // Entries and their internal changestates are the "source of truth" for all changes
// that have ever occurred and are stored on this server. So we use them to rebuild our RUV // that have ever occurred and are stored on this server. So we use them to rebuild our RUV
@ -628,7 +631,16 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
}); });
self.data.extend(rebuild_ruv); self.data.extend(rebuild_ruv);
self.ranged.extend(rebuild_range);
// Warning - you can't extend here because this is keyed by UUID. You need
// to go through each key and then merge the sets.
rebuild_range.into_iter().for_each(|(s_uuid, ts_set)| {
if let Some(ex_ts_set) = self.ranged.get_mut(&s_uuid) {
ex_ts_set.extend(ts_set)
} else {
self.ranged.insert(s_uuid, ts_set);
}
});
Ok(()) Ok(())
} }
@ -786,6 +798,8 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
Some(l) => *l, Some(l) => *l,
None => { None => {
error!("Impossible State - The RUV should not be empty"); error!("Impossible State - The RUV should not be empty");
error!(ruv = ?self);
error!(?cid);
return Err(OperationError::InvalidState); return Err(OperationError::InvalidState);
} }
}; };
@ -793,19 +807,25 @@ impl<'a> ReplicationUpdateVectorWriteTransaction<'a> {
if cid.ts != last { if cid.ts != last {
if !server_range.remove(&cid.ts) { if !server_range.remove(&cid.ts) {
error!("Impossible State - The RUV is corrupted due to missing sid:ts pair in ranged index"); error!("Impossible State - The RUV is corrupted due to missing sid:ts pair in ranged index");
error!(ruv = ?self);
error!(?cid);
return Err(OperationError::InvalidState); return Err(OperationError::InvalidState);
} }
} else { } else {
trace!("skip trimming maximum cid for s_uuid {}", cid.s_uuid); debug!("skip trimming maximum cid for s_uuid {}", cid.s_uuid);
} }
if server_range.is_empty() { if server_range.is_empty() {
// remove_suuid.push(cid.s_uuid); // remove_suuid.push(cid.s_uuid);
error!("Impossible State - The RUV should not be cleared for a s_uuid!"); error!("Impossible State - The RUV should not be cleared for a s_uuid!");
error!(ruv = ?self);
error!(?cid);
return Err(OperationError::InvalidState); return Err(OperationError::InvalidState);
} }
} }
None => { None => {
error!("Impossible State - The RUV is corrupted due to missing sid in ranged index"); error!("Impossible State - The RUV is corrupted due to missing sid in ranged index");
error!(ruv = ?self);
error!(?cid);
return Err(OperationError::InvalidState); return Err(OperationError::InvalidState);
} }
} }

View file

@ -14,7 +14,7 @@ impl<'a> QueryServerWriteTransaction<'a> {
self.be_txn self.be_txn
.reap_tombstones(&trim_cid) .reap_tombstones(&trim_cid)
.map_err(|e| { .map_err(|e| {
admin_error!(err = ?e, "Tombstone purge operation failed (backend)"); error!(err = ?e, "Tombstone purge operation failed (backend)");
e e
}) })
.map(|_| { .map(|_| {

View file

@ -612,7 +612,7 @@ pub fn from_result_value_iter(
mut iter: impl Iterator<Item = Result<Value, OperationError>>, mut iter: impl Iterator<Item = Result<Value, OperationError>>,
) -> Result<ValueSet, OperationError> { ) -> Result<ValueSet, OperationError> {
let Some(init) = iter.next() else { let Some(init) = iter.next() else {
admin_error!("Empty value iterator"); trace!("Empty value iterator");
return Err(OperationError::InvalidValueState); return Err(OperationError::InvalidValueState);
}; };
@ -673,7 +673,7 @@ pub fn from_result_value_iter(
pub fn from_value_iter(mut iter: impl Iterator<Item = Value>) -> Result<ValueSet, OperationError> { pub fn from_value_iter(mut iter: impl Iterator<Item = Value>) -> Result<ValueSet, OperationError> {
let Some(init) = iter.next() else { let Some(init) = iter.next() else {
admin_error!("Empty value iterator"); trace!("Empty value iterator");
return Err(OperationError::InvalidValueState); return Err(OperationError::InvalidValueState);
}; };