314 improve async ()

this completely removes actix and actix-web from the codebase, replacing it with tokio and http-rs/tide. Due to a current temporary limit in tokio parts with openssl/libressl, rustls is used for the webserver, but I'll change this back once that issue is resolved. For now there are likely some other clippy issues, but the next step now is that I can finally run cargo outdated and update this and the other kanidm/* deps to be up to date due to no longer being held back on versions by actix. So following this, I need to finish clippy warnings, and run cargo outdated and cargo audit.
This commit is contained in:
Firstyear 2020-09-06 08:44:35 +10:00 committed by GitHub
parent 064533f8f6
commit 0041445b73
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 3215 additions and 3648 deletions

1252
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -44,10 +44,10 @@ IP.1 = 127.0.0.1
DEVEOF
# Make the ca
openssl req -x509 -new -newkey rsa:2048 -keyout cakey.pem -out ca.pem -days 31 -subj "/C=AU/ST=Queensland/L=Brisbane/O=INSECURE/CN=insecure.ca.localhost" -nodes
openssl genrsa -out key.pem 2048
openssl req -key key.pem -out cert.csr -days 31 -config altnames.cnf -new -extensions v3_req
openssl x509 -req -days 31 -in cert.csr -CA ca.pem -CAkey cakey.pem -CAcreateserial -out cert.pem -extfile altnames.cnf -extensions v3_req
openssl req -x509 -new -newkey rsa:4096 -sha256 -keyout cakey.pem -out ca.pem -days 31 -subj "/C=AU/ST=Queensland/L=Brisbane/O=INSECURE/CN=insecure.ca.localhost" -nodes
openssl genrsa -out key.pem 4096
openssl req -sha256 -key key.pem -out cert.csr -days 31 -config altnames.cnf -new -extensions v3_req
openssl x509 -req -days 31 -in cert.csr -CA ca.pem -CAkey cakey.pem -CAcreateserial -out cert.pem -extfile altnames.cnf -extensions v3_req -sha256
echo use ca.pem, cert.pem, and key.pem

View file

@ -22,7 +22,7 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
# users = "0.10"
[dev-dependencies]
tokio = "0.2"
actix = "0.9"
tokio = { version = "0.2", features = ["full"] }
kanidm = { path = "../kanidmd" }
futures = "0.3"
async-std = "1.6"

View file

@ -952,7 +952,7 @@ impl KanidmClient {
self.perform_get_request(format!("/v1/recycle_bin/{}", id).as_str())
}
pub fn recycle_bin_revive(&self, id: &str) -> Result<(), ClientError> {
pub fn recycle_bin_revive(&self, id: &str) -> Result<bool, ClientError> {
self.perform_post_request(format!("/v1/recycle_bin/{}/_revive", id).as_str(), ())
}
}

View file

@ -1,5 +1,4 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::thread;
use kanidm::audit::LogLevel;
@ -7,7 +6,8 @@ use kanidm::config::{Configuration, IntegrationTestConfig};
use kanidm::core::create_server_core;
use kanidm_client::{KanidmClient, KanidmClientBuilder};
use actix::prelude::*;
use async_std::task;
use tokio::sync::mpsc;
pub const ADMIN_TEST_PASSWORD: &str = "integration test admin password";
static PORT_ALLOC: AtomicUsize = AtomicUsize::new(8080);
@ -22,7 +22,9 @@ pub fn run_test(test_fn: fn(KanidmClient) -> ()) {
.is_test(true)
.try_init();
let (tx, rx) = mpsc::channel();
let (mut ready_tx, mut ready_rx) = mpsc::channel(1);
let (mut finish_tx, mut finish_rx) = mpsc::channel(1);
let port = PORT_ALLOC.fetch_add(1, Ordering::SeqCst);
let int_config = Box::new(IntegrationTestConfig {
@ -34,25 +36,34 @@ pub fn run_test(test_fn: fn(KanidmClient) -> ()) {
config.address = format!("127.0.0.1:{}", port);
config.secure_cookies = false;
config.integration_test_config = Some(int_config);
// config.log_level = Some(LogLevel::Verbose as u32);
config.log_level = Some(LogLevel::Quiet as u32);
config.threads = 1;
// config.log_level = Some(LogLevel::FullTrace as u32);
thread::spawn(move || {
let t_handle = thread::spawn(move || {
// Spawn a thread for the test runner, this should have a unique
// port....
let system = System::new("test-rctx");
let rctx = async move {
let sctx = create_server_core(config).await;
let _ = tx.send(sctx);
};
Arbiter::spawn(rctx);
system.run().expect("Failed to start thread");
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.expect("failed to start tokio");
rt.block_on(async {
create_server_core(config)
.await
.expect("failed to start server core");
// We have to yield now to guarantee that the tide elements are setup.
task::yield_now().await;
ready_tx
.send(())
.await
.expect("failed in indicate readiness");
finish_rx.recv().await;
});
});
let sctx = rx.recv().unwrap().expect("failed to start ctx");
System::set_current(sctx.current());
let _ = task::block_on(ready_rx.recv()).expect("failed to start ctx");
// Do we need any fixtures?
// Yes probably, but they'll need to be futures as well ...
// later we could accept fixture as it's own future for re-use
@ -68,5 +79,8 @@ pub fn run_test(test_fn: fn(KanidmClient) -> ()) {
// We DO NOT need teardown, as sqlite is in mem
// let the tables hit the floor
sctx.stop();
// At this point, when the channel drops, it drops the thread too.
task::block_on(finish_tx.send(())).expect("unable to send to ctx");
t_handle.join().expect("failed to join thread");
}

View file

@ -73,4 +73,3 @@ default = [ "libsqlite3-sys/bundled" ]
[dev-dependencies]
kanidm = { path = "../kanidmd" }
actix = "0.9"

View file

@ -294,6 +294,11 @@ impl CacheLayer {
Some(OperationError::NoMatchingEntries),
opid,
)
| ClientError::Http(
StatusCode::NOT_FOUND,
Some(OperationError::NoMatchingEntries),
opid,
)
| ClientError::Http(
StatusCode::BAD_REQUEST,
Some(OperationError::InvalidAccountState(_)),
@ -362,6 +367,11 @@ impl CacheLayer {
Some(OperationError::NoMatchingEntries),
opid,
)
| ClientError::Http(
StatusCode::NOT_FOUND,
Some(OperationError::NoMatchingEntries),
opid,
)
| ClientError::Http(
StatusCode::BAD_REQUEST,
Some(OperationError::InvalidAccountState(_)),
@ -656,6 +666,11 @@ impl CacheLayer {
Some(OperationError::NoMatchingEntries),
opid,
)
| ClientError::Http(
StatusCode::NOT_FOUND,
Some(OperationError::NoMatchingEntries),
opid,
)
| ClientError::Http(
StatusCode::BAD_REQUEST,
Some(OperationError::InvalidAccountState(_)),

View file

@ -218,7 +218,7 @@ async fn handle_client(
Ok(())
}
#[tokio::main]
#[tokio::main(core_threads = 1, max_threads = 1)]
async fn main() {
let cuid = get_current_uid();
let ceuid = get_effective_uid();
@ -396,89 +396,3 @@ async fn main() {
server.await;
}
// This is the actix version, but on MacOS there is an issue where it can't flush the socket properly :(
//=== A connected client session
/*
struct ClientSession {
framed: actix::io::FramedWrite<WriteHalf<UnixStream>, ClientCodec>,
}
impl Actor for ClientSession {
type Context = Context<Self>;
}
impl actix::io::WriteHandler<io::Error> for ClientSession {}
impl StreamHandler<Result<ClientRequest, io::Error>> for ClientSession {
fn handle(&mut self, msg: Result<ClientRequest, io::Error>, ctx: &mut Self::Context) {
debug!("Processing -> {:?}", msg);
match msg {
Ok(ClientRequest::SshKey(account_id)) => {
self.framed.write(ClientResponse::SshKeys(vec![]));
}
Err(e) => {
println!("Encountered an IO error, disconnecting session -> {:?}", e);
ctx.stop();
}
}
}
}
impl ClientSession {
fn new(framed: actix::io::FramedWrite<WriteHalf<UnixStream>, ClientCodec>) -> Self {
ClientSession { framed: framed }
}
}
//=== this is the accept server
struct AcceptServer;
impl Actor for AcceptServer {
type Context = Context<Self>;
}
#[derive(Message)]
#[rtype(result = "()")]
struct UdsConnect(pub UnixStream, pub SocketAddr);
impl Handler<UdsConnect> for AcceptServer {
type Result = ();
fn handle(&mut self, msg: UdsConnect, _: &mut Context<Self>) {
debug!("Accepting new client ...");
// TODO: Clone the DB actor handle here.
ClientSession::create(move |ctx| {
let (r, w) = tokio::io::split(msg.0);
ClientSession::add_stream(FramedRead::new(r, ClientCodec), ctx);
ClientSession::new(actix::io::FramedWrite::new(w, ClientCodec, ctx))
});
}
}
#[actix_rt::main]
async fn main() {
// Setup logging
::std::env::set_var("RUST_LOG", "kanidm=debug,kanidm_client=debug");
env_logger::init();
rm_if_exist(DEFAULT_SOCK_PATH);
let listener = Box::new(UnixListener::bind(DEFAULT_SOCK_PATH).expect("Failed to bind"));
AcceptServer::create(|ctx| {
ctx.add_message_stream(Box::leak(listener).incoming().map(|st| {
let st = st.unwrap();
let addr = st.peer_addr().unwrap();
UdsConnect(st, addr)
}));
AcceptServer {}
});
println!("Running ...");
tokio::signal::ctrl_c().await.unwrap();
println!("Ctrl-C received, shutting down");
System::current().stop();
}
*/

View file

@ -1,8 +1,6 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::thread;
use actix::prelude::*;
use kanidm::audit::LogLevel;
use kanidm::config::{Configuration, IntegrationTestConfig};
use kanidm::core::create_server_core;
@ -18,6 +16,7 @@ use kanidm_client::asynchronous::KanidmAsyncClient;
use kanidm_client::{KanidmClient, KanidmClientBuilder};
use async_std::task;
use tokio::sync::mpsc;
static PORT_ALLOC: AtomicUsize = AtomicUsize::new(18080);
const ADMIN_TEST_PASSWORD: &str = "integration test admin password";
@ -28,7 +27,10 @@ const TESTACCOUNT1_PASSWORD_INC: &str = "never going to work";
fn run_test(fix_fn: fn(&KanidmClient) -> (), test_fn: fn(CacheLayer, KanidmAsyncClient) -> ()) {
// ::std::env::set_var("RUST_LOG", "actix_web=warn,kanidm=error");
let _ = env_logger::builder().is_test(true).try_init();
let (tx, rx) = mpsc::channel();
let (mut ready_tx, mut ready_rx) = mpsc::channel(1);
let (mut finish_tx, mut finish_rx) = mpsc::channel(1);
let port = PORT_ALLOC.fetch_add(1, Ordering::SeqCst);
let int_config = Box::new(IntegrationTestConfig {
@ -40,24 +42,33 @@ fn run_test(fix_fn: fn(&KanidmClient) -> (), test_fn: fn(CacheLayer, KanidmAsync
config.address = format!("127.0.0.1:{}", port);
config.secure_cookies = false;
config.integration_test_config = Some(int_config);
// config.log_level = Some(LogLevel::Verbose as u32);
config.log_level = Some(LogLevel::Quiet as u32);
thread::spawn(move || {
config.threads = 1;
let t_handle = thread::spawn(move || {
// Spawn a thread for the test runner, this should have a unique
// port....
let system = System::new("test-rctx");
let rctx = async move {
let sctx = create_server_core(config).await;
let _ = tx.send(sctx);
};
Arbiter::spawn(rctx);
system.run().expect("Failed to start thread");
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.expect("failed to start tokio");
rt.block_on(async {
create_server_core(config)
.await
.expect("failed to start server core");
// We have to yield now to guarantee that the tide elements are setup.
task::yield_now().await;
ready_tx
.send(())
.await
.expect("failed in indicate readiness");
finish_rx.recv().await;
});
});
let sctx = rx.recv().unwrap().expect("failed to start ctx");
System::set_current(sctx.current());
let _ = task::block_on(ready_rx.recv()).expect("failed to start ctx");
// Setup the client, and the address we selected.
let addr = format!("http://127.0.0.1:{}", port);
@ -95,7 +106,8 @@ fn run_test(fix_fn: fn(&KanidmClient) -> (), test_fn: fn(CacheLayer, KanidmAsync
// We DO NOT need teardown, as sqlite is in mem
// let the tables hit the floor
sctx.stop();
task::block_on(finish_tx.send(())).expect("unable to send to ctx");
t_handle.join().expect("failed to join thread");
}
fn test_fixture(rsclient: &KanidmClient) -> () {

View file

@ -28,11 +28,12 @@ path = "src/server/main.rs"
[dependencies]
kanidm_proto = { path = "../kanidm_proto", version = "1.1.0-alpha" }
actix = "0.9"
actix-rt = "1.1"
actix-web = { version = "2.0", features = ["openssl"] }
actix-session = "0.3"
actix-files = "0.2"
tide = "0.13"
async-trait = "0.1"
async-h1 = "2.0"
# Temporary!
# tide-rustls = "0.1"
tide-rustls = { git = "https://github.com/http-rs/tide-rustls.git", rev = "c1f13a77e82369323274d832b8d3f33ba7c272c7" }
async-std = "1.6"
@ -42,12 +43,16 @@ rand = "0.7"
toml = "0.5"
chrono = "0.4"
cookie = "0.14"
regex = "1"
lazy_static = "1.2.0"
tokio = "0.2"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "0.2", features = ["rt-threaded", "macros", "rt-util", "sync", "time", "net", "io-util", "signal"] }
tokio-util = { version = "0.2", features = ["codec"] }
tokio-openssl = "0.4"
openssl = "0.10"
uuid = { version = "0.8", features = ["serde", "v4" ] }
serde = "1.0"
serde_cbor = "0.11"
@ -63,11 +68,11 @@ structopt = { version = "0.3", default-features = false }
time = "0.1"
hashbrown = "0.8"
concread = "^0.1.18"
concread = "^0.2"
# concread = { path = "../../concread", features = ["asynch"] }
# concread = { path = "../../concread" }
crossbeam = "0.7"
openssl = "0.10"
sshkeys = "0.3"
rpassword = "4.0"
@ -81,9 +86,6 @@ base64 = "0.12"
ldap3_server = "0.1"
# ldap3_server = { path = "../../ldap3_server" }
futures-util = "0.3"
tokio-util = { version = "0.2", features = ["codec"] }
tokio-openssl = "0.4"
libc = "0.2"
users = "0.10"

View file

@ -5,4 +5,4 @@ db_fs_type = "zfs"
tls_ca = "../insecure/ca.pem"
tls_cert = "../insecure/cert.pem"
tls_key = "../insecure/key.pem"
log_level = "perffull"
log_level = "verbose"

View file

@ -1,4 +1,5 @@
use crossbeam::channel::Sender;
use tokio::sync::mpsc::UnboundedSender as Sender;
use std::sync::Arc;
use crate::audit::AuditScope;
@ -21,7 +22,6 @@ use kanidm_proto::v1::{
UserAuthToken, WhoamiResponse,
};
use actix::prelude::*;
use std::time::SystemTime;
use uuid::Uuid;
@ -40,16 +40,6 @@ pub struct WhoamiMessage {
pub eventid: Uuid,
}
impl WhoamiMessage {
pub fn new(uat: Option<UserAuthToken>, eventid: Uuid) -> Self {
WhoamiMessage { uat, eventid }
}
}
impl Message for WhoamiMessage {
type Result = Result<WhoamiResponse, OperationError>;
}
#[derive(Debug)]
pub struct AuthMessage {
pub sessionid: Option<Uuid>,
@ -67,10 +57,6 @@ impl AuthMessage {
}
}
impl Message for AuthMessage {
type Result = Result<AuthResponse, OperationError>;
}
pub struct SearchMessage {
pub uat: Option<UserAuthToken>,
pub req: SearchRequest,
@ -83,10 +69,6 @@ impl SearchMessage {
}
}
impl Message for SearchMessage {
type Result = Result<SearchResponse, OperationError>;
}
pub struct InternalSearchMessage {
pub uat: Option<UserAuthToken>,
pub filter: Filter<FilterInvalid>,
@ -94,10 +76,6 @@ pub struct InternalSearchMessage {
pub eventid: Uuid,
}
impl Message for InternalSearchMessage {
type Result = Result<Vec<ProtoEntry>, OperationError>;
}
pub struct InternalSearchRecycledMessage {
pub uat: Option<UserAuthToken>,
pub filter: Filter<FilterInvalid>,
@ -105,60 +83,36 @@ pub struct InternalSearchRecycledMessage {
pub eventid: Uuid,
}
impl Message for InternalSearchRecycledMessage {
type Result = Result<Vec<ProtoEntry>, OperationError>;
}
pub struct InternalRadiusReadMessage {
pub uat: Option<UserAuthToken>,
pub uuid_or_name: String,
pub eventid: Uuid,
}
impl Message for InternalRadiusReadMessage {
type Result = Result<Option<String>, OperationError>;
}
pub struct InternalRadiusTokenReadMessage {
pub uat: Option<UserAuthToken>,
pub uuid_or_name: String,
pub eventid: Uuid,
}
impl Message for InternalRadiusTokenReadMessage {
type Result = Result<RadiusAuthToken, OperationError>;
}
pub struct InternalUnixUserTokenReadMessage {
pub uat: Option<UserAuthToken>,
pub uuid_or_name: String,
pub eventid: Uuid,
}
impl Message for InternalUnixUserTokenReadMessage {
type Result = Result<UnixUserToken, OperationError>;
}
pub struct InternalUnixGroupTokenReadMessage {
pub uat: Option<UserAuthToken>,
pub uuid_or_name: String,
pub eventid: Uuid,
}
impl Message for InternalUnixGroupTokenReadMessage {
type Result = Result<UnixGroupToken, OperationError>;
}
pub struct InternalSshKeyReadMessage {
pub uat: Option<UserAuthToken>,
pub uuid_or_name: String,
pub eventid: Uuid,
}
impl Message for InternalSshKeyReadMessage {
type Result = Result<Vec<String>, OperationError>;
}
pub struct InternalSshKeyTagReadMessage {
pub uat: Option<UserAuthToken>,
pub uuid_or_name: String,
@ -166,10 +120,6 @@ pub struct InternalSshKeyTagReadMessage {
pub eventid: Uuid,
}
impl Message for InternalSshKeyTagReadMessage {
type Result = Result<Option<String>, OperationError>;
}
pub struct IdmAccountUnixAuthMessage {
pub uat: Option<UserAuthToken>,
pub uuid_or_name: String,
@ -177,31 +127,25 @@ pub struct IdmAccountUnixAuthMessage {
pub eventid: Uuid,
}
impl Message for IdmAccountUnixAuthMessage {
type Result = Result<Option<UnixUserToken>, OperationError>;
pub struct LdapRequestMessage {
pub eventid: Uuid,
pub protomsg: LdapMsg,
pub uat: Option<LdapBoundToken>,
}
// ===========================================================
pub struct QueryServerReadV1 {
log: Sender<Option<AuditScope>>,
log: Sender<AuditScope>,
log_level: Option<u32>,
qs: QueryServer,
idms: Arc<IdmServer>,
ldap: Arc<LdapServer>,
}
impl Actor for QueryServerReadV1 {
type Context = SyncContext<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
// ctx.set_mailbox_capacity(1 << 31);
}
}
impl QueryServerReadV1 {
pub fn new(
log: Sender<Option<AuditScope>>,
log: Sender<AuditScope>,
log_level: Option<u32>,
qs: QueryServer,
idms: Arc<IdmServer>,
@ -217,40 +161,38 @@ impl QueryServerReadV1 {
}
}
pub fn start(
log: Sender<Option<AuditScope>>,
pub fn start_static(
log: Sender<AuditScope>,
log_level: Option<u32>,
query_server: QueryServer,
idms: Arc<IdmServer>,
ldap: Arc<LdapServer>,
threads: usize,
) -> actix::Addr<QueryServerReadV1> {
SyncArbiter::start(threads, move || {
QueryServerReadV1::new(
log.clone(),
log_level,
query_server.clone(),
idms.clone(),
ldap.clone(),
)
})
) -> &'static Self {
let x = Box::new(QueryServerReadV1::new(
log.clone(),
log_level,
query_server.clone(),
idms.clone(),
ldap.clone(),
));
let x_ref = Box::leak(x);
&(*x_ref)
}
}
// The server only recieves "Message" structures, which
// are whole self contained DB operations with all parsing
// required complete. We still need to do certain validation steps, but
// at this point our just is just to route to do_<action>
// The server only recieves "Message" structures, which
// are whole self contained DB operations with all parsing
// required complete. We still need to do certain validation steps, but
// at this point our just is just to route to do_<action>
impl Handler<SearchMessage> for QueryServerReadV1 {
type Result = Result<SearchResponse, OperationError>;
fn handle(&mut self, msg: SearchMessage, _: &mut Self::Context) -> Self::Result {
pub async fn handle_search(
&self,
msg: SearchMessage,
) -> Result<SearchResponse, OperationError> {
let mut audit = AuditScope::new("search", msg.eventid, self.log_level);
// Begin a read
let qs_read = self.qs.read_async().await;
let res = lperf_op_segment!(&mut audit, "actors::v1_read::handle<SearchMessage>", || {
// Begin a read
let qs_read = self.qs.read();
// Make an event from the request
let srch = match SearchEvent::from_message(&mut audit, msg, &qs_read) {
Ok(s) => s,
@ -270,32 +212,26 @@ impl Handler<SearchMessage> for QueryServerReadV1 {
}
});
// At the end of the event we send it for logging.
self.log.send(Some(audit)).map_err(|_| {
self.log.send(audit).map_err(|_| {
error!("CRITICAL: UNABLE TO COMMIT LOGS");
OperationError::InvalidState
})?;
res
}
}
impl Handler<AuthMessage> for QueryServerReadV1 {
type Result = Result<AuthResponse, OperationError>;
fn handle(&mut self, msg: AuthMessage, _: &mut Self::Context) -> Self::Result {
pub async fn handle_auth(&self, msg: AuthMessage) -> Result<AuthResponse, OperationError> {
// This is probably the first function that really implements logic
// "on top" of the db server concept. In this case we check if
// the credentials provided is sufficient to say if someone is
// "authenticated" or not.
let mut audit = AuditScope::new("auth", msg.eventid, self.log_level);
let mut idm_write = self.idms.write_async().await;
let res = lperf_op_segment!(&mut audit, "actors::v1_read::handle<AuthMessage>", || {
lsecurity!(audit, "Begin auth event {:?}", msg);
// Destructure it.
// Convert the AuthRequest to an AuthEvent that the idm server
// can use.
let mut idm_write = self.idms.write();
let ae = AuthEvent::from_message(msg).map_err(|e| {
ladmin_error!(audit, "Failed to parse AuthEvent -> {:?}", e);
e
@ -328,24 +264,22 @@ impl Handler<AuthMessage> for QueryServerReadV1 {
r.map(|r| r.response())
});
// At the end of the event we send it for logging.
self.log.send(Some(audit)).map_err(|_| {
self.log.send(audit).map_err(|_| {
error!("CRITICAL: UNABLE TO COMMIT LOGS");
OperationError::InvalidState
})?;
res
}
}
impl Handler<WhoamiMessage> for QueryServerReadV1 {
type Result = Result<WhoamiResponse, OperationError>;
fn handle(&mut self, msg: WhoamiMessage, _: &mut Self::Context) -> Self::Result {
pub async fn handle_whoami(
&self,
msg: WhoamiMessage,
) -> Result<WhoamiResponse, OperationError> {
let mut audit = AuditScope::new("whoami", msg.eventid, self.log_level);
// TODO #62: Move this to IdmServer!!!
// Begin a read
let qs_read = self.qs.read_async().await;
let res = lperf_op_segment!(&mut audit, "actors::v1_read::handle<WhoamiMessage>", || {
// TODO #62: Move this to IdmServer!!!
// Begin a read
let qs_read = self.qs.read();
// Make an event from the whoami request. This will process the event and
// generate a selfuuid search.
//
@ -388,25 +322,23 @@ impl Handler<WhoamiMessage> for QueryServerReadV1 {
});
// Should we log the final result?
// At the end of the event we send it for logging.
self.log.send(Some(audit)).map_err(|_| {
self.log.send(audit).map_err(|_| {
error!("CRITICAL: UNABLE TO COMMIT LOGS");
OperationError::InvalidState
})?;
res
}
}
impl Handler<InternalSearchMessage> for QueryServerReadV1 {
type Result = Result<Vec<ProtoEntry>, OperationError>;
fn handle(&mut self, msg: InternalSearchMessage, _: &mut Self::Context) -> Self::Result {
pub async fn handle_internalsearch(
&self,
msg: InternalSearchMessage,
) -> Result<Vec<ProtoEntry>, OperationError> {
let mut audit = AuditScope::new("internal_search_message", msg.eventid, self.log_level);
let qs_read = self.qs.read_async().await;
let res = lperf_op_segment!(
&mut audit,
"actors::v1_read::handle<InternalSearchMessage>",
|| {
let qs_read = self.qs.read();
// Make an event from the request
let srch = match SearchEvent::from_internal_message(&mut audit, msg, &qs_read) {
Ok(s) => s,
@ -425,33 +357,28 @@ impl Handler<InternalSearchMessage> for QueryServerReadV1 {
}
}
);
self.log.send(Some(audit)).map_err(|_| {
self.log.send(audit).map_err(|_| {
error!("CRITICAL: UNABLE TO COMMIT LOGS");
OperationError::InvalidState
})?;
res
}
}
impl Handler<InternalSearchRecycledMessage> for QueryServerReadV1 {
type Result = Result<Vec<ProtoEntry>, OperationError>;
fn handle(
&mut self,
pub async fn handle_internalsearchrecycled(
&self,
msg: InternalSearchRecycledMessage,
_: &mut Self::Context,
) -> Self::Result {
) -> Result<Vec<ProtoEntry>, OperationError> {
let mut audit = AuditScope::new(
"internal_search_recycle_message",
msg.eventid,
self.log_level,
);
let qs_read = self.qs.read_async().await;
let res = lperf_op_segment!(
&mut audit,
"actors::v1_read::handle<InternalSearchRecycledMessage>",
|| {
let qs_read = self.qs.read();
// Make an event from the request
let srch =
match SearchEvent::from_internal_recycle_message(&mut audit, msg, &qs_read) {
@ -471,26 +398,24 @@ impl Handler<InternalSearchRecycledMessage> for QueryServerReadV1 {
}
}
);
self.log.send(Some(audit)).map_err(|_| {
self.log.send(audit).map_err(|_| {
error!("CRITICAL: UNABLE TO COMMIT LOGS");
OperationError::InvalidState
})?;
res
}
}
impl Handler<InternalRadiusReadMessage> for QueryServerReadV1 {
type Result = Result<Option<String>, OperationError>;
fn handle(&mut self, msg: InternalRadiusReadMessage, _: &mut Self::Context) -> Self::Result {
pub async fn handle_internalradiusread(
&self,
msg: InternalRadiusReadMessage,
) -> Result<Option<String>, OperationError> {
let mut audit =
AuditScope::new("internal_radius_read_message", msg.eventid, self.log_level);
let qs_read = self.qs.read_async().await;
let res = lperf_op_segment!(
&mut audit,
"actors::v1_read::handle<InternalRadiusReadMessage>",
|| {
let qs_read = self.qs.read();
let target_uuid = qs_read
.name_to_uuid(&mut audit, msg.uuid_or_name.as_str())
.map_err(|e| {
@ -530,33 +455,28 @@ impl Handler<InternalRadiusReadMessage> for QueryServerReadV1 {
}
}
);
self.log.send(Some(audit)).map_err(|_| {
self.log.send(audit).map_err(|_| {
error!("CRITICAL: UNABLE TO COMMIT LOGS");
OperationError::InvalidState
})?;
res
}
}
impl Handler<InternalRadiusTokenReadMessage> for QueryServerReadV1 {
type Result = Result<RadiusAuthToken, OperationError>;
fn handle(
&mut self,
pub async fn handle_internalradiustokenread(
&self,
msg: InternalRadiusTokenReadMessage,
_: &mut Self::Context,
) -> Self::Result {
) -> Result<RadiusAuthToken, OperationError> {
let mut audit = AuditScope::new(
"internal_radius_token_read_message",
msg.eventid,
self.log_level,
);
let mut idm_read = self.idms.proxy_read_async().await;
let res = lperf_op_segment!(
&mut audit,
"actors::v1_read::handle<InternalRadiusTokenReadMessage>",
|| {
let mut idm_read = self.idms.proxy_read();
let target_uuid = idm_read
.qs_read
.name_to_uuid(&mut audit, msg.uuid_or_name.as_str())
@ -584,42 +504,40 @@ impl Handler<InternalRadiusTokenReadMessage> for QueryServerReadV1 {
idm_read.get_radiusauthtoken(&mut audit, &rate)
}
);
self.log.send(Some(audit)).map_err(|_| {
self.log.send(audit).map_err(|_| {
error!("CRITICAL: UNABLE TO COMMIT LOGS");
OperationError::InvalidState
})?;
res
}
}
impl Handler<InternalUnixUserTokenReadMessage> for QueryServerReadV1 {
type Result = Result<UnixUserToken, OperationError>;
fn handle(
&mut self,
pub async fn handle_internalunixusertokenread(
&self,
msg: InternalUnixUserTokenReadMessage,
_: &mut Self::Context,
) -> Self::Result {
) -> Result<UnixUserToken, OperationError> {
let mut audit = AuditScope::new(
"internal_unix_token_read_message",
msg.eventid,
self.log_level,
);
let mut idm_read = self.idms.proxy_read_async().await;
let res = lperf_op_segment!(
&mut audit,
"actors::v1_read::handle<InternalUnixUserTokenReadMessage>",
|| {
let mut idm_read = self.idms.proxy_read();
let target_uuid = Uuid::parse_str(msg.uuid_or_name.as_str()).or_else(|_| {
idm_read
.qs_read
.name_to_uuid(&mut audit, msg.uuid_or_name.as_str())
.map_err(|e| {
ladmin_info!(&mut audit, "Error resolving as gidnumber continuing ...");
let target_uuid = idm_read
.qs_read
.name_to_uuid(&mut audit, msg.uuid_or_name.as_str())
.map_err(|e| {
ladmin_info!(
&mut audit,
"Error resolving {} as gidnumber continuing ... {:?}",
msg.uuid_or_name,
e
})
})?;
);
e
})?;
// Make an event from the request
let rate = match UnixUserTokenEvent::from_parts(
@ -640,42 +558,34 @@ impl Handler<InternalUnixUserTokenReadMessage> for QueryServerReadV1 {
idm_read.get_unixusertoken(&mut audit, &rate)
}
);
self.log.send(Some(audit)).map_err(|_| {
self.log.send(audit).map_err(|_| {
error!("CRITICAL: UNABLE TO COMMIT LOGS");
OperationError::InvalidState
})?;
res
}
}
impl Handler<InternalUnixGroupTokenReadMessage> for QueryServerReadV1 {
type Result = Result<UnixGroupToken, OperationError>;
fn handle(
&mut self,
pub async fn handle_internalunixgrouptokenread(
&self,
msg: InternalUnixGroupTokenReadMessage,
_: &mut Self::Context,
) -> Self::Result {
) -> Result<UnixGroupToken, OperationError> {
let mut audit = AuditScope::new(
"internal_unixgroup_token_read_message",
msg.eventid,
self.log_level,
);
let mut idm_read = self.idms.proxy_read_async().await;
let res = lperf_op_segment!(
&mut audit,
"actors::v1_read::handle<InternalUnixGroupTokenReadMessage>",
|| {
let mut idm_read = self.idms.proxy_read();
let target_uuid = Uuid::parse_str(msg.uuid_or_name.as_str()).or_else(|_| {
idm_read
.qs_read
.name_to_uuid(&mut audit, msg.uuid_or_name.as_str())
.map_err(|e| {
ladmin_info!(&mut audit, "Error resolving as gidnumber continuing ...");
e
})
})?;
let target_uuid = idm_read
.qs_read
.name_to_uuid(&mut audit, msg.uuid_or_name.as_str())
.map_err(|e| {
ladmin_info!(&mut audit, "Error resolving as gidnumber continuing ...");
e
})?;
// Make an event from the request
let rate = match UnixGroupTokenEvent::from_parts(
@ -696,26 +606,24 @@ impl Handler<InternalUnixGroupTokenReadMessage> for QueryServerReadV1 {
idm_read.get_unixgrouptoken(&mut audit, &rate)
}
);
self.log.send(Some(audit)).map_err(|_| {
self.log.send(audit).map_err(|_| {
error!("CRITICAL: UNABLE TO COMMIT LOGS");
OperationError::InvalidState
})?;
res
}
}
impl Handler<InternalSshKeyReadMessage> for QueryServerReadV1 {
type Result = Result<Vec<String>, OperationError>;
fn handle(&mut self, msg: InternalSshKeyReadMessage, _: &mut Self::Context) -> Self::Result {
pub async fn handle_internalsshkeyread(
&self,
msg: InternalSshKeyReadMessage,
) -> Result<Vec<String>, OperationError> {
let mut audit =
AuditScope::new("internal_sshkey_read_message", msg.eventid, self.log_level);
let qs_read = self.qs.read_async().await;
let res = lperf_op_segment!(
&mut audit,
"actors::v1_read::handle<InternalSshKeyReadMessage>",
|| {
let qs_read = self.qs.read();
let target_uuid = qs_read
.name_to_uuid(&mut audit, msg.uuid_or_name.as_str())
.map_err(|e| {
@ -759,18 +667,17 @@ impl Handler<InternalSshKeyReadMessage> for QueryServerReadV1 {
}
}
);
self.log.send(Some(audit)).map_err(|_| {
self.log.send(audit).map_err(|_| {
error!("CRITICAL: UNABLE TO COMMIT LOGS");
OperationError::InvalidState
})?;
res
}
}
impl Handler<InternalSshKeyTagReadMessage> for QueryServerReadV1 {
type Result = Result<Option<String>, OperationError>;
fn handle(&mut self, msg: InternalSshKeyTagReadMessage, _: &mut Self::Context) -> Self::Result {
pub async fn handle_internalsshkeytagread(
&self,
msg: InternalSshKeyTagReadMessage,
) -> Result<Option<String>, OperationError> {
let InternalSshKeyTagReadMessage {
uat,
uuid_or_name,
@ -779,12 +686,11 @@ impl Handler<InternalSshKeyTagReadMessage> for QueryServerReadV1 {
} = msg;
let mut audit =
AuditScope::new("internal_sshkey_tag_read_message", eventid, self.log_level);
let qs_read = self.qs.read_async().await;
let res = lperf_op_segment!(
&mut audit,
"actors::v1_read::handle<InternalSshKeyTagReadMessage>",
|| {
let qs_read = self.qs.read();
let target_uuid = qs_read
.name_to_uuid(&mut audit, uuid_or_name.as_str())
.map_err(|e| {
@ -834,35 +740,31 @@ impl Handler<InternalSshKeyTagReadMessage> for QueryServerReadV1 {
}
}
);
self.log.send(Some(audit)).map_err(|_| {
self.log.send(audit).map_err(|_| {
error!("CRITICAL: UNABLE TO COMMIT LOGS");
OperationError::InvalidState
})?;
res
}
}
impl Handler<IdmAccountUnixAuthMessage> for QueryServerReadV1 {
type Result = Result<Option<UnixUserToken>, OperationError>;
fn handle(&mut self, msg: IdmAccountUnixAuthMessage, _: &mut Self::Context) -> Self::Result {
pub async fn handle_idmaccountunixauth(
&self,
msg: IdmAccountUnixAuthMessage,
) -> Result<Option<UnixUserToken>, OperationError> {
let mut audit = AuditScope::new("idm_account_unix_auth", msg.eventid, self.log_level);
let mut idm_write = self.idms.write_async().await;
let res = lperf_op_segment!(
&mut audit,
"actors::v1_read::handle<IdmAccountUnixAuthMessage>",
|| {
let mut idm_write = self.idms.write();
// resolve the id
let target_uuid = Uuid::parse_str(msg.uuid_or_name.as_str()).or_else(|_| {
idm_write
.qs_read
.name_to_uuid(&mut audit, msg.uuid_or_name.as_str())
.map_err(|e| {
ladmin_info!(&mut audit, "Error resolving as gidnumber continuing ...");
e
})
})?;
let target_uuid = idm_write
.qs_read
.name_to_uuid(&mut audit, msg.uuid_or_name.as_str())
.map_err(|e| {
ladmin_info!(&mut audit, "Error resolving as gidnumber continuing ...");
e
})?;
// Make an event from the request
let uuae = match UnixUserAuthEvent::from_parts(
&mut audit,
@ -895,58 +797,49 @@ impl Handler<IdmAccountUnixAuthMessage> for QueryServerReadV1 {
r
}
);
self.log.send(Some(audit)).map_err(|_| {
self.log.send(audit).map_err(|_| {
error!("CRITICAL: UNABLE TO COMMIT LOGS");
OperationError::InvalidState
})?;
res
}
}
#[derive(Message)]
#[rtype(result = "Option<LdapResponseState>")]
pub struct LdapRequestMessage {
pub eventid: Uuid,
pub protomsg: LdapMsg,
pub uat: Option<LdapBoundToken>,
}
impl Handler<LdapRequestMessage> for QueryServerReadV1 {
type Result = Option<LdapResponseState>;
fn handle(&mut self, msg: LdapRequestMessage, _: &mut Self::Context) -> Self::Result {
pub async fn handle_ldaprequest(&self, msg: LdapRequestMessage) -> Option<LdapResponseState> {
let LdapRequestMessage {
eventid,
protomsg,
uat,
} = msg;
let mut audit = AuditScope::new("ldap_request_message", eventid, self.log_level);
/*
let res = lperf_op_segment!(
&mut audit,
"actors::v1_read::handle<LdapRequestMessage>",
|| {
let server_op = match ServerOps::try_from(protomsg) {
Ok(v) => v,
Err(_) => {
return LdapResponseState::Disconnect(DisconnectionNotice::gen(
LdapResultCode::ProtocolError,
format!("Invalid Request {:?}", &eventid).as_str(),
));
}
};
self.ldap
.do_op(&mut audit, &self.idms, server_op, uat, &eventid)
.unwrap_or_else(|e| {
ladmin_error!(&mut audit, "do_op failed -> {:?}", e);
LdapResponseState::Disconnect(DisconnectionNotice::gen(
LdapResultCode::Other,
format!("Internal Server Error {:?}", &eventid).as_str(),
))
})
*/
let res = match ServerOps::try_from(protomsg) {
Ok(server_op) => self
.ldap
.do_op(&mut audit, &self.idms, server_op, uat, &eventid)
.await
.unwrap_or_else(|e| {
ladmin_error!(&mut audit, "do_op failed -> {:?}", e);
LdapResponseState::Disconnect(DisconnectionNotice::gen(
LdapResultCode::Other,
format!("Internal Server Error {:?}", &eventid).as_str(),
))
}),
Err(_) => LdapResponseState::Disconnect(DisconnectionNotice::gen(
LdapResultCode::ProtocolError,
format!("Invalid Request {:?}", &eventid).as_str(),
)),
};
/*
}
);
if self.log.send(Some(audit)).is_err() {
*/
if self.log.send(audit).is_err() {
error!("Unable to commit log -> {:?}", &eventid);
Some(LdapResponseState::Disconnect(DisconnectionNotice::gen(
LdapResultCode::Other,

File diff suppressed because it is too large Load diff

View file

@ -1,37 +1,19 @@
use crate::audit::AuditScope;
use crossbeam::channel::Receiver;
use tokio::sync::mpsc::UnboundedReceiver as Receiver;
pub fn run(rx: &Receiver<Option<AuditScope>>) {
info!("Log thread started ...");
pub(crate) async fn run(mut rx: Receiver<AuditScope>) {
info!("Log task started ...");
loop {
match rx.recv() {
Ok(Some(al)) => {
match rx.recv().await {
Some(al) => {
al.write_log();
}
Ok(None) => {
None => {
// Prep to shutdown, finish draining.
break;
}
Err(_) => {
// we're cooked.
error!("CRITICAL: log thread is cooked.");
}
}
}
loop {
match rx.try_recv() {
Ok(Some(al)) => {
al.write_log();
}
Ok(None) => {
// Skip this, it's a shutdown msg.
}
Err(_) => {
// we've drained.
break;
}
}
}
info!("Log thread shutdown complete.");
}

View file

@ -1,4 +1,3 @@
use actix::prelude::*;
use std::fmt;
// use std::ptr;
use std::cmp::Ordering;
@ -439,13 +438,6 @@ pub struct AuditScope {
active_perf: Option<&'static mut PerfEvent>,
}
// unsafe impl Sync for AuditScope {}
// Allow us to be sent to the log subsystem
impl Message for AuditScope {
type Result = ();
}
impl AuditScope {
pub fn new(name: &str, eventid: Uuid, level: Option<u32>) -> Self {
let level = if cfg!(test) {

View file

@ -6,7 +6,7 @@ use crate::be::{IdRawEntry, IDL};
use crate::entry::{Entry, EntryCommitted, EntrySealed};
use crate::value::IndexType;
use crate::value::Value;
use concread::cache::arc::{Arc, ArcReadTxn, ArcWriteTxn};
use concread::arcache::{ARCache, ARCacheReadTxn, ARCacheWriteTxn};
use concread::cowcell::*;
use idlset::IDLBitRange;
use kanidm_proto::v1::{ConsistencyError, OperationError};
@ -64,9 +64,9 @@ impl From<(&str, &IndexType, &str)> for IdlCacheKey {
pub struct IdlArcSqlite {
db: IdlSqlite,
entry_cache: Arc<u64, Box<Entry<EntrySealed, EntryCommitted>>>,
idl_cache: Arc<IdlCacheKey, Box<IDLBitRange>>,
name_cache: Arc<NameCacheKey, NameCacheValue>,
entry_cache: ARCache<u64, Box<Entry<EntrySealed, EntryCommitted>>>,
idl_cache: ARCache<IdlCacheKey, Box<IDLBitRange>>,
name_cache: ARCache<NameCacheKey, NameCacheValue>,
op_ts_max: CowCell<Option<Duration>>,
allids: CowCell<IDLBitRange>,
maxid: CowCell<u64>,
@ -74,17 +74,17 @@ pub struct IdlArcSqlite {
pub struct IdlArcSqliteReadTransaction<'a> {
db: IdlSqliteReadTransaction,
entry_cache: ArcReadTxn<'a, u64, Box<Entry<EntrySealed, EntryCommitted>>>,
idl_cache: ArcReadTxn<'a, IdlCacheKey, Box<IDLBitRange>>,
name_cache: ArcReadTxn<'a, NameCacheKey, NameCacheValue>,
entry_cache: ARCacheReadTxn<'a, u64, Box<Entry<EntrySealed, EntryCommitted>>>,
idl_cache: ARCacheReadTxn<'a, IdlCacheKey, Box<IDLBitRange>>,
name_cache: ARCacheReadTxn<'a, NameCacheKey, NameCacheValue>,
allids: CowCellReadTxn<IDLBitRange>,
}
pub struct IdlArcSqliteWriteTransaction<'a> {
db: IdlSqliteWriteTransaction,
entry_cache: ArcWriteTxn<'a, u64, Box<Entry<EntrySealed, EntryCommitted>>>,
idl_cache: ArcWriteTxn<'a, IdlCacheKey, Box<IDLBitRange>>,
name_cache: ArcWriteTxn<'a, NameCacheKey, NameCacheValue>,
entry_cache: ARCacheWriteTxn<'a, u64, Box<Entry<EntrySealed, EntryCommitted>>>,
idl_cache: ARCacheWriteTxn<'a, IdlCacheKey, Box<IDLBitRange>>,
name_cache: ARCacheWriteTxn<'a, NameCacheKey, NameCacheValue>,
op_ts_max: CowCellWriteTxn<'a, Option<Duration>>,
allids: CowCellWriteTxn<'a, IDLBitRange>,
maxid: CowCellWriteTxn<'a, u64>,
@ -855,7 +855,7 @@ impl IdlArcSqlite {
fstype: FsType,
) -> Result<Self, OperationError> {
let db = IdlSqlite::new(audit, path, pool_size, fstype)?;
let entry_cache = Arc::new(
let entry_cache = ARCache::new(
DEFAULT_CACHE_TARGET,
pool_size as usize,
DEFAULT_CACHE_RMISS,
@ -864,7 +864,7 @@ impl IdlArcSqlite {
);
// The idl cache should have smaller items, and is critical for fast searches
// so we allow it to have a higher ratio of items relative to the entries.
let idl_cache = Arc::new(
let idl_cache = ARCache::new(
DEFAULT_CACHE_TARGET * DEFAULT_IDL_CACHE_RATIO,
pool_size as usize,
DEFAULT_CACHE_RMISS,
@ -872,7 +872,7 @@ impl IdlArcSqlite {
false,
);
let name_cache = Arc::new(
let name_cache = ARCache::new(
DEFAULT_CACHE_TARGET * DEFAULT_NAME_CACHE_RATIO,
pool_size as usize,
DEFAULT_CACHE_RMISS,

View file

@ -1238,6 +1238,9 @@ impl IdlSqlite {
pool_size: u32,
fstype: FsType,
) -> Result<Self, OperationError> {
if path == "" {
debug_assert!(pool_size == 1);
}
// If provided, set the page size to match the tuning we want. By default we use 4096. The VACUUM
// immediately after is so that on db create the page size takes effect.
//
@ -1245,6 +1248,9 @@ impl IdlSqlite {
let mut flags = OpenFlags::default();
// Open with multi thread flags and locking options.
flags.insert(OpenFlags::SQLITE_OPEN_NO_MUTEX);
// TODO: This probably only needs to be run on first run OR we need a flag
// or something else. Maybe on reindex only?
let manager = SqliteConnectionManager::file(path)
.with_init(move |c| {
c.execute_batch(
@ -1257,14 +1263,7 @@ impl IdlSqlite {
})
.with_flags(flags);
let builder1 = Pool::builder();
let builder2 = if path == "" {
// We are in a debug mode, with in memory. We MUST have only
// a single DB thread, else we cause consistency issues.
builder1.max_size(1)
} else {
// Have to add 1 for the write thread, and for the interval threads
builder1.max_size(pool_size + 2)
};
let builder2 = builder1.max_size(pool_size);
// Look at max_size and thread_pool here for perf later
let pool = builder2.build(manager).map_err(|e| {
ladmin_error!(audit, "r2d2 error {:?}", e);
@ -1275,6 +1274,9 @@ impl IdlSqlite {
}
pub fn read(&self) -> IdlSqliteReadTransaction {
// When we make this async, this will allow us to backoff
// when we miss-grabbing from the conn-pool.
// async_std::task::yield_now().await
#[allow(clippy::expect_used)]
let conn = self
.pool

View file

@ -65,6 +65,7 @@ pub struct IdRawEntry {
#[derive(Clone)]
pub struct Backend {
pool_size: usize,
idlayer: Arc<IdlArcSqlite>,
/// This is a copy-on-write cache of the index metadata that has been
/// extracted from attributes set, in the correct format for the backend
@ -1316,13 +1317,19 @@ impl Backend {
pub fn new(
audit: &mut AuditScope,
path: &str,
pool_size: u32,
mut pool_size: u32,
fstype: FsType,
idxmeta: Set<IdxKey>,
) -> Result<Self, OperationError> {
// If in memory, reduce pool to 1
if path == "" {
pool_size = 1;
}
// this has a ::memory() type, but will path == "" work?
lperf_trace_segment!(audit, "be::new", || {
let be = Backend {
pool_size: pool_size as usize,
idlayer: Arc::new(IdlArcSqlite::new(audit, path, pool_size, fstype)?),
idxmeta: Arc::new(CowCell::new(idxmeta)),
};
@ -1345,6 +1352,11 @@ impl Backend {
})
}
pub fn get_pool_size(&self) -> usize {
debug_assert!(self.pool_size > 0);
self.pool_size
}
pub fn read(&self) -> BackendReadTransaction {
BackendReadTransaction {
idlayer: UnsafeCell::new(self.idlayer.read()),

View file

@ -1,39 +0,0 @@
use crate::audit::AuditScope;
use actix::prelude::*;
use crossbeam::channel::Sender;
use std::thread;
pub struct ServerCtx {
system: System,
log_tx: Sender<Option<AuditScope>>,
log_thread: thread::JoinHandle<()>,
}
impl ServerCtx {
pub fn new(
system: System,
log_tx: Sender<Option<AuditScope>>,
log_thread: thread::JoinHandle<()>,
) -> Self {
ServerCtx {
system,
log_tx,
log_thread,
}
}
pub fn current(&self) -> System {
self.system.clone()
}
#[allow(clippy::expect_used)]
pub fn stop(self) {
// stop the actix system
self.system.stop();
// drain the log thread
self.log_tx
.send(None)
.expect("unable to shutdown log thread!");
self.log_thread.join().expect("failed to stop log thread");
}
}

File diff suppressed because it is too large Load diff

View file

@ -2,238 +2,169 @@ use crate::actors::v1_read::{LdapRequestMessage, QueryServerReadV1};
use crate::ldap::{LdapBoundToken, LdapResponseState};
use openssl::ssl::{SslAcceptor, SslAcceptorBuilder};
use actix::prelude::*;
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use ldap3_server::simple::*;
// use ldap3_server::simple::*;
use ldap3_server::LdapCodec;
// use std::convert::TryFrom;
use std::io;
use std::marker::Unpin;
use std::net;
use std::str::FromStr;
use tokio::io::{AsyncWrite, WriteHalf};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::codec::FramedRead;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener;
use tokio_util::codec::{FramedRead, FramedWrite};
use uuid::Uuid;
struct LdapReq(pub LdapMsg);
impl Message for LdapReq {
type Result = Result<(), ()>;
}
pub struct LdapServer {
qe_r: Addr<QueryServerReadV1>,
}
pub struct LdapSession<T>
where
T: AsyncWrite + Unpin,
{
qe_r: Addr<QueryServerReadV1>,
framed: actix::io::FramedWrite<WriteHalf<T>, LdapCodec>,
struct LdapSession {
uat: Option<LdapBoundToken>,
}
impl<T> Actor for LdapSession<T>
where
T: 'static + AsyncWrite + Unpin,
{
type Context = actix::Context<Self>;
}
impl<T> actix::io::WriteHandler<io::Error> for LdapSession<T> where T: 'static + AsyncWrite + Unpin {}
impl<T> Handler<LdapReq> for LdapSession<T>
where
T: 'static + AsyncWrite + Unpin,
{
type Result = ResponseActFuture<Self, Result<(), ()>>;
fn handle(&mut self, msg: LdapReq, _ctx: &mut Self::Context) -> Self::Result {
let protomsg = msg.0;
// Transform the LdapMsg to something the query server can work with.
// Because of the way these futures works, it's up to the qe_r to manage
// a lot of this, so we just palm off the processing to the thead pool.
let eventid = Uuid::new_v4();
let uat = self.uat.clone();
let qsf = self.qe_r.send(LdapRequestMessage {
eventid,
protomsg,
uat,
});
let qsf = actix::fut::wrap_future::<_, Self>(qsf);
let f = qsf.map(|result, actor, ctx| {
match result {
Ok(Some(LdapResponseState::Unbind)) => ctx.stop(),
Ok(Some(LdapResponseState::Disconnect(r))) => {
actor.framed.write(r);
ctx.stop()
}
Ok(Some(LdapResponseState::Bind(uat, r))) => {
actor.uat = Some(uat);
actor.framed.write(r);
}
Ok(Some(LdapResponseState::Respond(r))) => {
actor.framed.write(r);
}
Ok(Some(LdapResponseState::MultiPartResponse(v))) => {
v.into_iter().for_each(|r| actor.framed.write(r));
}
Ok(Some(LdapResponseState::BindMultiPartResponse(uat, v))) => {
actor.uat = Some(uat);
v.into_iter().for_each(|r| actor.framed.write(r));
}
Ok(None) | Err(_) => {
eprintln!("Internal server error");
ctx.stop();
}
};
Ok(())
});
Box::new(f)
}
}
impl<T> StreamHandler<Result<LdapMsg, io::Error>> for LdapSession<T>
where
T: 'static + AsyncWrite + Unpin,
{
fn handle(&mut self, msg: Result<LdapMsg, io::Error>, ctx: &mut Self::Context) {
match msg {
Ok(lm) => match ctx.address().try_send(LdapReq(lm)) {
// It's queued, we are done.
Ok(_) => {}
Err(_) => {
eprintln!("Too many queue msgs for connection");
ctx.stop()
}
},
Err(_) => {
eprintln!("Io error");
ctx.stop()
}
}
}
}
impl<T> LdapSession<T>
where
T: 'static + AsyncWrite + Unpin,
{
pub fn new(
framed: actix::io::FramedWrite<WriteHalf<T>, LdapCodec>,
qe_r: Addr<QueryServerReadV1>,
) -> Self {
impl LdapSession {
fn new() -> Self {
LdapSession {
qe_r,
framed,
// We start un-authenticated
uat: None,
}
}
}
impl Actor for LdapServer {
type Context = Context<Self>;
async fn client_process<W: AsyncWrite + Unpin, R: AsyncRead + Unpin>(
mut r: FramedRead<R, LdapCodec>,
mut w: FramedWrite<W, LdapCodec>,
_paddr: net::SocketAddr,
qe_r_ref: &'static QueryServerReadV1,
) {
// This is a connected client session. we need to associate some state to the
// session
let mut session = LdapSession::new();
// Now that we have the session we begin an event loop to process input OR
// we return.
while let Some(Ok(protomsg)) = r.next().await {
// Start the event
let eventid = Uuid::new_v4();
let uat = session.uat.clone();
let qs_result = qe_r_ref
.handle_ldaprequest(LdapRequestMessage {
eventid,
protomsg,
uat,
})
.await;
match qs_result {
Some(LdapResponseState::Unbind) => return,
Some(LdapResponseState::Disconnect(rmsg)) => {
if let Err(_) = w.send(rmsg).await {
break;
}
break;
}
Some(LdapResponseState::Bind(uat, rmsg)) => {
session.uat = Some(uat);
if let Err(_) = w.send(rmsg).await {
break;
}
}
Some(LdapResponseState::Respond(rmsg)) => {
if let Err(_) = w.send(rmsg).await {
break;
}
}
Some(LdapResponseState::MultiPartResponse(v)) => {
for rmsg in v.into_iter() {
if let Err(_) = w.send(rmsg).await {
break;
}
}
}
Some(LdapResponseState::BindMultiPartResponse(uat, v)) => {
session.uat = Some(uat);
for rmsg in v.into_iter() {
if let Err(_) = w.send(rmsg).await {
break;
}
}
}
None => {
error!("Internal server error");
break;
}
};
}
// We now are leaving, so any cleanup done here.
}
#[derive(Message)]
#[rtype(result = "()")]
struct TcpConnect(pub TcpStream, pub net::SocketAddr);
impl Handler<TcpConnect> for LdapServer {
type Result = ();
fn handle(&mut self, msg: TcpConnect, _: &mut Context<Self>) {
LdapSession::create(move |ctx| {
let (r, w) = tokio::io::split(msg.0);
LdapSession::add_stream(FramedRead::new(r, LdapCodec), ctx);
LdapSession::new(
actix::io::FramedWrite::new(w, LdapCodec, ctx),
self.qe_r.clone(),
)
});
async fn tls_acceptor(
mut listener: TcpListener,
tls_parms: SslAcceptor,
qe_r_ref: &'static QueryServerReadV1,
) {
// Do we need to do the silly ssl leak?
loop {
match listener.accept().await {
Ok((tcpstream, paddr)) => {
let res = tokio_openssl::accept(&tls_parms, tcpstream).await;
let tlsstream = match res {
Ok(ts) => ts,
Err(e) => {
error!("tls handshake error, continuing -> {:?}", e);
continue;
}
};
let (r, w) = tokio::io::split(tlsstream);
let r = FramedRead::new(r, LdapCodec);
let w = FramedWrite::new(w, LdapCodec);
tokio::spawn(client_process(r, w, paddr, qe_r_ref));
}
Err(e) => {
error!("acceptor error, continuing -> {:?}", e);
}
}
}
}
#[derive(Message)]
#[rtype(result = "Result<(), ()>")]
struct TlsConnect(pub &'static SslAcceptor, pub TcpStream, pub net::SocketAddr);
impl Handler<TlsConnect> for LdapServer {
type Result = ResponseActFuture<Self, Result<(), ()>>;
fn handle(&mut self, msg: TlsConnect, _: &mut Context<Self>) -> Self::Result {
let qsf = tokio_openssl::accept(msg.0, msg.1);
let qsf = actix::fut::wrap_future::<_, Self>(qsf);
let f = qsf.map(|result, actor, _ctx| {
result
.map(|tlsstream| {
LdapSession::create(move |ctx| {
let (r, w) = tokio::io::split(tlsstream);
LdapSession::add_stream(FramedRead::new(r, LdapCodec), ctx);
LdapSession::new(
actix::io::FramedWrite::new(w, LdapCodec, ctx),
actor.qe_r.clone(),
)
});
})
.map_err(|_| {
eprintln!("invalid tls handshake");
})
});
Box::new(f)
async fn acceptor(mut listener: TcpListener, qe_r_ref: &'static QueryServerReadV1) {
loop {
match listener.accept().await {
Ok((tcpstream, paddr)) => {
let (r, w) = tokio::io::split(tcpstream);
let r = FramedRead::new(r, LdapCodec);
let w = FramedWrite::new(w, LdapCodec);
// Let it rip.
tokio::spawn(client_process(r, w, paddr, qe_r_ref));
}
Err(e) => {
error!("acceptor error, continuing -> {:?}", e);
}
}
}
}
pub(crate) async fn create_ldap_server(
address: &str,
opt_tls_params: Option<SslAcceptorBuilder>,
qe_r: Addr<QueryServerReadV1>,
qe_r_ref: &'static QueryServerReadV1,
) -> Result<(), ()> {
let addr = net::SocketAddr::from_str(address).map_err(|e| {
eprintln!("Could not parse ldap server address {} -> {:?}", address, e);
})?;
let listener = Box::new(TcpListener::bind(&addr).await.map_err(|e| {
let listener = TcpListener::bind(&addr).await.map_err(|e| {
eprintln!(
"Could not bind to ldap server address {} -> {:?}",
address, e
);
})?);
})?;
match opt_tls_params {
Some(tls_params) => {
info!("Starting LDAPS interface ldaps://{} ...", address);
LdapServer::create(move |ctx| {
let acceptor = Box::new(tls_params.build());
let lacceptor = Box::leak(acceptor) as &'static _;
ctx.add_message_stream(Box::leak(listener).incoming().map(move |st| {
#[allow(clippy::expect_used)]
let st = st.expect("Failed to access TCP stream");
#[allow(clippy::expect_used)]
let addr = st.peer_addr().expect("Failed to access peer adddress");
TlsConnect(lacceptor, st, addr)
}));
LdapServer { qe_r }
});
let tls_parms = tls_params.build();
tokio::spawn(tls_acceptor(listener, tls_parms, qe_r_ref));
}
None => {
info!("Starting LDAP interface ldap://{} ...", address);
LdapServer::create(move |ctx| {
ctx.add_message_stream(Box::leak(listener).incoming().map(|st| {
#[allow(clippy::expect_used)]
let st = st.expect("Failed to access TCP stream");
#[allow(clippy::expect_used)]
let addr = st.peer_addr().expect("Failed to access peer adddress");
TcpConnect(st, addr)
}));
LdapServer { qe_r }
});
tokio::spawn(acceptor(listener, qe_r_ref));
}
}

File diff suppressed because it is too large Load diff

View file

@ -23,7 +23,6 @@ use crate::actors::v1_write::{CreateMessage, DeleteMessage, ModifyMessage};
// Bring in schematransaction trait for validate
// use crate::schema::SchemaTransaction;
use actix::prelude::*;
use ldap3_server::simple::LdapFilter;
use std::collections::BTreeSet;
use uuid::Uuid;
@ -1085,10 +1084,6 @@ pub struct PurgeTombstoneEvent {
pub eventid: Uuid,
}
impl Message for PurgeTombstoneEvent {
type Result = ();
}
impl PurgeTombstoneEvent {
pub fn new() -> Self {
PurgeTombstoneEvent {
@ -1104,10 +1099,6 @@ pub struct PurgeRecycledEvent {
pub eventid: Uuid,
}
impl Message for PurgeRecycledEvent {
type Result = ();
}
impl PurgeRecycledEvent {
pub fn new() -> Self {
PurgeRecycledEvent {
@ -1128,10 +1119,6 @@ pub struct ReviveRecycledEvent {
// It will be duplicated into the modify event as it exists.
}
impl Message for ReviveRecycledEvent {
type Result = ();
}
impl ReviveRecycledEvent {
pub fn from_parts(
audit: &mut AuditScope,

View file

@ -210,16 +210,12 @@ impl Account {
None => {
match &self.primary {
// Check the cred's associated pw.
Some(ref primary) => {
primary.password.as_ref()
.ok_or(OperationError::InvalidState)
.and_then(|pw| {
pw.verify(cleartext)
})
}
None => {
Err(OperationError::InvalidState)
}
Some(ref primary) => primary
.password
.as_ref()
.ok_or(OperationError::InvalidState)
.and_then(|pw| pw.verify(cleartext)),
None => Err(OperationError::InvalidState),
}
} // no appid
}

View file

@ -83,9 +83,8 @@ impl CredHandler {
pw: &Password,
who: Uuid,
cleartext: &str,
async_tx: &Sender<DelayedAction>
)
{
async_tx: &Sender<DelayedAction>,
) {
if pw.requires_upgrade() {
if let Err(_e) = async_tx.send(DelayedAction::PwUpgrade(PasswordUpgrade {
target_uuid: who,
@ -97,10 +96,7 @@ impl CredHandler {
}
}
fn validate_anonymous(
au: &mut AuditScope,
creds: &[AuthCredential],
) -> CredState {
fn validate_anonymous(au: &mut AuditScope, creds: &[AuthCredential]) -> CredState {
creds.iter().fold(
CredState::Continue(vec![AuthAllowed::Anonymous]),
|acc, cred| {
@ -295,8 +291,12 @@ impl CredHandler {
CredState::Denied("authentication denied")
}
CredHandler::Anonymous => Self::validate_anonymous(au, creds),
CredHandler::Password(ref mut pw) => Self::validate_password(au, creds, pw, who, async_tx),
CredHandler::TOTPPassword(ref mut pw_totp) => Self::validate_totp_password(au, creds, ts, pw_totp, who, async_tx),
CredHandler::Password(ref mut pw) => {
Self::validate_password(au, creds, pw, who, async_tx)
}
CredHandler::TOTPPassword(ref mut pw_totp) => {
Self::validate_totp_password(au, creds, ts, pw_totp, who, async_tx)
}
}
}
@ -409,7 +409,10 @@ impl AuthSession {
return Ok(AuthState::Denied(BAD_CREDENTIALS.to_string()));
}
match self.handler.validate(au, creds, time, self.account.uuid, async_tx) {
match self
.handler
.validate(au, creds, time, self.account.uuid, async_tx)
{
CredState::Success(claims) => {
lsecurity!(au, "Successful cred handling");
self.finished = true;
@ -631,7 +634,12 @@ mod tests {
// check send anon (fail)
{
let mut session = AuthSession::new(&mut audit, account.clone(), None);
match session.validate_creds(&mut audit, &vec![AuthCredential::Anonymous], &ts, &async_tx) {
match session.validate_creds(
&mut audit,
&vec![AuthCredential::Anonymous],
&ts,
&async_tx,
) {
Ok(AuthState::Denied(msg)) => assert!(msg == BAD_AUTH_TYPE_MSG),
_ => panic!(),
};
@ -647,12 +655,17 @@ mod tests {
&mut audit,
&vec![AuthCredential::Password(pw_bad.to_string())],
&ts,
&async_tx
&async_tx,
) {
Ok(AuthState::Continue(cont)) => assert!(cont == vec![AuthAllowed::TOTP]),
_ => panic!(),
};
match session.validate_creds(&mut audit, &vec![AuthCredential::TOTP(totp_good)], &ts, &async_tx) {
match session.validate_creds(
&mut audit,
&vec![AuthCredential::TOTP(totp_good)],
&ts,
&async_tx,
) {
Ok(AuthState::Denied(msg)) => assert!(msg == BAD_PASSWORD_MSG),
_ => panic!(),
};
@ -665,12 +678,17 @@ mod tests {
&mut audit,
&vec![AuthCredential::Password(pw_bad.to_string())],
&ts,
&async_tx
&async_tx,
) {
Ok(AuthState::Continue(cont)) => assert!(cont == vec![AuthAllowed::TOTP]),
_ => panic!(),
};
match session.validate_creds(&mut audit, &vec![AuthCredential::TOTP(totp_bad)], &ts, &async_tx) {
match session.validate_creds(
&mut audit,
&vec![AuthCredential::TOTP(totp_bad)],
&ts,
&async_tx,
) {
Ok(AuthState::Denied(msg)) => assert!(msg == BAD_TOTP_MSG),
_ => panic!(),
};
@ -684,12 +702,17 @@ mod tests {
&mut audit,
&vec![AuthCredential::Password(pw_good.to_string())],
&ts,
&async_tx
&async_tx,
) {
Ok(AuthState::Continue(cont)) => assert!(cont == vec![AuthAllowed::TOTP]),
_ => panic!(),
};
match session.validate_creds(&mut audit, &vec![AuthCredential::TOTP(totp_good)], &ts, &async_tx) {
match session.validate_creds(
&mut audit,
&vec![AuthCredential::TOTP(totp_good)],
&ts,
&async_tx,
) {
Ok(AuthState::Success(_)) => {}
_ => panic!(),
};
@ -703,12 +726,17 @@ mod tests {
&mut audit,
&vec![AuthCredential::Password(pw_good.to_string())],
&ts,
&async_tx
&async_tx,
) {
Ok(AuthState::Continue(cont)) => assert!(cont == vec![AuthAllowed::TOTP]),
_ => panic!(),
};
match session.validate_creds(&mut audit, &vec![AuthCredential::TOTP(totp_bad)], &ts, &async_tx) {
match session.validate_creds(
&mut audit,
&vec![AuthCredential::TOTP(totp_bad)],
&ts,
&async_tx,
) {
Ok(AuthState::Denied(msg)) => assert!(msg == BAD_TOTP_MSG),
_ => panic!(),
};
@ -717,7 +745,12 @@ mod tests {
// check send bad totp, should fail immediate
{
let mut session = AuthSession::new(&mut audit, account.clone(), None);
match session.validate_creds(&mut audit, &vec![AuthCredential::TOTP(totp_bad)], &ts, &async_tx) {
match session.validate_creds(
&mut audit,
&vec![AuthCredential::TOTP(totp_bad)],
&ts,
&async_tx,
) {
Ok(AuthState::Denied(msg)) => assert!(msg == BAD_TOTP_MSG),
_ => panic!(),
};
@ -727,7 +760,12 @@ mod tests {
// then bad pw, fail pw
{
let mut session = AuthSession::new(&mut audit, account.clone(), None);
match session.validate_creds(&mut audit, &vec![AuthCredential::TOTP(totp_good)], &ts, &async_tx) {
match session.validate_creds(
&mut audit,
&vec![AuthCredential::TOTP(totp_good)],
&ts,
&async_tx,
) {
Ok(AuthState::Continue(cont)) => assert!(cont == vec![AuthAllowed::Password]),
_ => panic!(),
};
@ -735,7 +773,7 @@ mod tests {
&mut audit,
&vec![AuthCredential::Password(pw_bad.to_string())],
&ts,
&async_tx
&async_tx,
) {
Ok(AuthState::Denied(msg)) => assert!(msg == BAD_PASSWORD_MSG),
_ => panic!(),
@ -746,7 +784,12 @@ mod tests {
// then good pw, success
{
let mut session = AuthSession::new(&mut audit, account.clone(), None);
match session.validate_creds(&mut audit, &vec![AuthCredential::TOTP(totp_good)], &ts, &async_tx) {
match session.validate_creds(
&mut audit,
&vec![AuthCredential::TOTP(totp_good)],
&ts,
&async_tx,
) {
Ok(AuthState::Continue(cont)) => assert!(cont == vec![AuthAllowed::Password]),
_ => panic!(),
};
@ -754,7 +797,7 @@ mod tests {
&mut audit,
&vec![AuthCredential::Password(pw_good.to_string())],
&ts,
&async_tx
&async_tx,
) {
Ok(AuthState::Success(_)) => {}
_ => panic!(),
@ -773,7 +816,7 @@ mod tests {
AuthCredential::TOTP(totp_bad),
],
&ts,
&async_tx
&async_tx,
) {
Ok(AuthState::Denied(msg)) => assert!(msg == BAD_TOTP_MSG),
_ => panic!(),
@ -789,7 +832,7 @@ mod tests {
AuthCredential::Password(pw_bad.to_string()),
],
&ts,
&async_tx
&async_tx,
) {
Ok(AuthState::Denied(msg)) => assert!(msg == BAD_PASSWORD_MSG),
_ => panic!(),
@ -805,7 +848,7 @@ mod tests {
AuthCredential::Password(pw_good.to_string()),
],
&ts,
&async_tx
&async_tx,
) {
Ok(AuthState::Denied(msg)) => assert!(msg == BAD_TOTP_MSG),
_ => panic!(),
@ -821,7 +864,7 @@ mod tests {
AuthCredential::Password(pw_good.to_string()),
],
&ts,
&async_tx
&async_tx,
) {
Ok(AuthState::Success(_)) => {}
_ => panic!(),

View file

@ -1,7 +1,7 @@
pub(crate) mod delayed;
pub(crate) mod account;
pub(crate) mod authsession;
pub(crate) mod claim;
pub(crate) mod delayed;
pub(crate) mod event;
pub(crate) mod group;
pub(crate) mod mfareg;

View file

@ -19,6 +19,7 @@ use crate::server::{QueryServer, QueryServerTransaction, QueryServerWriteTransac
use crate::utils::{password_from_random, readable_password_from_random, uuid_from_duration, SID};
use crate::value::PartialValue;
use crate::actors::v1_write::QueryServerWriteV1;
use crate::idm::delayed::{DelayedAction, PasswordUpgrade, UnixPasswordUpgrade};
use kanidm_proto::v1::AuthState;
@ -29,13 +30,20 @@ use kanidm_proto::v1::SetCredentialResponse;
use kanidm_proto::v1::UnixGroupToken;
use kanidm_proto::v1::UnixUserToken;
use std::sync::Arc;
// use crossbeam::channel::{unbounded, Sender, Receiver, TryRecvError};
use tokio::sync::mpsc::{unbounded_channel as unbounded, UnboundedSender as Sender, UnboundedReceiver as Receiver};
#[cfg(test)]
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{
unbounded_channel as unbounded, UnboundedReceiver as Receiver, UnboundedSender as Sender,
};
use tokio::sync::{Semaphore, SemaphorePermit};
#[cfg(test)]
use async_std::task;
use concread::collections::bptree::*;
use concread::bptree::{BptreeMap, BptreeMapWriteTxn};
use rand::prelude::*;
use std::time::Duration;
use uuid::Uuid;
@ -45,6 +53,7 @@ pub struct IdmServer {
// means that limits to sessions can be easily applied and checked to
// variaous accounts, and we have a good idea of how to structure the
// in memory caches related to locking.
session_ticket: Arc<Semaphore>,
sessions: BptreeMap<Uuid, AuthSession>,
// Keep a set of inprogress mfa registrations
mfareg_sessions: BptreeMap<Uuid, MfaRegSession>,
@ -60,6 +69,7 @@ pub struct IdmServerWriteTransaction<'a> {
// Contains methods that require writes, but in the context of writing to
// the idm in memory structures (maybe the query server too). This is
// things like authentication
_session_ticket: SemaphorePermit<'a>,
sessions: BptreeMapWriteTxn<'a, Uuid, AuthSession>,
pub qs_read: QueryServerReadTransaction<'a>,
// thread/server id
@ -100,60 +110,84 @@ impl IdmServer {
// improves.
let crypto_policy = CryptoPolicy::time_target(Duration::from_millis(1));
let (async_tx, async_rx) = unbounded();
(IdmServer {
sessions: BptreeMap::new(),
mfareg_sessions: BptreeMap::new(),
qs,
crypto_policy,
async_tx
}, IdmServerDelayed {
async_rx
})
(
IdmServer {
session_ticket: Arc::new(Semaphore::new(1)),
sessions: BptreeMap::new(),
mfareg_sessions: BptreeMap::new(),
qs,
crypto_policy,
async_tx,
},
IdmServerDelayed { async_rx },
)
}
#[cfg(test)]
pub fn write(&self) -> IdmServerWriteTransaction {
task::block_on(self.write_async())
}
pub async fn write_async<'a>(&'a self) -> IdmServerWriteTransaction<'a> {
let mut sid = [0; 4];
let mut rng = StdRng::from_entropy();
rng.fill(&mut sid);
let session_ticket = self.session_ticket.acquire().await;
let qs_read = self.qs.read_async().await;
IdmServerWriteTransaction {
_session_ticket: session_ticket,
sessions: self.sessions.write(),
// qs: &self.qs,
qs_read: self.qs.read(),
qs_read,
sid,
async_tx: self.async_tx.clone(),
}
}
pub fn proxy_read(&self) -> IdmServerProxyReadTransaction {
#[cfg(test)]
pub fn proxy_read<'a>(&'a self) -> IdmServerProxyReadTransaction<'a> {
task::block_on(self.proxy_read_async())
}
pub async fn proxy_read_async<'a>(&'a self) -> IdmServerProxyReadTransaction<'a> {
IdmServerProxyReadTransaction {
qs_read: self.qs.read(),
qs_read: self.qs.read_async().await,
}
}
#[cfg(test)]
pub fn proxy_write(&self, ts: Duration) -> IdmServerProxyWriteTransaction {
task::block_on(self.proxy_write_async(ts))
}
pub async fn proxy_write_async<'a>(
&'a self,
ts: Duration,
) -> IdmServerProxyWriteTransaction<'a> {
let mut sid = [0; 4];
let mut rng = StdRng::from_entropy();
rng.fill(&mut sid);
let qs_write = self.qs.write_async(ts).await;
IdmServerProxyWriteTransaction {
mfareg_sessions: self.mfareg_sessions.write(),
qs_write: self.qs.write(ts),
qs_write,
sid,
crypto_policy: &self.crypto_policy,
}
}
pub(crate) fn delayed_action(&self,
#[cfg(test)]
pub(crate) async fn delayed_action(
&self,
au: &mut AuditScope,
ts: Duration,
da: DelayedAction,
) -> Result<bool, OperationError> {
let mut pw = self.proxy_write(ts);
let mut pw = self.proxy_write_async(ts).await;
pw.process_delayedaction(au, da)
.and_then(|_| {
pw.commit(au)
})
.and_then(|_| pw.commit(au))
.map(|()| true)
}
}
@ -166,19 +200,17 @@ impl IdmServerDelayed {
#[cfg(test)]
pub(crate) fn try_recv(&mut self) -> Result<DelayedAction, OperationError> {
self.async_rx.try_recv().map_err(|e|
match e {
TryRecvError::Empty => OperationError::InvalidState,
TryRecvError::Closed => OperationError::QueueDisconnected,
}
)
self.async_rx.try_recv().map_err(|e| match e {
TryRecvError::Empty => OperationError::InvalidState,
TryRecvError::Closed => OperationError::QueueDisconnected,
})
}
pub(crate) async fn temp_drop_all(&mut self) {
pub(crate) async fn process_all(&mut self, server: &'static QueryServerWriteV1) {
loop {
match self.async_rx.recv().await {
// Drop it
Some(_) => {},
// process it.
Some(da) => server.handle_delayedaction(da).await,
// Channel has closed
None => return,
}
@ -873,7 +905,11 @@ impl<'a> IdmServerProxyWriteTransaction<'a> {
// if yes, gen the pw mod and apply.
if same {
let modlist = account
.gen_password_mod(pwu.existing_password.as_str(), &pwu.appid, self.crypto_policy)
.gen_password_mod(
pwu.existing_password.as_str(),
&pwu.appid,
self.crypto_policy,
)
.map_err(|e| {
ladmin_error!(au, "Unable to generate password mod {:?}", e);
e
@ -882,7 +918,8 @@ impl<'a> IdmServerProxyWriteTransaction<'a> {
self.qs_write.internal_modify(
au,
&filter_all!(f_eq("uuid", PartialValue::new_uuidr(&pwu.target_uuid))),
&modlist)
&modlist,
)
} else {
// No action needed, it's probably been changed/updated already.
Ok(())
@ -918,22 +955,21 @@ impl<'a> IdmServerProxyWriteTransaction<'a> {
self.qs_write.internal_modify(
au,
&filter_all!(f_eq("uuid", PartialValue::new_uuidr(&pwu.target_uuid))),
&modlist)
&modlist,
)
} else {
Ok(())
}
}
fn process_delayedaction(
pub(crate) fn process_delayedaction(
&mut self,
au: &mut AuditScope,
da: DelayedAction,
) -> Result<(), OperationError> {
match da {
DelayedAction::PwUpgrade(pwu) =>
self.process_pwupgrade(au, pwu),
DelayedAction::UnixPwUpgrade(upwu) =>
self.process_unixpwupgrade(au, upwu),
DelayedAction::PwUpgrade(pwu) => self.process_pwupgrade(au, pwu),
DelayedAction::UnixPwUpgrade(upwu) => self.process_unixpwupgrade(au, upwu),
}
}
@ -975,8 +1011,9 @@ mod tests {
// , IdmServerDelayed;
use crate::server::QueryServer;
use crate::utils::duration_from_epoch_now;
use std::time::Duration;
use async_std::task;
use std::convert::TryFrom;
use std::time::Duration;
use uuid::Uuid;
const TEST_PASSWORD: &'static str = "ntaoeuntnaoeuhraohuercahu😍";
@ -986,8 +1023,10 @@ mod tests {
#[test]
fn test_idm_anonymous_auth() {
run_idm_test!(|_qs: &QueryServer, idms: &IdmServer, _idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
run_idm_test!(|_qs: &QueryServer,
idms: &IdmServer,
_idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
let sid = {
// Start and test anonymous auth.
let mut idms_write = idms.write();
@ -1073,7 +1112,10 @@ mod tests {
// Test sending anonymous but with no session init.
#[test]
fn test_idm_anonymous_auth_invalid_states() {
run_idm_test!(|_qs: &QueryServer, idms: &IdmServer, _idms_delayed: &IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|_qs: &QueryServer,
idms: &IdmServer,
_idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
{
let mut idms_write = idms.write();
let sid = Uuid::new_v4();
@ -1144,47 +1186,48 @@ mod tests {
sessionid
}
fn check_admin_password(
idms: &IdmServer, au: &mut AuditScope, pw: &str
) {
let sid = init_admin_authsession_sid(idms, au);
fn check_admin_password(idms: &IdmServer, au: &mut AuditScope, pw: &str) {
let sid = init_admin_authsession_sid(idms, au);
let mut idms_write = idms.write();
let anon_step = AuthEvent::cred_step_password(sid, pw);
let mut idms_write = idms.write();
let anon_step = AuthEvent::cred_step_password(sid, pw);
// Expect success
let r2 = idms_write.auth(au, &anon_step, Duration::from_secs(TEST_CURRENT_TIME));
debug!("r2 ==> {:?}", r2);
// Expect success
let r2 = idms_write.auth(au, &anon_step, Duration::from_secs(TEST_CURRENT_TIME));
debug!("r2 ==> {:?}", r2);
match r2 {
Ok(ar) => {
let AuthResult {
sessionid: _,
state,
} = ar;
match state {
AuthState::Success(_uat) => {
// Check the uat.
}
_ => {
error!("A critical error has occured! We have a non-succcess result!");
panic!();
}
match r2 {
Ok(ar) => {
let AuthResult {
sessionid: _,
state,
} = ar;
match state {
AuthState::Success(_uat) => {
// Check the uat.
}
_ => {
error!("A critical error has occured! We have a non-succcess result!");
panic!();
}
}
Err(e) => {
error!("A critical error has occured! {:?}", e);
// Should not occur!
panic!();
}
};
}
Err(e) => {
error!("A critical error has occured! {:?}", e);
// Should not occur!
panic!();
}
};
idms_write.commit(au).expect("Must not fail");
idms_write.commit(au).expect("Must not fail");
}
#[test]
fn test_idm_simple_password_auth() {
run_idm_test!(|qs: &QueryServer, idms: &IdmServer, _idms_delayed: &IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|qs: &QueryServer,
idms: &IdmServer,
_idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
init_admin_w_password(au, qs, TEST_PASSWORD).expect("Failed to setup admin account");
check_admin_password(idms, au, TEST_PASSWORD);
})
@ -1192,7 +1235,10 @@ mod tests {
#[test]
fn test_idm_simple_password_spn_auth() {
run_idm_test!(|qs: &QueryServer, idms: &IdmServer, _idms_delayed: &IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|qs: &QueryServer,
idms: &IdmServer,
_idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
init_admin_w_password(au, qs, TEST_PASSWORD).expect("Failed to setup admin account");
let mut idms_write = idms.write();
let admin_init = AuthEvent::named_init("admin@example.com");
@ -1249,7 +1295,10 @@ mod tests {
#[test]
fn test_idm_simple_password_invalid() {
run_idm_test!(|qs: &QueryServer, idms: &IdmServer, _idms_delayed: &IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|qs: &QueryServer,
idms: &IdmServer,
_idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
init_admin_w_password(au, qs, TEST_PASSWORD).expect("Failed to setup admin account");
let sid = init_admin_authsession_sid(idms, au);
let mut idms_write = idms.write();
@ -1288,7 +1337,10 @@ mod tests {
#[test]
fn test_idm_simple_password_reset() {
run_idm_test!(|_qs: &QueryServer, idms: &IdmServer, _idms_delayed: &IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|_qs: &QueryServer,
idms: &IdmServer,
_idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
let pce = PasswordChangeEvent::new_internal(&UUID_ADMIN, TEST_PASSWORD, None);
let mut idms_prox_write = idms.proxy_write(duration_from_epoch_now());
@ -1300,7 +1352,10 @@ mod tests {
#[test]
fn test_idm_anonymous_set_password_denied() {
run_idm_test!(|_qs: &QueryServer, idms: &IdmServer, _idms_delayed: &IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|_qs: &QueryServer,
idms: &IdmServer,
_idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
let pce = PasswordChangeEvent::new_internal(&UUID_ANONYMOUS, TEST_PASSWORD, None);
let mut idms_prox_write = idms.proxy_write(duration_from_epoch_now());
@ -1311,7 +1366,10 @@ mod tests {
#[test]
fn test_idm_session_expire() {
run_idm_test!(|qs: &QueryServer, idms: &IdmServer, _idms_delayed: &IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|qs: &QueryServer,
idms: &IdmServer,
_idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
init_admin_w_password(au, qs, TEST_PASSWORD).expect("Failed to setup admin account");
let sid = init_admin_authsession_sid(idms, au);
let mut idms_write = idms.write();
@ -1330,7 +1388,10 @@ mod tests {
#[test]
fn test_idm_regenerate_radius_secret() {
run_idm_test!(|_qs: &QueryServer, idms: &IdmServer, _idms_delayed: &IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|_qs: &QueryServer,
idms: &IdmServer,
_idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
let mut idms_prox_write = idms.proxy_write(duration_from_epoch_now());
let rrse = RegenerateRadiusSecretEvent::new_internal(UUID_ADMIN.clone());
@ -1348,7 +1409,10 @@ mod tests {
#[test]
fn test_idm_radiusauthtoken() {
run_idm_test!(|_qs: &QueryServer, idms: &IdmServer, _idms_delayed: &IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|_qs: &QueryServer,
idms: &IdmServer,
_idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
let mut idms_prox_write = idms.proxy_write(duration_from_epoch_now());
let rrse = RegenerateRadiusSecretEvent::new_internal(UUID_ADMIN.clone());
let r1 = idms_prox_write
@ -1369,7 +1433,10 @@ mod tests {
#[test]
fn test_idm_simple_password_reject_weak() {
run_idm_test!(|_qs: &QueryServer, idms: &IdmServer, _idms_delayed: &IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|_qs: &QueryServer,
idms: &IdmServer,
_idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
// len check
let mut idms_prox_write = idms.proxy_write(duration_from_epoch_now());
@ -1402,7 +1469,10 @@ mod tests {
#[test]
fn test_idm_unixusertoken() {
run_idm_test!(|_qs: &QueryServer, idms: &IdmServer, _idms_delayed: &IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|_qs: &QueryServer,
idms: &IdmServer,
_idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
let idms_prox_write = idms.proxy_write(duration_from_epoch_now());
// Modify admin to have posixaccount
let me_posix = unsafe {
@ -1474,7 +1544,10 @@ mod tests {
#[test]
fn test_idm_simple_unix_password_reset() {
run_idm_test!(|_qs: &QueryServer, idms: &IdmServer, _idms_delayed: &IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|_qs: &QueryServer,
idms: &IdmServer,
_idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
let mut idms_prox_write = idms.proxy_write(duration_from_epoch_now());
// make the admin a valid posix account
let me_posix = unsafe {
@ -1536,7 +1609,10 @@ mod tests {
#[test]
fn test_idm_totp_registration() {
run_idm_test!(|_qs: &QueryServer, idms: &IdmServer, _idms_delayed: &IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|_qs: &QueryServer,
idms: &IdmServer,
_idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
let ct = duration_from_epoch_now();
let expire = Duration::from_secs(ct.as_secs() + MFAREG_SESSION_TIMEOUT + 2);
let mut idms_prox_write = idms.proxy_write(ct.clone());
@ -1667,7 +1743,10 @@ mod tests {
#[test]
fn test_idm_simple_password_upgrade() {
run_idm_test!(|qs: &QueryServer, idms: &IdmServer, idms_delayed: &mut IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|qs: &QueryServer,
idms: &IdmServer,
idms_delayed: &mut IdmServerDelayed,
au: &mut AuditScope| {
// Assert the delayed action queue is empty
idms_delayed.is_empty_or_panic();
// Setup the admin w_ an imported password.
@ -1693,7 +1772,8 @@ mod tests {
check_admin_password(idms, au, "password");
// process it.
let da = idms_delayed.try_recv().expect("invalid");
assert!(Ok(true) == idms.delayed_action(au, duration_from_epoch_now(), da));
let r = task::block_on(idms.delayed_action(au, duration_from_epoch_now(), da));
assert!(Ok(true) == r);
// Check the admin pw still matches
check_admin_password(idms, au, "password");
// No delayed action was queued.
@ -1703,11 +1783,14 @@ mod tests {
#[test]
fn test_idm_unix_password_upgrade() {
run_idm_test!(|qs: &QueryServer, idms: &IdmServer, idms_delayed: &mut IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|_qs: &QueryServer,
idms: &IdmServer,
idms_delayed: &mut IdmServerDelayed,
au: &mut AuditScope| {
// Assert the delayed action queue is empty
idms_delayed.is_empty_or_panic();
// Setup the admin with an imported unix pw.
let mut idms_prox_write = idms.proxy_write(duration_from_epoch_now());
let idms_prox_write = idms.proxy_write(duration_from_epoch_now());
let im_pw = "{SSHA512}JwrSUHkI7FTAfHRVR6KoFlSN0E3dmaQWARjZ+/UsShYlENOqDtFVU77HJLLrY2MuSp0jve52+pwtdVl2QUAHukQ0XUf5LDtM";
let pw = Password::try_from(im_pw).expect("failed to parse");
@ -1739,7 +1822,7 @@ mod tests {
// The upgrade was queued
// Process it.
let da = idms_delayed.try_recv().expect("invalid");
assert!(Ok(true) == idms.delayed_action(au, duration_from_epoch_now(), da));
let _r = task::block_on(idms.delayed_action(au, duration_from_epoch_now(), da));
// Go again
let mut idms_write = idms.write();
let a2 = idms_write.auth_unix(au, &uuae, Duration::from_secs(TEST_CURRENT_TIME));

View file

@ -175,12 +175,13 @@ impl UnixUserAccount {
// TODO #59: Is the cred locked?
// is the cred some or none?
match &self.cred {
Some(cred) => match &cred.password {
Some(pw) => {
if pw.verify(cleartext)? {
lsecurity!(au, "Successful unix cred handling");
if pw.requires_upgrade() {
async_tx.send(
Some(cred) => {
match &cred.password {
Some(pw) => {
if pw.verify(cleartext)? {
lsecurity!(au, "Successful unix cred handling");
if pw.requires_upgrade() {
async_tx.send(
DelayedAction::UnixPwUpgrade(UnixPasswordUpgrade {
target_uuid: self.uuid,
existing_password: cleartext.to_string(),
@ -189,23 +190,24 @@ impl UnixUserAccount {
ladmin_error!(au, "failed to queue delayed action - unix password upgrade");
OperationError::InvalidState
})?;
}
}
Some(self.to_unixusertoken()).transpose()
} else {
// Failed to auth
lsecurity!(au, "Failed unix cred handling (denied)");
Ok(None)
Some(self.to_unixusertoken()).transpose()
} else {
// Failed to auth
lsecurity!(au, "Failed unix cred handling (denied)");
Ok(None)
}
}
// We have a cred but it's not a password, that's weird
None => {
lsecurity!(au, "Invalid unix cred request");
Err(OperationError::InvalidAccountState(
"non-password cred type?".to_string(),
))
}
}
// We have a cred but it's not a password, that's weird
None => {
lsecurity!(au, "Invalid unix cred request");
Err(OperationError::InvalidAccountState(
"non-password cred type?".to_string(),
))
}
},
}
// They don't have a unix cred, fail the auth.
None => {
lsecurity!(au, "Failed unix cred handling (no cred present)");
@ -214,22 +216,13 @@ impl UnixUserAccount {
}
}
pub(crate) fn check_existing_pw(
&self,
cleartext: &str,
) -> Result<bool, OperationError> {
pub(crate) fn check_existing_pw(&self, cleartext: &str) -> Result<bool, OperationError> {
match &self.cred {
Some(cred) => match &cred.password {
Some(pw) => {
pw.verify(cleartext)
}
None => {
Err(OperationError::InvalidState)
}
}
None => {
Err(OperationError::InvalidState)
}
Some(pw) => pw.verify(cleartext),
None => Err(OperationError::InvalidState),
},
None => Err(OperationError::InvalidState),
}
}
}

View file

@ -1,43 +1,24 @@
use actix::prelude::*;
use std::time::Duration;
use crate::actors::v1_write::QueryServerWriteV1;
use crate::constants::PURGE_FREQUENCY;
use crate::event::{PurgeRecycledEvent, PurgeTombstoneEvent};
pub struct IntervalActor {
// Store any addresses we require
server: actix::Addr<QueryServerWriteV1>,
}
use tokio::time::{interval, Duration};
pub struct IntervalActor;
impl IntervalActor {
pub fn new(server: actix::Addr<QueryServerWriteV1>) -> Self {
IntervalActor { server }
}
// Define new events here
fn purge_tombstones(&mut self) {
// Make a purge request ...
let pe = PurgeTombstoneEvent::new();
self.server.do_send(pe)
}
fn purge_recycled(&mut self) {
let pe = PurgeRecycledEvent::new();
self.server.do_send(pe)
}
}
impl Actor for IntervalActor {
type Context = actix::Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
// TODO #65: This timeout could be configurable from config?
ctx.run_interval(Duration::from_secs(PURGE_FREQUENCY), move |act, _ctx| {
act.purge_recycled();
});
ctx.run_interval(Duration::from_secs(PURGE_FREQUENCY), move |act, _ctx| {
act.purge_tombstones();
pub fn start(server: &'static QueryServerWriteV1) {
tokio::spawn(async move {
let mut inter = interval(Duration::from_secs(PURGE_FREQUENCY));
loop {
inter.tick().await;
server
.handle_purgetombstoneevent(PurgeTombstoneEvent::new())
.await;
server
.handle_purgerecycledevent(PurgeRecycledEvent::new())
.await;
}
});
}
}

View file

@ -4,15 +4,15 @@ use crate::event::SearchEvent;
use crate::idm::event::LdapAuthEvent;
use crate::idm::server::IdmServer;
use crate::server::QueryServerTransaction;
use async_std::task;
use kanidm_proto::v1::{OperationError, UserAuthToken};
use ldap3_server::simple::*;
use regex::Regex;
use std::collections::BTreeSet;
use std::iter;
use std::time::SystemTime;
use uuid::Uuid;
use regex::Regex;
// Clippy doesn't like Bind here. But proto needs unboxed ldapmsg,
// and ldapboundtoken is moved. Really, it's not too bad, every message here is pretty sucky.
#[allow(clippy::large_enum_variant)]
@ -42,7 +42,7 @@ pub struct LdapServer {
impl LdapServer {
pub fn new(au: &mut AuditScope, idms: &IdmServer) -> Result<Self, OperationError> {
let idms_prox_read = idms.proxy_read();
let idms_prox_read = task::block_on(idms.proxy_read_async());
// This is the rootdse path.
// get the domain_info item
let domain_entry = idms_prox_read
@ -100,7 +100,7 @@ impl LdapServer {
})
}
fn do_search(
async fn do_search(
&self,
au: &mut AuditScope,
idms: &IdmServer,
@ -202,9 +202,9 @@ impl LdapServer {
ladmin_info!(au, "LDAP Search Request Attrs -> {:?}", attrs);
let idm_read = idms.proxy_read_async().await;
lperf_segment!(au, "ldap::do_search<core>", || {
// Now start the txn - we need it for resolving filter components.
let idm_read = idms.proxy_read();
// join the filter, with ext_filter
let lfilter = match ext_filter {
@ -288,7 +288,7 @@ impl LdapServer {
}
}
fn do_bind(
async fn do_bind(
&self,
au: &mut AuditScope,
idms: &IdmServer,
@ -300,7 +300,7 @@ impl LdapServer {
"Attempt LDAP Bind for {}",
if dn == "" { "anonymous" } else { dn }
);
let mut idm_write = idms.write();
let mut idm_write = idms.write_async().await;
let target_uuid: Uuid = if dn == "" {
if pw == "" {
@ -357,7 +357,7 @@ impl LdapServer {
})
}
pub fn do_op(
pub async fn do_op(
&self,
au: &mut AuditScope,
idms: &IdmServer,
@ -368,6 +368,7 @@ impl LdapServer {
match server_op {
ServerOps::SimpleBind(sbr) => self
.do_bind(au, idms, sbr.dn.as_str(), sbr.pw.as_str())
.await
.map(|r| match r {
Some(lbt) => LdapResponseState::Bind(lbt, sbr.gen_success()),
None => LdapResponseState::Respond(sbr.gen_invalid_cred()),
@ -379,6 +380,7 @@ impl LdapServer {
ServerOps::Search(sr) => match uat {
Some(u) => self
.do_search(au, idms, &sr, &u)
.await
.map(LdapResponseState::MultiPartResponse)
.or_else(|e| {
let (rc, msg) = operationerr_to_ldapresultcode(e);
@ -386,7 +388,7 @@ impl LdapServer {
}),
None => {
// Search can occur without a bind, so bind first.
let lbt = match self.do_bind(au, idms, "", "") {
let lbt = match self.do_bind(au, idms, "", "").await {
Ok(Some(lbt)) => lbt,
Ok(None) => {
return Ok(LdapResponseState::Respond(
@ -400,6 +402,7 @@ impl LdapServer {
};
// If okay, do the search.
self.do_search(au, idms, &sr, &lbt)
.await
.map(|r| LdapResponseState::BindMultiPartResponse(lbt, r))
.or_else(|e| {
let (rc, msg) = operationerr_to_ldapresultcode(e);
@ -485,12 +488,16 @@ mod tests {
// use crate::utils::duration_from_epoch_now;
// use uuid::Uuid;
use crate::ldap::LdapServer;
use async_std::task;
const TEST_PASSWORD: &'static str = "ntaoeuntnaoeuhraohuercahu😍";
#[test]
fn test_ldap_simple_bind() {
run_idm_test!(|_qs: &QueryServer, idms: &IdmServer, _idms_delayed: &IdmServerDelayed, au: &mut AuditScope| {
run_idm_test!(|_qs: &QueryServer,
idms: &IdmServer,
_idms_delayed: &IdmServerDelayed,
au: &mut AuditScope| {
let ldaps = LdapServer::new(au, idms).expect("failed to start ldap");
let mut idms_prox_write = idms.proxy_write(duration_from_epoch_now());
@ -511,127 +518,129 @@ mod tests {
assert!(idms_prox_write.set_unix_account_password(au, &pce).is_ok());
assert!(idms_prox_write.commit(au).is_ok());
let anon_t = ldaps.do_bind(au, idms, "", "").unwrap().unwrap();
let anon_t = task::block_on(ldaps.do_bind(au, idms, "", ""))
.unwrap()
.unwrap();
assert!(anon_t.uuid == *UUID_ANONYMOUS);
assert!(ldaps.do_bind(au, idms, "", "test").unwrap().is_none());
assert!(task::block_on(ldaps.do_bind(au, idms, "", "test"))
.unwrap()
.is_none());
// Bad password
assert!(ldaps.do_bind(au, idms, "admin", "test").unwrap().is_none());
assert!(task::block_on(ldaps.do_bind(au, idms, "admin", "test"))
.unwrap()
.is_none());
// Now test the admin and various DN's
let admin_t = ldaps
.do_bind(au, idms, "admin", TEST_PASSWORD)
let admin_t = task::block_on(ldaps.do_bind(au, idms, "admin", TEST_PASSWORD))
.unwrap()
.unwrap();
assert!(admin_t.uuid == *UUID_ADMIN);
let admin_t = ldaps
.do_bind(au, idms, "admin@example.com", TEST_PASSWORD)
let admin_t =
task::block_on(ldaps.do_bind(au, idms, "admin@example.com", TEST_PASSWORD))
.unwrap()
.unwrap();
assert!(admin_t.uuid == *UUID_ADMIN);
let admin_t = task::block_on(ldaps.do_bind(au, idms, STR_UUID_ADMIN, TEST_PASSWORD))
.unwrap()
.unwrap();
assert!(admin_t.uuid == *UUID_ADMIN);
let admin_t = ldaps
.do_bind(au, idms, STR_UUID_ADMIN, TEST_PASSWORD)
.unwrap()
.unwrap();
let admin_t = task::block_on(ldaps.do_bind(
au,
idms,
"name=admin,dc=example,dc=com",
TEST_PASSWORD,
))
.unwrap()
.unwrap();
assert!(admin_t.uuid == *UUID_ADMIN);
let admin_t = ldaps
.do_bind(au, idms, "name=admin,dc=example,dc=com", TEST_PASSWORD)
.unwrap()
.unwrap();
let admin_t = task::block_on(ldaps.do_bind(
au,
idms,
"spn=admin@example.com,dc=example,dc=com",
TEST_PASSWORD,
))
.unwrap()
.unwrap();
assert!(admin_t.uuid == *UUID_ADMIN);
let admin_t = ldaps
.do_bind(
au,
idms,
"spn=admin@example.com,dc=example,dc=com",
TEST_PASSWORD,
)
.unwrap()
.unwrap();
assert!(admin_t.uuid == *UUID_ADMIN);
let admin_t = ldaps
.do_bind(
au,
idms,
format!("uuid={},dc=example,dc=com", STR_UUID_ADMIN).as_str(),
TEST_PASSWORD,
)
.unwrap()
.unwrap();
let admin_t = task::block_on(ldaps.do_bind(
au,
idms,
format!("uuid={},dc=example,dc=com", STR_UUID_ADMIN).as_str(),
TEST_PASSWORD,
))
.unwrap()
.unwrap();
assert!(admin_t.uuid == *UUID_ADMIN);
let admin_t = ldaps
.do_bind(au, idms, "name=admin", TEST_PASSWORD)
let admin_t = task::block_on(ldaps.do_bind(au, idms, "name=admin", TEST_PASSWORD))
.unwrap()
.unwrap();
assert!(admin_t.uuid == *UUID_ADMIN);
let admin_t = ldaps
.do_bind(au, idms, "spn=admin@example.com", TEST_PASSWORD)
.unwrap()
.unwrap();
let admin_t =
task::block_on(ldaps.do_bind(au, idms, "spn=admin@example.com", TEST_PASSWORD))
.unwrap()
.unwrap();
assert!(admin_t.uuid == *UUID_ADMIN);
let admin_t = ldaps
.do_bind(
au,
idms,
format!("uuid={}", STR_UUID_ADMIN).as_str(),
TEST_PASSWORD,
)
.unwrap()
.unwrap();
let admin_t = task::block_on(ldaps.do_bind(
au,
idms,
format!("uuid={}", STR_UUID_ADMIN).as_str(),
TEST_PASSWORD,
))
.unwrap()
.unwrap();
assert!(admin_t.uuid == *UUID_ADMIN);
let admin_t = ldaps
.do_bind(au, idms, "admin,dc=example,dc=com", TEST_PASSWORD)
.unwrap()
.unwrap();
let admin_t =
task::block_on(ldaps.do_bind(au, idms, "admin,dc=example,dc=com", TEST_PASSWORD))
.unwrap()
.unwrap();
assert!(admin_t.uuid == *UUID_ADMIN);
let admin_t = ldaps
.do_bind(
au,
idms,
"admin@example.com,dc=example,dc=com",
TEST_PASSWORD,
)
.unwrap()
.unwrap();
let admin_t = task::block_on(ldaps.do_bind(
au,
idms,
"admin@example.com,dc=example,dc=com",
TEST_PASSWORD,
))
.unwrap()
.unwrap();
assert!(admin_t.uuid == *UUID_ADMIN);
let admin_t = ldaps
.do_bind(
au,
idms,
format!("{},dc=example,dc=com", STR_UUID_ADMIN).as_str(),
TEST_PASSWORD,
)
.unwrap()
.unwrap();
let admin_t = task::block_on(ldaps.do_bind(
au,
idms,
format!("{},dc=example,dc=com", STR_UUID_ADMIN).as_str(),
TEST_PASSWORD,
))
.unwrap()
.unwrap();
assert!(admin_t.uuid == *UUID_ADMIN);
// Non-existant and invalid DNs
assert!(ldaps
.do_bind(
au,
idms,
"spn=admin@example.com,dc=clownshoes,dc=example,dc=com",
TEST_PASSWORD
)
.is_err());
assert!(ldaps
.do_bind(
au,
idms,
"spn=claire@example.com,dc=example,dc=com",
TEST_PASSWORD
)
.is_err());
assert!(ldaps
.do_bind(au, idms, ",dc=example,dc=com", TEST_PASSWORD)
.is_err());
assert!(ldaps
.do_bind(au, idms, "dc=example,dc=com", TEST_PASSWORD)
.is_err());
assert!(task::block_on(ldaps.do_bind(
au,
idms,
"spn=admin@example.com,dc=clownshoes,dc=example,dc=com",
TEST_PASSWORD
))
.is_err());
assert!(task::block_on(ldaps.do_bind(
au,
idms,
"spn=claire@example.com,dc=example,dc=com",
TEST_PASSWORD
))
.is_err());
assert!(
task::block_on(ldaps.do_bind(au, idms, ",dc=example,dc=com", TEST_PASSWORD))
.is_err()
);
assert!(
task::block_on(ldaps.do_bind(au, idms, "dc=example,dc=com", TEST_PASSWORD))
.is_err()
);
assert!(ldaps.do_bind(au, idms, "claire", "test").is_err());
assert!(task::block_on(ldaps.do_bind(au, idms, "claire", "test")).is_err());
})
}
}

View file

@ -1,4 +1,4 @@
// #![deny(warnings)]
#![deny(warnings)]
#![warn(unused_extern_crates)]
#![deny(clippy::unwrap_used)]
#![deny(clippy::expect_used)]

View file

@ -19,7 +19,7 @@ macro_rules! run_test_no_init {
let schema_outer = Schema::new(&mut audit).expect("Failed to init schema");
let idxmeta = {
let schema_txn = schema_outer.write();
let schema_txn = schema_outer.write_blocking();
schema_txn.reload_idxmeta()
};
let be = match Backend::new(&mut audit, "", 1, FsType::Generic, idxmeta) {
@ -63,7 +63,7 @@ macro_rules! run_test {
let schema_outer = Schema::new(&mut audit).expect("Failed to init schema");
let idxmeta = {
let schema_txn = schema_outer.write();
let schema_txn = schema_outer.write_blocking();
schema_txn.reload_idxmeta()
};
let be = match Backend::new(&mut audit, "", 1, FsType::Generic, idxmeta) {
@ -134,7 +134,7 @@ macro_rules! run_idm_test {
let schema_outer = Schema::new(&mut audit).expect("Failed to init schema");
let idxmeta = {
let schema_txn = schema_outer.write();
let schema_txn = schema_outer.write_blocking();
schema_txn.reload_idxmeta()
};
let be =
@ -147,7 +147,12 @@ macro_rules! run_idm_test {
let (test_idm_server, mut idms_delayed) = IdmServer::new(test_server.clone());
$test_fn(&test_server, &test_idm_server, &mut idms_delayed, &mut audit);
$test_fn(
&test_server,
&test_idm_server,
&mut idms_delayed,
&mut audit,
);
// Any needed teardown?
// Make sure there are no errors.
assert!(test_server.verify(&mut audit).len() == 0);

View file

@ -16,7 +16,7 @@ macro_rules! setup_test {
// Create an in memory BE
let schema_outer = Schema::new($au).expect("Failed to init schema");
let idxmeta = {
let schema_txn = schema_outer.write();
let schema_txn = schema_outer.write_blocking();
schema_txn.reload_idxmeta()
};
let be = Backend::new($au, "", 1, FsType::Generic, idxmeta).expect("Failed to init BE");

View file

@ -29,6 +29,7 @@ use std::borrow::Borrow;
use std::collections::BTreeSet;
use uuid::Uuid;
// use concread::cowcell::asynch::*;
use concread::cowcell::*;
// representations of schema that confines object types, classes
@ -1409,6 +1410,7 @@ impl Schema {
unique_cache: CowCell::new(Vec::new()),
ref_cache: CowCell::new(HashMap::with_capacity(64)),
};
// let mut sw = task::block_on(s.write());
let mut sw = s.write();
let r1 = sw.generate_in_memory(audit);
debug_assert!(r1.is_ok());
@ -1427,7 +1429,7 @@ impl Schema {
}
}
pub fn write(&self) -> SchemaWriteTransaction {
pub fn write<'a>(&'a self) -> SchemaWriteTransaction<'a> {
SchemaWriteTransaction {
classes: self.classes.write(),
attributes: self.attributes.write(),
@ -1435,6 +1437,27 @@ impl Schema {
ref_cache: self.ref_cache.write(),
}
}
#[cfg(test)]
pub fn write_blocking<'a>(&'a self) -> SchemaWriteTransaction<'a> {
self.write()
}
/*
pub async fn write<'a>(&'a self) -> SchemaWriteTransaction<'a> {
SchemaWriteTransaction {
classes: self.classes.write().await,
attributes: self.attributes.write().await,
unique_cache: self.unique_cache.write().await,
ref_cache: self.ref_cache.write().await,
}
}
#[cfg(test)]
pub fn write_blocking<'a>(&'a self) -> SchemaWriteTransaction<'a> {
task::block_on(self.write())
}
*/
}
#[cfg(test)]
@ -2016,7 +2039,7 @@ mod tests {
// Check that entries can be normalised and validated sanely
let mut audit = AuditScope::new("test_schema_entry_validate", uuid::Uuid::new_v4(), None);
let schema_outer = Schema::new(&mut audit).expect("failed to create schema");
let schema = schema_outer.write();
let schema = schema_outer.write_blocking();
// Check syntax to upper
// check index to upper
@ -2197,7 +2220,7 @@ mod tests {
None,
);
let schema_outer = Schema::new(&mut audit).expect("failed to create schema");
let mut schema = schema_outer.write();
let mut schema = schema_outer.write_blocking();
assert!(schema.validate(&mut audit).len() == 0);

View file

@ -3,12 +3,13 @@
// This is really only used for long lived, high level types that need clone
// that otherwise can't be cloned. Think Mutex.
// use actix::prelude::*;
use async_std::task;
use hashbrown::HashMap;
use std::cell::Cell;
use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Semaphore, SemaphorePermit};
use uuid::Uuid;
use crate::audit::AuditScope;
@ -56,6 +57,42 @@ lazy_static! {
static ref PVACP_ENABLE_FALSE: PartialValue = PartialValue::new_bool(false);
}
#[derive(Clone)]
pub struct QueryServer {
s_uuid: Uuid,
d_uuid: Uuid,
be: Backend,
schema: Arc<Schema>,
accesscontrols: Arc<AccessControls>,
db_tickets: Arc<Semaphore>,
write_ticket: Arc<Semaphore>,
}
pub struct QueryServerReadTransaction<'a> {
be_txn: BackendReadTransaction<'a>,
// Anything else? In the future, we'll need to have a schema transaction
// type, maybe others?
schema: SchemaReadTransaction,
accesscontrols: AccessControlsReadTransaction,
_db_ticket: SemaphorePermit<'a>,
}
pub struct QueryServerWriteTransaction<'a> {
committed: bool,
d_uuid: Uuid,
cid: Cid,
be_txn: BackendWriteTransaction<'a>,
schema: SchemaWriteTransaction<'a>,
accesscontrols: AccessControlsWriteTransaction<'a>,
// We store a set of flags that indicate we need a reload of
// schema or acp, which is tested by checking the classes of the
// changing content.
changed_schema: Cell<bool>,
changed_acp: Cell<bool>,
_db_ticket: SemaphorePermit<'a>,
_write_ticket: SemaphorePermit<'a>,
}
// This is the core of the server. It implements all
// the search and modify actions, applies access controls
// and get's everything ready to push back to the fe code
@ -589,14 +626,6 @@ pub trait QueryServerTransaction {
}
}
pub struct QueryServerReadTransaction<'a> {
be_txn: BackendReadTransaction<'a>,
// Anything else? In the future, we'll need to have a schema transaction
// type, maybe others?
schema: SchemaReadTransaction,
accesscontrols: AccessControlsReadTransaction,
}
// Actually conduct a search request
// This is the core of the server, as it processes the entire event
// applies all parts required in order and more.
@ -661,20 +690,6 @@ impl<'a> QueryServerReadTransaction<'a> {
}
}
pub struct QueryServerWriteTransaction<'a> {
committed: bool,
d_uuid: Uuid,
cid: Cid,
be_txn: BackendWriteTransaction<'a>,
schema: SchemaWriteTransaction<'a>,
accesscontrols: AccessControlsWriteTransaction<'a>,
// We store a set of flags that indicate we need a reload of
// schema or acp, which is tested by checking the classes of the
// changing content.
changed_schema: Cell<bool>,
changed_acp: Cell<bool>,
}
impl<'a> QueryServerTransaction for QueryServerWriteTransaction<'a> {
type BackendTransactionType = BackendWriteTransaction<'a>;
@ -700,16 +715,6 @@ struct QueryServerMeta {
pub max_cid: Cid,
}
#[derive(Clone)]
pub struct QueryServer {
// log: actix::Addr<EventLog>,
s_uuid: Uuid,
d_uuid: Uuid,
be: Backend,
schema: Arc<Schema>,
accesscontrols: Arc<AccessControls>,
}
impl QueryServer {
pub fn new(be: Backend, schema: Schema) -> Self {
let (s_uuid, d_uuid) = {
@ -717,8 +722,11 @@ impl QueryServer {
(wr.get_db_s_uuid(), wr.get_db_d_uuid())
};
let pool_size = be.get_pool_size();
info!("Server ID -> {:?}", s_uuid);
info!("Domain ID -> {:?}", d_uuid);
info!("DB tickets -> {:?}", pool_size);
// log_event!(log, "Starting query worker ...");
QueryServer {
s_uuid,
@ -726,19 +734,41 @@ impl QueryServer {
be,
schema: Arc::new(schema),
accesscontrols: Arc::new(AccessControls::new()),
db_tickets: Arc::new(Semaphore::new(pool_size)),
write_ticket: Arc::new(Semaphore::new(1)),
}
}
#[cfg(test)]
pub fn read(&self) -> QueryServerReadTransaction {
task::block_on(self.read_async())
}
pub async fn read_async<'a>(&'a self) -> QueryServerReadTransaction<'a> {
// We need to ensure a db conn will be available
let db_ticket = self.db_tickets.acquire().await;
QueryServerReadTransaction {
be_txn: self.be.read(),
schema: self.schema.read(),
accesscontrols: self.accesscontrols.read(),
_db_ticket: db_ticket,
}
}
#[cfg(test)]
pub fn write(&self, ts: Duration) -> QueryServerWriteTransaction {
// Feed the current schema index metadata to the be write transaction.
task::block_on(self.write_async(ts))
}
pub async fn write_async<'a>(&'a self, ts: Duration) -> QueryServerWriteTransaction<'a> {
// We need to ensure a db conn will be available
let db_ticket = self.db_tickets.acquire().await;
// Guarantee we are the only writer on the thread pool
let write_ticket = self.write_ticket.acquire().await;
// let schema_write = self.schema.write().await;
let schema_write = self.schema.write();
let be_txn = self.be.write();
@ -761,6 +791,8 @@ impl QueryServer {
accesscontrols: self.accesscontrols.write(),
changed_schema: Cell::new(false),
changed_acp: Cell::new(false),
_db_ticket: db_ticket,
_write_ticket: write_ticket,
}
}
@ -780,7 +812,7 @@ impl QueryServer {
// reloading to occur, which causes the idxmeta to update, and allows validation
// of the schema in the subsequent steps as we proceed.
let reindex_write_1 = self.write(ts);
let reindex_write_1 = task::block_on(self.write_async(ts));
reindex_write_1
.upgrade_reindex(audit, SYSTEM_INDEX_VERSION)
.and_then(|_| reindex_write_1.commit(audit))?;
@ -794,19 +826,19 @@ impl QueryServer {
// the schema to tell us what's indexed), but because we have the in
// mem schema that defines how schema is structuded, and this is all
// marked "system", then we won't have an issue here.
let ts_write_1 = self.write(ts);
let ts_write_1 = task::block_on(self.write_async(ts));
ts_write_1
.initialise_schema_core(audit)
.and_then(|_| ts_write_1.commit(audit))?;
let ts_write_2 = self.write(ts);
let ts_write_2 = task::block_on(self.write_async(ts));
ts_write_2
.initialise_schema_idm(audit)
.and_then(|_| ts_write_2.commit(audit))?;
// reindex and set to version + 1, this way when we bump the version
// we are essetially pushing this version id back up to step write_1
let reindex_write_2 = self.write(ts);
let reindex_write_2 = task::block_on(self.write_async(ts));
reindex_write_2
.upgrade_reindex(audit, SYSTEM_INDEX_VERSION + 1)
.and_then(|_| reindex_write_2.commit(audit))?;
@ -819,7 +851,7 @@ impl QueryServer {
// the indexing subsystem is schema/value agnostic - the fact the values still let their keys
// be extracted, means that the pres indexes will be valid even though the entries are pending
// migration. We must be sure to NOT use EQ/SUB indexes in the migration code however!
let migrate_txn = self.write(ts);
let migrate_txn = task::block_on(self.write_async(ts));
// If we are "in the process of being setup" this is 0, and the migrations will have no
// effect as ... there is nothing to migrate! It allows reset of the version to 0 to force
// db migrations to take place.
@ -837,7 +869,7 @@ impl QueryServer {
migrate_txn.commit(audit)?;
// Migrations complete. Init idm will now set the version as needed.
let ts_write_3 = self.write(ts);
let ts_write_3 = task::block_on(self.write_async(ts));
ts_write_3
.initialise_idm(audit)
.and_then(|_| ts_write_3.commit(audit))?;
@ -847,7 +879,7 @@ impl QueryServer {
}
pub fn verify(&self, au: &mut AuditScope) -> Vec<Result<(), ConsistencyError>> {
let r_txn = self.read();
let r_txn = task::block_on(self.read_async());
r_txn.verify(au)
}
}

View file

@ -1,44 +1,31 @@
use crate::audit::AuditScope;
use actix::prelude::*;
use crossbeam::channel::Sender;
use tokio::sync::mpsc::UnboundedSender as Sender;
use uuid::Uuid;
pub struct StatusActor {
log_tx: Sender<Option<AuditScope>>,
log_level: Option<u32>,
}
impl StatusActor {
pub fn start(
log_tx: Sender<Option<AuditScope>>,
log_level: Option<u32>,
) -> actix::Addr<StatusActor> {
SyncArbiter::start(1, move || StatusActor {
log_tx: log_tx.clone(),
log_level,
})
}
}
impl Actor for StatusActor {
type Context = SyncContext<Self>;
}
pub struct StatusRequestEvent {
pub eventid: Uuid,
}
impl Message for StatusRequestEvent {
type Result = bool;
pub struct StatusActor {
log_tx: Sender<AuditScope>,
log_level: Option<u32>,
}
impl Handler<StatusRequestEvent> for StatusActor {
type Result = bool;
impl StatusActor {
pub fn start(log_tx: Sender<AuditScope>, log_level: Option<u32>) -> &'static Self {
let x = Box::new(StatusActor {
log_tx: log_tx.clone(),
log_level,
});
fn handle(&mut self, event: StatusRequestEvent, _ctx: &mut SyncContext<Self>) -> Self::Result {
let x_ptr = Box::into_raw(x);
unsafe { &(*x_ptr) }
}
pub async fn handle_request(&self, event: StatusRequestEvent) -> bool {
let mut audit = AuditScope::new("status_handler", event.eventid, self.log_level);
ladmin_info!(&mut audit, "status handler");
self.log_tx.send(Some(audit)).unwrap_or_else(|_| {
ladmin_info!(&mut audit, "status handler complete");
self.log_tx.send(audit).unwrap_or_else(|_| {
error!("CRITICAL: UNABLE TO COMMIT LOGS");
});
true

View file

@ -153,7 +153,7 @@ fn read_file_metadata(path: &PathBuf) -> Metadata {
}
}
#[actix_rt::main]
#[tokio::main]
async fn main() {
// Get info about who we are.
let cuid = get_current_uid();
@ -295,16 +295,23 @@ async fn main() {
Opt::Server(_sopt) => {
eprintln!("Running in server mode ...");
/*
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
*/
let sctx = create_server_core(config).await;
match sctx {
Ok(sctx) => match tokio::signal::ctrl_c().await {
Ok(_sctx) => match tokio::signal::ctrl_c().await {
Ok(_) => {
eprintln!("Ctrl-C received, shutting down");
sctx.stop()
// sctx.stop(true).await;
}
Err(_) => {
eprintln!("Invalid signal received, shutting down as a precaution ...");
sctx.stop()
// sctx.stop(true).await;
}
},
Err(_) => {