Split read vs write actors (#121)

This commit is contained in:
Firstyear 2019-10-15 15:34:07 +13:00 committed by GitHub
parent 764b96323c
commit 86938a7521
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 472 additions and 386 deletions

View file

@ -1 +1,2 @@
pub mod v1; pub mod v1_read;
pub mod v1_write;

View file

@ -0,0 +1,299 @@
use std::sync::Arc;
use crate::audit::AuditScope;
use crate::async_log::EventLog;
use crate::event::{AuthEvent, SearchEvent, SearchResult, WhoamiResult};
use kanidm_proto::v1::OperationError;
use crate::filter::{Filter, FilterInvalid};
use crate::idm::server::IdmServer;
use crate::server::{QueryServer, QueryServerTransaction};
use kanidm_proto::v1::Entry as ProtoEntry;
use kanidm_proto::v1::{
AuthRequest, AuthResponse, SearchRequest, SearchResponse, UserAuthToken, WhoamiResponse,
};
use actix::prelude::*;
use std::time::SystemTime;
use uuid::Uuid;
// These are used when the request (IE Get) has no intrising request
// type. Additionally, they are used in some requests where we need
// to supplement extra server state (IE userauthtokens) to a request.
//
// Generally we don't need to have the responses here because they are
// part of the protocol.
pub struct WhoamiMessage {
pub uat: Option<UserAuthToken>,
}
impl WhoamiMessage {
pub fn new(uat: Option<UserAuthToken>) -> Self {
WhoamiMessage { uat: uat }
}
}
impl Message for WhoamiMessage {
type Result = Result<WhoamiResponse, OperationError>;
}
#[derive(Debug)]
pub struct AuthMessage {
pub sessionid: Option<Uuid>,
pub req: AuthRequest,
}
impl AuthMessage {
pub fn new(req: AuthRequest, sessionid: Option<Uuid>) -> Self {
AuthMessage {
sessionid: sessionid,
req: req,
}
}
}
impl Message for AuthMessage {
type Result = Result<AuthResponse, OperationError>;
}
pub struct SearchMessage {
pub uat: Option<UserAuthToken>,
pub req: SearchRequest,
}
impl SearchMessage {
pub fn new(uat: Option<UserAuthToken>, req: SearchRequest) -> Self {
SearchMessage { uat: uat, req: req }
}
}
impl Message for SearchMessage {
type Result = Result<SearchResponse, OperationError>;
}
pub struct InternalSearchMessage {
pub uat: Option<UserAuthToken>,
pub filter: Filter<FilterInvalid>,
}
impl InternalSearchMessage {
pub fn new(uat: Option<UserAuthToken>, filter: Filter<FilterInvalid>) -> Self {
InternalSearchMessage {
uat: uat,
filter: filter,
}
}
}
impl Message for InternalSearchMessage {
type Result = Result<Vec<ProtoEntry>, OperationError>;
}
// ===========================================================
pub struct QueryServerReadV1 {
log: actix::Addr<EventLog>,
qs: QueryServer,
idms: Arc<IdmServer>,
}
impl Actor for QueryServerReadV1 {
type Context = SyncContext<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
// ctx.set_mailbox_capacity(1 << 31);
}
}
impl QueryServerReadV1 {
pub fn new(log: actix::Addr<EventLog>, qs: QueryServer, idms: Arc<IdmServer>) -> Self {
log_event!(log, "Starting query server v1 worker ...");
QueryServerReadV1 {
log: log,
qs: qs,
idms: idms,
}
}
pub fn start(
log: actix::Addr<EventLog>,
query_server: QueryServer,
idms: Arc<IdmServer>,
threads: usize,
) -> actix::Addr<QueryServerReadV1> {
SyncArbiter::start(threads, move || {
QueryServerReadV1::new(log.clone(), query_server.clone(), idms.clone())
})
}
}
// The server only recieves "Message" structures, which
// are whole self contained DB operations with all parsing
// required complete. We still need to do certain validation steps, but
// at this point our just is just to route to do_<action>
impl Handler<SearchMessage> for QueryServerReadV1 {
type Result = Result<SearchResponse, OperationError>;
fn handle(&mut self, msg: SearchMessage, _: &mut Self::Context) -> Self::Result {
let mut audit = AuditScope::new("search");
let res = audit_segment!(&mut audit, || {
// Begin a read
let qs_read = self.qs.read();
// Make an event from the request
let srch = match SearchEvent::from_message(&mut audit, msg, &qs_read) {
Ok(s) => s,
Err(e) => {
audit_log!(audit, "Failed to begin search: {:?}", e);
return Err(e);
}
};
audit_log!(audit, "Begin event {:?}", srch);
match qs_read.search_ext(&mut audit, &srch) {
Ok(entries) => {
SearchResult::new(&mut audit, &qs_read, entries).map(|ok_sr| ok_sr.response())
}
Err(e) => Err(e),
}
});
// At the end of the event we send it for logging.
self.log.do_send(audit);
res
}
}
impl Handler<AuthMessage> for QueryServerReadV1 {
type Result = Result<AuthResponse, OperationError>;
fn handle(&mut self, msg: AuthMessage, _: &mut Self::Context) -> Self::Result {
// This is probably the first function that really implements logic
// "on top" of the db server concept. In this case we check if
// the credentials provided is sufficient to say if someone is
// "authenticated" or not.
let mut audit = AuditScope::new("auth");
let res = audit_segment!(&mut audit, || {
audit_log!(audit, "Begin auth event {:?}", msg);
// Destructure it.
// Convert the AuthRequest to an AuthEvent that the idm server
// can use.
let mut idm_write = self.idms.write();
let ae = try_audit!(audit, AuthEvent::from_message(msg));
let ct = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Clock failure!");
// Trigger a session clean *before* we take any auth steps.
// It's important to do this before to ensure that timeouts on
// the session are enforced.
idm_write.expire_auth_sessions(ct);
// Generally things like auth denied are in Ok() msgs
// so true errors should always trigger a rollback.
let r = idm_write
.auth(&mut audit, &ae, ct)
.and_then(|r| idm_write.commit().map(|_| r));
audit_log!(audit, "Sending result -> {:?}", r);
// Build the result.
r.map(|r| r.response())
});
// At the end of the event we send it for logging.
self.log.do_send(audit);
res
}
}
impl Handler<WhoamiMessage> for QueryServerReadV1 {
type Result = Result<WhoamiResponse, OperationError>;
fn handle(&mut self, msg: WhoamiMessage, _: &mut Self::Context) -> Self::Result {
let mut audit = AuditScope::new("whoami");
let res = audit_segment!(&mut audit, || {
// TODO #62: Move this to IdmServer!!!
// Begin a read
let qs_read = self.qs.read();
// Make an event from the whoami request. This will process the event and
// generate a selfuuid search.
//
// This current handles the unauthenticated check, and will
// trigger the failure, but if we can manage to work out async
// then move this to core.rs, and don't allow Option<UAT> to get
// this far.
let uat = msg.uat.clone().ok_or(OperationError::NotAuthenticated)?;
let srch = match SearchEvent::from_whoami_request(&mut audit, msg.uat, &qs_read) {
Ok(s) => s,
Err(e) => {
audit_log!(audit, "Failed to begin whoami: {:?}", e);
return Err(e);
}
};
audit_log!(audit, "Begin event {:?}", srch);
match qs_read.search_ext(&mut audit, &srch) {
Ok(mut entries) => {
// assert there is only one ...
match entries.len() {
0 => Err(OperationError::NoMatchingEntries),
1 => {
let e = entries.pop().expect("Entry length mismatch!!!");
// Now convert to a response, and return
WhoamiResult::new(&mut audit, &qs_read, e, uat)
.map(|ok_wr| ok_wr.response())
}
// Somehow we matched multiple, which should be impossible.
_ => Err(OperationError::InvalidState),
}
}
// Something else went wrong ...
Err(e) => Err(e),
}
});
// Should we log the final result?
// At the end of the event we send it for logging.
self.log.do_send(audit);
res
}
}
impl Handler<InternalSearchMessage> for QueryServerReadV1 {
type Result = Result<Vec<ProtoEntry>, OperationError>;
fn handle(&mut self, msg: InternalSearchMessage, _: &mut Self::Context) -> Self::Result {
let mut audit = AuditScope::new("internal_search_message");
let res = audit_segment!(&mut audit, || {
let qs_read = self.qs.read();
// Make an event from the request
let srch = match SearchEvent::from_internal_message(&mut audit, msg, &qs_read) {
Ok(s) => s,
Err(e) => {
audit_log!(audit, "Failed to begin search: {:?}", e);
return Err(e);
}
};
audit_log!(audit, "Begin event {:?}", srch);
match qs_read.search_ext(&mut audit, &srch) {
Ok(entries) => SearchResult::new(&mut audit, &qs_read, entries)
.map(|ok_sr| ok_sr.to_proto_array()),
Err(e) => Err(e),
}
});
self.log.do_send(audit);
res
}
}

View file

@ -1,70 +1,24 @@
use std::sync::Arc;
use crate::audit::AuditScope; use crate::audit::AuditScope;
use std::sync::Arc;
use crate::async_log::EventLog; use crate::async_log::EventLog;
use crate::event::{ use crate::event::{
AuthEvent, CreateEvent, DeleteEvent, ModifyEvent, PurgeRecycledEvent, PurgeTombstoneEvent, CreateEvent, DeleteEvent, ModifyEvent, PurgeRecycledEvent, PurgeTombstoneEvent,
SearchEvent, SearchResult, WhoamiResult,
}; };
use crate::idm::event::{GeneratePasswordEvent, PasswordChangeEvent}; use crate::idm::event::{GeneratePasswordEvent, PasswordChangeEvent};
use kanidm_proto::v1::OperationError; use kanidm_proto::v1::OperationError;
use crate::filter::{Filter, FilterInvalid};
use crate::idm::server::IdmServer; use crate::idm::server::IdmServer;
use crate::server::{QueryServer, QueryServerTransaction}; use crate::server::{QueryServer, QueryServerTransaction};
use kanidm_proto::v1::Entry as ProtoEntry;
use kanidm_proto::v1::{ use kanidm_proto::v1::{
AuthRequest, AuthResponse, CreateRequest, DeleteRequest, ModifyRequest, OperationResponse, CreateRequest, DeleteRequest, ModifyRequest, OperationResponse, SetAuthCredential,
SearchRequest, SearchResponse, SetAuthCredential, SingleStringRequest, UserAuthToken, SingleStringRequest, UserAuthToken,
WhoamiResponse,
}; };
use actix::prelude::*; use actix::prelude::*;
use std::time::SystemTime;
use uuid::Uuid; use uuid::Uuid;
// These are used when the request (IE Get) has no intrising request
// type. Additionally, they are used in some requests where we need
// to supplement extra server state (IE userauthtokens) to a request.
//
// Generally we don't need to have the responses here because they are
// part of the protocol.
pub struct WhoamiMessage {
pub uat: Option<UserAuthToken>,
}
impl WhoamiMessage {
pub fn new(uat: Option<UserAuthToken>) -> Self {
WhoamiMessage { uat: uat }
}
}
impl Message for WhoamiMessage {
type Result = Result<WhoamiResponse, OperationError>;
}
#[derive(Debug)]
pub struct AuthMessage {
pub sessionid: Option<Uuid>,
pub req: AuthRequest,
}
impl AuthMessage {
pub fn new(req: AuthRequest, sessionid: Option<Uuid>) -> Self {
AuthMessage {
sessionid: sessionid,
req: req,
}
}
}
impl Message for AuthMessage {
type Result = Result<AuthResponse, OperationError>;
}
pub struct CreateMessage { pub struct CreateMessage {
pub uat: Option<UserAuthToken>, pub uat: Option<UserAuthToken>,
pub req: CreateRequest, pub req: CreateRequest,
@ -110,39 +64,6 @@ impl Message for ModifyMessage {
type Result = Result<OperationResponse, OperationError>; type Result = Result<OperationResponse, OperationError>;
} }
pub struct SearchMessage {
pub uat: Option<UserAuthToken>,
pub req: SearchRequest,
}
impl SearchMessage {
pub fn new(uat: Option<UserAuthToken>, req: SearchRequest) -> Self {
SearchMessage { uat: uat, req: req }
}
}
impl Message for SearchMessage {
type Result = Result<SearchResponse, OperationError>;
}
pub struct InternalSearchMessage {
pub uat: Option<UserAuthToken>,
pub filter: Filter<FilterInvalid>,
}
impl InternalSearchMessage {
pub fn new(uat: Option<UserAuthToken>, filter: Filter<FilterInvalid>) -> Self {
InternalSearchMessage {
uat: uat,
filter: filter,
}
}
}
impl Message for InternalSearchMessage {
type Result = Result<Vec<ProtoEntry>, OperationError>;
}
pub struct IdmAccountSetPasswordMessage { pub struct IdmAccountSetPasswordMessage {
pub uat: Option<UserAuthToken>, pub uat: Option<UserAuthToken>,
pub cleartext: String, pub cleartext: String,
@ -188,26 +109,26 @@ impl Message for InternalCredentialSetMessage {
type Result = Result<Option<String>, OperationError>; type Result = Result<Option<String>, OperationError>;
} }
// =========================================================== pub struct QueryServerWriteV1 {
pub struct QueryServerV1 {
log: actix::Addr<EventLog>, log: actix::Addr<EventLog>,
qs: QueryServer, qs: QueryServer,
idms: Arc<IdmServer>, idms: Arc<IdmServer>,
} }
impl Actor for QueryServerV1 { impl Actor for QueryServerWriteV1 {
type Context = SyncContext<Self>; type Context = SyncContext<Self>;
fn started(&mut self, _ctx: &mut Self::Context) { fn started(&mut self, _ctx: &mut Self::Context) {
// How much backlog we want to allow outstanding before we start to throw
// errors?
// ctx.set_mailbox_capacity(1 << 31); // ctx.set_mailbox_capacity(1 << 31);
} }
} }
impl QueryServerV1 { impl QueryServerWriteV1 {
pub fn new(log: actix::Addr<EventLog>, qs: QueryServer, idms: Arc<IdmServer>) -> Self { pub fn new(log: actix::Addr<EventLog>, qs: QueryServer, idms: Arc<IdmServer>) -> Self {
log_event!(log, "Starting query server v1 worker ..."); log_event!(log, "Starting query server v1 worker ...");
QueryServerV1 { QueryServerWriteV1 {
log: log, log: log,
qs: qs, qs: qs,
idms: idms, idms: idms,
@ -217,55 +138,15 @@ impl QueryServerV1 {
pub fn start( pub fn start(
log: actix::Addr<EventLog>, log: actix::Addr<EventLog>,
query_server: QueryServer, query_server: QueryServer,
idms: IdmServer, idms: Arc<IdmServer>,
threads: usize, ) -> actix::Addr<QueryServerWriteV1> {
) -> actix::Addr<QueryServerV1> { SyncArbiter::start(1, move || {
let idms_arc = Arc::new(idms); QueryServerWriteV1::new(log.clone(), query_server.clone(), idms.clone())
SyncArbiter::start(threads, move || {
QueryServerV1::new(log.clone(), query_server.clone(), idms_arc.clone())
}) })
} }
} }
// The server only recieves "Message" structures, which impl Handler<CreateMessage> for QueryServerWriteV1 {
// are whole self contained DB operations with all parsing
// required complete. We still need to do certain validation steps, but
// at this point our just is just to route to do_<action>
impl Handler<SearchMessage> for QueryServerV1 {
type Result = Result<SearchResponse, OperationError>;
fn handle(&mut self, msg: SearchMessage, _: &mut Self::Context) -> Self::Result {
let mut audit = AuditScope::new("search");
let res = audit_segment!(&mut audit, || {
// Begin a read
let qs_read = self.qs.read();
// Make an event from the request
let srch = match SearchEvent::from_message(&mut audit, msg, &qs_read) {
Ok(s) => s,
Err(e) => {
audit_log!(audit, "Failed to begin search: {:?}", e);
return Err(e);
}
};
audit_log!(audit, "Begin event {:?}", srch);
match qs_read.search_ext(&mut audit, &srch) {
Ok(entries) => {
SearchResult::new(&mut audit, &qs_read, entries).map(|ok_sr| ok_sr.response())
}
Err(e) => Err(e),
}
});
// At the end of the event we send it for logging.
self.log.do_send(audit);
res
}
}
impl Handler<CreateMessage> for QueryServerV1 {
type Result = Result<OperationResponse, OperationError>; type Result = Result<OperationResponse, OperationError>;
fn handle(&mut self, msg: CreateMessage, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: CreateMessage, _: &mut Self::Context) -> Self::Result {
@ -293,7 +174,7 @@ impl Handler<CreateMessage> for QueryServerV1 {
} }
} }
impl Handler<ModifyMessage> for QueryServerV1 { impl Handler<ModifyMessage> for QueryServerWriteV1 {
type Result = Result<OperationResponse, OperationError>; type Result = Result<OperationResponse, OperationError>;
fn handle(&mut self, msg: ModifyMessage, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: ModifyMessage, _: &mut Self::Context) -> Self::Result {
@ -319,7 +200,7 @@ impl Handler<ModifyMessage> for QueryServerV1 {
} }
} }
impl Handler<DeleteMessage> for QueryServerV1 { impl Handler<DeleteMessage> for QueryServerWriteV1 {
type Result = Result<OperationResponse, OperationError>; type Result = Result<OperationResponse, OperationError>;
fn handle(&mut self, msg: DeleteMessage, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: DeleteMessage, _: &mut Self::Context) -> Self::Result {
@ -346,169 +227,8 @@ impl Handler<DeleteMessage> for QueryServerV1 {
} }
} }
// Need an auth session storage. LRU? // IDM native types for modifications
// requires a lock ... impl Handler<InternalCredentialSetMessage> for QueryServerWriteV1 {
// needs session id, entry, etc.
impl Handler<AuthMessage> for QueryServerV1 {
type Result = Result<AuthResponse, OperationError>;
fn handle(&mut self, msg: AuthMessage, _: &mut Self::Context) -> Self::Result {
// This is probably the first function that really implements logic
// "on top" of the db server concept. In this case we check if
// the credentials provided is sufficient to say if someone is
// "authenticated" or not.
let mut audit = AuditScope::new("auth");
let res = audit_segment!(&mut audit, || {
audit_log!(audit, "Begin auth event {:?}", msg);
// Destructure it.
// Convert the AuthRequest to an AuthEvent that the idm server
// can use.
let mut idm_write = self.idms.write();
let ae = try_audit!(audit, AuthEvent::from_message(msg));
let ct = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Clock failure!");
// Trigger a session clean *before* we take any auth steps.
// It's important to do this before to ensure that timeouts on
// the session are enforced.
idm_write.expire_auth_sessions(ct);
// Generally things like auth denied are in Ok() msgs
// so true errors should always trigger a rollback.
let r = idm_write
.auth(&mut audit, &ae, ct)
.and_then(|r| idm_write.commit().map(|_| r));
audit_log!(audit, "Sending result -> {:?}", r);
// Build the result.
r.map(|r| r.response())
});
// At the end of the event we send it for logging.
self.log.do_send(audit);
res
}
}
impl Handler<WhoamiMessage> for QueryServerV1 {
type Result = Result<WhoamiResponse, OperationError>;
fn handle(&mut self, msg: WhoamiMessage, _: &mut Self::Context) -> Self::Result {
let mut audit = AuditScope::new("whoami");
let res = audit_segment!(&mut audit, || {
// TODO #62: Move this to IdmServer!!!
// Begin a read
let qs_read = self.qs.read();
// Make an event from the whoami request. This will process the event and
// generate a selfuuid search.
//
// This current handles the unauthenticated check, and will
// trigger the failure, but if we can manage to work out async
// then move this to core.rs, and don't allow Option<UAT> to get
// this far.
let uat = msg.uat.clone().ok_or(OperationError::NotAuthenticated)?;
let srch = match SearchEvent::from_whoami_request(&mut audit, msg.uat, &qs_read) {
Ok(s) => s,
Err(e) => {
audit_log!(audit, "Failed to begin whoami: {:?}", e);
return Err(e);
}
};
audit_log!(audit, "Begin event {:?}", srch);
match qs_read.search_ext(&mut audit, &srch) {
Ok(mut entries) => {
// assert there is only one ...
match entries.len() {
0 => Err(OperationError::NoMatchingEntries),
1 => {
let e = entries.pop().expect("Entry length mismatch!!!");
// Now convert to a response, and return
WhoamiResult::new(&mut audit, &qs_read, e, uat)
.map(|ok_wr| ok_wr.response())
}
// Somehow we matched multiple, which should be impossible.
_ => Err(OperationError::InvalidState),
}
}
// Something else went wrong ...
Err(e) => Err(e),
}
});
// Should we log the final result?
// At the end of the event we send it for logging.
self.log.do_send(audit);
res
}
}
impl Handler<IdmAccountSetPasswordMessage> for QueryServerV1 {
type Result = Result<OperationResponse, OperationError>;
fn handle(&mut self, msg: IdmAccountSetPasswordMessage, _: &mut Self::Context) -> Self::Result {
let mut audit = AuditScope::new("idm_account_set_password");
let res = audit_segment!(&mut audit, || {
let mut idms_prox_write = self.idms.proxy_write();
let pce = PasswordChangeEvent::from_idm_account_set_password(
&mut audit,
&idms_prox_write.qs_write,
msg,
)
.map_err(|e| {
audit_log!(audit, "Failed to begin idm_account_set_password: {:?}", e);
e
})?;
idms_prox_write
.set_account_password(&mut audit, &pce)
.and_then(|_| idms_prox_write.commit(&mut audit))
.map(|_| OperationResponse::new(()))
});
self.log.do_send(audit);
res
}
}
impl Handler<InternalSearchMessage> for QueryServerV1 {
type Result = Result<Vec<ProtoEntry>, OperationError>;
fn handle(&mut self, msg: InternalSearchMessage, _: &mut Self::Context) -> Self::Result {
let mut audit = AuditScope::new("internal_search_message");
let res = audit_segment!(&mut audit, || {
let qs_read = self.qs.read();
// Make an event from the request
let srch = match SearchEvent::from_internal_message(&mut audit, msg, &qs_read) {
Ok(s) => s,
Err(e) => {
audit_log!(audit, "Failed to begin search: {:?}", e);
return Err(e);
}
};
audit_log!(audit, "Begin event {:?}", srch);
match qs_read.search_ext(&mut audit, &srch) {
Ok(entries) => SearchResult::new(&mut audit, &qs_read, entries)
.map(|ok_sr| ok_sr.to_proto_array()),
Err(e) => Err(e),
}
});
self.log.do_send(audit);
res
}
}
impl Handler<InternalCredentialSetMessage> for QueryServerV1 {
type Result = Result<Option<String>, OperationError>; type Result = Result<Option<String>, OperationError>;
fn handle(&mut self, msg: InternalCredentialSetMessage, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: InternalCredentialSetMessage, _: &mut Self::Context) -> Self::Result {
@ -583,9 +303,37 @@ impl Handler<InternalCredentialSetMessage> for QueryServerV1 {
} }
} }
impl Handler<IdmAccountSetPasswordMessage> for QueryServerWriteV1 {
type Result = Result<OperationResponse, OperationError>;
fn handle(&mut self, msg: IdmAccountSetPasswordMessage, _: &mut Self::Context) -> Self::Result {
let mut audit = AuditScope::new("idm_account_set_password");
let res = audit_segment!(&mut audit, || {
let mut idms_prox_write = self.idms.proxy_write();
let pce = PasswordChangeEvent::from_idm_account_set_password(
&mut audit,
&idms_prox_write.qs_write,
msg,
)
.map_err(|e| {
audit_log!(audit, "Failed to begin idm_account_set_password: {:?}", e);
e
})?;
idms_prox_write
.set_account_password(&mut audit, &pce)
.and_then(|_| idms_prox_write.commit(&mut audit))
.map(|_| OperationResponse::new(()))
});
self.log.do_send(audit);
res
}
}
// These below are internal only types. // These below are internal only types.
impl Handler<PurgeTombstoneEvent> for QueryServerV1 { impl Handler<PurgeTombstoneEvent> for QueryServerWriteV1 {
type Result = (); type Result = ();
fn handle(&mut self, msg: PurgeTombstoneEvent, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: PurgeTombstoneEvent, _: &mut Self::Context) -> Self::Result {
@ -606,7 +354,7 @@ impl Handler<PurgeTombstoneEvent> for QueryServerV1 {
} }
} }
impl Handler<PurgeRecycledEvent> for QueryServerV1 { impl Handler<PurgeRecycledEvent> for QueryServerWriteV1 {
type Result = (); type Result = ();
fn handle(&mut self, msg: PurgeRecycledEvent, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: PurgeRecycledEvent, _: &mut Self::Context) -> Self::Result {

View file

@ -8,16 +8,18 @@ use actix_web::{
use bytes::BytesMut; use bytes::BytesMut;
use futures::{future, Future, Stream}; use futures::{future, Future, Stream};
use std::sync::Arc;
use time::Duration; use time::Duration;
use crate::config::Configuration; use crate::config::Configuration;
// SearchResult // SearchResult
use crate::actors::v1::QueryServerV1; use crate::actors::v1_read::QueryServerReadV1;
use crate::actors::v1::{ use crate::actors::v1_read::{AuthMessage, InternalSearchMessage, SearchMessage, WhoamiMessage};
AuthMessage, CreateMessage, DeleteMessage, IdmAccountSetPasswordMessage, use crate::actors::v1_write::QueryServerWriteV1;
InternalCredentialSetMessage, InternalSearchMessage, ModifyMessage, SearchMessage, use crate::actors::v1_write::{
WhoamiMessage, CreateMessage, DeleteMessage, IdmAccountSetPasswordMessage, InternalCredentialSetMessage,
ModifyMessage,
}; };
use crate::async_log; use crate::async_log;
use crate::audit::AuditScope; use crate::audit::AuditScope;
@ -41,7 +43,8 @@ use kanidm_proto::v1::{
use uuid::Uuid; use uuid::Uuid;
struct AppState { struct AppState {
qe: actix::Addr<QueryServerV1>, qe_r: actix::Addr<QueryServerReadV1>,
qe_w: actix::Addr<QueryServerWriteV1>,
max_size: usize, max_size: usize,
} }
@ -69,7 +72,7 @@ fn operation_error_to_response(e: OperationError) -> HttpResponse {
} }
macro_rules! json_event_post { macro_rules! json_event_post {
($req:expr, $state:expr, $message_type:ty, $request_type:ty) => {{ ($req:expr, $state:expr, $message_type:ty, $request_type:ty, $dest:expr) => {{
// This is copied every request. Is there a better way? // This is copied every request. Is there a better way?
// The issue is the fold move takes ownership of state if // The issue is the fold move takes ownership of state if
// we don't copy this here // we don't copy this here
@ -106,8 +109,7 @@ macro_rules! json_event_post {
Ok(obj) => { Ok(obj) => {
// combine request + uat -> message. // combine request + uat -> message.
let m_obj = <($message_type)>::new(uat, obj); let m_obj = <($message_type)>::new(uat, obj);
let res = $state let res = $dest
.qe
.send(m_obj) .send(m_obj)
// What is from_err? // What is from_err?
.from_err() .from_err()
@ -138,7 +140,7 @@ macro_rules! json_event_get {
// New event, feed current auth data from the token to it. // New event, feed current auth data from the token to it.
let obj = <($message_type)>::new(uat); let obj = <($message_type)>::new(uat);
let res = $state.qe.send(obj).from_err().and_then(|res| match res { let res = $state.qe_r.send(obj).from_err().and_then(|res| match res {
Ok(event_result) => Ok(HttpResponse::Ok().json(event_result)), Ok(event_result) => Ok(HttpResponse::Ok().json(event_result)),
Err(e) => Ok(operation_error_to_response(e)), Err(e) => Ok(operation_error_to_response(e)),
}); });
@ -152,25 +154,25 @@ macro_rules! json_event_get {
fn create( fn create(
(req, state): (HttpRequest<AppState>, State<AppState>), (req, state): (HttpRequest<AppState>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> { ) -> impl Future<Item = HttpResponse, Error = Error> {
json_event_post!(req, state, CreateMessage, CreateRequest) json_event_post!(req, state, CreateMessage, CreateRequest, state.qe_w)
} }
fn modify( fn modify(
(req, state): (HttpRequest<AppState>, State<AppState>), (req, state): (HttpRequest<AppState>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> { ) -> impl Future<Item = HttpResponse, Error = Error> {
json_event_post!(req, state, ModifyMessage, ModifyRequest) json_event_post!(req, state, ModifyMessage, ModifyRequest, state.qe_w)
} }
fn delete( fn delete(
(req, state): (HttpRequest<AppState>, State<AppState>), (req, state): (HttpRequest<AppState>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> { ) -> impl Future<Item = HttpResponse, Error = Error> {
json_event_post!(req, state, DeleteMessage, DeleteRequest) json_event_post!(req, state, DeleteMessage, DeleteRequest, state.qe_w)
} }
fn search( fn search(
(req, state): (HttpRequest<AppState>, State<AppState>), (req, state): (HttpRequest<AppState>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> { ) -> impl Future<Item = HttpResponse, Error = Error> {
json_event_post!(req, state, SearchMessage, SearchRequest) json_event_post!(req, state, SearchMessage, SearchRequest, state.qe_r)
} }
fn whoami( fn whoami(
@ -192,7 +194,7 @@ fn json_rest_event_get(
// type that we send to the qs. // type that we send to the qs.
let obj = InternalSearchMessage::new(uat, filter); let obj = InternalSearchMessage::new(uat, filter);
let res = state.qe.send(obj).from_err().and_then(|res| match res { let res = state.qe_r.send(obj).from_err().and_then(|res| match res {
Ok(event_result) => Ok(HttpResponse::Ok().json(event_result)), Ok(event_result) => Ok(HttpResponse::Ok().json(event_result)),
Err(e) => Ok(operation_error_to_response(e)), Err(e) => Ok(operation_error_to_response(e)),
}); });
@ -212,7 +214,7 @@ fn json_rest_event_get_id(
let obj = InternalSearchMessage::new(uat, filter); let obj = InternalSearchMessage::new(uat, filter);
let res = state.qe.send(obj).from_err().and_then(|res| match res { let res = state.qe_r.send(obj).from_err().and_then(|res| match res {
Ok(mut event_result) => { Ok(mut event_result) => {
// Only send back the first result, or None // Only send back the first result, or None
Ok(HttpResponse::Ok().json(event_result.pop())) Ok(HttpResponse::Ok().json(event_result.pop()))
@ -259,7 +261,7 @@ fn json_rest_event_credential_put(
match r_obj { match r_obj {
Ok(obj) => { Ok(obj) => {
let m_obj = InternalCredentialSetMessage::new(uat, id, cred_id, obj); let m_obj = InternalCredentialSetMessage::new(uat, id, cred_id, obj);
let res = state.qe.send(m_obj).from_err().and_then(|res| match res { let res = state.qe_w.send(m_obj).from_err().and_then(|res| match res {
Ok(event_result) => Ok(HttpResponse::Ok().json(event_result)), Ok(event_result) => Ok(HttpResponse::Ok().json(event_result)),
Err(e) => Ok(operation_error_to_response(e)), Err(e) => Ok(operation_error_to_response(e)),
}); });
@ -320,7 +322,7 @@ fn schema_attributetype_get_id(
let obj = InternalSearchMessage::new(uat, filter); let obj = InternalSearchMessage::new(uat, filter);
let res = state.qe.send(obj).from_err().and_then(|res| match res { let res = state.qe_r.send(obj).from_err().and_then(|res| match res {
Ok(mut event_result) => { Ok(mut event_result) => {
// Only send back the first result, or None // Only send back the first result, or None
Ok(HttpResponse::Ok().json(event_result.pop())) Ok(HttpResponse::Ok().json(event_result.pop()))
@ -351,7 +353,7 @@ fn schema_classtype_get_id(
let obj = InternalSearchMessage::new(uat, filter); let obj = InternalSearchMessage::new(uat, filter);
let res = state.qe.send(obj).from_err().and_then(|res| match res { let res = state.qe_r.send(obj).from_err().and_then(|res| match res {
Ok(mut event_result) => { Ok(mut event_result) => {
// Only send back the first result, or None // Only send back the first result, or None
Ok(HttpResponse::Ok().json(event_result.pop())) Ok(HttpResponse::Ok().json(event_result.pop()))
@ -447,9 +449,9 @@ fn auth(
// We probably need to know if we allocate the cookie, that this is a // We probably need to know if we allocate the cookie, that this is a
// new session, and in that case, anything *except* authrequest init is // new session, and in that case, anything *except* authrequest init is
// invalid. // invalid.
let res = let res = state
state // This may change in the future ...
.qe .qe_r
.send(auth_msg) .send(auth_msg)
.from_err() .from_err()
.and_then(move |res| match res { .and_then(move |res| match res {
@ -462,8 +464,7 @@ fn auth(
match req.session().set("uat", uat) { match req.session().set("uat", uat) {
Ok(_) => Ok(HttpResponse::Ok().json(ar)), Ok(_) => Ok(HttpResponse::Ok().json(ar)),
Err(_) => { Err(_) => {
Ok(HttpResponse::InternalServerError() Ok(HttpResponse::InternalServerError().json(()))
.json(()))
} }
} }
} }
@ -474,14 +475,11 @@ fn auth(
} }
AuthState::Continue(_) => { AuthState::Continue(_) => {
// Ensure the auth-session-id is set // Ensure the auth-session-id is set
match req match req.session().set("auth-session-id", ar.sessionid)
.session()
.set("auth-session-id", ar.sessionid)
{ {
Ok(_) => Ok(HttpResponse::Ok().json(ar)), Ok(_) => Ok(HttpResponse::Ok().json(ar)),
Err(_) => { Err(_) => {
Ok(HttpResponse::InternalServerError() Ok(HttpResponse::InternalServerError().json(()))
.json(()))
} }
} }
} }
@ -507,7 +505,8 @@ fn idm_account_set_password(
req, req,
state, state,
IdmAccountSetPasswordMessage, IdmAccountSetPasswordMessage,
SingleStringRequest SingleStringRequest,
state.qe_w
) )
} }
@ -841,12 +840,23 @@ pub fn create_server_core(config: Configuration) {
} }
log_addr.do_send(audit); log_addr.do_send(audit);
// Pass it to the actor for threading. // Arc the idms.
// Start the query server with the given be path: future config let idms_arc = Arc::new(idms);
let server_addr = QueryServerV1::start(log_addr.clone(), qs, idms, config.threads);
// Setup timed events // Pass it to the actor for threading.
let _int_addr = IntervalActor::new(server_addr.clone()).start(); // Start the read query server with the given be path: future config
let server_read_addr = QueryServerReadV1::start(
log_addr.clone(),
qs.clone(),
idms_arc.clone(),
config.threads,
);
// Start the write thread
let server_write_addr =
QueryServerWriteV1::start(log_addr.clone(), qs.clone(), idms_arc.clone());
// Setup timed events associated to the write thread
let _int_addr = IntervalActor::new(server_write_addr.clone()).start();
// Copy the max size // Copy the max size
let max_size = config.maximum_request; let max_size = config.maximum_request;
@ -857,7 +867,8 @@ pub fn create_server_core(config: Configuration) {
// start the web server // start the web server
let aws_builder = actix_web::server::new(move || { let aws_builder = actix_web::server::new(move || {
App::with_state(AppState { App::with_state(AppState {
qe: server_addr.clone(), qe_r: server_read_addr.clone(),
qe_w: server_write_addr.clone(),
max_size: max_size, max_size: max_size,
}) })
// Connect all our end points here. // Connect all our end points here.

View file

@ -13,9 +13,8 @@ use crate::server::{
}; };
use kanidm_proto::v1::OperationError; use kanidm_proto::v1::OperationError;
use crate::actors::v1::{ use crate::actors::v1_read::{AuthMessage, InternalSearchMessage, SearchMessage};
AuthMessage, CreateMessage, DeleteMessage, InternalSearchMessage, ModifyMessage, SearchMessage, use crate::actors::v1_write::{CreateMessage, DeleteMessage, ModifyMessage};
};
// Bring in schematransaction trait for validate // Bring in schematransaction trait for validate
// use crate::schema::SchemaTransaction; // use crate::schema::SchemaTransaction;

View file

@ -1,4 +1,4 @@
use crate::actors::v1::IdmAccountSetPasswordMessage; use crate::actors::v1_write::IdmAccountSetPasswordMessage;
use crate::audit::AuditScope; use crate::audit::AuditScope;
use crate::event::Event; use crate::event::Event;
use crate::server::QueryServerWriteTransaction; use crate::server::QueryServerWriteTransaction;

View file

@ -1,17 +1,17 @@
use actix::prelude::*; use actix::prelude::*;
use std::time::Duration; use std::time::Duration;
use crate::actors::v1::QueryServerV1; use crate::actors::v1_write::QueryServerWriteV1;
use crate::constants::PURGE_TIMEOUT; use crate::constants::PURGE_TIMEOUT;
use crate::event::{PurgeRecycledEvent, PurgeTombstoneEvent}; use crate::event::{PurgeRecycledEvent, PurgeTombstoneEvent};
pub struct IntervalActor { pub struct IntervalActor {
// Store any addresses we require // Store any addresses we require
server: actix::Addr<QueryServerV1>, server: actix::Addr<QueryServerWriteV1>,
} }
impl IntervalActor { impl IntervalActor {
pub fn new(server: actix::Addr<QueryServerV1>) -> Self { pub fn new(server: actix::Addr<QueryServerWriteV1>) -> Self {
IntervalActor { server: server } IntervalActor { server: server }
} }

View file

@ -225,7 +225,9 @@ mod tests {
let preload = vec![e]; let preload = vec![e];
run_create_test!( run_create_test!(
Err(OperationError::Plugin(PluginError::AttrUnique("duplicate value detected".to_string()))), Err(OperationError::Plugin(PluginError::AttrUnique(
"duplicate value detected".to_string()
))),
preload, preload,
create, create,
None, None,
@ -253,7 +255,9 @@ mod tests {
let preload = Vec::new(); let preload = Vec::new();
run_create_test!( run_create_test!(
Err(OperationError::Plugin(PluginError::AttrUnique("ava already exists".to_string()))), Err(OperationError::Plugin(PluginError::AttrUnique(
"ava already exists".to_string()
))),
preload, preload,
create, create,
None, None,
@ -294,7 +298,9 @@ mod tests {
let preload = vec![ea, eb]; let preload = vec![ea, eb];
run_modify_test!( run_modify_test!(
Err(OperationError::Plugin(PluginError::AttrUnique("duplicate value detected".to_string()))), Err(OperationError::Plugin(PluginError::AttrUnique(
"duplicate value detected".to_string()
))),
preload, preload,
filter!(f_or!([f_eq( filter!(f_or!([f_eq(
"name", "name",
@ -339,7 +345,9 @@ mod tests {
let preload = vec![ea, eb]; let preload = vec![ea, eb];
run_modify_test!( run_modify_test!(
Err(OperationError::Plugin(PluginError::AttrUnique("ava already exists".to_string()))), Err(OperationError::Plugin(PluginError::AttrUnique(
"ava already exists".to_string()
))),
preload, preload,
filter!(f_or!([ filter!(f_or!([
f_eq("name", PartialValue::new_iutf8s("testgroup_a")), f_eq("name", PartialValue::new_iutf8s("testgroup_a")),

View file

@ -414,7 +414,9 @@ mod tests {
let create = vec![e.clone()]; let create = vec![e.clone()];
run_create_test!( run_create_test!(
Err(OperationError::Plugin(PluginError::Base("Uuid format invalid".to_string()))), Err(OperationError::Plugin(PluginError::Base(
"Uuid format invalid".to_string()
))),
preload, preload,
create, create,
None, None,
@ -486,7 +488,9 @@ mod tests {
let create = vec![e.clone()]; let create = vec![e.clone()];
run_create_test!( run_create_test!(
Err(OperationError::Plugin(PluginError::Base("Uuid has multiple values".to_string()))), Err(OperationError::Plugin(PluginError::Base(
"Uuid has multiple values".to_string()
))),
preload, preload,
create, create,
None, None,
@ -522,7 +526,9 @@ mod tests {
let preload = vec![e]; let preload = vec![e];
run_create_test!( run_create_test!(
Err(OperationError::Plugin(PluginError::Base("Uuid duplicate found in database".to_string()))), Err(OperationError::Plugin(PluginError::Base(
"Uuid duplicate found in database".to_string()
))),
preload, preload,
create, create,
None, None,
@ -566,7 +572,9 @@ mod tests {
let create = vec![ea, eb]; let create = vec![ea, eb];
run_create_test!( run_create_test!(
Err(OperationError::Plugin(PluginError::Base("Uuid duplicate detected in request".to_string()))), Err(OperationError::Plugin(PluginError::Base(
"Uuid duplicate detected in request".to_string()
))),
preload, preload,
create, create,
None, None,
@ -691,7 +699,9 @@ mod tests {
let create = vec![e.clone()]; let create = vec![e.clone()];
run_create_test!( run_create_test!(
Err(OperationError::Plugin(PluginError::Base("Uuid must not be in protected range".to_string()))), Err(OperationError::Plugin(PluginError::Base(
"Uuid must not be in protected range".to_string()
))),
preload, preload,
create, create,
Some(JSON_ADMIN_V1), Some(JSON_ADMIN_V1),
@ -721,7 +731,9 @@ mod tests {
let create = vec![e.clone()]; let create = vec![e.clone()];
run_create_test!( run_create_test!(
Err(OperationError::Plugin(PluginError::Base("UUID_DOES_NOT_EXIST may not exist!".to_string()))), Err(OperationError::Plugin(PluginError::Base(
"UUID_DOES_NOT_EXIST may not exist!".to_string()
))),
preload, preload,
create, create,
None, None,

View file

@ -277,7 +277,9 @@ mod tests {
let create = vec![e.clone()]; let create = vec![e.clone()];
let preload = Vec::new(); let preload = Vec::new();
run_create_test!( run_create_test!(
Err(OperationError::Plugin(PluginError::ReferentialIntegrity("Uuid referenced not found in database".to_string()))), Err(OperationError::Plugin(PluginError::ReferentialIntegrity(
"Uuid referenced not found in database".to_string()
))),
preload, preload,
create, create,
None, None,
@ -433,7 +435,9 @@ mod tests {
let preload = vec![eb]; let preload = vec![eb];
run_modify_test!( run_modify_test!(
Err(OperationError::Plugin(PluginError::ReferentialIntegrity("Uuid referenced not found in database".to_string()))), Err(OperationError::Plugin(PluginError::ReferentialIntegrity(
"Uuid referenced not found in database".to_string()
))),
preload, preload,
filter!(f_eq("name", PartialValue::new_iutf8s("testgroup_b"))), filter!(f_eq("name", PartialValue::new_iutf8s("testgroup_b"))),
ModifyList::new_list(vec![Modify::Present( ModifyList::new_list(vec![Modify::Present(
@ -548,7 +552,9 @@ mod tests {
let preload = vec![ea, eb]; let preload = vec![ea, eb];
run_modify_test!( run_modify_test!(
Err(OperationError::Plugin(PluginError::ReferentialIntegrity("Uuid referenced not found in database".to_string()))), Err(OperationError::Plugin(PluginError::ReferentialIntegrity(
"Uuid referenced not found in database".to_string()
))),
preload, preload,
filter!(f_eq("name", PartialValue::new_iutf8s("testgroup_b"))), filter!(f_eq("name", PartialValue::new_iutf8s("testgroup_b"))),
ModifyList::new_list(vec![Modify::Present( ModifyList::new_list(vec![Modify::Present(

View file

@ -1,6 +1,6 @@
use std::time::Duration; use std::time::Duration;
use uuid::{Builder, Uuid};
use std::time::SystemTime; use std::time::SystemTime;
use uuid::{Builder, Uuid};
use rand::distributions::Alphanumeric; use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
@ -27,7 +27,9 @@ pub fn password_from_random() -> String {
#[allow(dead_code)] #[allow(dead_code)]
pub fn uuid_from_now(sid: &SID) -> Uuid { pub fn uuid_from_now(sid: &SID) -> Uuid {
let d = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); let d = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
uuid_from_duration(d, sid) uuid_from_duration(d, sid)
} }