Add verification of name indexes (#433)

This commit is contained in:
Firstyear 2021-05-06 21:12:02 +10:00 committed by GitHub
parent 1eb777485e
commit 644eb0b0d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 189 additions and 45 deletions

5
Cargo.lock generated
View file

@ -624,9 +624,9 @@ dependencies = [
[[package]]
name = "concread"
version = "0.2.9"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "953d6851b04c62e12dd1d07fb99bf87094284e321db47ac3a4d861bb4054444b"
checksum = "7a695f8f543f6c58f519d0006c069d244c269ef64291b9eead6ebe30ffc294f4"
dependencies = [
"ahash 0.7.2",
"crossbeam",
@ -636,6 +636,7 @@ dependencies = [
"parking_lot",
"rand 0.8.3",
"smallvec",
"tokio",
]
[[package]]

View file

@ -51,6 +51,8 @@ pub enum ConsistencyError {
DuplicateUniqueAttribute(String),
InvalidSpn(u64),
SqliteIntegrityFailure,
BackendAllIdsSync,
BackendIndexSync,
}
#[derive(Serialize, Deserialize, Debug)]

View file

@ -62,7 +62,7 @@ structopt = { version = "0.3", default-features = false }
time = { version = "0.2", features = ["serde", "std"] }
hashbrown = "0.11"
concread = "^0.2.9"
concread = "^0.2.12"
# concread = { version = "^0.2.9", path = "../../concread" }
# concread = { version = "^0.2.9", features = ["simd_support"] }

View file

@ -281,6 +281,30 @@ macro_rules! uuid2rdn {
}};
}
macro_rules! verify {
(
$self:expr,
$audit:expr
) => {{
let mut r = $self.db.verify();
if r.is_empty() && !$self.is_dirty() {
// Check allids.
match $self.db.get_allids($audit) {
Ok(db_allids) => {
if !db_allids.is_compressed() || !(*($self).allids).is_compressed() {
r.push(Err(ConsistencyError::BackendAllIdsSync))
}
if db_allids != (*($self).allids) {
r.push(Err(ConsistencyError::BackendAllIdsSync))
}
}
Err(_) => r.push(Err(ConsistencyError::Unknown)),
};
};
r
}};
}
pub trait IdlArcSqliteTransaction {
fn get_identry(
&mut self,
@ -313,7 +337,9 @@ pub trait IdlArcSqliteTransaction {
fn get_db_d_uuid(&self) -> Result<Option<Uuid>, OperationError>;
fn verify(&self) -> Vec<Result<(), ConsistencyError>>;
fn verify(&self, audit: &mut AuditScope) -> Vec<Result<(), ConsistencyError>>;
fn is_dirty(&self) -> bool;
fn name2uuid(
&mut self,
@ -378,8 +404,12 @@ impl<'a> IdlArcSqliteTransaction for IdlArcSqliteReadTransaction<'a> {
self.db.get_db_d_uuid()
}
fn verify(&self) -> Vec<Result<(), ConsistencyError>> {
self.db.verify()
fn verify(&self, audit: &mut AuditScope) -> Vec<Result<(), ConsistencyError>> {
verify!(self, audit)
}
fn is_dirty(&self) -> bool {
false
}
fn name2uuid(
@ -451,8 +481,12 @@ impl<'a> IdlArcSqliteTransaction for IdlArcSqliteWriteTransaction<'a> {
self.db.get_db_d_uuid()
}
fn verify(&self) -> Vec<Result<(), ConsistencyError>> {
self.db.verify()
fn verify(&self, audit: &mut AuditScope) -> Vec<Result<(), ConsistencyError>> {
verify!(self, audit)
}
fn is_dirty(&self) -> bool {
self.entry_cache.is_dirty()
}
fn name2uuid(
@ -804,6 +838,9 @@ impl<'a> IdlArcSqliteWriteTransaction<'a> {
pub unsafe fn purge_id2entry(&mut self, audit: &mut AuditScope) -> Result<(), OperationError> {
self.db.purge_id2entry(audit).map(|()| {
let mut ids = IDLBitRange::new();
ids.compress();
std::mem::swap(self.allids.deref_mut(), &mut ids);
self.entry_cache.clear();
})
}

View file

@ -411,6 +411,40 @@ pub trait IdlSqliteTransaction {
})
}
fn get_allids(&self, au: &mut AuditScope) -> Result<IDLBitRange, OperationError> {
ltrace!(au, "Building allids...");
let mut stmt = self
.get_conn()
.prepare("SELECT id FROM id2entry")
.map_err(|e| {
ladmin_error!(au, "SQLite Error {:?}", e);
OperationError::SqliteError
})?;
let res = stmt.query_map([], |row| row.get(0)).map_err(|e| {
ladmin_error!(au, "SQLite Error {:?}", e);
OperationError::SqliteError
})?;
let mut ids: Result<IDLBitRange, _> = res
.map(|v| {
v.map_err(|e| {
ladmin_error!(au, "SQLite Error {:?}", e);
OperationError::SqliteError
})
.and_then(|id: i64| {
// Convert the idsqlite to id raw
id.try_into().map_err(|e| {
ladmin_error!(au, "I64 Parse Error {:?}", e);
OperationError::SqliteError
})
})
})
.collect();
if let Ok(i) = &mut ids {
i.compress()
}
ids
}
// This allow is critical as it resolves a life time issue in stmt.
#[allow(clippy::let_and_return)]
fn verify(&self) -> Vec<Result<(), ConsistencyError>> {
@ -1093,32 +1127,6 @@ impl IdlSqliteWriteTransaction {
})
}
pub(crate) fn get_allids(&self, au: &mut AuditScope) -> Result<IDLBitRange, OperationError> {
ltrace!(au, "Building allids...");
let mut stmt = self.conn.prepare("SELECT id FROM id2entry").map_err(|e| {
ladmin_error!(au, "SQLite Error {:?}", e);
OperationError::SqliteError
})?;
let res = stmt.query_map([], |row| row.get(0)).map_err(|e| {
ladmin_error!(au, "SQLite Error {:?}", e);
OperationError::SqliteError
})?;
res.map(|v| {
v.map_err(|e| {
ladmin_error!(au, "SQLite Error {:?}", e);
OperationError::SqliteError
})
.and_then(|id: i64| {
// Convert the idsqlite to id raw
id.try_into().map_err(|e| {
ladmin_error!(au, "I64 Parse Error {:?}", e);
OperationError::SqliteError
})
})
})
.collect()
}
pub fn setup(&self, audit: &mut AuditScope) -> Result<(), OperationError> {
// This stores versions of components. For example:
// ----------------------

View file

@ -676,9 +676,107 @@ pub trait BackendTransaction {
}) // end audit segment
}
fn verify(&self) -> Vec<Result<(), ConsistencyError>> {
// Vec::new()
self.get_idlayer().verify()
fn verify(&self, audit: &mut AuditScope) -> Vec<Result<(), ConsistencyError>> {
self.get_idlayer().verify(audit)
}
fn verify_entry_index(
&self,
audit: &mut AuditScope,
e: &Entry<EntrySealed, EntryCommitted>,
) -> Result<(), ConsistencyError> {
// First, check our references in name2uuid, uuid2spn and uuid2rdn
if e.mask_recycled_ts().is_some() {
let e_uuid = e.get_uuid();
// We only check these on live entries.
let (n2u_add, n2u_rem) = Entry::idx_name2uuid_diff(None, Some(&e));
let n2u_set = match (n2u_add, n2u_rem) {
(Some(set), None) => set,
(_, _) => {
ladmin_error!(audit, "Invalid idx_name2uuid_diff state");
return Err(ConsistencyError::BackendIndexSync);
}
};
// If the set.len > 1, check each item.
n2u_set.iter().try_for_each(|name| {
match self.get_idlayer().name2uuid(audit, name) {
Ok(Some(idx_uuid)) => {
if &idx_uuid == e_uuid {
Ok(())
} else {
ladmin_error!(
audit,
"Invalid name2uuid state -> incorrect uuid association"
);
return Err(ConsistencyError::BackendIndexSync);
}
}
r => {
ladmin_error!(audit, "Invalid name2uuid state -> {:?}", r);
return Err(ConsistencyError::BackendIndexSync);
}
}
})?;
let spn = e.get_uuid2spn();
match self.get_idlayer().uuid2spn(audit, &e_uuid) {
Ok(Some(idx_spn)) => {
if spn != idx_spn {
ladmin_error!(audit, "Invalid uuid2spn state -> incorrect idx spn value");
return Err(ConsistencyError::BackendIndexSync);
}
}
r => {
ladmin_error!(audit, "Invalid uuid2spn state -> {:?}", r);
return Err(ConsistencyError::BackendIndexSync);
}
};
let rdn = e.get_uuid2rdn();
match self.get_idlayer().uuid2rdn(audit, &e_uuid) {
Ok(Some(idx_rdn)) => {
if rdn != idx_rdn {
ladmin_error!(audit, "Invalid uuid2rdn state -> incorrect idx rdn value");
return Err(ConsistencyError::BackendIndexSync);
}
}
r => {
ladmin_error!(audit, "Invalid uuid2rdn state -> {:?}", r);
return Err(ConsistencyError::BackendIndexSync);
}
};
}
// Check the other entry:attr indexes are valid
//
// This is acutally pretty hard to check, because we can check a value *should*
// exist, but not that a value should NOT be present in the index. Thought needed ...
// Got here? Ok!
Ok(())
}
fn verify_indexes(&self, audit: &mut AuditScope) -> Vec<Result<(), ConsistencyError>> {
let idl = IDL::ALLIDS;
let entries = match self.get_idlayer().get_identry(audit, &idl) {
Ok(s) => s,
Err(e) => {
ladmin_error!(audit, "get_identry failure {:?}", e);
return vec![Err(ConsistencyError::Unknown)];
}
};
let r = entries
.iter()
.try_for_each(|e| self.verify_entry_index(audit, e));
if r.is_err() {
vec![r]
} else {
Vec::new()
}
}
fn backup(&self, audit: &mut AuditScope, dst_path: &str) -> Result<(), OperationError> {
@ -1278,7 +1376,7 @@ impl<'a> BackendWriteTransaction<'a> {
// Reindex now we are loaded.
self.reindex(audit)?;
let vr = self.verify();
let vr = self.verify(audit);
if vr.is_empty() {
Ok(())
} else {

View file

@ -921,7 +921,7 @@ impl Entry<EntrySealed, EntryCommitted> {
}
#[inline]
fn get_uuid2rdn(&self) -> String {
pub(crate) fn get_uuid2rdn(&self) -> String {
self.attrs
.get("spn")
.and_then(|vs| {

View file

@ -40,7 +40,8 @@ impl ReferentialIntegrity {
.to_ref_uuid()
.map(|uuid| f_eq("uuid", PartialValue::new_uuid(*uuid)))
.ok_or_else(|| {
ladmin_error!(au, "ref value could not convert to reference uuid");
ladmin_error!(au, "reference value could not convert to reference uuid.");
ladmin_error!(au, "If you are sure the name/uuid/spn exist, and that this is in error, you should run a verify task.");
OperationError::InvalidAttribute(
"uuid could not become reference value".to_string(),
)

View file

@ -758,7 +758,7 @@ impl<'a> QueryServerReadTransaction<'a> {
// If we fail after backend, we need to return NOW because we can't
// assert any other faith in the DB states.
// * backend
let be_errs = self.get_be_txn().verify();
let be_errs = self.get_be_txn().verify(au);
if !be_errs.is_empty() {
return be_errs;
@ -772,14 +772,11 @@ impl<'a> QueryServerReadTransaction<'a> {
}
// * Indexing (req be + sch )
/*
idx_errs = self.get_be_txn()
.verify_indexes();
let idx_errs = self.get_be_txn().verify_indexes(au);
if !idx_errs.is_empty() {
return idx_errs;
}
*/
// Ok BE passed, lets move on to the content.
// Most of our checks are in the plugins, so we let them