Can save and retrieve entries

This commit is contained in:
William Brown 2018-11-07 16:35:25 +10:00
parent 8844293a48
commit 51797bc125
8 changed files with 118 additions and 86 deletions

View file

@ -1,15 +1,16 @@
//! Db executor actor //! Db executor actor
use actix::prelude::*; use actix::prelude::*;
use serde_json;
use r2d2_sqlite::SqliteConnectionManager;
use r2d2::Pool; use r2d2::Pool;
use rusqlite::NO_PARAMS; use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::types::ToSql; use rusqlite::types::ToSql;
use rusqlite::NO_PARAMS;
use serde_json;
// use uuid; // use uuid;
use super::log::EventLog;
use super::entry::Entry; use super::entry::Entry;
use super::filter::Filter;
use super::log::EventLog;
mod idl; mod idl;
mod mem_be; mod mem_be;
@ -35,7 +36,7 @@ impl BackendAuditEvent {
#[derive(Debug)] #[derive(Debug)]
struct IdEntry { struct IdEntry {
id: i32, id: i32,
data: String data: String,
} }
pub enum BackendType { pub enum BackendType {
@ -45,7 +46,7 @@ pub enum BackendType {
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum BackendError { pub enum BackendError {
EmptyRequest EmptyRequest,
} }
pub struct Backend { pub struct Backend {
@ -55,16 +56,18 @@ pub struct Backend {
// In the future this will do the routing betwene the chosen backends etc. // In the future this will do the routing betwene the chosen backends etc.
impl Backend { impl Backend {
pub fn new( pub fn new(log: actix::Addr<EventLog>, path: &str) -> Self {
log: actix::Addr<EventLog>,
path: &str,
) -> Self {
// this has a ::memory() type, but will path == "" work? // this has a ::memory() type, but will path == "" work?
let manager = SqliteConnectionManager::file(path); let manager = SqliteConnectionManager::file(path);
let pool = Pool::builder() let builder1 = Pool::builder();
let builder2 = if path == "" {
builder1.max_size(1)
} else {
// FIXME: Make this configurable
builder1.max_size(8)
};
// Look at max_size and thread_pool here for perf later // Look at max_size and thread_pool here for perf later
.build(manager) let pool = builder2.build(manager).expect("Failed to create pool");
.expect("Failed to create pool");
{ {
let conn = pool.get().unwrap(); let conn = pool.get().unwrap();
@ -77,7 +80,8 @@ impl Backend {
id INTEGER PRIMARY KEY ASC, id INTEGER PRIMARY KEY ASC,
data TEXT NOT NULL data TEXT NOT NULL
) )
", NO_PARAMS ",
NO_PARAMS,
).unwrap(); ).unwrap();
// Create a version table for migration indication // Create a version table for migration indication
@ -101,14 +105,16 @@ impl Backend {
if entries.is_empty() { if entries.is_empty() {
// TODO: Better error // TODO: Better error
// End the timer // End the timer
return Err(BackendError::EmptyRequest) return Err(BackendError::EmptyRequest);
} }
// Turn all the entries into relevent json/cbor types // Turn all the entries into relevent json/cbor types
// we do this outside the txn to avoid blocking needlessly. // we do this outside the txn to avoid blocking needlessly.
// However, it could be pointless due to the extra string allocs ... // However, it could be pointless due to the extra string allocs ...
let ser_entries: Vec<String> = entries.iter().map(|val| { let ser_entries: Vec<String> = entries
.iter()
.map(|val| {
// TODO: Should we do better than unwrap? // TODO: Should we do better than unwrap?
serde_json::to_string(&val).unwrap() serde_json::to_string(&val).unwrap()
}).collect(); }).collect();
@ -121,22 +127,14 @@ impl Backend {
// Start a txn // Start a txn
conn.execute("BEGIN TRANSACTION", NO_PARAMS).unwrap(); conn.execute("BEGIN TRANSACTION", NO_PARAMS).unwrap();
// write them all
for ser_entry in ser_entries { for ser_entry in ser_entries {
conn.execute("INSERT INTO id2entry (data) VALUES (?1)", &[&ser_entry as &ToSql]).unwrap(); conn.execute(
"INSERT INTO id2entry (data) VALUES (?1)",
&[&ser_entry as &ToSql],
).unwrap();
} }
// write them all
let mut stmt = conn.prepare("SELECT id, data FROM id2entry").unwrap();
let id2entry_iter = stmt
.query_map(NO_PARAMS, |row|
IdEntry {
id: row.get(0),
data: row.get(1)
}
).unwrap();
for row in id2entry_iter {
println!("{:?}", row);
}
// TODO: update indexes (as needed) // TODO: update indexes (as needed)
// Commit the txn // Commit the txn
conn.execute("COMMIT TRANSACTION", NO_PARAMS).unwrap(); conn.execute("COMMIT TRANSACTION", NO_PARAMS).unwrap();
@ -148,14 +146,53 @@ impl Backend {
} }
// Take filter, and AuditEvent ref? // Take filter, and AuditEvent ref?
pub fn search() { pub fn search(&self, filter: Filter) -> Vec<Entry> {
// Do things
// Alloc a vec for the entries.
// FIXME: Make this actually a good size for the result set ...
// FIXME: Actually compute indexes here.
let mut raw_entries: Vec<String> = Vec::new();
{
// Actually do a search now!
let conn = self.pool.get().unwrap();
// Start a txn
conn.execute("BEGIN TRANSACTION", NO_PARAMS).unwrap();
// read them all
let mut stmt = conn.prepare("SELECT id, data FROM id2entry").unwrap();
let id2entry_iter = stmt
.query_map(NO_PARAMS, |row| IdEntry {
id: row.get(0),
data: row.get(1),
}).unwrap();
for row in id2entry_iter {
println!("{:?}", row);
// FIXME: Handle this properly.
raw_entries.push(row.unwrap().data);
}
// Rollback, we should have done nothing.
conn.execute("ROLLBACK TRANSACTION", NO_PARAMS).unwrap();
}
// Do other things
// Now, de-serialise the raw_entries back to entries
let entries: Vec<Entry> = raw_entries
.iter()
.map(|val| {
// TODO: Should we do better than unwrap?
let e = serde_json::from_str(val.as_str()).unwrap();
if filter.entry_match_no_index(e) {
Some(e)
} else {
None
}
}).collect();
entries
} }
pub fn modify() { pub fn modify() {}
}
pub fn delete() { pub fn delete() {}
}
} }
impl Clone for Backend { impl Clone for Backend {
@ -176,14 +213,14 @@ mod tests {
use actix::prelude::*; use actix::prelude::*;
extern crate futures; extern crate futures;
use futures::future::Future;
use futures::future::lazy;
use futures::future; use futures::future;
use futures::future::lazy;
use futures::future::Future;
extern crate tokio; extern crate tokio;
use super::super::log::{self, EventLog, LogEvent};
use super::super::entry::Entry; use super::super::entry::Entry;
use super::super::log::{self, EventLog, LogEvent};
use super::{Backend, BackendError}; use super::{Backend, BackendError};
macro_rules! run_test { macro_rules! run_test {
@ -191,12 +228,11 @@ mod tests {
System::run(|| { System::run(|| {
let test_log = log::start(); let test_log = log::start();
let mut be = Backend::new(test_log.clone(), "/tmp/test.db"); let mut be = Backend::new(test_log.clone(), "");
// Could wrap another future here for the future::ok bit... // Could wrap another future here for the future::ok bit...
let fut = $test_fn(test_log, be); let fut = $test_fn(test_log, be);
let comp_fut = fut.map_err(|()| ()) let comp_fut = fut.map_err(|()| ()).and_then(|r| {
.and_then(|r| {
println!("Stopping actix ..."); println!("Stopping actix ...");
actix::System::current().stop(); actix::System::current().stop();
future::result(Ok(())) future::result(Ok(()))
@ -207,7 +243,6 @@ mod tests {
}}; }};
} }
#[test] #[test]
fn test_simple_create() { fn test_simple_create() {
run_test!(|log: actix::Addr<EventLog>, mut be: Backend| { run_test!(|log: actix::Addr<EventLog>, mut be: Backend| {
@ -217,17 +252,24 @@ mod tests {
log_event!(log, "{:?}", empty_result); log_event!(log, "{:?}", empty_result);
assert_eq!(empty_result, Err(BackendError::EmptyRequest)); assert_eq!(empty_result, Err(BackendError::EmptyRequest));
let mut e: Entry = Entry::new(); let mut e: Entry = Entry::new();
e.add_ava(String::from("userid"), String::from("william")).unwrap(); e.add_ava(String::from("userid"), String::from("william"))
.unwrap();
assert!(e.validate()); assert!(e.validate());
let single_result = be.create(vec![ let single_result = be.create(vec![e]);
e
]);
assert!(single_result.is_ok()); assert!(single_result.is_ok());
let entries = be.search();
println!("{:?}", entries);
// There should only be one entry so is this enough?
assert!(entries.first().is_some());
// Later we could check consistency of the entry saved ...
// Check it's there
future::ok(()) future::ok(())
}); });
} }

View file

@ -3,4 +3,3 @@
// need a way to add an index // need a way to add an index
// need a way to do filters // need a way to do filters
// need a way to manage idls // need a way to manage idls

View file

@ -36,7 +36,7 @@ pub struct Entry {
impl Entry { impl Entry {
pub fn new() -> Self { pub fn new() -> Self {
Entry { Entry {
attrs: BTreeMap::new() attrs: BTreeMap::new(),
} }
} }
@ -44,13 +44,12 @@ impl Entry {
// a list of syntax violations ... // a list of syntax violations ...
pub fn add_ava(&mut self, attr: String, value: String) -> Result<(), ()> { pub fn add_ava(&mut self, attr: String, value: String) -> Result<(), ()> {
// get_mut to access value // get_mut to access value
self.attrs.entry(attr) self.attrs
.and_modify(|v| { v.push(value.clone()) }) .entry(attr)
.or_insert(vec!(value)); .and_modify(|v| v.push(value.clone()))
.or_insert(vec![value]);
Ok(()) Ok(())
} }
pub fn validate(&self) -> bool { pub fn validate(&self) -> bool {
@ -137,7 +136,7 @@ impl User {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{User, Entry}; use super::{Entry, User};
use serde_json; use serde_json;
#[test] #[test]
@ -159,13 +158,13 @@ mod tests {
fn test_entry_basic() { fn test_entry_basic() {
let mut e: Entry = Entry::new(); let mut e: Entry = Entry::new();
e.add_ava(String::from("userid"), String::from("william")).unwrap(); e.add_ava(String::from("userid"), String::from("william"))
.unwrap();
assert!(e.validate()); assert!(e.validate());
let d = serde_json::to_string_pretty(&e).unwrap(); let d = serde_json::to_string_pretty(&e).unwrap();
println!("d: {}", d.as_str()); println!("d: {}", d.as_str());
} }
} }

View file

@ -76,5 +76,3 @@ pub struct AuditEvent {
time_start: (), time_start: (),
time_end: (), time_end: (),
} }

View file

@ -5,9 +5,9 @@ extern crate serde_derive;
extern crate actix; extern crate actix;
extern crate actix_web; extern crate actix_web;
extern crate futures; extern crate futures;
extern crate rusqlite;
extern crate r2d2; extern crate r2d2;
extern crate r2d2_sqlite; extern crate r2d2_sqlite;
extern crate rusqlite;
extern crate uuid; extern crate uuid;
use actix::prelude::*; use actix::prelude::*;

View file

@ -1,6 +1,5 @@
use actix::prelude::*; use actix::prelude::*;
// Helper for internal logging. // Helper for internal logging.
#[macro_export] #[macro_export]
macro_rules! log_event { macro_rules! log_event {

View file

@ -2,14 +2,14 @@ use actix::prelude::*;
use be::Backend; use be::Backend;
use entry::Entry; use entry::Entry;
use event::{SearchEvent, CreateEvent, EventResult}; use event::{CreateEvent, EventResult, SearchEvent};
use log::EventLog; use log::EventLog;
pub fn start( pub fn start(
log: actix::Addr<EventLog>, log: actix::Addr<EventLog>,
// be: actix::Addr<BackendActor>, // be: actix::Addr<BackendActor>,
path: &str, path: &str,
threads: usize threads: usize,
) -> actix::Addr<QueryServer> { ) -> actix::Addr<QueryServer> {
// Create the BE connection // Create the BE connection
// probably need a config type soon .... // probably need a config type soon ....

View file

@ -9,9 +9,9 @@ use rsidm::server::{self, QueryServer};
// use be; // use be;
extern crate futures; extern crate futures;
use futures::future::Future;
use futures::future::lazy;
use futures::future; use futures::future;
use futures::future::lazy;
use futures::future::Future;
extern crate tokio; extern crate tokio;
use tokio::executor::current_thread::CurrentThread; use tokio::executor::current_thread::CurrentThread;
@ -41,8 +41,7 @@ macro_rules! run_test {
// Now chain them ... // Now chain them ...
// Now append the server shutdown. // Now append the server shutdown.
let comp_fut = fut.map_err(|_| ()) let comp_fut = fut.map_err(|_| ()).and_then(|r| {
.and_then(|r| {
println!("Stopping actix ..."); println!("Stopping actix ...");
actix::System::current().stop(); actix::System::current().stop();
future::result(Ok(())) future::result(Ok(()))
@ -53,17 +52,14 @@ macro_rules! run_test {
// We DO NOT need teardown, as sqlite is in mem // We DO NOT need teardown, as sqlite is in mem
// let the tables hit the floor // let the tables hit the floor
}); });
}}; }};
} }
#[test] #[test]
fn test_schema() { fn test_schema() {
run_test!(|log: actix::Addr<EventLog>, server| { run_test!(|log: actix::Addr<EventLog>, server| log.send(LogEvent {
log.send(LogEvent {
msg: String::from("Test log event") msg: String::from("Test log event")
}) }));
});
} }
/* /*
@ -74,4 +70,3 @@ fn test_be_create_user() {
}); });
} }
*/ */