From 00a86583a97edefed4af305b8fde17608283b06a Mon Sep 17 00:00:00 2001 From: William Brown Date: Tue, 20 Nov 2018 17:26:49 +1000 Subject: [PATCH] Working create functionality --- src/event.rs | 11 +++-- src/main.rs | 130 +++++++++++++++++++++++++++----------------------- src/proto.rs | 20 ++++++++ src/server.rs | 7 +-- 4 files changed, 101 insertions(+), 67 deletions(-) diff --git a/src/event.rs b/src/event.rs index 6e02b9511..88bd8bf1d 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,4 +1,5 @@ use super::filter::Filter; +use super::proto::{CreateRequest, SearchRequest}; use actix::prelude::*; use entry::Entry; @@ -27,9 +28,9 @@ impl Message for SearchEvent { } impl SearchEvent { - pub fn new(filter: Filter) -> Self { + pub fn new(request: SearchRequest) -> Self { SearchEvent { - filter: filter, + filter: request.filter, class: (), } } @@ -50,7 +51,9 @@ impl Message for CreateEvent { } impl CreateEvent { - pub fn new(entries: Vec) -> Self { - CreateEvent { entries: entries } + pub fn new(request: CreateRequest) -> Self { + CreateEvent { + entries: request.entries, + } } } diff --git a/src/main.rs b/src/main.rs index 214fba8e0..b0930e8ad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,11 @@ -extern crate serde; -extern crate serde_json; -// #[macro_use] extern crate actix; extern crate actix_web; extern crate bytes; extern crate env_logger; extern crate futures; +extern crate serde; extern crate serde_derive; +extern crate serde_json; extern crate uuid; // use actix::prelude::*; @@ -20,18 +19,72 @@ use futures::{future, Future, Stream}; #[macro_use] extern crate rsidm; -use rsidm::event; +use rsidm::event::{CreateEvent, EventResult, SearchEvent}; use rsidm::filter::Filter; use rsidm::log; -use rsidm::proto::SearchRequest; +use rsidm::proto::{CreateRequest, SearchRequest}; use rsidm::server; -const MAX_SIZE: usize = 262_144; //256k +const MAX_SIZE: usize = 262_144; //256k - this is the upper bound on create/search etc. struct AppState { qe: actix::Addr, } +macro_rules! json_event_decode { + ($req:expr, $state:expr, $event_type:ty, $message_type:ty) => {{ + // HttpRequest::payload() is stream of Bytes objects + $req.payload() + // `Future::from_err` acts like `?` in that it coerces the error type from + // the future into the final error type + .from_err() + // `fold` will asynchronously read each chunk of the request body and + // call supplied closure, then it resolves to result of closure + .fold(BytesMut::new(), move |mut body, chunk| { + // limit max size of in-memory payload + if (body.len() + chunk.len()) > MAX_SIZE { + Err(error::ErrorBadRequest("overflow")) + } else { + body.extend_from_slice(&chunk); + Ok(body) + } + }) + // `Future::and_then` can be used to merge an asynchronous workflow with a + // synchronous workflow + .and_then( + move |body| -> Box> { + // body is loaded, now we can deserialize serde-json + // let r_obj = serde_json::from_slice::(&body); + let r_obj = serde_json::from_slice::<$message_type>(&body); + + // Send to the db for create + match r_obj { + Ok(obj) => { + let res = $state + .qe + .send( + // Could make this a .into_inner() and move? + // event::SearchEvent::new(obj.filter), + <($event_type)>::new(obj), + ) + .from_err() + .and_then(|res| match res { + Ok(entries) => Ok(HttpResponse::Ok().json(entries)), + Err(_) => Ok(HttpResponse::InternalServerError().into()), + }); + + Box::new(res) + } + Err(e) => Box::new(future::err(error::ErrorBadRequest(format!( + "Json Decode Failed: {:?}", + e + )))), + } + }, + ) + }}; +} + // Handle the various end points we need to expose /// simple handle @@ -55,14 +108,14 @@ fn class_list((_name, state): (Path, State)) -> FutureResponse // // FIXME: Don't use SEARCHEVENT here!!!! // - event::SearchEvent::new(filt), + SearchEvent::new(SearchRequest::new(filt)), ) // TODO: How to time this part of the code? // What does this do? .from_err() .and_then(|res| match res { // What type is entry? - Ok(event::EventResult::Search { entries }) => Ok(HttpResponse::Ok().json(entries)), + Ok(EventResult::Search { entries }) => Ok(HttpResponse::Ok().json(entries)), Ok(_) => Ok(HttpResponse::Ok().into()), // Can we properly report this? Err(_) => Ok(HttpResponse::InternalServerError().into()), @@ -71,56 +124,16 @@ fn class_list((_name, state): (Path, State)) -> FutureResponse .responder() } +fn create( + (req, state): (HttpRequest, State), +) -> impl Future { + json_event_decode!(req, state, CreateEvent, CreateRequest) +} + fn search( (req, state): (HttpRequest, State), ) -> impl Future { - // HttpRequest::payload() is stream of Bytes objects - req.payload() - // `Future::from_err` acts like `?` in that it coerces the error type from - // the future into the final error type - .from_err() - // `fold` will asynchronously read each chunk of the request body and - // call supplied closure, then it resolves to result of closure - .fold(BytesMut::new(), move |mut body, chunk| { - // limit max size of in-memory payload - if (body.len() + chunk.len()) > MAX_SIZE { - Err(error::ErrorBadRequest("overflow")) - } else { - body.extend_from_slice(&chunk); - Ok(body) - } - }) - // `Future::and_then` can be used to merge an asynchronous workflow with a - // synchronous workflow - .and_then( - move |body| -> Box> { - // body is loaded, now we can deserialize serde-json - let r_obj = serde_json::from_slice::(&body); - - // Send to the db for create - match r_obj { - Ok(obj) => { - let res = state - .qe - .send( - // Could make this a .into_inner() and move? - event::SearchEvent::new(obj.filter), - ) - .from_err() - .and_then(|res| match res { - Ok(entries) => Ok(HttpResponse::Ok().json(entries)), - Err(_) => Ok(HttpResponse::InternalServerError().into()), - }); - - Box::new(res) - } - Err(e) => Box::new(future::err(error::ErrorBadRequest(format!( - "Json Decode Failed: {:?}", - e - )))), - } - }, - ) + json_event_decode!(req, state, SearchEvent, SearchRequest) } fn main() { @@ -149,13 +162,10 @@ fn main() { // Connect all our end points here. .middleware(middleware::Logger::default()) .resource("/", |r| r.f(index)) - // - /* + // curl --header "Content-Type: application/json" --request POST --data '{ "entries": [ {"attrs": {"class": ["group"], "name": ["testgroup"], "description": ["testperson"]}}]}' http://127.0.0.1:8080/create .resource("/create", |r| { - r.method(http::Method::POST) - .with(create) + r.method(http::Method::POST).with_async(create) }) - */ // curl --header "Content-Type: application/json" --request POST --data '{ "filter" : { "Eq": ["class", "user"] }}' http://127.0.0.1:8080/search .resource("/search", |r| { r.method(http::Method::POST).with_async(search) diff --git a/src/proto.rs b/src/proto.rs index 74fac771c..0ae934b55 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -1,6 +1,26 @@ +use super::entry::Entry; use super::filter::Filter; +// These proto implementations are here because they have public definitions + #[derive(Debug, Serialize, Deserialize)] pub struct SearchRequest { pub filter: Filter, } + +impl SearchRequest { + pub fn new(filter: Filter) -> Self { + SearchRequest { filter: filter } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateRequest { + pub entries: Vec, +} + +impl CreateRequest { + pub fn new(entries: Vec) -> Self { + CreateRequest { entries: entries } + } +} diff --git a/src/server.rs b/src/server.rs index 0e56dea25..27d0e57cb 100644 --- a/src/server.rs +++ b/src/server.rs @@ -169,6 +169,7 @@ mod tests { use super::super::event::{CreateEvent, SearchEvent}; use super::super::filter::Filter; use super::super::log; + use super::super::proto::{CreateRequest, SearchRequest}; use super::super::schema::Schema; use super::super::server::QueryServer; @@ -202,8 +203,8 @@ mod tests { run_test!(|_log, mut server: QueryServer, audit: &mut AuditEvent| { let filt = Filter::Pres(String::from("name")); - let se1 = SearchEvent::new(filt.clone()); - let se2 = SearchEvent::new(filt); + let se1 = SearchEvent::new(SearchRequest::new(filt.clone())); + let se2 = SearchEvent::new(SearchRequest::new(filt)); let e: Entry = serde_json::from_str( r#"{ @@ -219,7 +220,7 @@ mod tests { let expected = vec![e]; - let ce = CreateEvent::new(expected.clone()); + let ce = CreateEvent::new(CreateRequest::new(expected.clone())); let r1 = server.search(audit, &se1).unwrap(); assert!(r1.len() == 0);