Add tombstone lifecycle functionaly

This commit is contained in:
William Brown 2019-02-22 16:15:48 +10:00
parent dc0d37d701
commit ea5af4f369
11 changed files with 386 additions and 13 deletions

View file

@ -1,3 +1,6 @@
pub static PURGE_TIMEOUT: u64 = 3600;
pub static UUID_ADMIN: &'static str = "00000000-0000-0000-0000-000000000000";
pub static UUID_ANONYMOUS: &'static str = "00000000-0000-0000-0000-ffffffffffff";
@ -20,7 +23,6 @@ pub static JSON_SYSTEM_INFO_V1: &'static str = r#"{
"state": null,
"attrs": {
"class": ["object", "system_info"],
"name": ["system_info"],
"uuid": ["00000000-0000-0000-0000-ffffff000001"],
"description": ["System info and metadata object."],
"version": ["1"],
@ -50,6 +52,10 @@ pub static UUID_SCHEMA_CLASS_OBJECT: &'static str = "579bb16d-1d85-4f8e-bb3b-6fc
pub static UUID_SCHEMA_CLASS_EXTENSIBLEOBJECT: &'static str =
"0fb2171d-372b-4d0d-9194-9a4d6846c324";
pub static UUID_SCHEMA_CLASS_RECYCLED: &'static str = "813bb7e3-dadf-413d-acc4-197b03d55a4f";
pub static UUID_SCHEMA_CLASS_TOMBSTONE: &'static str = "848a1224-0c3c-465f-abd0-10a32e21830e";
// system supplementary
pub static UUID_SCHEMA_ATTR_DISPLAYNAME: &'static str = "201bc966-954b-48f5-bf25-99ffed759861";
pub static UUID_SCHEMA_ATTR_MAIL: &'static str = "fae94676-720b-461b-9438-bfe8cfd7e6cd";

View file

@ -4,6 +4,7 @@ use actix_web::{
error, http, middleware, App, AsyncResponder, Error, FutureResponse, HttpMessage, HttpRequest,
HttpResponse, Path, Result, State,
};
use actix::Actor;
use bytes::BytesMut;
use futures::{future, Future, Stream};
@ -17,8 +18,10 @@ use super::log;
use super::proto_v1::{
AuthRequest, AuthResponse, CreateRequest, DeleteRequest, ModifyRequest, SearchRequest,
};
use super::interval::IntervalActor;
use super::server;
struct AppState {
qe: actix::Addr<server::QueryServer>,
max_size: usize,
@ -213,6 +216,10 @@ pub fn create_server_core(config: Configuration) {
// Start the query server with the given be path: future config
let server_addr = server::start(log_addr.clone(), config.db_path.as_str(), config.threads);
// Setup timed events
let _int_addr = IntervalActor::new(server_addr.clone()).start();
// Copy the max size
let max_size = config.maximum_request;

View file

@ -2,7 +2,7 @@ use super::filter::{Filter, FilterInvalid};
use super::proto_v1::Entry as ProtoEntry;
use super::proto_v1::{
AuthRequest, AuthResponse, AuthStatus, CreateRequest, DeleteRequest, ModifyRequest, Response,
SearchRequest, SearchResponse,
SearchRequest, SearchResponse
};
use actix::prelude::*;
use entry::{Entry, EntryCommitted, EntryInvalid, EntryNew, EntryValid};
@ -72,7 +72,21 @@ impl SearchEvent {
pub fn from_request(request: SearchRequest) -> Self {
SearchEvent {
internal: false,
filter: Filter::from(&request.filter),
filter: Filter::And(vec![
Filter::AndNot(Box::new(
Filter::Or(vec![
Filter::Eq(
"class".to_string(),
"tombstone".to_string(),
),
Filter::Eq(
"class".to_string(),
"recycled".to_string(),
)
])
)),
Filter::from(&request.filter)
]),
class: (),
}
}
@ -80,7 +94,21 @@ impl SearchEvent {
pub fn new_impersonate(filter: Filter<FilterInvalid>) -> Self {
SearchEvent {
internal: false,
filter: filter,
filter: Filter::And(vec![
Filter::AndNot(Box::new(
Filter::Or(vec![
Filter::Eq(
"class".to_string(),
"tombstone".to_string(),
),
Filter::Eq(
"class".to_string(),
"recycled".to_string(),
)
])
)),
filter
]),
class: (),
}
}
@ -171,7 +199,10 @@ impl Message for DeleteEvent {
impl DeleteEvent {
pub fn from_request(request: DeleteRequest) -> Self {
unimplemented!()
DeleteEvent {
filter: Filter::from(&request.filter),
internal: false,
}
}
#[cfg(test)]
@ -203,7 +234,25 @@ impl Message for ModifyEvent {
impl ModifyEvent {
pub fn from_request(request: ModifyRequest) -> Self {
unimplemented!()
ModifyEvent {
filter: Filter::And(vec![
Filter::AndNot(Box::new(
Filter::Or(vec![
Filter::Eq(
"class".to_string(),
"tombstone".to_string(),
),
Filter::Eq(
"class".to_string(),
"recycled".to_string(),
)
])
)),
Filter::from(&request.filter)
]),
modlist: ModifyList::from(&request.modlist),
internal: false
}
}
#[cfg(test)]
@ -246,3 +295,17 @@ impl AuthResult {
}
}
}
#[derive(Debug)]
pub struct PurgeEvent {}
impl Message for PurgeEvent {
type Result = ();
}
impl PurgeEvent {
pub fn new() -> Self {
PurgeEvent {}
}
}

38
src/lib/interval.rs Normal file
View file

@ -0,0 +1,38 @@
use std::time::Duration;
use actix::prelude::*;
use server::QueryServer;
use event::PurgeEvent;
use constants::PURGE_TIMEOUT;
pub struct IntervalActor {
// Store any addresses we require
server: actix::Addr<QueryServer>,
}
impl IntervalActor {
pub fn new(server: actix::Addr<QueryServer>) -> Self {
IntervalActor {
server: server,
}
}
// Define new events here
fn purge_tombstones(&mut self) {
// Make a purge request ...
let pe = PurgeEvent::new();
self.server.do_send(pe)
}
}
impl Actor for IntervalActor {
type Context = actix::Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.run_interval(Duration::from_secs(PURGE_TIMEOUT), move |act, _ctx| {
act.purge_tombstones();
});
}
}

View file

@ -39,6 +39,7 @@ mod constants;
mod entry;
mod event;
mod identity;
mod interval;
mod modify;
mod plugins;
mod schema;

View file

@ -1,3 +1,6 @@
use proto_v1::ModifyList as ProtoModifyList;
use proto_v1::Modify as ProtoModify;
#[derive(Serialize, Deserialize, Debug)]
pub enum Modify {
// This value *should* exist.
@ -8,6 +11,22 @@ pub enum Modify {
Purged(String),
}
impl Modify {
pub fn from(m: &ProtoModify) -> Self {
match m {
ProtoModify::Present(a, v) => {
Modify::Present(a.clone(), v.clone())
}
ProtoModify::Removed(a, v) => {
Modify::Removed(a.clone(), v.clone())
}
ProtoModify::Purged(a) => {
Modify::Purged(a.clone())
}
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ModifyList {
// And ordered list of changes to apply. Should this be state based?
@ -30,4 +49,16 @@ impl ModifyList {
pub fn len(&self) -> usize {
self.mods.len()
}
pub fn from(ml: &ProtoModifyList) -> Self {
// For each ProtoModify, do a from.
ModifyList {
mods: ml.mods.iter()
.map(|pm| {
Modify::from(pm)
})
.collect()
}
}
}

View file

@ -7,6 +7,7 @@ use schema::{SchemaTransaction, SchemaWriteTransaction};
mod base;
mod protected;
mod recycle;
trait Plugin {
fn id() -> &'static str;

View file

View file

@ -27,6 +27,25 @@ pub enum Filter {
AndNot(Box<Filter>),
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Modify {
Present(String, String),
Removed(String, String),
Purged(String),
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ModifyList {
pub mods: Vec<Modify>,
}
impl ModifyList {
pub fn new_list(mods: Vec<Modify>) -> Self {
ModifyList { mods: mods }
}
}
// FIXME: Do I need proto filter?
// Probably yes, don't be shit william.
@ -88,6 +107,17 @@ impl DeleteRequest {
#[derive(Debug, Serialize, Deserialize)]
pub struct ModifyRequest {
// Probably needs a modlist?
pub filter: Filter,
pub modlist: ModifyList,
}
impl ModifyRequest {
pub fn new(filter: Filter, modlist: ModifyList) -> Self {
ModifyRequest {
filter: filter,
modlist: modlist,
}
}
}
// Login is a multi-step process potentially. First the client says who they

View file

@ -582,7 +582,7 @@ impl SchemaInner {
may: vec![],
systemmust: vec![
String::from("class"),
String::from("name"),
// String::from("name"),
String::from("uuid"),
],
must: vec![],
@ -593,13 +593,41 @@ impl SchemaInner {
SchemaClass {
name: String::from("extensibleobject"),
uuid: Uuid::parse_str(UUID_SCHEMA_CLASS_EXTENSIBLEOBJECT).unwrap(),
description: String::from("A class type that turns off all rules ..."),
description: String::from("A class type that has green hair and turns off all rules ..."),
systemmay: vec![],
may: vec![],
systemmust: vec![],
must: vec![],
},
);
/* These two classes are core to the entry lifecycle for recycling and tombstoning */
s.classes.insert(
String::from("recycled"),
SchemaClass {
name: String::from("recycled"),
uuid: Uuid::parse_str(UUID_SCHEMA_CLASS_RECYCLED).unwrap(),
description: String::from("An object that has been deleted, but still recoverable via the revive operation. Recycled objects are not modifiable, only revivable."),
systemmay: vec![],
may: vec![],
systemmust: vec![],
must: vec![],
},
);
s.classes.insert(
String::from("tombstone"),
SchemaClass {
name: String::from("tombstone"),
uuid: Uuid::parse_str(UUID_SCHEMA_CLASS_TOMBSTONE).unwrap(),
description: String::from("An object that is purged from the recycle bin. This is a system internal state. Tombstones have no attributes beside UUID."),
systemmay: vec![],
may: vec![],
systemmust: vec![
String::from("class"),
String::from("uuid"),
],
must: vec![],
},
);
match s.validate(&mut au) {
Ok(_) => Ok(s),
@ -753,7 +781,10 @@ impl SchemaInner {
// String::from("gidnumber"),
],
may: vec![],
systemmust: vec![String::from("displayname")],
systemmust: vec![
String::from("displayname"),
String::from("name")
],
must: vec![],
},
);
@ -769,7 +800,10 @@ impl SchemaInner {
// String::from("password"),
],
may: vec![],
systemmust: vec![String::from("displayname")],
systemmust: vec![
String::from("displayname"),
String::from("name")
],
must: vec![],
},
);
@ -784,7 +818,9 @@ impl SchemaInner {
// String::from("gidnumber"),
],
may: vec![],
systemmust: vec![],
systemmust: vec![
String::from("name"),
],
must: vec![],
},
);

View file

@ -14,7 +14,7 @@ use entry::{Entry, EntryCommitted, EntryInvalid, EntryNew, EntryValid};
use error::{OperationError, SchemaError};
use event::{
AuthEvent, AuthResult, CreateEvent, DeleteEvent, ExistsEvent, ModifyEvent, OpResult,
SearchEvent, SearchResult,
SearchEvent, SearchResult, PurgeEvent,
};
use filter::{Filter, FilterInvalid};
use log::EventLog;
@ -399,10 +399,14 @@ impl<'a> QueryServerWriteTransaction<'a> {
.for_each(|cand| audit_log!(au, "delete: intent candidate {:?}", cand));
// Now, delete only what you can see
// TODO: Delete actually just modifies these to add class -> recycled
let mut audit_be = AuditScope::new("backend_delete");
let res = self
.be_txn
// Change this to an update, not delete.
.delete(&mut audit_be, &pre_candidates)
.map(|_| ())
.map_err(|e| match e {
@ -424,6 +428,59 @@ impl<'a> QueryServerWriteTransaction<'a> {
res
}
pub fn purge_tombstones(&self, au: &mut AuditScope) -> Result<(), OperationError> {
// delete everything that is a tombstone.
// Search for tombstones
let ts = match self.internal_search(
au,
Filter::Eq("class".to_string(), "tombstone".to_string())
) {
Ok(r) => {
r
}
Err(e) => {
return Err(e)
}
};
// TODO: Has an appropriate amount of time/condition past (ie replication events?)
// Delete them
let mut audit_be = AuditScope::new("backend_delete");
let res = self
.be_txn
// Change this to an update, not delete.
.delete(&mut audit_be, &ts)
.map(|_| ())
.map_err(|e| match e {
BackendError::EmptyRequest => OperationError::EmptyRequest,
BackendError::EntryMissingId => OperationError::InvalidRequestState,
});
au.append_scope(audit_be);
if res.is_err() {
// be_txn is dropped, ie aborted here.
audit_log!(au, "Tombstone purge operation failed (backend), {:?}", res);
return res;
}
// Send result
audit_log!(au, "Tombstone purge operation success");
res
}
pub fn purge_recycle(&self) -> Result<(), OperationError> {
// Send everything that is recycled to tombstone
unimplemented!()
}
pub fn revive_recycled(&self) -> Result<(), OperationError> {
// Revive an entry to live.
unimplemented!()
}
pub fn modify(&self, au: &mut AuditScope, me: &ModifyEvent) -> Result<(), OperationError> {
// Get the candidates.
// Modify applies a modlist to a filter, so we need to internal search
@ -747,6 +804,8 @@ impl Actor for QueryServer {
ctx.set_mailbox_capacity(1 << 31);
}
*/
}
// The server only recieves "Event" structures, which
@ -868,6 +927,29 @@ impl Handler<AuthEvent> for QueryServer {
}
}
impl Handler<PurgeEvent> for QueryServer {
type Result = ();
fn handle(&mut self, msg: PurgeEvent, _: &mut Self::Context) -> Self::Result {
let mut audit = AuditScope::new("purge tombstones");
let res = audit_segment!(&mut audit, || {
audit_log!(audit, "Begin purge tombstone event {:?}", msg);
let qs_write = self.write();
let res = match qs_write.purge_tombstones(&mut audit) {
Ok(()) => {
qs_write.commit(&mut audit);
Ok(OpResult {})
}
Err(e) => Err(e),
};
audit_log!(audit, "Purge tombstones result: {:?}", res);
});
// At the end of the event we send it for logging.
self.log.do_send(audit);
}
}
// Auth requests? How do we structure these ...
#[cfg(test)]
@ -892,7 +974,9 @@ mod tests {
use super::super::modify::{Modify, ModifyList};
use super::super::proto_v1::Entry as ProtoEntry;
use super::super::proto_v1::Filter as ProtoFilter;
use super::super::proto_v1::{CreateRequest, SearchRequest};
use super::super::proto_v1::{CreateRequest, SearchRequest, DeleteRequest, ModifyRequest};
use super::super::proto_v1::Modify as ProtoModify;
use super::super::proto_v1::ModifyList as ProtoModifyList;
use super::super::schema::Schema;
use super::super::server::{
QueryServer, QueryServerReadTransaction, QueryServerWriteTransaction,
@ -1265,4 +1349,80 @@ mod tests {
future::ok(())
})
}
#[test]
fn test_qs_tombstone() {
run_test!(|_log, mut server: QueryServer, audit: &mut AuditScope| {
let mut server_txn = server.write();
let filt_ts = ProtoFilter::Eq(
String::from("class"),
String::from("tombstone")
);
let filt_i_ts = Filter::Eq(
String::from("class"),
String::from("tombstone")
);
// Create fake external requests. Probably from admin later
let me_ts = ModifyEvent::from_request(
ModifyRequest::new(
filt_ts.clone(),
ProtoModifyList::new_list(vec![
ProtoModify::Present(String::from("class"), String::from("tombstone")),
]),
)
);
let de_ts = DeleteEvent::from_request(DeleteRequest::new(filt_ts.clone()));
let se_ts = SearchEvent::from_request(SearchRequest::new(filt_ts.clone()));
// First, create a tombstone
let e_ts: Entry<EntryInvalid, EntryNew> = serde_json::from_str(
r#"{
"valid": null,
"state": null,
"attrs": {
"class": ["tombstone", "object"],
"uuid": ["9557f49c-97a5-4277-a9a5-097d17eb8317"]
}
}"#,
)
.unwrap();
let ce = CreateEvent::from_vec(vec![e_ts]);
let cr = server_txn.create(audit, &ce);
assert!(cr.is_ok());
// Can it be seen (external search)
let r1 = server_txn.search(audit, &se_ts).unwrap();
assert!(r1.len() == 0);
// Can it be deleted (external delete)
// Should be err-no candidates.
assert!(server_txn.delete(audit, &de_ts).is_err());
// Can it be modified? (external modify)
// Should be err-no candidates
assert!(server_txn.modify(audit, &me_ts).is_err());
// Can it be seen (internal search)
// Internal search should see it.
let r2 = server_txn.internal_search(audit, filt_i_ts.clone()).unwrap();
assert!(r2.len() == 1);
// Now purge
assert!(server_txn.purge_tombstones(audit).is_ok());
// Assert it's gone
// Internal search should not see it.
let r3 = server_txn.internal_search(audit, filt_i_ts).unwrap();
assert!(r3.len() == 0);
assert!(server_txn.commit(audit).is_ok());
future::ok(())
})
}
}