diff --git a/Cargo.toml b/Cargo.toml index 8fbe338a7..56e1ce060 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,9 @@ path = "src/main.rs" [dependencies] actix = "0.7" actix-web = "0.7" +bytes = "0.4" +env_logger = "0.5" + tokio = "0.1" futures = "0.1" diff --git a/src/audit.rs b/src/audit.rs new file mode 100644 index 000000000..fa0545f8e --- /dev/null +++ b/src/audit.rs @@ -0,0 +1,116 @@ +use actix::prelude::*; +use std::time::SystemTime; + +#[macro_export] +macro_rules! audit_log { + ($audit:expr, $($arg:tt)*) => ({ + use std::fmt; + if cfg!(test) || cfg!(debug_assertions) { + print!("DEBUG AUDIT -> "); + println!($($arg)*) + } + $audit.raw_event( + fmt::format( + format_args!($($arg)*) + ) + ) + }) +} + +#[derive(Debug, Serialize, Deserialize)] +struct AuditInner { + name: String, + time: SystemTime, +} + +// This structure tracks and event lifecycle, and is eventually +// sent to the logging system where it's structured and written +// out to the current logging BE. +#[derive(Debug, Serialize, Deserialize)] +pub struct AuditEvent { + // vec of start/end points of various parts of the event? + // We probably need some functions for this. Is there a way in rust + // to automatically annotate line numbers of code? + events: Vec, +} + +// Allow us to be sent to the log subsystem +impl Message for AuditEvent { + type Result = (); +} + +impl AuditEvent { + pub fn new() -> Self { + AuditEvent { events: Vec::new() } + } + + pub fn start_event(&mut self, name: &str) { + self.events.push(AuditInner { + name: String::from(name), + time: SystemTime::now(), + }) + } + + pub fn raw_event(&mut self, data: String) { + self.events.push(AuditInner { + name: data, + time: SystemTime::now(), + }) + } + + pub fn end_event(&mut self, name: &str) { + self.events.push(AuditInner { + name: String::from(name), + time: SystemTime::now(), + }) + } +} + +#[cfg(test)] +mod tests { + use super::AuditEvent; + + // Create and remove. Perhaps add some core details? + #[test] + fn test_audit_simple() { + let mut au = AuditEvent::new(); + au.start_event("test"); + au.end_event("test"); + let d = serde_json::to_string_pretty(&au).unwrap(); + println!("{}", d); + } + + fn test_audit_nested_inner(au: &mut AuditEvent) { + au.start_event("inner"); + au.end_event("inner"); + } + + // Test calling nested functions and getting the details added correctly? + #[test] + fn test_audit_nested() { + let mut au = AuditEvent::new(); + au.start_event("test"); + test_audit_nested_inner(&mut au); + au.end_event("test"); + let d = serde_json::to_string_pretty(&au).unwrap(); + println!("{}", d); + } + + // Test failing to close an event + #[test] + fn test_audit_no_close() { + let mut au = AuditEvent::new(); + au.start_event("test"); + au.start_event("inner"); + let d = serde_json::to_string_pretty(&au).unwrap(); + println!("{}", d); + } + + // Test logging + // specifically, logs should be sent to this struct and posted post-op + // rather that "during" the operation. They should be structured! + // + // IMO these should be structured as json? + #[test] + fn test_audit_logging() {} +} diff --git a/src/be/mod.rs b/src/be/mod.rs index b1406d17a..0987760ea 100644 --- a/src/be/mod.rs +++ b/src/be/mod.rs @@ -1,5 +1,4 @@ //! Db executor actor -use actix::prelude::*; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; @@ -8,9 +7,9 @@ use rusqlite::NO_PARAMS; use serde_json; // use uuid; +use super::audit::AuditEvent; use super::entry::Entry; use super::filter::Filter; -use super::log::EventLog; mod idl; mod mem_be; @@ -41,10 +40,12 @@ struct IdEntry { data: String, } +/* pub enum BackendType { Memory, // isn't memory just sqlite with file :memory: ? SQLite, } +*/ #[derive(Debug, PartialEq)] pub enum BackendError { @@ -52,14 +53,14 @@ pub enum BackendError { } pub struct Backend { - log: actix::Addr, pool: Pool, } -// In the future this will do the routing betwene the chosen backends etc. +// In the future this will do the routing between the chosen backends etc. impl Backend { - pub fn new(log: actix::Addr, path: &str) -> Self { + pub fn new(audit: &mut AuditEvent, path: &str) -> Self { // this has a ::memory() type, but will path == "" work? + audit.start_event("backend_new"); let manager = SqliteConnectionManager::file(path); let builder1 = Pool::builder(); let builder2 = if path == "" { @@ -92,15 +93,17 @@ impl Backend { // Create the core db } - log_event!(log, "Starting DB worker ..."); - Backend { - log: log, - pool: pool, - } + audit_log!(audit, "Starting DB workers ..."); + audit.end_event("backend_new"); + Backend { pool: pool } } - pub fn create(&mut self, entries: &Vec) -> Result { - log_event!(self.log, "Begin create"); + pub fn create( + &mut self, + au: &mut AuditEvent, + entries: &Vec, + ) -> Result { + au.start_event("be_create"); let be_audit = BackendAuditEvent::new(); // Start be audit timer @@ -123,7 +126,7 @@ impl Backend { }) .collect(); - log_event!(self.log, "serialising: {:?}", ser_entries); + audit_log!(au, "serialising: {:?}", ser_entries); // THIS IS PROBABLY THE BIT WHERE YOU NEED DB ABSTRACTION { @@ -145,13 +148,13 @@ impl Backend { conn.execute("COMMIT TRANSACTION", NO_PARAMS).unwrap(); } - log_event!(self.log, "End create"); + au.end_event("be_create"); // End the timer? Ok(be_audit) } // Take filter, and AuditEvent ref? - pub fn search(&self, filt: &Filter) -> Result, ()> { + pub fn search(&self, au: &mut AuditEvent, filt: &Filter) -> Result, ()> { // Do things // Alloc a vec for the entries. // FIXME: Make this actually a good size for the result set ... @@ -161,6 +164,7 @@ impl Backend { // possible) to create the candidate set. // Unlike DS, even if we don't get the index back, we can just pass // to the in-memory filter test and be done. + au.start_event("be_search"); let mut raw_entries: Vec = Vec::new(); { @@ -178,7 +182,7 @@ impl Backend { }) .unwrap(); for row in id2entry_iter { - println!("{:?}", row); + audit_log!(au, "raw entry: {:?}", row); // FIXME: Handle this properly. raw_entries.push(row.unwrap().data); } @@ -200,6 +204,7 @@ impl Backend { }) .collect(); + au.end_event("be_search"); Ok(entries) } @@ -212,7 +217,6 @@ impl Clone for Backend { fn clone(&self) -> Self { // Make another Be and close the pool. Backend { - log: self.log.clone(), pool: self.pool.clone(), } } @@ -231,21 +235,25 @@ mod tests { extern crate tokio; + use super::super::audit::AuditEvent; use super::super::entry::Entry; use super::super::filter::Filter; - use super::super::log::{self, EventLog}; + use super::super::log; use super::{Backend, BackendError}; macro_rules! run_test { ($test_fn:expr) => {{ System::run(|| { + let mut audit = AuditEvent::new(); + let test_log = log::start(); - let be = Backend::new(test_log.clone(), ""); + let be = Backend::new(&mut audit, ""); // Could wrap another future here for the future::ok bit... - let fut = $test_fn(test_log, be); - let comp_fut = fut.map_err(|()| ()).and_then(|r| { + let fut = $test_fn(&mut audit, be); + let comp_fut = fut.map_err(|()| ()).and_then(move |_r| { + test_log.do_send(audit); println!("Stopping actix ..."); actix::System::current().stop(); future::result(Ok(())) @@ -258,11 +266,11 @@ mod tests { #[test] fn test_simple_create() { - run_test!(|log: actix::Addr, mut be: Backend| { - log_event!(log, "Simple Create"); + run_test!(|audit: &mut AuditEvent, mut be: Backend| { + audit_log!(audit, "Simple Create"); - let empty_result = be.create(&Vec::new()); - log_event!(log, "{:?}", empty_result); + let empty_result = be.create(audit, &Vec::new()); + audit_log!(audit, "{:?}", empty_result); assert_eq!(empty_result, Err(BackendError::EmptyRequest)); let mut e: Entry = Entry::new(); @@ -270,14 +278,13 @@ mod tests { .unwrap(); assert!(e.validate()); - let single_result = be.create(&vec![e]); + let single_result = be.create(audit, &vec![e]); assert!(single_result.is_ok()); // Construct a filter let filt = Filter::Pres(String::from("userid")); - let entries = be.search(&filt).unwrap(); - println!("{:?}", entries); + let entries = be.search(audit, &filt).unwrap(); // There should only be one entry so is this enough? assert!(entries.first().is_some()); @@ -291,24 +298,24 @@ mod tests { #[test] fn test_simple_search() { - run_test!(|log: actix::Addr, be| { - log_event!(log, "Simple Search"); + run_test!(|audit: &mut AuditEvent, be| { + audit_log!(audit, "Simple Search"); future::ok(()) }); } #[test] fn test_simple_modify() { - run_test!(|log: actix::Addr, be| { - log_event!(log, "Simple Modify"); + run_test!(|audit: &mut AuditEvent, be| { + audit_log!(audit, "Simple Modify"); future::ok(()) }); } #[test] fn test_simple_delete() { - run_test!(|log: actix::Addr, be| { - log_event!(log, "Simple Delete"); + run_test!(|audit: &mut AuditEvent, be| { + audit_log!(audit, "Simple Delete"); future::ok(()) }); } diff --git a/src/event.rs b/src/event.rs index fbffcbfa9..fcc912ccb 100644 --- a/src/event.rs +++ b/src/event.rs @@ -11,9 +11,6 @@ pub enum EventResult { Create, } -#[derive(Debug)] -pub struct RawSearchEvent {} - // At the top we get "event types" and they contain the needed // actions, and a generic event component. @@ -21,8 +18,6 @@ pub struct RawSearchEvent {} pub struct SearchEvent { pub filter: Filter, class: (), // String - // It could be better to box this later ... - event: AuditEvent, } impl Message for SearchEvent { @@ -34,10 +29,6 @@ impl SearchEvent { SearchEvent { filter: filter, class: (), - event: AuditEvent { - time_start: (), - time_end: (), - }, } } // We need event -> some kind of json event string for logging @@ -50,7 +41,6 @@ pub struct CreateEvent { // input that we plan to parse. pub entries: Vec, // It could be better to box this later ... - event: AuditEvent, } impl Message for CreateEvent { @@ -59,24 +49,6 @@ impl Message for CreateEvent { impl CreateEvent { pub fn new(entries: Vec) -> Self { - CreateEvent { - entries: entries, - event: AuditEvent { - time_start: (), - time_end: (), - }, - } + CreateEvent { entries: entries } } } - -// This structure tracks and event lifecycle, and is eventually -// sent to the logging system where it's structured and written -// out to the current logging BE. -#[derive(Debug)] -pub struct AuditEvent { - // vec of start/end points of various parts of the event? - // We probably need some functions for this. Is there a way in rust - // to automatically annotate line numbers of code? - time_start: (), - time_end: (), -} diff --git a/src/filter.rs b/src/filter.rs index d7323c859..1459449e7 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -43,7 +43,7 @@ impl Filter { // This is probably not safe, so it's for internal test cases // only because I'm familiar with the syntax ... you have been warned. - fn from_ldap_string(ldap_string: String) -> Result { + fn from_ldap_string(_ldap_string: String) -> Result { // For now return an empty filters Ok(Filter::And(Vec::new())) } diff --git a/src/lib.rs b/src/lib.rs index 219d3be32..252dafd71 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,8 +20,11 @@ extern crate uuid; // This has to be before be so the import order works #[macro_use] pub mod log; -pub mod be; +#[macro_use] +mod audit; +mod be; pub mod entry; pub mod event; pub mod filter; +pub mod proto; pub mod server; diff --git a/src/log.rs b/src/log.rs index add7a5dd8..f6b708c62 100644 --- a/src/log.rs +++ b/src/log.rs @@ -1,4 +1,7 @@ use actix::prelude::*; +use serde_json; + +use super::audit::AuditEvent; // Helper for internal logging. #[macro_export] @@ -57,6 +60,15 @@ impl Handler for EventLog { } } +impl Handler for EventLog { + type Result = (); + + fn handle(&mut self, event: AuditEvent, _: &mut SyncContext) -> Self::Result { + let d = serde_json::to_string_pretty(&event).unwrap(); + println!("AUDIT: {}", d); + } +} + /* impl Handler for EventLog { type Result = (); diff --git a/src/main.rs b/src/main.rs index 9512e26c9..7de5b23cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,24 +3,31 @@ extern crate serde_json; // #[macro_use] extern crate actix; extern crate actix_web; +extern crate bytes; +extern crate env_logger; extern crate futures; extern crate serde_derive; extern crate uuid; // use actix::prelude::*; use actix_web::{ - http, App, AsyncResponder, FutureResponse, HttpRequest, HttpResponse, Path, State, + error, http, middleware, App, AsyncResponder, Error, FutureResponse, HttpMessage, HttpRequest, + HttpResponse, Path, State, }; -use futures::Future; +use bytes::BytesMut; +use futures::{future, Future, Stream}; #[macro_use] extern crate rsidm; use rsidm::event; use rsidm::filter::Filter; use rsidm::log; +use rsidm::proto::SearchRequest; use rsidm::server; +const MAX_SIZE: usize = 262_144; //256k + struct AppState { qe: actix::Addr, } @@ -64,7 +71,53 @@ fn class_list((_name, state): (Path, State)) -> FutureResponse .responder() } +// Based on actix web example json +fn search(req: &HttpRequest) -> Box> { + println!("{:?}", req); + // HttpRequest::payload() is stream of Bytes objects + req.payload() + .from_err() + // `fold` will asynchronously read each chunk of the request body and + // call supplied closure, then it resolves to result of closure + .fold(BytesMut::new(), move |mut body, chunk| { + // limit max size of in-memory payload + if (body.len() + chunk.len()) > MAX_SIZE { + Err(error::ErrorBadRequest("Request size too large.")) + } else { + body.extend_from_slice(&chunk); + Ok(body) + } + }) + .and_then(|body| { + // body is loaded, now we can deserialize serde-json + // FIXME: THIS IS FUCKING AWFUL + let obj = serde_json::from_slice::(&body).unwrap(); + // Dispatch a search + println!("{:?}", obj); + // We have to resolve this NOW else we break everything :( + /* + req.state().qe.send( + event::SearchEvent::new(obj.filter) + ) + .from_err() + .and_then(|res| future::result(match res { + // What type is entry? + Ok(event::EventResult::Search { entries }) => Ok(HttpResponse::Ok().json(entries)), + Ok(_) => Ok(HttpResponse::Ok().into()), + // Can we properly report this? + Err(_) => Ok(HttpResponse::InternalServerError().into()), + })) + */ + Ok(HttpResponse::InternalServerError().into()) + }) + .responder() +} + fn main() { + // Configure the middleware logger + ::std::env::set_var("RUST_LOG", "actix_web=info"); + env_logger::init(); + let sys = actix::System::new("rsidm-server"); // read the config (if any?) @@ -75,10 +128,7 @@ fn main() { // Start up the logging system: for now it just maps to stderr let log_addr = log::start(); - // Starting the BE chooses the path. - // let be_addr = be::start(log_addr.clone(), be::BackendType::SQLite, "test.db", 8); - - // Start the query server with the given be + // Start the query server with the given be path: future config let server_addr = server::start(log_addr.clone(), "test.db", 8); // start the web server @@ -87,12 +137,14 @@ fn main() { qe: server_addr.clone(), }) // Connect all our end points here. - // .middleware(middleware::Logger::default()) + .middleware(middleware::Logger::default()) .resource("/", |r| r.f(index)) - .resource("/{class_list}", |r| { + .resource("/search", |r| r.method(http::Method::POST).a(search)) + // Add an ldap compat search function type? + .resource("/list/{class_list}", |r| { r.method(http::Method::GET).with(class_list) }) - .resource("/{class_list}/", |r| { + .resource("/list/{class_list}/", |r| { r.method(http::Method::GET).with(class_list) }) }) @@ -101,16 +153,9 @@ fn main() { .start(); log_event!(log_addr, "Starting rsidm on http://127.0.0.1:8080"); + // curl --header "Content-Type: application/json" --request POST --data '{"name":"xyz","number":3}' http://127.0.0.1:8080/manual // all the needed routes / views let _ = sys.run(); } - -#[cfg(test)] -mod tests { - #[test] - fn test_simple_create() { - println!("It works!"); - } -} diff --git a/src/proto.rs b/src/proto.rs new file mode 100644 index 000000000..74fac771c --- /dev/null +++ b/src/proto.rs @@ -0,0 +1,6 @@ +use super::filter::Filter; + +#[derive(Debug, Serialize, Deserialize)] +pub struct SearchRequest { + pub filter: Filter, +} diff --git a/src/server.rs b/src/server.rs index 5bc6bc68d..3f00dae8a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,22 +1,22 @@ use actix::prelude::*; +use audit::AuditEvent; use be::Backend; use entry::Entry; use event::{CreateEvent, EventResult, SearchEvent}; use log::EventLog; -pub fn start( - log: actix::Addr, - // be: actix::Addr, - path: &str, - threads: usize, -) -> actix::Addr { +pub fn start(log: actix::Addr, path: &str, threads: usize) -> actix::Addr { + let mut audit = AuditEvent::new(); + audit.start_event("server_new"); // Create the BE connection // probably need a config type soon .... - let be = Backend::new(log.clone(), path); + let be = Backend::new(&mut audit, path); // now we clone it out in the startup I think // Should the be need a log clone ref? or pass it around? // it probably needs it ... + audit.end_event("server_new"); + log.do_send(audit); SyncArbiter::start(threads, move || QueryServer::new(log.clone(), be.clone())) } @@ -46,8 +46,8 @@ impl QueryServer { // Actually conduct a search request // This is the core of the server, as it processes the entire event // applies all parts required in order and more. - pub fn search(&mut self, se: &SearchEvent) -> Result, ()> { - match self.be.search(&se.filter) { + pub fn search(&mut self, au: &mut AuditEvent, se: &SearchEvent) -> Result, ()> { + match self.be.search(au, &se.filter) { Ok(r) => Ok(r), Err(_) => Err(()), } @@ -56,11 +56,11 @@ impl QueryServer { // What should this take? // This should probably take raw encoded entries? Or sohuld they // be handled by fe? - pub fn create(&mut self, ce: &CreateEvent) -> Result<(), ()> { + pub fn create(&mut self, au: &mut AuditEvent, ce: &CreateEvent) -> Result<(), ()> { // Start a txn // Run any pre checks // We may change from ce.entries later to something else? - match self.be.create(&ce.entries) { + match self.be.create(au, &ce.entries) { Ok(_) => Ok(()), Err(_) => Err(()), } @@ -82,20 +82,25 @@ impl Handler for QueryServer { type Result = Result; fn handle(&mut self, msg: SearchEvent, _: &mut Self::Context) -> Self::Result { - log_event!(self.log, "Begin event {:?}", msg); + let mut audit = AuditEvent::new(); + audit.start_event("search"); + audit_log!(audit, "Begin event {:?}", msg); + // Parse what we need from the event? // What kind of event is it? // In the future we'll likely change search event ... // was this ok? - let res = match self.search(&msg) { + let res = match self.search(&mut audit, &msg) { Ok(entries) => Ok(EventResult::Search { entries: entries }), Err(e) => Err(e), }; - log_event!(self.log, "End event {:?}", msg); + audit_log!(audit, "End event {:?}", msg); + audit.end_event("search"); // At the end of the event we send it for logging. + self.log.do_send(audit); res } } @@ -104,14 +109,19 @@ impl Handler for QueryServer { type Result = Result; fn handle(&mut self, msg: CreateEvent, _: &mut Self::Context) -> Self::Result { - log_event!(self.log, "Begin event {:?}", msg); + let mut audit = AuditEvent::new(); + audit.start_event("create"); + audit_log!(audit, "Begin create event {:?}", msg); - let res = match self.create(&msg) { + let res = match self.create(&mut audit, &msg) { Ok(()) => Ok(EventResult::Create), Err(e) => Err(e), }; - log_event!(self.log, "End event {:?}", msg); + audit_log!(audit, "End create event {:?}", msg); + audit.end_event("create"); + // At the end of the event we send it for logging. + self.log.do_send(audit); // At the end of the event we send it for logging. res } @@ -130,6 +140,7 @@ mod tests { extern crate tokio; + use super::super::audit::AuditEvent; use super::super::be::Backend; use super::super::entry::Entry; use super::super::event::{CreateEvent, SearchEvent}; @@ -140,14 +151,16 @@ mod tests { macro_rules! run_test { ($test_fn:expr) => {{ System::run(|| { + let mut audit = AuditEvent::new(); let test_log = log::start(); - let be = Backend::new(test_log.clone(), ""); + let be = Backend::new(&mut audit, ""); let test_server = QueryServer::new(test_log.clone(), be); // Could wrap another future here for the future::ok bit... - let fut = $test_fn(test_log, test_server); - let comp_fut = fut.map_err(|()| ()).and_then(|_r| { + let fut = $test_fn(test_log.clone(), test_server, &mut audit); + let comp_fut = fut.map_err(|()| ()).and_then(move |_r| { + test_log.do_send(audit); println!("Stopping actix ..."); actix::System::current().stop(); future::result(Ok(())) @@ -160,7 +173,7 @@ mod tests { #[test] fn test_be_create_user() { - run_test!(|_log, mut server: QueryServer| { + run_test!(|_log, mut server: QueryServer, audit: &mut AuditEvent| { let filt = Filter::Pres(String::from("userid")); let se1 = SearchEvent::new(filt.clone()); @@ -174,13 +187,13 @@ mod tests { let ce = CreateEvent::new(expected.clone()); - let r1 = server.search(&se1).unwrap(); + let r1 = server.search(audit, &se1).unwrap(); assert!(r1.len() == 0); - let cr = server.create(&ce); + let cr = server.create(audit, &ce); assert!(cr.is_ok()); - let r2 = server.search(&se2).unwrap(); + let r2 = server.search(audit, &se2).unwrap(); assert!(r2.len() == 1); assert_eq!(r2, expected);