Working create functionality

This commit is contained in:
William Brown 2018-11-20 17:26:49 +10:00
parent f40fdee9cd
commit 00a86583a9
4 changed files with 101 additions and 67 deletions

View file

@ -1,4 +1,5 @@
use super::filter::Filter; use super::filter::Filter;
use super::proto::{CreateRequest, SearchRequest};
use actix::prelude::*; use actix::prelude::*;
use entry::Entry; use entry::Entry;
@ -27,9 +28,9 @@ impl Message for SearchEvent {
} }
impl SearchEvent { impl SearchEvent {
pub fn new(filter: Filter) -> Self { pub fn new(request: SearchRequest) -> Self {
SearchEvent { SearchEvent {
filter: filter, filter: request.filter,
class: (), class: (),
} }
} }
@ -50,7 +51,9 @@ impl Message for CreateEvent {
} }
impl CreateEvent { impl CreateEvent {
pub fn new(entries: Vec<Entry>) -> Self { pub fn new(request: CreateRequest) -> Self {
CreateEvent { entries: entries } CreateEvent {
entries: request.entries,
}
} }
} }

View file

@ -1,12 +1,11 @@
extern crate serde;
extern crate serde_json;
// #[macro_use]
extern crate actix; extern crate actix;
extern crate actix_web; extern crate actix_web;
extern crate bytes; extern crate bytes;
extern crate env_logger; extern crate env_logger;
extern crate futures; extern crate futures;
extern crate serde;
extern crate serde_derive; extern crate serde_derive;
extern crate serde_json;
extern crate uuid; extern crate uuid;
// use actix::prelude::*; // use actix::prelude::*;
@ -20,18 +19,72 @@ use futures::{future, Future, Stream};
#[macro_use] #[macro_use]
extern crate rsidm; extern crate rsidm;
use rsidm::event; use rsidm::event::{CreateEvent, EventResult, SearchEvent};
use rsidm::filter::Filter; use rsidm::filter::Filter;
use rsidm::log; use rsidm::log;
use rsidm::proto::SearchRequest; use rsidm::proto::{CreateRequest, SearchRequest};
use rsidm::server; use rsidm::server;
const MAX_SIZE: usize = 262_144; //256k const MAX_SIZE: usize = 262_144; //256k - this is the upper bound on create/search etc.
struct AppState { struct AppState {
qe: actix::Addr<server::QueryServer>, qe: actix::Addr<server::QueryServer>,
} }
macro_rules! json_event_decode {
($req:expr, $state:expr, $event_type:ty, $message_type:ty) => {{
// HttpRequest::payload() is stream of Bytes objects
$req.payload()
// `Future::from_err` acts like `?` in that it coerces the error type from
// the future into the final error type
.from_err()
// `fold` will asynchronously read each chunk of the request body and
// call supplied closure, then it resolves to result of closure
.fold(BytesMut::new(), move |mut body, chunk| {
// limit max size of in-memory payload
if (body.len() + chunk.len()) > MAX_SIZE {
Err(error::ErrorBadRequest("overflow"))
} else {
body.extend_from_slice(&chunk);
Ok(body)
}
})
// `Future::and_then` can be used to merge an asynchronous workflow with a
// synchronous workflow
.and_then(
move |body| -> Box<Future<Item = HttpResponse, Error = Error>> {
// body is loaded, now we can deserialize serde-json
// let r_obj = serde_json::from_slice::<SearchRequest>(&body);
let r_obj = serde_json::from_slice::<$message_type>(&body);
// Send to the db for create
match r_obj {
Ok(obj) => {
let res = $state
.qe
.send(
// Could make this a .into_inner() and move?
// event::SearchEvent::new(obj.filter),
<($event_type)>::new(obj),
)
.from_err()
.and_then(|res| match res {
Ok(entries) => Ok(HttpResponse::Ok().json(entries)),
Err(_) => Ok(HttpResponse::InternalServerError().into()),
});
Box::new(res)
}
Err(e) => Box::new(future::err(error::ErrorBadRequest(format!(
"Json Decode Failed: {:?}",
e
)))),
}
},
)
}};
}
// Handle the various end points we need to expose // Handle the various end points we need to expose
/// simple handle /// simple handle
@ -55,14 +108,14 @@ fn class_list((_name, state): (Path<String>, State<AppState>)) -> FutureResponse
// //
// FIXME: Don't use SEARCHEVENT here!!!! // FIXME: Don't use SEARCHEVENT here!!!!
// //
event::SearchEvent::new(filt), SearchEvent::new(SearchRequest::new(filt)),
) )
// TODO: How to time this part of the code? // TODO: How to time this part of the code?
// What does this do? // What does this do?
.from_err() .from_err()
.and_then(|res| match res { .and_then(|res| match res {
// What type is entry? // What type is entry?
Ok(event::EventResult::Search { entries }) => Ok(HttpResponse::Ok().json(entries)), Ok(EventResult::Search { entries }) => Ok(HttpResponse::Ok().json(entries)),
Ok(_) => Ok(HttpResponse::Ok().into()), Ok(_) => Ok(HttpResponse::Ok().into()),
// Can we properly report this? // Can we properly report this?
Err(_) => Ok(HttpResponse::InternalServerError().into()), Err(_) => Ok(HttpResponse::InternalServerError().into()),
@ -71,56 +124,16 @@ fn class_list((_name, state): (Path<String>, State<AppState>)) -> FutureResponse
.responder() .responder()
} }
fn create(
(req, state): (HttpRequest<AppState>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> {
json_event_decode!(req, state, CreateEvent, CreateRequest)
}
fn search( fn search(
(req, state): (HttpRequest<AppState>, State<AppState>), (req, state): (HttpRequest<AppState>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> { ) -> impl Future<Item = HttpResponse, Error = Error> {
// HttpRequest::payload() is stream of Bytes objects json_event_decode!(req, state, SearchEvent, SearchRequest)
req.payload()
// `Future::from_err` acts like `?` in that it coerces the error type from
// the future into the final error type
.from_err()
// `fold` will asynchronously read each chunk of the request body and
// call supplied closure, then it resolves to result of closure
.fold(BytesMut::new(), move |mut body, chunk| {
// limit max size of in-memory payload
if (body.len() + chunk.len()) > MAX_SIZE {
Err(error::ErrorBadRequest("overflow"))
} else {
body.extend_from_slice(&chunk);
Ok(body)
}
})
// `Future::and_then` can be used to merge an asynchronous workflow with a
// synchronous workflow
.and_then(
move |body| -> Box<Future<Item = HttpResponse, Error = Error>> {
// body is loaded, now we can deserialize serde-json
let r_obj = serde_json::from_slice::<SearchRequest>(&body);
// Send to the db for create
match r_obj {
Ok(obj) => {
let res = state
.qe
.send(
// Could make this a .into_inner() and move?
event::SearchEvent::new(obj.filter),
)
.from_err()
.and_then(|res| match res {
Ok(entries) => Ok(HttpResponse::Ok().json(entries)),
Err(_) => Ok(HttpResponse::InternalServerError().into()),
});
Box::new(res)
}
Err(e) => Box::new(future::err(error::ErrorBadRequest(format!(
"Json Decode Failed: {:?}",
e
)))),
}
},
)
} }
fn main() { fn main() {
@ -149,13 +162,10 @@ fn main() {
// Connect all our end points here. // Connect all our end points here.
.middleware(middleware::Logger::default()) .middleware(middleware::Logger::default())
.resource("/", |r| r.f(index)) .resource("/", |r| r.f(index))
// // curl --header "Content-Type: application/json" --request POST --data '{ "entries": [ {"attrs": {"class": ["group"], "name": ["testgroup"], "description": ["testperson"]}}]}' http://127.0.0.1:8080/create
/*
.resource("/create", |r| { .resource("/create", |r| {
r.method(http::Method::POST) r.method(http::Method::POST).with_async(create)
.with(create)
}) })
*/
// curl --header "Content-Type: application/json" --request POST --data '{ "filter" : { "Eq": ["class", "user"] }}' http://127.0.0.1:8080/search // curl --header "Content-Type: application/json" --request POST --data '{ "filter" : { "Eq": ["class", "user"] }}' http://127.0.0.1:8080/search
.resource("/search", |r| { .resource("/search", |r| {
r.method(http::Method::POST).with_async(search) r.method(http::Method::POST).with_async(search)

View file

@ -1,6 +1,26 @@
use super::entry::Entry;
use super::filter::Filter; use super::filter::Filter;
// These proto implementations are here because they have public definitions
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct SearchRequest { pub struct SearchRequest {
pub filter: Filter, pub filter: Filter,
} }
impl SearchRequest {
pub fn new(filter: Filter) -> Self {
SearchRequest { filter: filter }
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateRequest {
pub entries: Vec<Entry>,
}
impl CreateRequest {
pub fn new(entries: Vec<Entry>) -> Self {
CreateRequest { entries: entries }
}
}

View file

@ -169,6 +169,7 @@ mod tests {
use super::super::event::{CreateEvent, SearchEvent}; use super::super::event::{CreateEvent, SearchEvent};
use super::super::filter::Filter; use super::super::filter::Filter;
use super::super::log; use super::super::log;
use super::super::proto::{CreateRequest, SearchRequest};
use super::super::schema::Schema; use super::super::schema::Schema;
use super::super::server::QueryServer; use super::super::server::QueryServer;
@ -202,8 +203,8 @@ mod tests {
run_test!(|_log, mut server: QueryServer, audit: &mut AuditEvent| { run_test!(|_log, mut server: QueryServer, audit: &mut AuditEvent| {
let filt = Filter::Pres(String::from("name")); let filt = Filter::Pres(String::from("name"));
let se1 = SearchEvent::new(filt.clone()); let se1 = SearchEvent::new(SearchRequest::new(filt.clone()));
let se2 = SearchEvent::new(filt); let se2 = SearchEvent::new(SearchRequest::new(filt));
let e: Entry = serde_json::from_str( let e: Entry = serde_json::from_str(
r#"{ r#"{
@ -219,7 +220,7 @@ mod tests {
let expected = vec![e]; let expected = vec![e];
let ce = CreateEvent::new(expected.clone()); let ce = CreateEvent::new(CreateRequest::new(expected.clone()));
let r1 = server.search(audit, &se1).unwrap(); let r1 = server.search(audit, &se1).unwrap();
assert!(r1.len() == 0); assert!(r1.len() == 0);