diff --git a/src/be/mod.rs b/src/be/mod.rs index e8f418e7c..2479ea7ae 100644 --- a/src/be/mod.rs +++ b/src/be/mod.rs @@ -1,15 +1,16 @@ //! Db executor actor use actix::prelude::*; -use serde_json; -use r2d2_sqlite::SqliteConnectionManager; use r2d2::Pool; -use rusqlite::NO_PARAMS; +use r2d2_sqlite::SqliteConnectionManager; use rusqlite::types::ToSql; +use rusqlite::NO_PARAMS; +use serde_json; // use uuid; -use super::log::EventLog; use super::entry::Entry; +use super::filter::Filter; +use super::log::EventLog; mod idl; mod mem_be; @@ -35,7 +36,7 @@ impl BackendAuditEvent { #[derive(Debug)] struct IdEntry { id: i32, - data: String + data: String, } pub enum BackendType { @@ -45,7 +46,7 @@ pub enum BackendType { #[derive(Debug, PartialEq)] pub enum BackendError { - EmptyRequest + EmptyRequest, } pub struct Backend { @@ -55,16 +56,18 @@ pub struct Backend { // In the future this will do the routing betwene the chosen backends etc. impl Backend { - pub fn new( - log: actix::Addr, - path: &str, - ) -> Self { + pub fn new(log: actix::Addr, path: &str) -> Self { // this has a ::memory() type, but will path == "" work? let manager = SqliteConnectionManager::file(path); - let pool = Pool::builder() - // Look at max_size and thread_pool here for perf later - .build(manager) - .expect("Failed to create pool"); + let builder1 = Pool::builder(); + let builder2 = if path == "" { + builder1.max_size(1) + } else { + // FIXME: Make this configurable + builder1.max_size(8) + }; + // Look at max_size and thread_pool here for perf later + let pool = builder2.build(manager).expect("Failed to create pool"); { let conn = pool.get().unwrap(); @@ -77,8 +80,9 @@ impl Backend { id INTEGER PRIMARY KEY ASC, data TEXT NOT NULL ) - ", NO_PARAMS - ).unwrap(); + ", + NO_PARAMS, + ).unwrap(); // Create a version table for migration indication @@ -101,17 +105,19 @@ impl Backend { if entries.is_empty() { // TODO: Better error // End the timer - return Err(BackendError::EmptyRequest) + return Err(BackendError::EmptyRequest); } // Turn all the entries into relevent json/cbor types // we do this outside the txn to avoid blocking needlessly. // However, it could be pointless due to the extra string allocs ... - let ser_entries: Vec = entries.iter().map(|val| { - // TODO: Should we do better than unwrap? - serde_json::to_string(&val).unwrap() - }).collect(); + let ser_entries: Vec = entries + .iter() + .map(|val| { + // TODO: Should we do better than unwrap? + serde_json::to_string(&val).unwrap() + }).collect(); log_event!(self.log, "serialising: {:?}", ser_entries); @@ -121,22 +127,14 @@ impl Backend { // Start a txn conn.execute("BEGIN TRANSACTION", NO_PARAMS).unwrap(); + // write them all for ser_entry in ser_entries { - conn.execute("INSERT INTO id2entry (data) VALUES (?1)", &[&ser_entry as &ToSql]).unwrap(); + conn.execute( + "INSERT INTO id2entry (data) VALUES (?1)", + &[&ser_entry as &ToSql], + ).unwrap(); } - // write them all - let mut stmt = conn.prepare("SELECT id, data FROM id2entry").unwrap(); - let id2entry_iter = stmt - .query_map(NO_PARAMS, |row| - IdEntry { - id: row.get(0), - data: row.get(1) - } - ).unwrap(); - for row in id2entry_iter { - println!("{:?}", row); - } // TODO: update indexes (as needed) // Commit the txn conn.execute("COMMIT TRANSACTION", NO_PARAMS).unwrap(); @@ -148,14 +146,53 @@ impl Backend { } // Take filter, and AuditEvent ref? - pub fn search() { + pub fn search(&self, filter: Filter) -> Vec { + // Do things + // Alloc a vec for the entries. + // FIXME: Make this actually a good size for the result set ... + // FIXME: Actually compute indexes here. + let mut raw_entries: Vec = Vec::new(); + { + // Actually do a search now! + let conn = self.pool.get().unwrap(); + // Start a txn + conn.execute("BEGIN TRANSACTION", NO_PARAMS).unwrap(); + + // read them all + let mut stmt = conn.prepare("SELECT id, data FROM id2entry").unwrap(); + let id2entry_iter = stmt + .query_map(NO_PARAMS, |row| IdEntry { + id: row.get(0), + data: row.get(1), + }).unwrap(); + for row in id2entry_iter { + println!("{:?}", row); + // FIXME: Handle this properly. + raw_entries.push(row.unwrap().data); + } + // Rollback, we should have done nothing. + conn.execute("ROLLBACK TRANSACTION", NO_PARAMS).unwrap(); + } + // Do other things + // Now, de-serialise the raw_entries back to entries + let entries: Vec = raw_entries + .iter() + .map(|val| { + // TODO: Should we do better than unwrap? + let e = serde_json::from_str(val.as_str()).unwrap(); + if filter.entry_match_no_index(e) { + Some(e) + } else { + None + } + }).collect(); + + entries } - pub fn modify() { - } + pub fn modify() {} - pub fn delete() { - } + pub fn delete() {} } impl Clone for Backend { @@ -176,14 +213,14 @@ mod tests { use actix::prelude::*; extern crate futures; - use futures::future::Future; - use futures::future::lazy; use futures::future; + use futures::future::lazy; + use futures::future::Future; extern crate tokio; - use super::super::log::{self, EventLog, LogEvent}; use super::super::entry::Entry; + use super::super::log::{self, EventLog, LogEvent}; use super::{Backend, BackendError}; macro_rules! run_test { @@ -191,23 +228,21 @@ mod tests { System::run(|| { let test_log = log::start(); - let mut be = Backend::new(test_log.clone(), "/tmp/test.db"); + let mut be = Backend::new(test_log.clone(), ""); // Could wrap another future here for the future::ok bit... let fut = $test_fn(test_log, be); - let comp_fut = fut.map_err(|()| ()) - .and_then(|r| { - println!("Stopping actix ..."); - actix::System::current().stop(); - future::result(Ok(())) - }); + let comp_fut = fut.map_err(|()| ()).and_then(|r| { + println!("Stopping actix ..."); + actix::System::current().stop(); + future::result(Ok(())) + }); tokio::spawn(comp_fut); }); }}; } - #[test] fn test_simple_create() { run_test!(|log: actix::Addr, mut be: Backend| { @@ -217,17 +252,24 @@ mod tests { log_event!(log, "{:?}", empty_result); assert_eq!(empty_result, Err(BackendError::EmptyRequest)); - let mut e: Entry = Entry::new(); - e.add_ava(String::from("userid"), String::from("william")).unwrap(); + e.add_ava(String::from("userid"), String::from("william")) + .unwrap(); assert!(e.validate()); - let single_result = be.create(vec![ - e - ]); + let single_result = be.create(vec![e]); assert!(single_result.is_ok()); + let entries = be.search(); + println!("{:?}", entries); + + // There should only be one entry so is this enough? + assert!(entries.first().is_some()); + // Later we could check consistency of the entry saved ... + + // Check it's there + future::ok(()) }); } diff --git a/src/be/sqlite_be/mod.rs b/src/be/sqlite_be/mod.rs index 7853bb748..d3b241b48 100644 --- a/src/be/sqlite_be/mod.rs +++ b/src/be/sqlite_be/mod.rs @@ -3,4 +3,3 @@ // need a way to add an index // need a way to do filters // need a way to manage idls - diff --git a/src/entry.rs b/src/entry.rs index 99eac90ea..f96815168 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -36,7 +36,7 @@ pub struct Entry { impl Entry { pub fn new() -> Self { Entry { - attrs: BTreeMap::new() + attrs: BTreeMap::new(), } } @@ -44,13 +44,12 @@ impl Entry { // a list of syntax violations ... pub fn add_ava(&mut self, attr: String, value: String) -> Result<(), ()> { // get_mut to access value - self.attrs.entry(attr) - .and_modify(|v| { v.push(value.clone()) }) - .or_insert(vec!(value)); - + self.attrs + .entry(attr) + .and_modify(|v| v.push(value.clone())) + .or_insert(vec![value]); Ok(()) - } pub fn validate(&self) -> bool { @@ -137,7 +136,7 @@ impl User { #[cfg(test)] mod tests { - use super::{User, Entry}; + use super::{Entry, User}; use serde_json; #[test] @@ -159,13 +158,13 @@ mod tests { fn test_entry_basic() { let mut e: Entry = Entry::new(); - e.add_ava(String::from("userid"), String::from("william")).unwrap(); + e.add_ava(String::from("userid"), String::from("william")) + .unwrap(); assert!(e.validate()); let d = serde_json::to_string_pretty(&e).unwrap(); println!("d: {}", d.as_str()); - } } diff --git a/src/event.rs b/src/event.rs index 70456befb..2247e13b5 100644 --- a/src/event.rs +++ b/src/event.rs @@ -76,5 +76,3 @@ pub struct AuditEvent { time_start: (), time_end: (), } - - diff --git a/src/lib.rs b/src/lib.rs index 9f647e5be..0bae7e0c9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,9 +5,9 @@ extern crate serde_derive; extern crate actix; extern crate actix_web; extern crate futures; -extern crate rusqlite; extern crate r2d2; extern crate r2d2_sqlite; +extern crate rusqlite; extern crate uuid; use actix::prelude::*; diff --git a/src/log.rs b/src/log.rs index 9f5ec46ca..add7a5dd8 100644 --- a/src/log.rs +++ b/src/log.rs @@ -1,6 +1,5 @@ use actix::prelude::*; - // Helper for internal logging. #[macro_export] macro_rules! log_event { diff --git a/src/server.rs b/src/server.rs index 57367ffb6..99bd897de 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,14 +2,14 @@ use actix::prelude::*; use be::Backend; use entry::Entry; -use event::{SearchEvent, CreateEvent, EventResult}; +use event::{CreateEvent, EventResult, SearchEvent}; use log::EventLog; pub fn start( log: actix::Addr, // be: actix::Addr, path: &str, - threads: usize + threads: usize, ) -> actix::Addr { // Create the BE connection // probably need a config type soon .... diff --git a/tests/integration_test.rs b/tests/integration_test.rs index a0c323e73..b743d67db 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -9,9 +9,9 @@ use rsidm::server::{self, QueryServer}; // use be; extern crate futures; -use futures::future::Future; -use futures::future::lazy; use futures::future; +use futures::future::lazy; +use futures::future::Future; extern crate tokio; use tokio::executor::current_thread::CurrentThread; @@ -41,29 +41,25 @@ macro_rules! run_test { // Now chain them ... // Now append the server shutdown. - let comp_fut = fut.map_err(|_| ()) - .and_then(|r| { - println!("Stopping actix ..."); - actix::System::current().stop(); - future::result(Ok(())) - }); + let comp_fut = fut.map_err(|_| ()).and_then(|r| { + println!("Stopping actix ..."); + actix::System::current().stop(); + future::result(Ok(())) + }); // Run the future tokio::spawn(comp_fut); // We DO NOT need teardown, as sqlite is in mem // let the tables hit the floor }); - }}; } #[test] fn test_schema() { - run_test!(|log: actix::Addr, server| { - log.send(LogEvent { - msg: String::from("Test log event") - }) - }); + run_test!(|log: actix::Addr, server| log.send(LogEvent { + msg: String::from("Test log event") + })); } /* @@ -74,4 +70,3 @@ fn test_be_create_user() { }); } */ -