Add signal trapping for a variety of signals ()

This commit is contained in:
Firstyear 2022-11-23 20:10:43 +10:00 committed by GitHub
parent 98766661a3
commit a9f5a219be
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 288 additions and 116 deletions
kanidmd
core/src
daemon/src
testkit-macros/src
testkit/src

View file

@ -28,6 +28,9 @@ use crate::actors::v1_read::QueryServerReadV1;
use crate::actors::v1_write::QueryServerWriteV1;
use crate::config::{ServerRole, TlsConfiguration};
use crate::CoreAction;
use tokio::sync::broadcast;
#[derive(Clone)]
pub struct JavaScriptFile {
// Relative to the pkg/ dir
@ -326,7 +329,8 @@ pub fn create_https_server(
status_ref: &'static StatusActor,
qe_w_ref: &'static QueryServerWriteV1,
qe_r_ref: &'static QueryServerReadV1,
) -> Result<(), ()> {
mut rx: broadcast::Receiver<CoreAction>,
) -> Result<tokio::task::JoinHandle<()>, ()> {
let jws_validator = jws_signer.get_validator().map_err(|e| {
error!(?e, "Failed to get jws validator");
})?;
@ -847,7 +851,7 @@ pub fn create_https_server(
// === End routes
// Create listener?
match opt_tls_params {
let handle = match opt_tls_params {
Some(tls_param) => {
let tlsl = TlsListener::build()
.addrs(&address)
@ -864,25 +868,45 @@ pub fn create_https_server(
*/
tokio::spawn(async move {
if let Err(e) = tserver.listen(tlsl).await {
error!(
"Failed to start server listener on address {:?} -> {:?}",
&address, e
);
}
});
tokio::select! {
Ok(action) = rx.recv() => {
match action {
CoreAction::Shutdown => {},
}
}
server_result = tserver.listen(tlsl) => {
if let Err(e) = server_result {
error!(
"Failed to start server listener on address {:?} -> {:?}",
&address, e
);
}
}
};
info!("Stopped HTTPSAcceptorActor");
})
}
None => {
// Create without https
tokio::spawn(async move {
if let Err(e) = tserver.listen(&address).await {
error!(
"Failed to start server listener on address {:?} -> {:?}",
&address, e,
);
tokio::select! {
Ok(action) = rx.recv() => {
match action {
CoreAction::Shutdown => {},
}
}
server_result = tserver.listen(&address) => {
if let Err(e) = server_result {
error!(
"Failed to start server listener on address {:?} -> {:?}",
&address, e
);
}
}
}
});
info!("Stopped HTTPAcceptorActor");
})
}
};
Ok(())
Ok(handle)
}

View file

@ -7,9 +7,11 @@ use std::path::Path;
use chrono::Utc;
use saffron::parse::{CronExpr, English};
use saffron::Cron;
use tokio::sync::broadcast;
use tokio::time::{interval, sleep, Duration};
use crate::config::OnlineBackup;
use crate::CoreAction;
use crate::actors::v1_read::QueryServerReadV1;
use crate::actors::v1_write::QueryServerWriteV1;
@ -19,19 +21,33 @@ use kanidmd_lib::event::{OnlineBackupEvent, PurgeRecycledEvent, PurgeTombstoneEv
pub struct IntervalActor;
impl IntervalActor {
pub fn start(server: &'static QueryServerWriteV1) {
pub fn start(
server: &'static QueryServerWriteV1,
mut rx: broadcast::Receiver<CoreAction>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut inter = interval(Duration::from_secs(PURGE_FREQUENCY));
loop {
inter.tick().await;
server
.handle_purgetombstoneevent(PurgeTombstoneEvent::new())
.await;
server
.handle_purgerecycledevent(PurgeRecycledEvent::new())
.await;
tokio::select! {
Ok(action) = rx.recv() => {
match action {
CoreAction::Shutdown => break,
}
}
_ = inter.tick() => {
server
.handle_purgetombstoneevent(PurgeTombstoneEvent::new())
.await;
server
.handle_purgerecycledevent(PurgeRecycledEvent::new())
.await;
}
}
}
});
info!("Stopped IntervalActor");
})
}
// Allow this because result is the only way to map and ? to bubble up, but we aren't
@ -40,7 +56,8 @@ impl IntervalActor {
pub fn start_online_backup(
server: &'static QueryServerReadV1,
cfg: &OnlineBackup,
) -> Result<(), ()> {
mut rx: broadcast::Receiver<CoreAction>,
) -> Result<tokio::task::JoinHandle<()>, ()> {
let outpath = cfg.path.to_owned();
let schedule = cfg.schedule.to_owned();
let versions = cfg.versions;
@ -86,7 +103,7 @@ impl IntervalActor {
return Err(());
}
tokio::spawn(async move {
let handle = tokio::spawn(async move {
let ct = Utc::now();
let cron = Cron::new(cron_expr.clone());
@ -100,20 +117,29 @@ impl IntervalActor {
next_time, wait_seconds
);
sleep(Duration::from_secs(wait_seconds)).await;
if let Err(e) = server
.handle_online_backup(
OnlineBackupEvent::new(),
outpath.clone().as_str(),
versions,
)
.await
{
error!(?e, "An online backup error occured.");
tokio::select! {
Ok(action) = rx.recv() => {
match action {
CoreAction::Shutdown => break,
}
}
_ = sleep(Duration::from_secs(wait_seconds)) => {
if let Err(e) = server
.handle_online_backup(
OnlineBackupEvent::new(),
outpath.clone().as_str(),
versions,
)
.await
{
error!(?e, "An online backup error occured.");
}
}
}
}
info!("Stopped OnlineBackupActor");
});
Ok(())
Ok(handle)
}
}

View file

@ -1,6 +1,5 @@
use std::marker::Unpin;
use std::net;
use std::pin::Pin;
use std::str::FromStr;
use crate::actors::v1_read::QueryServerReadV1;
@ -16,6 +15,9 @@ use tokio::net::TcpListener;
use tokio_openssl::SslStream;
use tokio_util::codec::{FramedRead, FramedWrite};
use crate::CoreAction;
use tokio::sync::broadcast;
struct LdapSession {
uat: Option<LdapBoundToken>,
}
@ -108,61 +110,54 @@ async fn tls_acceptor(
listener: TcpListener,
tls_parms: SslAcceptor,
qe_r_ref: &'static QueryServerReadV1,
mut rx: broadcast::Receiver<CoreAction>,
) {
loop {
match listener.accept().await {
Ok((tcpstream, client_socket_addr)) => {
// Start the event
// From the parms we need to create an SslContext.
let mut tlsstream = match Ssl::new(tls_parms.context())
.and_then(|tls_obj| SslStream::new(tls_obj, tcpstream))
{
Ok(ta) => ta,
Err(e) => {
error!("LDAP TLS setup error, continuing -> {:?}", e);
continue;
}
};
if let Err(e) = SslStream::accept(Pin::new(&mut tlsstream)).await {
error!("LDAP TLS accept error, continuing -> {:?}", e);
continue;
};
let (r, w) = tokio::io::split(tlsstream);
let r = FramedRead::new(r, LdapCodec);
let w = FramedWrite::new(w, LdapCodec);
tokio::spawn(client_process(r, w, client_socket_addr, qe_r_ref));
tokio::select! {
Ok(action) = rx.recv() => {
match action {
CoreAction::Shutdown => break,
}
}
Err(e) => {
error!("LDAP acceptor error, continuing -> {:?}", e);
accept_result = listener.accept() => {
match accept_result {
Ok((tcpstream, client_socket_addr)) => {
// Start the event
// From the parms we need to create an SslContext.
let mut tlsstream = match Ssl::new(tls_parms.context())
.and_then(|tls_obj| SslStream::new(tls_obj, tcpstream))
{
Ok(ta) => ta,
Err(e) => {
error!("LDAP TLS setup error, continuing -> {:?}", e);
continue;
}
};
if let Err(e) = SslStream::accept(Pin::new(&mut tlsstream)).await {
error!("LDAP TLS accept error, continuing -> {:?}", e);
continue;
};
let (r, w) = tokio::io::split(tlsstream);
let r = FramedRead::new(r, LdapCodec);
let w = FramedWrite::new(w, LdapCodec);
tokio::spawn(client_process(r, w, client_socket_addr, qe_r_ref));
}
Err(e) => {
error!("LDAP acceptor error, continuing -> {:?}", e);
}
}
}
}
}
info!("Stopped LdapAcceptorActor");
}
/// Plain TCP LDAP Listener, hands off to [client_process]
// async fn acceptor(listener: TcpListener, qe_r_ref: &'static QueryServerReadV1) {
// loop {
// match listener.accept().await {
// Ok((tcpstream, client_socket_addr)) => {
// // Start the event
// let (r, w) = tokio::io::split(tcpstream);
// let r = FramedRead::new(r, LdapCodec);
// let w = FramedWrite::new(w, LdapCodec);
// // Let it rip.
// tokio::spawn(client_process(r, w, client_socket_addr, qe_r_ref));
// }
// Err(e) => {
// error!("LDAP acceptor error, continuing -> {:?}", e);
// }
// }
// }
// }
pub(crate) async fn create_ldap_server(
address: &str,
opt_tls_params: Option<SslAcceptorBuilder>,
qe_r_ref: &'static QueryServerReadV1,
) -> Result<(), ()> {
rx: broadcast::Receiver<CoreAction>,
) -> Result<tokio::task::JoinHandle<()>, ()> {
if address.starts_with(":::") {
// takes :::xxxx to xxxx
let port = address.replacen(":::", "", 1);
@ -180,18 +175,18 @@ pub(crate) async fn create_ldap_server(
);
})?;
match opt_tls_params {
let ldap_acceptor_handle = match opt_tls_params {
Some(tls_params) => {
info!("Starting LDAPS interface ldaps://{} ...", address);
let tls_parms = tls_params.build();
tokio::spawn(tls_acceptor(listener, tls_parms, qe_r_ref));
tokio::spawn(tls_acceptor(listener, tls_parms, qe_r_ref, rx))
}
None => {
error!("The server won't run without TLS!");
return Err(());
}
}
};
info!("Created LDAP interface");
Ok(())
Ok(ldap_acceptor_handle)
}

View file

@ -47,6 +47,8 @@ use kanidmd_lib::utils::{duration_from_epoch_now, touch_file_or_quit};
#[cfg(not(target_family = "windows"))]
use libc::umask;
use tokio::sync::broadcast;
use crate::actors::v1_read::QueryServerReadV1;
use crate::actors::v1_write::QueryServerWriteV1;
use crate::config::Configuration;
@ -551,8 +553,53 @@ pub async fn recover_account_core(config: &Configuration, name: &str) {
);
}
pub async fn create_server_core(config: Configuration, config_test: bool) -> Result<(), ()> {
#[derive(Clone, Debug)]
pub enum CoreAction {
Shutdown,
}
pub struct CoreHandle {
clean_shutdown: bool,
tx: broadcast::Sender<CoreAction>,
handles: Vec<tokio::task::JoinHandle<()>>,
// interval_handle: tokio::task::JoinHandle<()>,
}
impl CoreHandle {
pub async fn shutdown(&mut self) {
if let Err(_) = self.tx.send(CoreAction::Shutdown) {
eprintln!("No receivers acked shutdown request. Treating as unclean.");
return;
}
// Wait on the handles.
while let Some(handle) = self.handles.pop() {
if let Err(_) = handle.await {
eprintln!("A task failed to join");
}
}
self.clean_shutdown = true;
}
}
impl Drop for CoreHandle {
fn drop(&mut self) {
if !self.clean_shutdown {
eprintln!("⚠️ UNCLEAN SHUTDOWN OCCURED ⚠️ ");
}
// Can't enable yet until we clean up unix_int cache layer test
// debug_assert!(self.clean_shutdown);
}
}
pub async fn create_server_core(
config: Configuration,
config_test: bool,
) -> Result<CoreHandle, ()> {
// Until this point, we probably want to write to the log macro fns.
let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
if config.integration_test_config.is_some() {
warn!("RUNNING IN INTEGRATION TEST MODE.");
@ -666,30 +713,43 @@ pub async fn create_server_core(config: Configuration, config_test: bool) -> Res
// Create the server async write entry point.
let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
tokio::spawn(async move {
let delayed_handle = tokio::spawn(async move {
loop {
match idms_delayed.next().await {
Some(da) => server_write_ref.handle_delayedaction(da).await,
// Channel has closed, stop the task.
None => return,
tokio::select! {
Ok(action) = broadcast_rx.recv() => {
match action {
CoreAction::Shutdown => break,
}
}
delayed = idms_delayed.next() => {
match delayed {
Some(da) => server_write_ref.handle_delayedaction(da).await,
// Channel has closed, stop the task.
None => break,
}
}
}
}
info!("Stopped DelayedActionActor");
});
// Setup timed events associated to the write thread
IntervalActor::start(server_write_ref);
let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
// Setup timed events associated to the read thread
match &config.online_backup {
let maybe_backup_handle = match &config.online_backup {
Some(cfg) => {
IntervalActor::start_online_backup(server_read_ref, cfg)?;
let handle =
IntervalActor::start_online_backup(server_read_ref, cfg, broadcast_tx.subscribe())?;
Some(handle)
}
None => {
debug!("Online backup not requested, skipping");
None
}
};
// If we have been requested to init LDAP, configure it now.
match &config.ldapaddress {
let maybe_ldap_acceptor_handle = match &config.ldapaddress {
Some(la) => {
let opt_ldap_tls_params = match setup_tls(&config) {
Ok(t) => t,
@ -700,14 +760,23 @@ pub async fn create_server_core(config: Configuration, config_test: bool) -> Res
};
if !config_test {
// ⚠️ only start the sockets and listeners in non-config-test modes.
ldaps::create_ldap_server(la.as_str(), opt_ldap_tls_params, server_read_ref)
.await?;
let h = ldaps::create_ldap_server(
la.as_str(),
opt_ldap_tls_params,
server_read_ref,
broadcast_tx.subscribe(),
)
.await?;
Some(h)
} else {
None
}
}
None => {
debug!("LDAP not requested, skipping");
None
}
}
};
// TODO: Remove these when we go to auth bearer!
// Copy the max size
@ -715,11 +784,12 @@ pub async fn create_server_core(config: Configuration, config_test: bool) -> Res
// domain will come from the qs now!
let cookie_key: [u8; 32] = config.cookie_key;
if config_test {
let maybe_http_acceptor_handle = if config_test {
admin_info!("this config rocks! 🪨 ");
None
} else {
// ⚠️ only start the sockets and listeners in non-config-test modes.
self::https::create_https_server(
let h = self::https::create_https_server(
config.address,
// opt_tls_params,
config.tls_config.as_ref(),
@ -730,10 +800,30 @@ pub async fn create_server_core(config: Configuration, config_test: bool) -> Res
status_ref,
server_write_ref,
server_read_ref,
broadcast_tx.subscribe(),
)?;
admin_info!("ready to rock! 🪨 ");
Some(h)
};
let mut handles = vec![interval_handle, delayed_handle];
if let Some(backup_handle) = maybe_backup_handle {
handles.push(backup_handle)
}
Ok(())
if let Some(ldap_handle) = maybe_ldap_acceptor_handle {
handles.push(ldap_handle)
}
if let Some(http_handle) = maybe_http_acceptor_handle {
handles.push(http_handle)
}
Ok(CoreHandle {
clean_shutdown: false,
tx: broadcast_tx,
handles,
})
}

View file

@ -332,14 +332,48 @@ async fn main() {
let sctx = create_server_core(config, config_test).await;
if !config_test {
match sctx {
Ok(_sctx) => match tokio::signal::ctrl_c().await {
Ok(_) => {
eprintln!("Ctrl-C received, shutting down");
Ok(mut sctx) => {
loop {
tokio::select! {
Ok(()) = tokio::signal::ctrl_c() => {
break
}
Some(()) = async move {
let sigterm = tokio::signal::unix::SignalKind::terminate();
tokio::signal::unix::signal(sigterm).unwrap().recv().await
} => {
break
}
Some(()) = async move {
let sigterm = tokio::signal::unix::SignalKind::alarm();
tokio::signal::unix::signal(sigterm).unwrap().recv().await
} => {
// Ignore
}
Some(()) = async move {
let sigterm = tokio::signal::unix::SignalKind::hangup();
tokio::signal::unix::signal(sigterm).unwrap().recv().await
} => {
// Ignore
}
Some(()) = async move {
let sigterm = tokio::signal::unix::SignalKind::user_defined1();
tokio::signal::unix::signal(sigterm).unwrap().recv().await
} => {
// Ignore
}
Some(()) = async move {
let sigterm = tokio::signal::unix::SignalKind::user_defined2();
tokio::signal::unix::signal(sigterm).unwrap().recv().await
} => {
// Ignore
}
}
}
Err(_) => {
eprintln!("Invalid signal received, shutting down as a precaution ...");
}
},
eprintln!("Signal received, shutting down");
// Send a broadcast that we are done.
sctx.shutdown().await;
}
Err(_) => {
eprintln!("Failed to start server core!");
// We may need to return an exit code here, but that may take some re-architecting
@ -347,8 +381,10 @@ async fn main() {
return;
}
}
eprintln!("stopped 🛑 ");
eprintln!("Stopped 🛑 ");
}
}
KanidmdOpt::Database {
commands: DbCommands::Backup(bopt),

View file

@ -43,8 +43,9 @@ fn parse_knobs(input: &syn::ItemFn) -> TokenStream {
#header
fn #test_driver() {
let body = async {
let rsclient = kanidmd_testkit::setup_async_test().await;
#fn_name(rsclient).await
let (rsclient, mut core_handle) = kanidmd_testkit::setup_async_test().await;
#fn_name(rsclient).await;
core_handle.shutdown().await;
};
#[allow(clippy::expect_used, clippy::diverging_sub_expression)]
{

View file

@ -15,7 +15,7 @@ use std::sync::atomic::{AtomicU16, Ordering};
use kanidm_client::{KanidmClient, KanidmClientBuilder};
use kanidmd_core::config::{Configuration, IntegrationTestConfig, ServerRole};
use kanidmd_core::create_server_core;
use kanidmd_core::{create_server_core, CoreHandle};
use tokio::task;
pub const ADMIN_TEST_USER: &str = "admin";
@ -36,7 +36,7 @@ pub fn is_free_port(port: u16) -> bool {
// allowed because the use of this function is behind a test gate
#[allow(dead_code)]
pub async fn setup_async_test() -> KanidmClient {
pub async fn setup_async_test() -> (KanidmClient, CoreHandle) {
let _ = sketching::test_init();
let mut counter = 0;
@ -71,7 +71,7 @@ pub async fn setup_async_test() -> KanidmClient {
// config.log_level = Some(LogLevel::FullTrace as u32);
config.threads = 1;
create_server_core(config, false)
let core_handle = create_server_core(config, false)
.await
.expect("failed to start server core");
// We have to yield now to guarantee that the tide elements are setup.
@ -85,5 +85,5 @@ pub async fn setup_async_test() -> KanidmClient {
tracing::info!("Testkit server setup complete - {}", addr);
rsclient
(rsclient, core_handle)
}