Add interval for purge recycled

This commit is contained in:
William Brown 2019-02-25 19:36:53 +10:00
parent 9c0aaa072a
commit 6c283164cb
3 changed files with 52 additions and 9 deletions

View file

@ -284,15 +284,28 @@ impl AuthResult {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct PurgeEvent {} pub struct PurgeTombstoneEvent {}
impl Message for PurgeEvent { impl Message for PurgeTombstoneEvent {
type Result = (); type Result = ();
} }
impl PurgeEvent { impl PurgeTombstoneEvent {
pub fn new() -> Self { pub fn new() -> Self {
PurgeEvent {} PurgeTombstoneEvent {}
}
}
#[derive(Debug)]
pub struct PurgeRecycledEvent {}
impl Message for PurgeRecycledEvent {
type Result = ();
}
impl PurgeRecycledEvent {
pub fn new() -> Self {
PurgeRecycledEvent {}
} }
} }

View file

@ -2,7 +2,7 @@ use actix::prelude::*;
use std::time::Duration; use std::time::Duration;
use constants::PURGE_TIMEOUT; use constants::PURGE_TIMEOUT;
use event::PurgeEvent; use event::{PurgeTombstoneEvent, PurgeRecycledEvent};
use server::QueryServer; use server::QueryServer;
pub struct IntervalActor { pub struct IntervalActor {
@ -18,7 +18,12 @@ impl IntervalActor {
// Define new events here // Define new events here
fn purge_tombstones(&mut self) { fn purge_tombstones(&mut self) {
// Make a purge request ... // Make a purge request ...
let pe = PurgeEvent::new(); let pe = PurgeTombstoneEvent::new();
self.server.do_send(pe)
}
fn purge_recycled(&mut self) {
let pe = PurgeRecycledEvent::new();
self.server.do_send(pe) self.server.do_send(pe)
} }
} }
@ -27,6 +32,10 @@ impl Actor for IntervalActor {
type Context = actix::Context<Self>; type Context = actix::Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
// TODO: This timeout could be configurable from config?
ctx.run_interval(Duration::from_secs(PURGE_TIMEOUT), move |act, _ctx| {
act.purge_recycled();
});
ctx.run_interval(Duration::from_secs(PURGE_TIMEOUT), move |act, _ctx| { ctx.run_interval(Duration::from_secs(PURGE_TIMEOUT), move |act, _ctx| {
act.purge_tombstones(); act.purge_tombstones();
}); });

View file

@ -14,7 +14,7 @@ use entry::{Entry, EntryCommitted, EntryInvalid, EntryNew, EntryValid};
use error::{OperationError, SchemaError}; use error::{OperationError, SchemaError};
use event::{ use event::{
AuthEvent, AuthResult, CreateEvent, DeleteEvent, ExistsEvent, ModifyEvent, OpResult, AuthEvent, AuthResult, CreateEvent, DeleteEvent, ExistsEvent, ModifyEvent, OpResult,
PurgeEvent, ReviveRecycledEvent, SearchEvent, SearchResult, PurgeTombstoneEvent, PurgeRecycledEvent, ReviveRecycledEvent, SearchEvent, SearchResult,
}; };
use filter::{Filter, FilterInvalid}; use filter::{Filter, FilterInvalid};
use log::EventLog; use log::EventLog;
@ -1052,10 +1052,10 @@ impl Handler<AuthEvent> for QueryServer {
} }
} }
impl Handler<PurgeEvent> for QueryServer { impl Handler<PurgeTombstoneEvent> for QueryServer {
type Result = (); type Result = ();
fn handle(&mut self, msg: PurgeEvent, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: PurgeTombstoneEvent, _: &mut Self::Context) -> Self::Result {
let mut audit = AuditScope::new("purge tombstones"); let mut audit = AuditScope::new("purge tombstones");
let res = audit_segment!(&mut audit, || { let res = audit_segment!(&mut audit, || {
audit_log!(audit, "Begin purge tombstone event {:?}", msg); audit_log!(audit, "Begin purge tombstone event {:?}", msg);
@ -1073,6 +1073,27 @@ impl Handler<PurgeEvent> for QueryServer {
} }
} }
impl Handler<PurgeRecycledEvent> for QueryServer {
type Result = ();
fn handle(&mut self, msg: PurgeRecycledEvent, _: &mut Self::Context) -> Self::Result {
let mut audit = AuditScope::new("purge recycled");
let res = audit_segment!(&mut audit, || {
audit_log!(audit, "Begin purge recycled event {:?}", msg);
let qs_write = self.write();
let res = qs_write
.purge_recycled(&mut audit)
.map(|_| qs_write.commit(&mut audit).map(|_| OpResult {}));
audit_log!(audit, "Purge recycled result: {:?}", res);
res.expect("Invalid Server State");
});
// At the end of the event we send it for logging.
self.log.do_send(audit);
res
}
}
// Auth requests? How do we structure these ... // Auth requests? How do we structure these ...
#[cfg(test)] #[cfg(test)]