From 86938a7521717b6cad227a3c63dde794b41ad7f9 Mon Sep 17 00:00:00 2001 From: Firstyear Date: Tue, 15 Oct 2019 15:34:07 +1300 Subject: [PATCH] Split read vs write actors (#121) --- kanidmd/src/lib/actors/mod.rs | 3 +- kanidmd/src/lib/actors/v1_read.rs | 299 +++++++++++++++ kanidmd/src/lib/actors/{v1.rs => v1_write.rs} | 350 +++--------------- kanidmd/src/lib/core.rs | 135 +++---- kanidmd/src/lib/event.rs | 5 +- kanidmd/src/lib/idm/event.rs | 2 +- kanidmd/src/lib/interval.rs | 6 +- kanidmd/src/lib/plugins/attrunique.rs | 16 +- kanidmd/src/lib/plugins/base.rs | 24 +- kanidmd/src/lib/plugins/refint.rs | 12 +- kanidmd/src/lib/utils.rs | 6 +- 11 files changed, 472 insertions(+), 386 deletions(-) create mode 100644 kanidmd/src/lib/actors/v1_read.rs rename kanidmd/src/lib/actors/{v1.rs => v1_write.rs} (53%) diff --git a/kanidmd/src/lib/actors/mod.rs b/kanidmd/src/lib/actors/mod.rs index a3a6d96c3..e38d2bcd7 100644 --- a/kanidmd/src/lib/actors/mod.rs +++ b/kanidmd/src/lib/actors/mod.rs @@ -1 +1,2 @@ -pub mod v1; +pub mod v1_read; +pub mod v1_write; diff --git a/kanidmd/src/lib/actors/v1_read.rs b/kanidmd/src/lib/actors/v1_read.rs new file mode 100644 index 000000000..b6166963f --- /dev/null +++ b/kanidmd/src/lib/actors/v1_read.rs @@ -0,0 +1,299 @@ +use std::sync::Arc; + +use crate::audit::AuditScope; + +use crate::async_log::EventLog; +use crate::event::{AuthEvent, SearchEvent, SearchResult, WhoamiResult}; +use kanidm_proto::v1::OperationError; + +use crate::filter::{Filter, FilterInvalid}; +use crate::idm::server::IdmServer; +use crate::server::{QueryServer, QueryServerTransaction}; + +use kanidm_proto::v1::Entry as ProtoEntry; +use kanidm_proto::v1::{ + AuthRequest, AuthResponse, SearchRequest, SearchResponse, UserAuthToken, WhoamiResponse, +}; + +use actix::prelude::*; +use std::time::SystemTime; +use uuid::Uuid; + +// These are used when the request (IE Get) has no intrising request +// type. Additionally, they are used in some requests where we need +// to supplement extra server state (IE userauthtokens) to a request. +// +// Generally we don't need to have the responses here because they are +// part of the protocol. + +pub struct WhoamiMessage { + pub uat: Option, +} + +impl WhoamiMessage { + pub fn new(uat: Option) -> Self { + WhoamiMessage { uat: uat } + } +} + +impl Message for WhoamiMessage { + type Result = Result; +} + +#[derive(Debug)] +pub struct AuthMessage { + pub sessionid: Option, + pub req: AuthRequest, +} + +impl AuthMessage { + pub fn new(req: AuthRequest, sessionid: Option) -> Self { + AuthMessage { + sessionid: sessionid, + req: req, + } + } +} + +impl Message for AuthMessage { + type Result = Result; +} + +pub struct SearchMessage { + pub uat: Option, + pub req: SearchRequest, +} + +impl SearchMessage { + pub fn new(uat: Option, req: SearchRequest) -> Self { + SearchMessage { uat: uat, req: req } + } +} + +impl Message for SearchMessage { + type Result = Result; +} + +pub struct InternalSearchMessage { + pub uat: Option, + pub filter: Filter, +} + +impl InternalSearchMessage { + pub fn new(uat: Option, filter: Filter) -> Self { + InternalSearchMessage { + uat: uat, + filter: filter, + } + } +} + +impl Message for InternalSearchMessage { + type Result = Result, OperationError>; +} + +// =========================================================== + +pub struct QueryServerReadV1 { + log: actix::Addr, + qs: QueryServer, + idms: Arc, +} + +impl Actor for QueryServerReadV1 { + type Context = SyncContext; + + fn started(&mut self, _ctx: &mut Self::Context) { + // ctx.set_mailbox_capacity(1 << 31); + } +} + +impl QueryServerReadV1 { + pub fn new(log: actix::Addr, qs: QueryServer, idms: Arc) -> Self { + log_event!(log, "Starting query server v1 worker ..."); + QueryServerReadV1 { + log: log, + qs: qs, + idms: idms, + } + } + + pub fn start( + log: actix::Addr, + query_server: QueryServer, + idms: Arc, + threads: usize, + ) -> actix::Addr { + SyncArbiter::start(threads, move || { + QueryServerReadV1::new(log.clone(), query_server.clone(), idms.clone()) + }) + } +} + +// The server only recieves "Message" structures, which +// are whole self contained DB operations with all parsing +// required complete. We still need to do certain validation steps, but +// at this point our just is just to route to do_ + +impl Handler for QueryServerReadV1 { + type Result = Result; + + fn handle(&mut self, msg: SearchMessage, _: &mut Self::Context) -> Self::Result { + let mut audit = AuditScope::new("search"); + let res = audit_segment!(&mut audit, || { + // Begin a read + let qs_read = self.qs.read(); + + // Make an event from the request + let srch = match SearchEvent::from_message(&mut audit, msg, &qs_read) { + Ok(s) => s, + Err(e) => { + audit_log!(audit, "Failed to begin search: {:?}", e); + return Err(e); + } + }; + + audit_log!(audit, "Begin event {:?}", srch); + + match qs_read.search_ext(&mut audit, &srch) { + Ok(entries) => { + SearchResult::new(&mut audit, &qs_read, entries).map(|ok_sr| ok_sr.response()) + } + Err(e) => Err(e), + } + }); + // At the end of the event we send it for logging. + self.log.do_send(audit); + res + } +} + +impl Handler for QueryServerReadV1 { + type Result = Result; + + fn handle(&mut self, msg: AuthMessage, _: &mut Self::Context) -> Self::Result { + // This is probably the first function that really implements logic + // "on top" of the db server concept. In this case we check if + // the credentials provided is sufficient to say if someone is + // "authenticated" or not. + let mut audit = AuditScope::new("auth"); + let res = audit_segment!(&mut audit, || { + audit_log!(audit, "Begin auth event {:?}", msg); + + // Destructure it. + // Convert the AuthRequest to an AuthEvent that the idm server + // can use. + + let mut idm_write = self.idms.write(); + + let ae = try_audit!(audit, AuthEvent::from_message(msg)); + + let ct = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Clock failure!"); + + // Trigger a session clean *before* we take any auth steps. + // It's important to do this before to ensure that timeouts on + // the session are enforced. + idm_write.expire_auth_sessions(ct); + + // Generally things like auth denied are in Ok() msgs + // so true errors should always trigger a rollback. + let r = idm_write + .auth(&mut audit, &ae, ct) + .and_then(|r| idm_write.commit().map(|_| r)); + + audit_log!(audit, "Sending result -> {:?}", r); + // Build the result. + r.map(|r| r.response()) + }); + // At the end of the event we send it for logging. + self.log.do_send(audit); + res + } +} + +impl Handler for QueryServerReadV1 { + type Result = Result; + + fn handle(&mut self, msg: WhoamiMessage, _: &mut Self::Context) -> Self::Result { + let mut audit = AuditScope::new("whoami"); + let res = audit_segment!(&mut audit, || { + // TODO #62: Move this to IdmServer!!! + // Begin a read + let qs_read = self.qs.read(); + + // Make an event from the whoami request. This will process the event and + // generate a selfuuid search. + // + // This current handles the unauthenticated check, and will + // trigger the failure, but if we can manage to work out async + // then move this to core.rs, and don't allow Option to get + // this far. + let uat = msg.uat.clone().ok_or(OperationError::NotAuthenticated)?; + + let srch = match SearchEvent::from_whoami_request(&mut audit, msg.uat, &qs_read) { + Ok(s) => s, + Err(e) => { + audit_log!(audit, "Failed to begin whoami: {:?}", e); + return Err(e); + } + }; + + audit_log!(audit, "Begin event {:?}", srch); + + match qs_read.search_ext(&mut audit, &srch) { + Ok(mut entries) => { + // assert there is only one ... + match entries.len() { + 0 => Err(OperationError::NoMatchingEntries), + 1 => { + let e = entries.pop().expect("Entry length mismatch!!!"); + // Now convert to a response, and return + WhoamiResult::new(&mut audit, &qs_read, e, uat) + .map(|ok_wr| ok_wr.response()) + } + // Somehow we matched multiple, which should be impossible. + _ => Err(OperationError::InvalidState), + } + } + // Something else went wrong ... + Err(e) => Err(e), + } + }); + // Should we log the final result? + // At the end of the event we send it for logging. + self.log.do_send(audit); + res + } +} + +impl Handler for QueryServerReadV1 { + type Result = Result, OperationError>; + + fn handle(&mut self, msg: InternalSearchMessage, _: &mut Self::Context) -> Self::Result { + let mut audit = AuditScope::new("internal_search_message"); + let res = audit_segment!(&mut audit, || { + let qs_read = self.qs.read(); + + // Make an event from the request + let srch = match SearchEvent::from_internal_message(&mut audit, msg, &qs_read) { + Ok(s) => s, + Err(e) => { + audit_log!(audit, "Failed to begin search: {:?}", e); + return Err(e); + } + }; + + audit_log!(audit, "Begin event {:?}", srch); + + match qs_read.search_ext(&mut audit, &srch) { + Ok(entries) => SearchResult::new(&mut audit, &qs_read, entries) + .map(|ok_sr| ok_sr.to_proto_array()), + Err(e) => Err(e), + } + }); + self.log.do_send(audit); + res + } +} diff --git a/kanidmd/src/lib/actors/v1.rs b/kanidmd/src/lib/actors/v1_write.rs similarity index 53% rename from kanidmd/src/lib/actors/v1.rs rename to kanidmd/src/lib/actors/v1_write.rs index 3910858c9..632693ca8 100644 --- a/kanidmd/src/lib/actors/v1.rs +++ b/kanidmd/src/lib/actors/v1_write.rs @@ -1,70 +1,24 @@ -use std::sync::Arc; - use crate::audit::AuditScope; +use std::sync::Arc; use crate::async_log::EventLog; use crate::event::{ - AuthEvent, CreateEvent, DeleteEvent, ModifyEvent, PurgeRecycledEvent, PurgeTombstoneEvent, - SearchEvent, SearchResult, WhoamiResult, + CreateEvent, DeleteEvent, ModifyEvent, PurgeRecycledEvent, PurgeTombstoneEvent, }; use crate::idm::event::{GeneratePasswordEvent, PasswordChangeEvent}; use kanidm_proto::v1::OperationError; -use crate::filter::{Filter, FilterInvalid}; use crate::idm::server::IdmServer; use crate::server::{QueryServer, QueryServerTransaction}; -use kanidm_proto::v1::Entry as ProtoEntry; use kanidm_proto::v1::{ - AuthRequest, AuthResponse, CreateRequest, DeleteRequest, ModifyRequest, OperationResponse, - SearchRequest, SearchResponse, SetAuthCredential, SingleStringRequest, UserAuthToken, - WhoamiResponse, + CreateRequest, DeleteRequest, ModifyRequest, OperationResponse, SetAuthCredential, + SingleStringRequest, UserAuthToken, }; use actix::prelude::*; -use std::time::SystemTime; use uuid::Uuid; -// These are used when the request (IE Get) has no intrising request -// type. Additionally, they are used in some requests where we need -// to supplement extra server state (IE userauthtokens) to a request. -// -// Generally we don't need to have the responses here because they are -// part of the protocol. - -pub struct WhoamiMessage { - pub uat: Option, -} - -impl WhoamiMessage { - pub fn new(uat: Option) -> Self { - WhoamiMessage { uat: uat } - } -} - -impl Message for WhoamiMessage { - type Result = Result; -} - -#[derive(Debug)] -pub struct AuthMessage { - pub sessionid: Option, - pub req: AuthRequest, -} - -impl AuthMessage { - pub fn new(req: AuthRequest, sessionid: Option) -> Self { - AuthMessage { - sessionid: sessionid, - req: req, - } - } -} - -impl Message for AuthMessage { - type Result = Result; -} - pub struct CreateMessage { pub uat: Option, pub req: CreateRequest, @@ -110,39 +64,6 @@ impl Message for ModifyMessage { type Result = Result; } -pub struct SearchMessage { - pub uat: Option, - pub req: SearchRequest, -} - -impl SearchMessage { - pub fn new(uat: Option, req: SearchRequest) -> Self { - SearchMessage { uat: uat, req: req } - } -} - -impl Message for SearchMessage { - type Result = Result; -} - -pub struct InternalSearchMessage { - pub uat: Option, - pub filter: Filter, -} - -impl InternalSearchMessage { - pub fn new(uat: Option, filter: Filter) -> Self { - InternalSearchMessage { - uat: uat, - filter: filter, - } - } -} - -impl Message for InternalSearchMessage { - type Result = Result, OperationError>; -} - pub struct IdmAccountSetPasswordMessage { pub uat: Option, pub cleartext: String, @@ -188,26 +109,26 @@ impl Message for InternalCredentialSetMessage { type Result = Result, OperationError>; } -// =========================================================== - -pub struct QueryServerV1 { +pub struct QueryServerWriteV1 { log: actix::Addr, qs: QueryServer, idms: Arc, } -impl Actor for QueryServerV1 { +impl Actor for QueryServerWriteV1 { type Context = SyncContext; fn started(&mut self, _ctx: &mut Self::Context) { + // How much backlog we want to allow outstanding before we start to throw + // errors? // ctx.set_mailbox_capacity(1 << 31); } } -impl QueryServerV1 { +impl QueryServerWriteV1 { pub fn new(log: actix::Addr, qs: QueryServer, idms: Arc) -> Self { log_event!(log, "Starting query server v1 worker ..."); - QueryServerV1 { + QueryServerWriteV1 { log: log, qs: qs, idms: idms, @@ -217,55 +138,15 @@ impl QueryServerV1 { pub fn start( log: actix::Addr, query_server: QueryServer, - idms: IdmServer, - threads: usize, - ) -> actix::Addr { - let idms_arc = Arc::new(idms); - SyncArbiter::start(threads, move || { - QueryServerV1::new(log.clone(), query_server.clone(), idms_arc.clone()) + idms: Arc, + ) -> actix::Addr { + SyncArbiter::start(1, move || { + QueryServerWriteV1::new(log.clone(), query_server.clone(), idms.clone()) }) } } -// The server only recieves "Message" structures, which -// are whole self contained DB operations with all parsing -// required complete. We still need to do certain validation steps, but -// at this point our just is just to route to do_ - -impl Handler for QueryServerV1 { - type Result = Result; - - fn handle(&mut self, msg: SearchMessage, _: &mut Self::Context) -> Self::Result { - let mut audit = AuditScope::new("search"); - let res = audit_segment!(&mut audit, || { - // Begin a read - let qs_read = self.qs.read(); - - // Make an event from the request - let srch = match SearchEvent::from_message(&mut audit, msg, &qs_read) { - Ok(s) => s, - Err(e) => { - audit_log!(audit, "Failed to begin search: {:?}", e); - return Err(e); - } - }; - - audit_log!(audit, "Begin event {:?}", srch); - - match qs_read.search_ext(&mut audit, &srch) { - Ok(entries) => { - SearchResult::new(&mut audit, &qs_read, entries).map(|ok_sr| ok_sr.response()) - } - Err(e) => Err(e), - } - }); - // At the end of the event we send it for logging. - self.log.do_send(audit); - res - } -} - -impl Handler for QueryServerV1 { +impl Handler for QueryServerWriteV1 { type Result = Result; fn handle(&mut self, msg: CreateMessage, _: &mut Self::Context) -> Self::Result { @@ -293,7 +174,7 @@ impl Handler for QueryServerV1 { } } -impl Handler for QueryServerV1 { +impl Handler for QueryServerWriteV1 { type Result = Result; fn handle(&mut self, msg: ModifyMessage, _: &mut Self::Context) -> Self::Result { @@ -319,7 +200,7 @@ impl Handler for QueryServerV1 { } } -impl Handler for QueryServerV1 { +impl Handler for QueryServerWriteV1 { type Result = Result; fn handle(&mut self, msg: DeleteMessage, _: &mut Self::Context) -> Self::Result { @@ -346,169 +227,8 @@ impl Handler for QueryServerV1 { } } -// Need an auth session storage. LRU? -// requires a lock ... -// needs session id, entry, etc. - -impl Handler for QueryServerV1 { - type Result = Result; - - fn handle(&mut self, msg: AuthMessage, _: &mut Self::Context) -> Self::Result { - // This is probably the first function that really implements logic - // "on top" of the db server concept. In this case we check if - // the credentials provided is sufficient to say if someone is - // "authenticated" or not. - let mut audit = AuditScope::new("auth"); - let res = audit_segment!(&mut audit, || { - audit_log!(audit, "Begin auth event {:?}", msg); - - // Destructure it. - // Convert the AuthRequest to an AuthEvent that the idm server - // can use. - - let mut idm_write = self.idms.write(); - - let ae = try_audit!(audit, AuthEvent::from_message(msg)); - - let ct = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("Clock failure!"); - - // Trigger a session clean *before* we take any auth steps. - // It's important to do this before to ensure that timeouts on - // the session are enforced. - idm_write.expire_auth_sessions(ct); - - // Generally things like auth denied are in Ok() msgs - // so true errors should always trigger a rollback. - let r = idm_write - .auth(&mut audit, &ae, ct) - .and_then(|r| idm_write.commit().map(|_| r)); - - audit_log!(audit, "Sending result -> {:?}", r); - // Build the result. - r.map(|r| r.response()) - }); - // At the end of the event we send it for logging. - self.log.do_send(audit); - res - } -} - -impl Handler for QueryServerV1 { - type Result = Result; - - fn handle(&mut self, msg: WhoamiMessage, _: &mut Self::Context) -> Self::Result { - let mut audit = AuditScope::new("whoami"); - let res = audit_segment!(&mut audit, || { - // TODO #62: Move this to IdmServer!!! - // Begin a read - let qs_read = self.qs.read(); - - // Make an event from the whoami request. This will process the event and - // generate a selfuuid search. - // - // This current handles the unauthenticated check, and will - // trigger the failure, but if we can manage to work out async - // then move this to core.rs, and don't allow Option to get - // this far. - let uat = msg.uat.clone().ok_or(OperationError::NotAuthenticated)?; - - let srch = match SearchEvent::from_whoami_request(&mut audit, msg.uat, &qs_read) { - Ok(s) => s, - Err(e) => { - audit_log!(audit, "Failed to begin whoami: {:?}", e); - return Err(e); - } - }; - - audit_log!(audit, "Begin event {:?}", srch); - - match qs_read.search_ext(&mut audit, &srch) { - Ok(mut entries) => { - // assert there is only one ... - match entries.len() { - 0 => Err(OperationError::NoMatchingEntries), - 1 => { - let e = entries.pop().expect("Entry length mismatch!!!"); - // Now convert to a response, and return - WhoamiResult::new(&mut audit, &qs_read, e, uat) - .map(|ok_wr| ok_wr.response()) - } - // Somehow we matched multiple, which should be impossible. - _ => Err(OperationError::InvalidState), - } - } - // Something else went wrong ... - Err(e) => Err(e), - } - }); - // Should we log the final result? - // At the end of the event we send it for logging. - self.log.do_send(audit); - res - } -} - -impl Handler for QueryServerV1 { - type Result = Result; - - fn handle(&mut self, msg: IdmAccountSetPasswordMessage, _: &mut Self::Context) -> Self::Result { - let mut audit = AuditScope::new("idm_account_set_password"); - let res = audit_segment!(&mut audit, || { - let mut idms_prox_write = self.idms.proxy_write(); - - let pce = PasswordChangeEvent::from_idm_account_set_password( - &mut audit, - &idms_prox_write.qs_write, - msg, - ) - .map_err(|e| { - audit_log!(audit, "Failed to begin idm_account_set_password: {:?}", e); - e - })?; - - idms_prox_write - .set_account_password(&mut audit, &pce) - .and_then(|_| idms_prox_write.commit(&mut audit)) - .map(|_| OperationResponse::new(())) - }); - self.log.do_send(audit); - res - } -} - -impl Handler for QueryServerV1 { - type Result = Result, OperationError>; - - fn handle(&mut self, msg: InternalSearchMessage, _: &mut Self::Context) -> Self::Result { - let mut audit = AuditScope::new("internal_search_message"); - let res = audit_segment!(&mut audit, || { - let qs_read = self.qs.read(); - - // Make an event from the request - let srch = match SearchEvent::from_internal_message(&mut audit, msg, &qs_read) { - Ok(s) => s, - Err(e) => { - audit_log!(audit, "Failed to begin search: {:?}", e); - return Err(e); - } - }; - - audit_log!(audit, "Begin event {:?}", srch); - - match qs_read.search_ext(&mut audit, &srch) { - Ok(entries) => SearchResult::new(&mut audit, &qs_read, entries) - .map(|ok_sr| ok_sr.to_proto_array()), - Err(e) => Err(e), - } - }); - self.log.do_send(audit); - res - } -} - -impl Handler for QueryServerV1 { +// IDM native types for modifications +impl Handler for QueryServerWriteV1 { type Result = Result, OperationError>; fn handle(&mut self, msg: InternalCredentialSetMessage, _: &mut Self::Context) -> Self::Result { @@ -583,9 +303,37 @@ impl Handler for QueryServerV1 { } } +impl Handler for QueryServerWriteV1 { + type Result = Result; + + fn handle(&mut self, msg: IdmAccountSetPasswordMessage, _: &mut Self::Context) -> Self::Result { + let mut audit = AuditScope::new("idm_account_set_password"); + let res = audit_segment!(&mut audit, || { + let mut idms_prox_write = self.idms.proxy_write(); + + let pce = PasswordChangeEvent::from_idm_account_set_password( + &mut audit, + &idms_prox_write.qs_write, + msg, + ) + .map_err(|e| { + audit_log!(audit, "Failed to begin idm_account_set_password: {:?}", e); + e + })?; + + idms_prox_write + .set_account_password(&mut audit, &pce) + .and_then(|_| idms_prox_write.commit(&mut audit)) + .map(|_| OperationResponse::new(())) + }); + self.log.do_send(audit); + res + } +} + // These below are internal only types. -impl Handler for QueryServerV1 { +impl Handler for QueryServerWriteV1 { type Result = (); fn handle(&mut self, msg: PurgeTombstoneEvent, _: &mut Self::Context) -> Self::Result { @@ -606,7 +354,7 @@ impl Handler for QueryServerV1 { } } -impl Handler for QueryServerV1 { +impl Handler for QueryServerWriteV1 { type Result = (); fn handle(&mut self, msg: PurgeRecycledEvent, _: &mut Self::Context) -> Self::Result { diff --git a/kanidmd/src/lib/core.rs b/kanidmd/src/lib/core.rs index 4f4e64c22..f9953ef58 100644 --- a/kanidmd/src/lib/core.rs +++ b/kanidmd/src/lib/core.rs @@ -8,16 +8,18 @@ use actix_web::{ use bytes::BytesMut; use futures::{future, Future, Stream}; +use std::sync::Arc; use time::Duration; use crate::config::Configuration; // SearchResult -use crate::actors::v1::QueryServerV1; -use crate::actors::v1::{ - AuthMessage, CreateMessage, DeleteMessage, IdmAccountSetPasswordMessage, - InternalCredentialSetMessage, InternalSearchMessage, ModifyMessage, SearchMessage, - WhoamiMessage, +use crate::actors::v1_read::QueryServerReadV1; +use crate::actors::v1_read::{AuthMessage, InternalSearchMessage, SearchMessage, WhoamiMessage}; +use crate::actors::v1_write::QueryServerWriteV1; +use crate::actors::v1_write::{ + CreateMessage, DeleteMessage, IdmAccountSetPasswordMessage, InternalCredentialSetMessage, + ModifyMessage, }; use crate::async_log; use crate::audit::AuditScope; @@ -41,7 +43,8 @@ use kanidm_proto::v1::{ use uuid::Uuid; struct AppState { - qe: actix::Addr, + qe_r: actix::Addr, + qe_w: actix::Addr, max_size: usize, } @@ -69,7 +72,7 @@ fn operation_error_to_response(e: OperationError) -> HttpResponse { } macro_rules! json_event_post { - ($req:expr, $state:expr, $message_type:ty, $request_type:ty) => {{ + ($req:expr, $state:expr, $message_type:ty, $request_type:ty, $dest:expr) => {{ // This is copied every request. Is there a better way? // The issue is the fold move takes ownership of state if // we don't copy this here @@ -106,8 +109,7 @@ macro_rules! json_event_post { Ok(obj) => { // combine request + uat -> message. let m_obj = <($message_type)>::new(uat, obj); - let res = $state - .qe + let res = $dest .send(m_obj) // What is from_err? .from_err() @@ -138,7 +140,7 @@ macro_rules! json_event_get { // New event, feed current auth data from the token to it. let obj = <($message_type)>::new(uat); - let res = $state.qe.send(obj).from_err().and_then(|res| match res { + let res = $state.qe_r.send(obj).from_err().and_then(|res| match res { Ok(event_result) => Ok(HttpResponse::Ok().json(event_result)), Err(e) => Ok(operation_error_to_response(e)), }); @@ -152,25 +154,25 @@ macro_rules! json_event_get { fn create( (req, state): (HttpRequest, State), ) -> impl Future { - json_event_post!(req, state, CreateMessage, CreateRequest) + json_event_post!(req, state, CreateMessage, CreateRequest, state.qe_w) } fn modify( (req, state): (HttpRequest, State), ) -> impl Future { - json_event_post!(req, state, ModifyMessage, ModifyRequest) + json_event_post!(req, state, ModifyMessage, ModifyRequest, state.qe_w) } fn delete( (req, state): (HttpRequest, State), ) -> impl Future { - json_event_post!(req, state, DeleteMessage, DeleteRequest) + json_event_post!(req, state, DeleteMessage, DeleteRequest, state.qe_w) } fn search( (req, state): (HttpRequest, State), ) -> impl Future { - json_event_post!(req, state, SearchMessage, SearchRequest) + json_event_post!(req, state, SearchMessage, SearchRequest, state.qe_r) } fn whoami( @@ -192,7 +194,7 @@ fn json_rest_event_get( // type that we send to the qs. let obj = InternalSearchMessage::new(uat, filter); - let res = state.qe.send(obj).from_err().and_then(|res| match res { + let res = state.qe_r.send(obj).from_err().and_then(|res| match res { Ok(event_result) => Ok(HttpResponse::Ok().json(event_result)), Err(e) => Ok(operation_error_to_response(e)), }); @@ -212,7 +214,7 @@ fn json_rest_event_get_id( let obj = InternalSearchMessage::new(uat, filter); - let res = state.qe.send(obj).from_err().and_then(|res| match res { + let res = state.qe_r.send(obj).from_err().and_then(|res| match res { Ok(mut event_result) => { // Only send back the first result, or None Ok(HttpResponse::Ok().json(event_result.pop())) @@ -259,7 +261,7 @@ fn json_rest_event_credential_put( match r_obj { Ok(obj) => { let m_obj = InternalCredentialSetMessage::new(uat, id, cred_id, obj); - let res = state.qe.send(m_obj).from_err().and_then(|res| match res { + let res = state.qe_w.send(m_obj).from_err().and_then(|res| match res { Ok(event_result) => Ok(HttpResponse::Ok().json(event_result)), Err(e) => Ok(operation_error_to_response(e)), }); @@ -320,7 +322,7 @@ fn schema_attributetype_get_id( let obj = InternalSearchMessage::new(uat, filter); - let res = state.qe.send(obj).from_err().and_then(|res| match res { + let res = state.qe_r.send(obj).from_err().and_then(|res| match res { Ok(mut event_result) => { // Only send back the first result, or None Ok(HttpResponse::Ok().json(event_result.pop())) @@ -351,7 +353,7 @@ fn schema_classtype_get_id( let obj = InternalSearchMessage::new(uat, filter); - let res = state.qe.send(obj).from_err().and_then(|res| match res { + let res = state.qe_r.send(obj).from_err().and_then(|res| match res { Ok(mut event_result) => { // Only send back the first result, or None Ok(HttpResponse::Ok().json(event_result.pop())) @@ -447,48 +449,44 @@ fn auth( // We probably need to know if we allocate the cookie, that this is a // new session, and in that case, anything *except* authrequest init is // invalid. - let res = - state - .qe - .send(auth_msg) - .from_err() - .and_then(move |res| match res { - Ok(ar) => { - match &ar.state { - AuthState::Success(uat) => { - // Remove the auth-session-id - req.session().remove("auth-session-id"); - // Set the uat into the cookie - match req.session().set("uat", uat) { - Ok(_) => Ok(HttpResponse::Ok().json(ar)), - Err(_) => { - Ok(HttpResponse::InternalServerError() - .json(())) - } + let res = state + // This may change in the future ... + .qe_r + .send(auth_msg) + .from_err() + .and_then(move |res| match res { + Ok(ar) => { + match &ar.state { + AuthState::Success(uat) => { + // Remove the auth-session-id + req.session().remove("auth-session-id"); + // Set the uat into the cookie + match req.session().set("uat", uat) { + Ok(_) => Ok(HttpResponse::Ok().json(ar)), + Err(_) => { + Ok(HttpResponse::InternalServerError().json(())) } } - AuthState::Denied(_) => { - // Remove the auth-session-id - req.session().remove("auth-session-id"); - Ok(HttpResponse::Unauthorized().json(ar)) - } - AuthState::Continue(_) => { - // Ensure the auth-session-id is set - match req - .session() - .set("auth-session-id", ar.sessionid) - { - Ok(_) => Ok(HttpResponse::Ok().json(ar)), - Err(_) => { - Ok(HttpResponse::InternalServerError() - .json(())) - } + } + AuthState::Denied(_) => { + // Remove the auth-session-id + req.session().remove("auth-session-id"); + Ok(HttpResponse::Unauthorized().json(ar)) + } + AuthState::Continue(_) => { + // Ensure the auth-session-id is set + match req.session().set("auth-session-id", ar.sessionid) + { + Ok(_) => Ok(HttpResponse::Ok().json(ar)), + Err(_) => { + Ok(HttpResponse::InternalServerError().json(())) } } } } - Err(e) => Ok(operation_error_to_response(e)), - }); + } + Err(e) => Ok(operation_error_to_response(e)), + }); Box::new(res) } Err(e) => Box::new(future::err(error::ErrorBadRequest(format!( @@ -507,7 +505,8 @@ fn idm_account_set_password( req, state, IdmAccountSetPasswordMessage, - SingleStringRequest + SingleStringRequest, + state.qe_w ) } @@ -841,12 +840,23 @@ pub fn create_server_core(config: Configuration) { } log_addr.do_send(audit); - // Pass it to the actor for threading. - // Start the query server with the given be path: future config - let server_addr = QueryServerV1::start(log_addr.clone(), qs, idms, config.threads); + // Arc the idms. + let idms_arc = Arc::new(idms); - // Setup timed events - let _int_addr = IntervalActor::new(server_addr.clone()).start(); + // Pass it to the actor for threading. + // Start the read query server with the given be path: future config + let server_read_addr = QueryServerReadV1::start( + log_addr.clone(), + qs.clone(), + idms_arc.clone(), + config.threads, + ); + // Start the write thread + let server_write_addr = + QueryServerWriteV1::start(log_addr.clone(), qs.clone(), idms_arc.clone()); + + // Setup timed events associated to the write thread + let _int_addr = IntervalActor::new(server_write_addr.clone()).start(); // Copy the max size let max_size = config.maximum_request; @@ -857,7 +867,8 @@ pub fn create_server_core(config: Configuration) { // start the web server let aws_builder = actix_web::server::new(move || { App::with_state(AppState { - qe: server_addr.clone(), + qe_r: server_read_addr.clone(), + qe_w: server_write_addr.clone(), max_size: max_size, }) // Connect all our end points here. diff --git a/kanidmd/src/lib/event.rs b/kanidmd/src/lib/event.rs index 359d99546..5db844e11 100644 --- a/kanidmd/src/lib/event.rs +++ b/kanidmd/src/lib/event.rs @@ -13,9 +13,8 @@ use crate::server::{ }; use kanidm_proto::v1::OperationError; -use crate::actors::v1::{ - AuthMessage, CreateMessage, DeleteMessage, InternalSearchMessage, ModifyMessage, SearchMessage, -}; +use crate::actors::v1_read::{AuthMessage, InternalSearchMessage, SearchMessage}; +use crate::actors::v1_write::{CreateMessage, DeleteMessage, ModifyMessage}; // Bring in schematransaction trait for validate // use crate::schema::SchemaTransaction; diff --git a/kanidmd/src/lib/idm/event.rs b/kanidmd/src/lib/idm/event.rs index cc3e12a33..3dbb3a39c 100644 --- a/kanidmd/src/lib/idm/event.rs +++ b/kanidmd/src/lib/idm/event.rs @@ -1,4 +1,4 @@ -use crate::actors::v1::IdmAccountSetPasswordMessage; +use crate::actors::v1_write::IdmAccountSetPasswordMessage; use crate::audit::AuditScope; use crate::event::Event; use crate::server::QueryServerWriteTransaction; diff --git a/kanidmd/src/lib/interval.rs b/kanidmd/src/lib/interval.rs index fa3c23fd2..e9b1729b5 100644 --- a/kanidmd/src/lib/interval.rs +++ b/kanidmd/src/lib/interval.rs @@ -1,17 +1,17 @@ use actix::prelude::*; use std::time::Duration; -use crate::actors::v1::QueryServerV1; +use crate::actors::v1_write::QueryServerWriteV1; use crate::constants::PURGE_TIMEOUT; use crate::event::{PurgeRecycledEvent, PurgeTombstoneEvent}; pub struct IntervalActor { // Store any addresses we require - server: actix::Addr, + server: actix::Addr, } impl IntervalActor { - pub fn new(server: actix::Addr) -> Self { + pub fn new(server: actix::Addr) -> Self { IntervalActor { server: server } } diff --git a/kanidmd/src/lib/plugins/attrunique.rs b/kanidmd/src/lib/plugins/attrunique.rs index 3bad8d517..0ff15d5e2 100644 --- a/kanidmd/src/lib/plugins/attrunique.rs +++ b/kanidmd/src/lib/plugins/attrunique.rs @@ -225,7 +225,9 @@ mod tests { let preload = vec![e]; run_create_test!( - Err(OperationError::Plugin(PluginError::AttrUnique("duplicate value detected".to_string()))), + Err(OperationError::Plugin(PluginError::AttrUnique( + "duplicate value detected".to_string() + ))), preload, create, None, @@ -253,7 +255,9 @@ mod tests { let preload = Vec::new(); run_create_test!( - Err(OperationError::Plugin(PluginError::AttrUnique("ava already exists".to_string()))), + Err(OperationError::Plugin(PluginError::AttrUnique( + "ava already exists".to_string() + ))), preload, create, None, @@ -294,7 +298,9 @@ mod tests { let preload = vec![ea, eb]; run_modify_test!( - Err(OperationError::Plugin(PluginError::AttrUnique("duplicate value detected".to_string()))), + Err(OperationError::Plugin(PluginError::AttrUnique( + "duplicate value detected".to_string() + ))), preload, filter!(f_or!([f_eq( "name", @@ -339,7 +345,9 @@ mod tests { let preload = vec![ea, eb]; run_modify_test!( - Err(OperationError::Plugin(PluginError::AttrUnique("ava already exists".to_string()))), + Err(OperationError::Plugin(PluginError::AttrUnique( + "ava already exists".to_string() + ))), preload, filter!(f_or!([ f_eq("name", PartialValue::new_iutf8s("testgroup_a")), diff --git a/kanidmd/src/lib/plugins/base.rs b/kanidmd/src/lib/plugins/base.rs index 22e4b9649..2bf9b8ce9 100644 --- a/kanidmd/src/lib/plugins/base.rs +++ b/kanidmd/src/lib/plugins/base.rs @@ -414,7 +414,9 @@ mod tests { let create = vec![e.clone()]; run_create_test!( - Err(OperationError::Plugin(PluginError::Base("Uuid format invalid".to_string()))), + Err(OperationError::Plugin(PluginError::Base( + "Uuid format invalid".to_string() + ))), preload, create, None, @@ -486,7 +488,9 @@ mod tests { let create = vec![e.clone()]; run_create_test!( - Err(OperationError::Plugin(PluginError::Base("Uuid has multiple values".to_string()))), + Err(OperationError::Plugin(PluginError::Base( + "Uuid has multiple values".to_string() + ))), preload, create, None, @@ -522,7 +526,9 @@ mod tests { let preload = vec![e]; run_create_test!( - Err(OperationError::Plugin(PluginError::Base("Uuid duplicate found in database".to_string()))), + Err(OperationError::Plugin(PluginError::Base( + "Uuid duplicate found in database".to_string() + ))), preload, create, None, @@ -566,7 +572,9 @@ mod tests { let create = vec![ea, eb]; run_create_test!( - Err(OperationError::Plugin(PluginError::Base("Uuid duplicate detected in request".to_string()))), + Err(OperationError::Plugin(PluginError::Base( + "Uuid duplicate detected in request".to_string() + ))), preload, create, None, @@ -691,7 +699,9 @@ mod tests { let create = vec![e.clone()]; run_create_test!( - Err(OperationError::Plugin(PluginError::Base("Uuid must not be in protected range".to_string()))), + Err(OperationError::Plugin(PluginError::Base( + "Uuid must not be in protected range".to_string() + ))), preload, create, Some(JSON_ADMIN_V1), @@ -721,7 +731,9 @@ mod tests { let create = vec![e.clone()]; run_create_test!( - Err(OperationError::Plugin(PluginError::Base("UUID_DOES_NOT_EXIST may not exist!".to_string()))), + Err(OperationError::Plugin(PluginError::Base( + "UUID_DOES_NOT_EXIST may not exist!".to_string() + ))), preload, create, None, diff --git a/kanidmd/src/lib/plugins/refint.rs b/kanidmd/src/lib/plugins/refint.rs index 6ac95245b..7838d53e3 100644 --- a/kanidmd/src/lib/plugins/refint.rs +++ b/kanidmd/src/lib/plugins/refint.rs @@ -277,7 +277,9 @@ mod tests { let create = vec![e.clone()]; let preload = Vec::new(); run_create_test!( - Err(OperationError::Plugin(PluginError::ReferentialIntegrity("Uuid referenced not found in database".to_string()))), + Err(OperationError::Plugin(PluginError::ReferentialIntegrity( + "Uuid referenced not found in database".to_string() + ))), preload, create, None, @@ -433,7 +435,9 @@ mod tests { let preload = vec![eb]; run_modify_test!( - Err(OperationError::Plugin(PluginError::ReferentialIntegrity("Uuid referenced not found in database".to_string()))), + Err(OperationError::Plugin(PluginError::ReferentialIntegrity( + "Uuid referenced not found in database".to_string() + ))), preload, filter!(f_eq("name", PartialValue::new_iutf8s("testgroup_b"))), ModifyList::new_list(vec![Modify::Present( @@ -548,7 +552,9 @@ mod tests { let preload = vec![ea, eb]; run_modify_test!( - Err(OperationError::Plugin(PluginError::ReferentialIntegrity("Uuid referenced not found in database".to_string()))), + Err(OperationError::Plugin(PluginError::ReferentialIntegrity( + "Uuid referenced not found in database".to_string() + ))), preload, filter!(f_eq("name", PartialValue::new_iutf8s("testgroup_b"))), ModifyList::new_list(vec![Modify::Present( diff --git a/kanidmd/src/lib/utils.rs b/kanidmd/src/lib/utils.rs index 0e5d0ea5c..ccfc8e592 100644 --- a/kanidmd/src/lib/utils.rs +++ b/kanidmd/src/lib/utils.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use uuid::{Builder, Uuid}; use std::time::SystemTime; +use uuid::{Builder, Uuid}; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; @@ -27,7 +27,9 @@ pub fn password_from_random() -> String { #[allow(dead_code)] pub fn uuid_from_now(sid: &SID) -> Uuid { - let d = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let d = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); uuid_from_duration(d, sid) }