Replication tweaks - try the most recent successful one and error less (#2189)

* made an error less error-y and also found a way to try the last-most-working repl peer
This commit is contained in:
James Hodgkinson 2023-10-07 13:09:42 +10:00 committed by GitHub
parent 0adc3e0dd9
commit 48979b8e1a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 101 additions and 75 deletions

View file

@ -115,8 +115,15 @@ kanidm login --name demo_user
kanidm self whoami --name demo_user kanidm self whoami --name demo_user
``` ```
{{#template templates/kani-warning.md imagepath=images title=Warning! text=Don't use the direct <!-- deno-fmt-ignore-start -->
credential reset to lock or invalidate an account. You should expire the account instead. }}
{{#template templates/kani-warning.md
imagepath=images
title=Warning!
text=Don't use the direct credential reset to lock or invalidate an account. You should expire the account instead.
}}
<!-- deno-fmt-ignore-end -->
## Reauthentication / Privilege Access Mode ## Reauthentication / Privilege Access Mode

View file

@ -392,7 +392,7 @@ pub(crate) async fn handle_conn(
.serve_connection(tls_stream, svc) .serve_connection(tls_stream, svc)
.await .await
.map_err(|e| { .map_err(|e| {
error!("Failed to complete connection: {:?}", e); debug!("Failed to complete connection: {:?}", e);
std::io::Error::from(ErrorKind::ConnectionAborted) std::io::Error::from(ErrorKind::ConnectionAborted)
}) })
} }

View file

@ -8,12 +8,15 @@ use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::time::{interval, sleep, timeout}; use tokio::time::{interval, sleep, timeout};
use tokio::{
net::{TcpListener, TcpStream},
task::JoinHandle,
};
use tokio_openssl::SslStream; use tokio_openssl::SslStream;
use tokio_util::codec::{Framed, FramedRead, FramedWrite}; use tokio_util::codec::{Framed, FramedRead, FramedWrite};
use tracing::{error, Instrument}; use tracing::{error, Instrument};
@ -77,7 +80,7 @@ pub(crate) async fn create_repl_server(
"Starting replication interface https://{} ...", "Starting replication interface https://{} ...",
repl_config.bindaddress repl_config.bindaddress
); );
let repl_handle = tokio::spawn(repl_acceptor( let repl_handle: JoinHandle<()> = tokio::spawn(repl_acceptor(
listener, listener,
idms, idms,
repl_config.clone(), repl_config.clone(),
@ -90,15 +93,22 @@ pub(crate) async fn create_repl_server(
} }
#[instrument(level = "info", skip_all)] #[instrument(level = "info", skip_all)]
/// This returns the remote address that worked, so you can try that first next time
async fn repl_consumer_connect_supplier( async fn repl_consumer_connect_supplier(
domain: &str, domain: &str,
sock_addrs: &[SocketAddr], sock_addrs: &[SocketAddr],
tls_connector: &SslConnector, tls_connector: &SslConnector,
consumer_conn_settings: &ConsumerConnSettings, consumer_conn_settings: &ConsumerConnSettings,
) -> Option<Framed<SslStream<TcpStream>, codec::ConsumerCodec>> { ) -> Option<(
SocketAddr,
Framed<SslStream<TcpStream>, codec::ConsumerCodec>,
)> {
// This is pretty gnarly, but we need to loop to try out each socket addr. // This is pretty gnarly, but we need to loop to try out each socket addr.
for sock_addr in sock_addrs { for sock_addr in sock_addrs {
debug!("Connecting to {} replica via {}", domain, sock_addr); debug!(
"Attempting to connect to {} replica via {}",
domain, sock_addr
);
let tcpstream = match timeout( let tcpstream = match timeout(
consumer_conn_settings.replica_connect_timeout, consumer_conn_settings.replica_connect_timeout,
@ -108,29 +118,29 @@ async fn repl_consumer_connect_supplier(
{ {
Ok(Ok(tc)) => tc, Ok(Ok(tc)) => tc,
Ok(Err(err)) => { Ok(Err(err)) => {
error!(?err, "Failed to connect to {}", sock_addr); debug!(?err, "Failed to connect to {}", sock_addr);
continue; continue;
} }
Err(_) => { Err(_) => {
error!("Timeout connecting to {}", sock_addr); debug!("Timeout connecting to {}", sock_addr);
continue; continue;
} }
}; };
trace!("connection established"); trace!("Connection established to peer on {:?}", sock_addr);
let mut tlsstream = match Ssl::new(tls_connector.context()) let mut tlsstream = match Ssl::new(tls_connector.context())
.and_then(|tls_obj| SslStream::new(tls_obj, tcpstream)) .and_then(|tls_obj| SslStream::new(tls_obj, tcpstream))
{ {
Ok(ta) => ta, Ok(ta) => ta,
Err(e) => { Err(e) => {
error!("replication client TLS setup error, continuing -> {:?}", e); error!("Replication client TLS setup error, continuing -> {:?}", e);
continue; continue;
} }
}; };
if let Err(e) = SslStream::connect(Pin::new(&mut tlsstream)).await { if let Err(e) = SslStream::connect(Pin::new(&mut tlsstream)).await {
error!("replication client TLS accept error, continuing -> {:?}", e); error!("Replication client TLS accept error, continuing -> {:?}", e);
continue; continue;
}; };
@ -138,14 +148,19 @@ async fn repl_consumer_connect_supplier(
tlsstream, tlsstream,
codec::ConsumerCodec::new(consumer_conn_settings.max_frame_bytes), codec::ConsumerCodec::new(consumer_conn_settings.max_frame_bytes),
); );
// "hey this one worked, try it first next time!"
return Some(supplier_conn); return Some((sock_addr.to_owned(), supplier_conn));
} }
error!("Unable to connect to supplier."); error!(
"Unable to connect to supplier, tried to connect to {:?}",
sock_addrs
);
None None
} }
/// This returns the socket address that worked, so you can try that first next time
#[instrument(level="info", skip(refresh_coord, tls_connector, idms), fields(uuid=Uuid::new_v4().to_string()))]
async fn repl_run_consumer_refresh( async fn repl_run_consumer_refresh(
refresh_coord: Arc<Mutex<(bool, mpsc::Sender<()>)>>, refresh_coord: Arc<Mutex<(bool, mpsc::Sender<()>)>>,
domain: &str, domain: &str,
@ -153,7 +168,7 @@ async fn repl_run_consumer_refresh(
tls_connector: &SslConnector, tls_connector: &SslConnector,
idms: &IdmServer, idms: &IdmServer,
consumer_conn_settings: &ConsumerConnSettings, consumer_conn_settings: &ConsumerConnSettings,
) { ) -> Result<Option<SocketAddr>, ()> {
// Take the refresh lock. Note that every replication consumer *should* end up here // Take the refresh lock. Note that every replication consumer *should* end up here
// behind this lock, but only one can proceed. This is what we want! // behind this lock, but only one can proceed. This is what we want!
@ -161,44 +176,37 @@ async fn repl_run_consumer_refresh(
// Simple case - task is already done. // Simple case - task is already done.
if refresh_coord_guard.0 { if refresh_coord_guard.0 {
trace!("refresh already completed by another task, return."); trace!("Refresh already completed by another task, return.");
return; return Ok(None);
} }
// okay, we need to proceed. // okay, we need to proceed.
let Some(mut supplier_conn) = let (addr, mut supplier_conn) =
repl_consumer_connect_supplier(domain, sock_addrs, tls_connector, consumer_conn_settings) repl_consumer_connect_supplier(domain, sock_addrs, tls_connector, consumer_conn_settings)
.await .await
else { .ok_or(())?;
return;
};
// If we fail at any point, just RETURN because this leaves the next task to attempt, or // If we fail at any point, just RETURN because this leaves the next task to attempt, or
// the channel drops and that tells the caller this failed. // the channel drops and that tells the caller this failed.
supplier_conn
if let Err(err) = supplier_conn.send(ConsumerRequest::Refresh).await { .send(ConsumerRequest::Refresh)
error!(?err, "consumer encode error, unable to continue."); .await
return; .map_err(|err| error!(?err, "consumer encode error, unable to continue."))?;
}
let refresh = if let Some(codec_msg) = supplier_conn.next().await { let refresh = if let Some(codec_msg) = supplier_conn.next().await {
match codec_msg { match codec_msg.map_err(|err| error!(?err, "Consumer decode error, unable to continue."))? {
Ok(SupplierResponse::Refresh(changes)) => { SupplierResponse::Refresh(changes) => {
// Success - return to bypass the error message. // Success - return to bypass the error message.
changes changes
} }
Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Incremental(_)) => { SupplierResponse::Pong | SupplierResponse::Incremental(_) => {
error!("Supplier Response contains invalid State"); error!("Supplier Response contains invalid State");
return; return Err(());
}
Err(err) => {
error!(?err, "consumer decode error, unable to continue.");
return;
} }
} }
} else { } else {
error!("Connection closed"); error!("Connection closed");
return; return Err(());
}; };
// Now apply the refresh if possible // Now apply the refresh if possible
@ -206,14 +214,11 @@ async fn repl_run_consumer_refresh(
// Scope the transaction. // Scope the transaction.
let ct = duration_from_epoch_now(); let ct = duration_from_epoch_now();
let mut write_txn = idms.proxy_write(ct).await; let mut write_txn = idms.proxy_write(ct).await;
if let Err(err) = write_txn write_txn
.qs_write .qs_write
.consumer_apply_refresh(&refresh) .consumer_apply_refresh(&refresh)
.and_then(|cs| write_txn.commit().map(|()| cs)) .and_then(|cs| write_txn.commit().map(|()| cs))
{ .map_err(|err| error!(?err, "Consumer was not able to apply refresh."))?;
error!(?err, "consumer was not able to apply refresh.");
return;
}
} }
// Now mark the refresh as complete AND indicate it to the channel. // Now mark the refresh as complete AND indicate it to the channel.
@ -224,9 +229,11 @@ async fn repl_run_consumer_refresh(
// Here the coord guard will drop and every other task proceeds. // Here the coord guard will drop and every other task proceeds.
warn!("Replication refresh was successful."); info!("Replication refresh was successful.");
Ok(Some(addr))
} }
#[instrument(level="info", skip(tls_connector, idms), fields(eventid=Uuid::new_v4().to_string()))]
async fn repl_run_consumer( async fn repl_run_consumer(
domain: &str, domain: &str,
sock_addrs: &[SocketAddr], sock_addrs: &[SocketAddr],
@ -234,12 +241,12 @@ async fn repl_run_consumer(
automatic_refresh: bool, automatic_refresh: bool,
idms: &IdmServer, idms: &IdmServer,
consumer_conn_settings: &ConsumerConnSettings, consumer_conn_settings: &ConsumerConnSettings,
) { ) -> Option<SocketAddr> {
let Some(mut supplier_conn) = let Some((socket_addr, mut supplier_conn)) =
repl_consumer_connect_supplier(domain, sock_addrs, tls_connector, consumer_conn_settings) repl_consumer_connect_supplier(domain, sock_addrs, tls_connector, consumer_conn_settings)
.await .await
else { else {
return; return None;
}; };
// Perform incremental. // Perform incremental.
@ -252,7 +259,7 @@ async fn repl_run_consumer(
?err, ?err,
"consumer ruv range could not be accessed, unable to continue." "consumer ruv range could not be accessed, unable to continue."
); );
return; return None;
} }
} }
}; };
@ -262,7 +269,7 @@ async fn repl_run_consumer(
.await .await
{ {
error!(?err, "consumer encode error, unable to continue."); error!(?err, "consumer encode error, unable to continue.");
return; return None;
} }
let changes = if let Some(codec_msg) = supplier_conn.next().await { let changes = if let Some(codec_msg) = supplier_conn.next().await {
@ -273,16 +280,16 @@ async fn repl_run_consumer(
} }
Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Refresh(_)) => { Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Refresh(_)) => {
error!("Supplier Response contains invalid State"); error!("Supplier Response contains invalid State");
return; return None;
} }
Err(err) => { Err(err) => {
error!(?err, "consumer decode error, unable to continue."); error!(?err, "consumer decode error, unable to continue.");
return; return None;
} }
} }
} else { } else {
error!("Connection closed"); error!("Connection closed");
return; return None;
}; };
// Now apply the changes if possible // Now apply the changes if possible
@ -297,7 +304,7 @@ async fn repl_run_consumer(
Ok(state) => state, Ok(state) => state,
Err(err) => { Err(err) => {
error!(?err, "consumer was not able to apply changes."); error!(?err, "consumer was not able to apply changes.");
return; return None;
} }
} }
}; };
@ -306,21 +313,21 @@ async fn repl_run_consumer(
ConsumerState::Ok => { ConsumerState::Ok => {
info!("Incremental Replication Success"); info!("Incremental Replication Success");
// return to bypass the failure message. // return to bypass the failure message.
return; return Some(socket_addr);
} }
ConsumerState::RefreshRequired => { ConsumerState::RefreshRequired => {
if automatic_refresh { if automatic_refresh {
warn!("Consumer is out of date and must be refreshed. This will happen *now*."); warn!("Consumer is out of date and must be refreshed. This will happen *now*.");
} else { } else {
error!("Consumer is out of date and must be refreshed. You must manually resolve this situation."); error!("Consumer is out of date and must be refreshed. You must manually resolve this situation.");
return; return None;
}; };
} }
} }
if let Err(err) = supplier_conn.send(ConsumerRequest::Refresh).await { if let Err(err) = supplier_conn.send(ConsumerRequest::Refresh).await {
error!(?err, "consumer encode error, unable to continue."); error!(?err, "consumer encode error, unable to continue.");
return; return None;
} }
let refresh = if let Some(codec_msg) = supplier_conn.next().await { let refresh = if let Some(codec_msg) = supplier_conn.next().await {
@ -331,16 +338,16 @@ async fn repl_run_consumer(
} }
Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Incremental(_)) => { Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Incremental(_)) => {
error!("Supplier Response contains invalid State"); error!("Supplier Response contains invalid State");
return; return None;
} }
Err(err) => { Err(err) => {
error!(?err, "consumer decode error, unable to continue."); error!(?err, "consumer decode error, unable to continue.");
return; return None;
} }
} }
} else { } else {
error!("Connection closed"); error!("Connection closed");
return; return None;
}; };
// Now apply the refresh if possible // Now apply the refresh if possible
@ -352,10 +359,11 @@ async fn repl_run_consumer(
.and_then(|cs| write_txn.commit().map(|()| cs)) .and_then(|cs| write_txn.commit().map(|()| cs))
{ {
error!(?err, "consumer was not able to apply refresh."); error!(?err, "consumer was not able to apply refresh.");
return; return None;
} }
warn!("Replication refresh was successful."); warn!("Replication refresh was successful.");
Some(socket_addr)
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -450,43 +458,54 @@ async fn repl_task(
info!("Replica task for {} has started.", origin); info!("Replica task for {} has started.", origin);
// we keep track of the "last known good" socketaddr so we can try that first next time.
let mut last_working_address: Option<SocketAddr> = None;
// Okay, all the parameters are setup. Now we wait on our interval. // Okay, all the parameters are setup. Now we wait on our interval.
loop { loop {
// if the target address worked last time, then let's use it this time!
let mut sorted_socket_addrs = vec![];
if let Some(addr) = last_working_address {
sorted_socket_addrs.push(addr);
};
// this is O(2^n) but we *should* be talking about a small number of addresses for a given hostname
socket_addrs.iter().for_each(|addr| {
if !sorted_socket_addrs.contains(addr) {
sorted_socket_addrs.push(addr.to_owned());
}
});
tokio::select! { tokio::select! {
Ok(task) = task_rx.recv() => { Ok(task) = task_rx.recv() => {
match task { match task {
ReplConsumerCtrl::Stop => break, ReplConsumerCtrl::Stop => break,
ReplConsumerCtrl::Refresh ( refresh_coord ) => { ReplConsumerCtrl::Refresh ( refresh_coord ) => {
let eventid = Uuid::new_v4(); last_working_address = match repl_run_consumer_refresh(
let span = info_span!("replication_run_consumer_refresh", uuid = ?eventid);
// let _enter = span.enter();
repl_run_consumer_refresh(
refresh_coord, refresh_coord,
domain, domain,
&socket_addrs, &sorted_socket_addrs,
&tls_connector, &tls_connector,
&idms, &idms,
&consumer_conn_settings &consumer_conn_settings
) )
.instrument(span) .await {
.await Ok(val) => val,
Err(_) => None
};
} }
} }
} }
_ = repl_interval.tick() => { _ = repl_interval.tick() => {
// Interval passed, attempt a replication run. // Interval passed, attempt a replication run.
let eventid = Uuid::new_v4();
let span = info_span!("replication_run_consumer", uuid = ?eventid);
// let _enter = span.enter();
repl_run_consumer( repl_run_consumer(
domain, domain,
&socket_addrs, &sorted_socket_addrs,
&tls_connector, &tls_connector,
automatic_refresh, automatic_refresh,
&idms, &idms,
&consumer_conn_settings &consumer_conn_settings
) )
.instrument(span)
.await; .await;
} }
} }
@ -697,7 +716,7 @@ async fn repl_acceptor(
} => { } => {
let task_rx = task_tx.subscribe(); let task_rx = task_tx.subscribe();
let handle = tokio::spawn(repl_task( let handle: JoinHandle<()> = tokio::spawn(repl_task(
origin.clone(), origin.clone(),
server_key.clone(), server_key.clone(),
server_cert.clone(), server_cert.clone(),
@ -879,7 +898,7 @@ async fn repl_acceptor(
let _ = task_tx.send(ReplConsumerCtrl::Stop); let _ = task_tx.send(ReplConsumerCtrl::Stop);
for task_handle in task_handles.drain(..) { for task_handle in task_handles.drain(..) {
// Let each task join. // Let each task join.
let res: Result<(), _> = task_handle.await; let res: Result<(), _> = task_handle.await.map(|_| ());
if res.is_err() { if res.is_err() {
warn!("Failed to join replication task, continuing ..."); warn!("Failed to join replication task, continuing ...");
} }

View file

@ -661,7 +661,7 @@ impl FilterComp {
// * If all filters are okay, return Ok(Filter::Or()) // * If all filters are okay, return Ok(Filter::Or())
// * Any filter is invalid, return the error. // * Any filter is invalid, return the error.
// * An empty "or" is a valid filter in mathematical terms, but we throw an // * An empty "or" is a valid filter in mathematical terms, but we throw an
// error to warn the user because it's super unlikey they want that // error to warn the user because it's super unlikely they want that
if filters.is_empty() { if filters.is_empty() {
return Err(SchemaError::EmptyFilter); return Err(SchemaError::EmptyFilter);
}; };
@ -676,7 +676,7 @@ impl FilterComp {
// * If all filters are okay, return Ok(Filter::Or()) // * If all filters are okay, return Ok(Filter::Or())
// * Any filter is invalid, return the error. // * Any filter is invalid, return the error.
// * An empty "and" is a valid filter in mathematical terms, but we throw an // * An empty "and" is a valid filter in mathematical terms, but we throw an
// error to warn the user because it's super unlikey they want that // error to warn the user because it's super unlikely they want that
if filters.is_empty() { if filters.is_empty() {
return Err(SchemaError::EmptyFilter); return Err(SchemaError::EmptyFilter);
}; };