diff --git a/Cargo.toml b/Cargo.toml index 56e1ce060..930a7a612 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ actix = "0.7" actix-web = "0.7" bytes = "0.4" env_logger = "0.5" +reqwest = "0.9" tokio = "0.1" diff --git a/src/be/mod.rs b/src/be/mod.rs index 6d4b3f5d1..6a76bf6de 100644 --- a/src/be/mod.rs +++ b/src/be/mod.rs @@ -154,7 +154,7 @@ impl Backend { } // Take filter, and AuditEvent ref? - pub fn search(&self, au: &mut AuditEvent, filt: &Filter) -> Result, ()> { + pub fn search(&self, au: &mut AuditEvent, filt: &Filter) -> Result, BackendError> { // Do things // Alloc a vec for the entries. // FIXME: Make this actually a good size for the result set ... diff --git a/src/config.rs b/src/config.rs index e69de29bb..500b8ea2e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -0,0 +1,21 @@ +#[derive(Serialize, Deserialize, Debug)] +pub struct Configuration { + pub address: String, + pub threads: usize, + pub db_path: String, + pub maximum_request: usize, + // db type later +} + +impl Configuration { + pub fn new() -> Self { + Configuration { + address: String::from("127.0.0.1:8080"), + threads: 8, + db_path: String::from(""), + maximum_request: 262144, // 256k + // log type + // log path + } + } +} diff --git a/src/core.rs b/src/core.rs new file mode 100644 index 000000000..4c9d0bc90 --- /dev/null +++ b/src/core.rs @@ -0,0 +1,175 @@ +use actix::SystemRunner; +use actix_web::{ + error, http, middleware, App, AsyncResponder, Error, FutureResponse, HttpMessage, HttpRequest, + HttpResponse, Path, State, +}; + +use bytes::BytesMut; +use futures::{future, Future, Stream}; + +use super::config::Configuration; +use super::event::{CreateEvent, EventResult, SearchEvent}; +use super::filter::Filter; +use super::log; +use super::proto::{CreateRequest, SearchRequest}; +use super::server; + +struct AppState { + qe: actix::Addr, + max_size: usize, +} + +macro_rules! json_event_decode { + ($req:expr, $state:expr, $event_type:ty, $message_type:ty) => {{ + // This is copied every request. Is there a better way? + // The issue is the fold move takes ownership of state if + // we don't copy this here + let max_size = $state.max_size; + + // HttpRequest::payload() is stream of Bytes objects + $req.payload() + // `Future::from_err` acts like `?` in that it coerces the error type from + // the future into the final error type + .from_err() + // `fold` will asynchronously read each chunk of the request body and + // call supplied closure, then it resolves to result of closure + .fold(BytesMut::new(), move |mut body, chunk| { + // limit max size of in-memory payload + if (body.len() + chunk.len()) > max_size { + Err(error::ErrorBadRequest("overflow")) + } else { + body.extend_from_slice(&chunk); + Ok(body) + } + }) + // `Future::and_then` can be used to merge an asynchronous workflow with a + // synchronous workflow + .and_then( + move |body| -> Box> { + // body is loaded, now we can deserialize serde-json + // let r_obj = serde_json::from_slice::(&body); + let r_obj = serde_json::from_slice::<$message_type>(&body); + + // Send to the db for create + match r_obj { + Ok(obj) => { + let res = $state + .qe + .send( + // Could make this a .into_inner() and move? + // event::SearchEvent::new(obj.filter), + <($event_type)>::new(obj), + ) + .from_err() + .and_then(|res| match res { + Ok(entries) => Ok(HttpResponse::Ok().json(entries)), + Err(e) => Ok(HttpResponse::InternalServerError().json(e)), + }); + + Box::new(res) + } + Err(e) => Box::new(future::err(error::ErrorBadRequest(format!( + "Json Decode Failed: {:?}", + e + )))), + } + }, + ) + }}; +} + +// Handle the various end points we need to expose + +/// simple handle +fn index(req: &HttpRequest) -> HttpResponse { + println!("{:?}", req); + + HttpResponse::Ok().body("Hello\n") +} + +fn class_list((_name, state): (Path, State)) -> FutureResponse { + // println!("request to class_list"); + let filt = Filter::Pres(String::from("objectclass")); + + state + .qe + .send( + // This is where we need to parse the request into an event + // LONG TERM + // Make a search REQUEST, and create the audit struct here, then + // pass it to the server + // + // FIXME: Don't use SEARCHEVENT here!!!! + // + SearchEvent::new(SearchRequest::new(filt)), + ) + // TODO: How to time this part of the code? + // What does this do? + .from_err() + .and_then(|res| match res { + // What type is entry? + Ok(EventResult::Search { entries }) => Ok(HttpResponse::Ok().json(entries)), + Ok(_) => Ok(HttpResponse::Ok().into()), + // Can we properly report this? + Err(_) => Ok(HttpResponse::InternalServerError().into()), + }) + // What does this do? + .responder() +} + +fn create( + (req, state): (HttpRequest, State), +) -> impl Future { + json_event_decode!(req, state, CreateEvent, CreateRequest) +} + +fn search( + (req, state): (HttpRequest, State), +) -> impl Future { + json_event_decode!(req, state, SearchEvent, SearchRequest) +} + +pub fn create_server_core(config: Configuration) { + // Configure the middleware logger + ::std::env::set_var("RUST_LOG", "actix_web=info"); + env_logger::init(); + + // Until this point, we probably want to write to stderr + // Start up the logging system: for now it just maps to stderr + let log_addr = log::start(); + log_event!(log_addr, "Starting rsidm with configuration: {:?}", config); + + // Start the query server with the given be path: future config + let server_addr = server::start(log_addr.clone(), config.db_path.as_str(), config.threads); + // Copy the max size + let max_size = config.maximum_request; + + // start the web server + actix_web::server::new(move || { + App::with_state(AppState { + qe: server_addr.clone(), + max_size: max_size, + }) + // Connect all our end points here. + .middleware(middleware::Logger::default()) + .resource("/", |r| r.f(index)) + // curl --header "Content-Type: application/json" --request POST --data '{ "entries": [ {"attrs": {"class": ["group"], "name": ["testgroup"], "description": ["testperson"]}}]}' http://127.0.0.1:8080/create + .resource("/create", |r| { + r.method(http::Method::POST).with_async(create) + }) + // curl --header "Content-Type: application/json" --request POST --data '{ "filter" : { "Eq": ["class", "user"] }}' http://127.0.0.1:8080/search + .resource("/search", |r| { + r.method(http::Method::POST).with_async(search) + }) + // Add an ldap compat search function type? + .resource("/list/{class_list}", |r| { + r.method(http::Method::GET).with(class_list) + }) + .resource("/list/{class_list}/", |r| { + r.method(http::Method::GET).with(class_list) + }) + }) + .bind(config.address) + .unwrap() + .start(); +} diff --git a/src/error.rs b/src/error.rs index 60c45f1f7..884616190 100644 --- a/src/error.rs +++ b/src/error.rs @@ -8,3 +8,10 @@ pub enum SchemaError { InvalidAttributeSyntax, EmptyFilter, } + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +pub enum OperationError { + EmptyRequest, + Backend, + SchemaViolation, +} diff --git a/src/event.rs b/src/event.rs index 88bd8bf1d..2c60fdd9c 100644 --- a/src/event.rs +++ b/src/event.rs @@ -2,6 +2,7 @@ use super::filter::Filter; use super::proto::{CreateRequest, SearchRequest}; use actix::prelude::*; use entry::Entry; +use error::OperationError; // Should the event Result have the log items? // FIXME: Remove seralising here - each type should @@ -24,7 +25,7 @@ pub struct SearchEvent { } impl Message for SearchEvent { - type Result = Result; + type Result = Result; } impl SearchEvent { @@ -47,7 +48,7 @@ pub struct CreateEvent { } impl Message for CreateEvent { - type Result = Result; + type Result = Result; } impl CreateEvent { diff --git a/src/lib.rs b/src/lib.rs index ca3b26429..9704d4fd1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,9 @@ extern crate r2d2_sqlite; extern crate rusqlite; extern crate uuid; +extern crate bytes; +extern crate env_logger; + // use actix::prelude::*; // use actix_web::{ // http, middleware, App, AsyncResponder, FutureResponse, HttpRequest, HttpResponse, Path, State, @@ -25,6 +28,8 @@ pub mod log; #[macro_use] mod audit; mod be; +pub mod config; +pub mod core; pub mod entry; pub mod error; pub mod event; diff --git a/src/main.rs b/src/main.rs index 8c4850698..972301120 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,187 +1,19 @@ -extern crate actix; -extern crate actix_web; -extern crate bytes; -extern crate env_logger; -extern crate futures; -extern crate serde; -extern crate serde_derive; -extern crate serde_json; -extern crate uuid; - -// use actix::prelude::*; -use actix_web::{ - error, http, middleware, App, AsyncResponder, Error, FutureResponse, HttpMessage, HttpRequest, - HttpResponse, Path, State, -}; - -use bytes::BytesMut; -use futures::{future, Future, Stream}; - #[macro_use] +extern crate actix; + extern crate rsidm; -use rsidm::event::{CreateEvent, EventResult, SearchEvent}; -use rsidm::filter::Filter; -use rsidm::log; -use rsidm::proto::{CreateRequest, SearchRequest}; -use rsidm::server; - -const MAX_SIZE: usize = 262_144; //256k - this is the upper bound on create/search etc. - -struct AppState { - qe: actix::Addr, -} - -macro_rules! json_event_decode { - ($req:expr, $state:expr, $event_type:ty, $message_type:ty) => {{ - // HttpRequest::payload() is stream of Bytes objects - $req.payload() - // `Future::from_err` acts like `?` in that it coerces the error type from - // the future into the final error type - .from_err() - // `fold` will asynchronously read each chunk of the request body and - // call supplied closure, then it resolves to result of closure - .fold(BytesMut::new(), move |mut body, chunk| { - // limit max size of in-memory payload - if (body.len() + chunk.len()) > MAX_SIZE { - Err(error::ErrorBadRequest("overflow")) - } else { - body.extend_from_slice(&chunk); - Ok(body) - } - }) - // `Future::and_then` can be used to merge an asynchronous workflow with a - // synchronous workflow - .and_then( - move |body| -> Box> { - // body is loaded, now we can deserialize serde-json - // let r_obj = serde_json::from_slice::(&body); - let r_obj = serde_json::from_slice::<$message_type>(&body); - - // Send to the db for create - match r_obj { - Ok(obj) => { - let res = $state - .qe - .send( - // Could make this a .into_inner() and move? - // event::SearchEvent::new(obj.filter), - <($event_type)>::new(obj), - ) - .from_err() - .and_then(|res| match res { - Ok(entries) => Ok(HttpResponse::Ok().json(entries)), - Err(_) => Ok(HttpResponse::InternalServerError().into()), - }); - - Box::new(res) - } - Err(e) => Box::new(future::err(error::ErrorBadRequest(format!( - "Json Decode Failed: {:?}", - e - )))), - } - }, - ) - }}; -} - -// Handle the various end points we need to expose - -/// simple handle -fn index(req: &HttpRequest) -> HttpResponse { - println!("{:?}", req); - - HttpResponse::Ok().body("Hello\n") -} - -fn class_list((_name, state): (Path, State)) -> FutureResponse { - // println!("request to class_list"); - let filt = Filter::Pres(String::from("objectclass")); - - state - .qe - .send( - // This is where we need to parse the request into an event - // LONG TERM - // Make a search REQUEST, and create the audit struct here, then - // pass it to the server - // - // FIXME: Don't use SEARCHEVENT here!!!! - // - SearchEvent::new(SearchRequest::new(filt)), - ) - // TODO: How to time this part of the code? - // What does this do? - .from_err() - .and_then(|res| match res { - // What type is entry? - Ok(EventResult::Search { entries }) => Ok(HttpResponse::Ok().json(entries)), - Ok(_) => Ok(HttpResponse::Ok().into()), - // Can we properly report this? - Err(_) => Ok(HttpResponse::InternalServerError().into()), - }) - // What does this do? - .responder() -} - -fn create( - (req, state): (HttpRequest, State), -) -> impl Future { - json_event_decode!(req, state, CreateEvent, CreateRequest) -} - -fn search( - (req, state): (HttpRequest, State), -) -> impl Future { - json_event_decode!(req, state, SearchEvent, SearchRequest) -} +use rsidm::config::Configuration; +use rsidm::core::create_server_core; fn main() { - // Configure the middleware logger - ::std::env::set_var("RUST_LOG", "actix_web=info"); - env_logger::init(); - - let sys = actix::System::new("rsidm-server"); - // read the config (if any?) // How do we make the config accesible to all threads/workers? clone it? // Make it an Arc? - // Until this point, we probably want to write to stderr - // Start up the logging system: for now it just maps to stderr - let log_addr = log::start(); + // FIXME: Pass config to the server core + let config = Configuration::new(); + let sys = actix::System::new("rsidm-server"); - // Start the query server with the given be path: future config - let server_addr = server::start(log_addr.clone(), "test.db", 8); - - // start the web server - actix_web::server::new(move || { - App::with_state(AppState { - qe: server_addr.clone(), - }) - // Connect all our end points here. - .middleware(middleware::Logger::default()) - .resource("/", |r| r.f(index)) - // curl --header "Content-Type: application/json" --request POST --data '{ "entries": [ {"attrs": {"class": ["group"], "name": ["testgroup"], "description": ["testperson"]}}]}' http://127.0.0.1:8080/create - .resource("/create", |r| { - r.method(http::Method::POST).with_async(create) - }) - // curl --header "Content-Type: application/json" --request POST --data '{ "filter" : { "Eq": ["class", "user"] }}' http://127.0.0.1:8080/search - .resource("/search", |r| { - r.method(http::Method::POST).with_async(search) - }) - // Add an ldap compat search function type? - .resource("/list/{class_list}", |r| { - r.method(http::Method::GET).with(class_list) - }) - .resource("/list/{class_list}/", |r| { - r.method(http::Method::GET).with(class_list) - }) - }) - .bind("127.0.0.1:8080") - .unwrap() - .start(); - - log_event!(log_addr, "Starting rsidm on http://127.0.0.1:8080"); + create_server_core(config); let _ = sys.run(); } diff --git a/src/proto.rs b/src/proto.rs index 0ae934b55..e6a612bc0 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -3,6 +3,11 @@ use super::filter::Filter; // These proto implementations are here because they have public definitions +// FIXME: We probably need a proto entry to transform our +// server core entry into. + +// FIXME: Proto Response as well here + #[derive(Debug, Serialize, Deserialize)] pub struct SearchRequest { pub filter: Filter, diff --git a/src/server.rs b/src/server.rs index ae5ed8e64..bc85fcd8b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,8 +1,10 @@ use actix::prelude::*; use audit::AuditEvent; -use be::Backend; +use be::{Backend, BackendError}; + use entry::Entry; +use error::OperationError; use event::{CreateEvent, EventResult, SearchEvent}; use log::EventLog; use schema::Schema; @@ -56,24 +58,33 @@ impl QueryServer { // Actually conduct a search request // This is the core of the server, as it processes the entire event // applies all parts required in order and more. - pub fn search(&mut self, au: &mut AuditEvent, se: &SearchEvent) -> Result, ()> { - match self.be.search(au, &se.filter) { - Ok(r) => Ok(r), - Err(_) => Err(()), - } + pub fn search( + &mut self, + au: &mut AuditEvent, + se: &SearchEvent, + ) -> Result, OperationError> { + let res = self + .be + .search(au, &se.filter) + .map(|r| r) + .map_err(|_| OperationError::Backend); + // We'll add ACI later + res } // What should this take? // This should probably take raw encoded entries? Or sohuld they // be handled by fe? - pub fn create(&mut self, au: &mut AuditEvent, ce: &CreateEvent) -> Result<(), ()> { + pub fn create(&mut self, au: &mut AuditEvent, ce: &CreateEvent) -> Result<(), OperationError> { // Start a txn // Run any pre checks // FIXME: Normalise all entries incoming let r = ce.entries.iter().fold(Ok(()), |acc, e| { if acc.is_ok() { - self.schema.validate_entry(e).map_err(|_| ()) + self.schema + .validate_entry(e) + .map_err(|_| OperationError::SchemaViolation) } else { acc } @@ -83,12 +94,18 @@ impl QueryServer { } // We may change from ce.entries later to something else? - match self.be.create(au, &ce.entries) { - Ok(_) => Ok(()), - Err(_) => Err(()), - } + let res = self + .be + .create(au, &ce.entries) + .map(|_| ()) + .map_err(|e| match e { + BackendError::EmptyRequest => OperationError::EmptyRequest, + _ => OperationError::Backend, + }); + // Run and post checks // Commit/Abort the txn + res } } @@ -108,7 +125,7 @@ impl Actor for QueryServer { // at this point our just is just to route to do_ impl Handler for QueryServer { - type Result = Result; + type Result = Result; fn handle(&mut self, msg: SearchEvent, _: &mut Self::Context) -> Self::Result { let mut audit = AuditEvent::new(); @@ -135,7 +152,7 @@ impl Handler for QueryServer { } impl Handler for QueryServer { - type Result = Result; + type Result = Result; fn handle(&mut self, msg: CreateEvent, _: &mut Self::Context) -> Self::Result { let mut audit = AuditEvent::new(); @@ -147,7 +164,7 @@ impl Handler for QueryServer { Err(e) => Err(e), }; - audit_log!(audit, "End create event {:?}", msg); + audit_log!(audit, "End create event {:?} -> {:?}", msg, res); audit.end_event("create"); // At the end of the event we send it for logging. self.log.do_send(audit); diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 9558c67e8..2674110eb 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -2,14 +2,24 @@ extern crate actix; use actix::prelude::*; extern crate rsidm; +use rsidm::config::Configuration; +use rsidm::core::create_server_core; +use rsidm::entry::Entry; +use rsidm::event::EventResult; use rsidm::log::{self, EventLog, LogEvent}; +use rsidm::proto::{CreateRequest, SearchRequest}; use rsidm::server::{self, QueryServer}; // use be; +extern crate reqwest; + extern crate futures; use futures::future; use futures::future::Future; +use std::sync::mpsc; +use std::thread; + extern crate tokio; // use tokio::executor::current_thread::CurrentThread; @@ -17,48 +27,61 @@ extern crate tokio; macro_rules! run_test { ($test_fn:expr) => {{ - System::run(|| { + let (tx, rx) = mpsc::channel(); + + thread::spawn(|| { // setup // Create a server config in memory for use - use test settings // Create a log: In memory - for now it's just stdout - let test_log = log::start(); - // Create the db as a temporary, see: - // https://sqlite.org/inmemorydb.html - let test_server = server::start(test_log.clone(), "", 1); + System::run(move || { + let config = Configuration::new(); + create_server_core(config); - // Do we need any fixtures? - // Yes probably, but they'll need to be futures as well ... - // later we could accept fixture as it's own future for re-use - // For now these can also bypass the FE code - // let fixture_fut = (); - - // We have to spawn every test as a future - let fut = $test_fn(test_log, test_server); - - // Now chain them ... - // Now append the server shutdown. - let comp_fut = fut.map_err(|_| ()).and_then(|_r| { - println!("Stopping actix ..."); - actix::System::current().stop(); - future::result(Ok(())) + // This appears to be bind random ... + // let srv = srv.bind("127.0.0.1:0").unwrap(); + let _ = tx.send(System::current()); }); - - // Run the future - tokio::spawn(comp_fut); - // We DO NOT need teardown, as sqlite is in mem - // let the tables hit the floor }); + let sys = rx.recv().unwrap(); + System::set_current(sys.clone()); + + // Do we need any fixtures? + // Yes probably, but they'll need to be futures as well ... + // later we could accept fixture as it's own future for re-use + $test_fn(); + + // We DO NOT need teardown, as sqlite is in mem + // let the tables hit the floor + let _ = sys.stop(); }}; } #[test] -fn test_schema() { - run_test!( - |log: actix::Addr, _server: actix::Addr| log.send(LogEvent { - msg: String::from("Test log event") - }) - ); +fn test_server_proto() { + run_test!(|| { + let client = reqwest::Client::new(); + + let c = CreateRequest { + entries: Vec::new(), + }; + + let mut response = client + .post("http://127.0.0.1:8080/create") + .body(serde_json::to_string(&c).unwrap()) + .send() + .unwrap(); + + println!("{:?}", response); + let r: EventResult = serde_json::from_str(response.text().unwrap().as_str()).unwrap(); + + println!("{:?}", r); + + // deserialise the response here + // check it's valid. + + () + }); } /*