From 48979b8e1a50c191492b570c71afb20de967611a Mon Sep 17 00:00:00 2001 From: James Hodgkinson Date: Sat, 7 Oct 2023 13:09:42 +1000 Subject: [PATCH] Replication tweaks - try the most recent successful one and error less (#2189) * made an error less error-y and also found a way to try the last-most-working repl peer --- book/src/authentication.md | 11 ++- server/core/src/https/mod.rs | 2 +- server/core/src/repl/mod.rs | 159 ++++++++++++++++++++--------------- server/lib/src/filter.rs | 4 +- 4 files changed, 101 insertions(+), 75 deletions(-) diff --git a/book/src/authentication.md b/book/src/authentication.md index 516f6bd33..984370736 100644 --- a/book/src/authentication.md +++ b/book/src/authentication.md @@ -115,8 +115,15 @@ kanidm login --name demo_user kanidm self whoami --name demo_user ``` -{{#template templates/kani-warning.md imagepath=images title=Warning! text=Don't use the direct -credential reset to lock or invalidate an account. You should expire the account instead. }} + + +{{#template templates/kani-warning.md +imagepath=images +title=Warning! +text=Don't use the direct credential reset to lock or invalidate an account. You should expire the account instead. +}} + + ## Reauthentication / Privilege Access Mode diff --git a/server/core/src/https/mod.rs b/server/core/src/https/mod.rs index 4a2987777..7c0871745 100644 --- a/server/core/src/https/mod.rs +++ b/server/core/src/https/mod.rs @@ -392,7 +392,7 @@ pub(crate) async fn handle_conn( .serve_connection(tls_stream, svc) .await .map_err(|e| { - error!("Failed to complete connection: {:?}", e); + debug!("Failed to complete connection: {:?}", e); std::io::Error::from(ErrorKind::ConnectionAborted) }) } diff --git a/server/core/src/repl/mod.rs b/server/core/src/repl/mod.rs index aa9257019..eeff12ea3 100644 --- a/server/core/src/repl/mod.rs +++ b/server/core/src/repl/mod.rs @@ -8,12 +8,15 @@ use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use tokio::net::{TcpListener, TcpStream}; use tokio::sync::broadcast; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::Mutex; use tokio::time::{interval, sleep, timeout}; +use tokio::{ + net::{TcpListener, TcpStream}, + task::JoinHandle, +}; use tokio_openssl::SslStream; use tokio_util::codec::{Framed, FramedRead, FramedWrite}; use tracing::{error, Instrument}; @@ -77,7 +80,7 @@ pub(crate) async fn create_repl_server( "Starting replication interface https://{} ...", repl_config.bindaddress ); - let repl_handle = tokio::spawn(repl_acceptor( + let repl_handle: JoinHandle<()> = tokio::spawn(repl_acceptor( listener, idms, repl_config.clone(), @@ -90,15 +93,22 @@ pub(crate) async fn create_repl_server( } #[instrument(level = "info", skip_all)] +/// This returns the remote address that worked, so you can try that first next time async fn repl_consumer_connect_supplier( domain: &str, sock_addrs: &[SocketAddr], tls_connector: &SslConnector, consumer_conn_settings: &ConsumerConnSettings, -) -> Option, codec::ConsumerCodec>> { +) -> Option<( + SocketAddr, + Framed, codec::ConsumerCodec>, +)> { // This is pretty gnarly, but we need to loop to try out each socket addr. for sock_addr in sock_addrs { - debug!("Connecting to {} replica via {}", domain, sock_addr); + debug!( + "Attempting to connect to {} replica via {}", + domain, sock_addr + ); let tcpstream = match timeout( consumer_conn_settings.replica_connect_timeout, @@ -108,29 +118,29 @@ async fn repl_consumer_connect_supplier( { Ok(Ok(tc)) => tc, Ok(Err(err)) => { - error!(?err, "Failed to connect to {}", sock_addr); + debug!(?err, "Failed to connect to {}", sock_addr); continue; } Err(_) => { - error!("Timeout connecting to {}", sock_addr); + debug!("Timeout connecting to {}", sock_addr); continue; } }; - trace!("connection established"); + trace!("Connection established to peer on {:?}", sock_addr); let mut tlsstream = match Ssl::new(tls_connector.context()) .and_then(|tls_obj| SslStream::new(tls_obj, tcpstream)) { Ok(ta) => ta, Err(e) => { - error!("replication client TLS setup error, continuing -> {:?}", e); + error!("Replication client TLS setup error, continuing -> {:?}", e); continue; } }; if let Err(e) = SslStream::connect(Pin::new(&mut tlsstream)).await { - error!("replication client TLS accept error, continuing -> {:?}", e); + error!("Replication client TLS accept error, continuing -> {:?}", e); continue; }; @@ -138,14 +148,19 @@ async fn repl_consumer_connect_supplier( tlsstream, codec::ConsumerCodec::new(consumer_conn_settings.max_frame_bytes), ); - - return Some(supplier_conn); + // "hey this one worked, try it first next time!" + return Some((sock_addr.to_owned(), supplier_conn)); } - error!("Unable to connect to supplier."); + error!( + "Unable to connect to supplier, tried to connect to {:?}", + sock_addrs + ); None } +/// This returns the socket address that worked, so you can try that first next time +#[instrument(level="info", skip(refresh_coord, tls_connector, idms), fields(uuid=Uuid::new_v4().to_string()))] async fn repl_run_consumer_refresh( refresh_coord: Arc)>>, domain: &str, @@ -153,7 +168,7 @@ async fn repl_run_consumer_refresh( tls_connector: &SslConnector, idms: &IdmServer, consumer_conn_settings: &ConsumerConnSettings, -) { +) -> Result, ()> { // Take the refresh lock. Note that every replication consumer *should* end up here // behind this lock, but only one can proceed. This is what we want! @@ -161,44 +176,37 @@ async fn repl_run_consumer_refresh( // Simple case - task is already done. if refresh_coord_guard.0 { - trace!("refresh already completed by another task, return."); - return; + trace!("Refresh already completed by another task, return."); + return Ok(None); } // okay, we need to proceed. - let Some(mut supplier_conn) = + let (addr, mut supplier_conn) = repl_consumer_connect_supplier(domain, sock_addrs, tls_connector, consumer_conn_settings) .await - else { - return; - }; + .ok_or(())?; - // If we fail at anypoint, just RETURN because this leaves the next task to attempt, or + // If we fail at any point, just RETURN because this leaves the next task to attempt, or // the channel drops and that tells the caller this failed. - - if let Err(err) = supplier_conn.send(ConsumerRequest::Refresh).await { - error!(?err, "consumer encode error, unable to continue."); - return; - } + supplier_conn + .send(ConsumerRequest::Refresh) + .await + .map_err(|err| error!(?err, "consumer encode error, unable to continue."))?; let refresh = if let Some(codec_msg) = supplier_conn.next().await { - match codec_msg { - Ok(SupplierResponse::Refresh(changes)) => { + match codec_msg.map_err(|err| error!(?err, "Consumer decode error, unable to continue."))? { + SupplierResponse::Refresh(changes) => { // Success - return to bypass the error message. changes } - Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Incremental(_)) => { + SupplierResponse::Pong | SupplierResponse::Incremental(_) => { error!("Supplier Response contains invalid State"); - return; - } - Err(err) => { - error!(?err, "consumer decode error, unable to continue."); - return; + return Err(()); } } } else { error!("Connection closed"); - return; + return Err(()); }; // Now apply the refresh if possible @@ -206,14 +214,11 @@ async fn repl_run_consumer_refresh( // Scope the transaction. let ct = duration_from_epoch_now(); let mut write_txn = idms.proxy_write(ct).await; - if let Err(err) = write_txn + write_txn .qs_write .consumer_apply_refresh(&refresh) .and_then(|cs| write_txn.commit().map(|()| cs)) - { - error!(?err, "consumer was not able to apply refresh."); - return; - } + .map_err(|err| error!(?err, "Consumer was not able to apply refresh."))?; } // Now mark the refresh as complete AND indicate it to the channel. @@ -224,9 +229,11 @@ async fn repl_run_consumer_refresh( // Here the coord guard will drop and every other task proceeds. - warn!("Replication refresh was successful."); + info!("Replication refresh was successful."); + Ok(Some(addr)) } +#[instrument(level="info", skip(tls_connector, idms), fields(eventid=Uuid::new_v4().to_string()))] async fn repl_run_consumer( domain: &str, sock_addrs: &[SocketAddr], @@ -234,12 +241,12 @@ async fn repl_run_consumer( automatic_refresh: bool, idms: &IdmServer, consumer_conn_settings: &ConsumerConnSettings, -) { - let Some(mut supplier_conn) = +) -> Option { + let Some((socket_addr, mut supplier_conn)) = repl_consumer_connect_supplier(domain, sock_addrs, tls_connector, consumer_conn_settings) .await else { - return; + return None; }; // Perform incremental. @@ -252,7 +259,7 @@ async fn repl_run_consumer( ?err, "consumer ruv range could not be accessed, unable to continue." ); - return; + return None; } } }; @@ -262,7 +269,7 @@ async fn repl_run_consumer( .await { error!(?err, "consumer encode error, unable to continue."); - return; + return None; } let changes = if let Some(codec_msg) = supplier_conn.next().await { @@ -273,16 +280,16 @@ async fn repl_run_consumer( } Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Refresh(_)) => { error!("Supplier Response contains invalid State"); - return; + return None; } Err(err) => { error!(?err, "consumer decode error, unable to continue."); - return; + return None; } } } else { error!("Connection closed"); - return; + return None; }; // Now apply the changes if possible @@ -297,7 +304,7 @@ async fn repl_run_consumer( Ok(state) => state, Err(err) => { error!(?err, "consumer was not able to apply changes."); - return; + return None; } } }; @@ -306,21 +313,21 @@ async fn repl_run_consumer( ConsumerState::Ok => { info!("Incremental Replication Success"); // return to bypass the failure message. - return; + return Some(socket_addr); } ConsumerState::RefreshRequired => { if automatic_refresh { warn!("Consumer is out of date and must be refreshed. This will happen *now*."); } else { error!("Consumer is out of date and must be refreshed. You must manually resolve this situation."); - return; + return None; }; } } if let Err(err) = supplier_conn.send(ConsumerRequest::Refresh).await { error!(?err, "consumer encode error, unable to continue."); - return; + return None; } let refresh = if let Some(codec_msg) = supplier_conn.next().await { @@ -331,16 +338,16 @@ async fn repl_run_consumer( } Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Incremental(_)) => { error!("Supplier Response contains invalid State"); - return; + return None; } Err(err) => { error!(?err, "consumer decode error, unable to continue."); - return; + return None; } } } else { error!("Connection closed"); - return; + return None; }; // Now apply the refresh if possible @@ -352,10 +359,11 @@ async fn repl_run_consumer( .and_then(|cs| write_txn.commit().map(|()| cs)) { error!(?err, "consumer was not able to apply refresh."); - return; + return None; } warn!("Replication refresh was successful."); + Some(socket_addr) } #[derive(Debug, Clone)] @@ -450,43 +458,54 @@ async fn repl_task( info!("Replica task for {} has started.", origin); + // we keep track of the "last known good" socketaddr so we can try that first next time. + let mut last_working_address: Option = None; + // Okay, all the parameters are setup. Now we wait on our interval. loop { + // if the target address worked last time, then let's use it this time! + let mut sorted_socket_addrs = vec![]; + + if let Some(addr) = last_working_address { + sorted_socket_addrs.push(addr); + }; + // this is O(2^n) but we *should* be talking about a small number of addresses for a given hostname + socket_addrs.iter().for_each(|addr| { + if !sorted_socket_addrs.contains(addr) { + sorted_socket_addrs.push(addr.to_owned()); + } + }); + tokio::select! { Ok(task) = task_rx.recv() => { match task { ReplConsumerCtrl::Stop => break, ReplConsumerCtrl::Refresh ( refresh_coord ) => { - let eventid = Uuid::new_v4(); - let span = info_span!("replication_run_consumer_refresh", uuid = ?eventid); - // let _enter = span.enter(); - repl_run_consumer_refresh( + last_working_address = match repl_run_consumer_refresh( refresh_coord, domain, - &socket_addrs, + &sorted_socket_addrs, &tls_connector, &idms, &consumer_conn_settings ) - .instrument(span) - .await + .await { + Ok(val) => val, + Err(_) => None + }; } } } _ = repl_interval.tick() => { // Interval passed, attempt a replication run. - let eventid = Uuid::new_v4(); - let span = info_span!("replication_run_consumer", uuid = ?eventid); - // let _enter = span.enter(); repl_run_consumer( domain, - &socket_addrs, + &sorted_socket_addrs, &tls_connector, automatic_refresh, &idms, &consumer_conn_settings ) - .instrument(span) .await; } } @@ -697,7 +716,7 @@ async fn repl_acceptor( } => { let task_rx = task_tx.subscribe(); - let handle = tokio::spawn(repl_task( + let handle: JoinHandle<()> = tokio::spawn(repl_task( origin.clone(), server_key.clone(), server_cert.clone(), @@ -879,7 +898,7 @@ async fn repl_acceptor( let _ = task_tx.send(ReplConsumerCtrl::Stop); for task_handle in task_handles.drain(..) { // Let each task join. - let res: Result<(), _> = task_handle.await; + let res: Result<(), _> = task_handle.await.map(|_| ()); if res.is_err() { warn!("Failed to join replication task, continuing ..."); } diff --git a/server/lib/src/filter.rs b/server/lib/src/filter.rs index 5847e8f83..aca54e35b 100644 --- a/server/lib/src/filter.rs +++ b/server/lib/src/filter.rs @@ -661,7 +661,7 @@ impl FilterComp { // * If all filters are okay, return Ok(Filter::Or()) // * Any filter is invalid, return the error. // * An empty "or" is a valid filter in mathematical terms, but we throw an - // error to warn the user because it's super unlikey they want that + // error to warn the user because it's super unlikely they want that if filters.is_empty() { return Err(SchemaError::EmptyFilter); }; @@ -676,7 +676,7 @@ impl FilterComp { // * If all filters are okay, return Ok(Filter::Or()) // * Any filter is invalid, return the error. // * An empty "and" is a valid filter in mathematical terms, but we throw an - // error to warn the user because it's super unlikey they want that + // error to warn the user because it's super unlikely they want that if filters.is_empty() { return Err(SchemaError::EmptyFilter); };