Check DNS on replication loop start not at task start (#3243)

This commit is contained in:
James Hodgkinson 2024-12-03 13:58:16 +10:00 committed by GitHub
parent 64fcb61d5e
commit 388ed679a8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 45 additions and 16 deletions

View file

@ -400,14 +400,6 @@ async fn repl_task(
} }
}; };
let socket_addrs = match origin.socket_addrs(|| Some(443)) {
Ok(sa) => sa,
Err(err) => {
error!(?err, "Replica origin could not resolve to ip:port");
return;
}
};
// Setup our tls connector. // Setup our tls connector.
let mut ssl_builder = match SslConnector::builder(SslMethod::tls_client()) { let mut ssl_builder = match SslConnector::builder(SslMethod::tls_client()) {
Ok(sb) => sb, Ok(sb) => sb,
@ -465,20 +457,57 @@ async fn repl_task(
// we keep track of the "last known good" socketaddr so we can try that first next time. // we keep track of the "last known good" socketaddr so we can try that first next time.
let mut last_working_address: Option<SocketAddr> = None; let mut last_working_address: Option<SocketAddr> = None;
// Okay, all the parameters are setup. Now we wait on our interval. // Okay, all the parameters are set up. Now we replicate on our interval.
loop { loop {
// if the target address worked last time, then let's use it this time! // we resolve the DNS entry to the ip:port each time we attempt a connection to avoid stale
// DNS issues, ref #3188. If we are unable to resolve the address, we backoff and try again
// as in something like docker the address may change frequently.
//
// Note, if DNS isn't available, we can proceed with the last used working address too. This
// prevents DNS (or lack thereof) from causing a replication outage.
let mut sorted_socket_addrs = vec![]; let mut sorted_socket_addrs = vec![];
// If the target address worked last time, then let's use it this time!
if let Some(addr) = last_working_address { if let Some(addr) = last_working_address {
debug!(?last_working_address);
sorted_socket_addrs.push(addr); sorted_socket_addrs.push(addr);
}; };
// this is O(2^n) but we *should* be talking about a small number of addresses for a given hostname
socket_addrs.iter().for_each(|addr| { // Default to port 443 if not set in the origin
if !sorted_socket_addrs.contains(addr) { match origin.socket_addrs(|| Some(443)) {
sorted_socket_addrs.push(addr.to_owned()); Ok(mut socket_addrs) => {
// Make every address unique.
socket_addrs.sort_unstable();
socket_addrs.dedup();
// The only possible conflict is with the last working address,
// so lets just check that.
socket_addrs.into_iter().for_each(|addr| {
if Some(&addr) != last_working_address.as_ref() {
// Not already present, append
sorted_socket_addrs.push(addr);
} }
}); });
}
Err(err) => {
if let Some(addr) = last_working_address {
warn!(
?err,
"Unable to resolve '{origin}' to ip:port, using last known working address '{addr}'"
);
} else {
warn!(?err, "Unable to resolve '{origin}' to ip:port.");
}
}
};
if sorted_socket_addrs.is_empty() {
warn!(
"No replication addresses available, delaying replication operation for '{origin}'"
);
repl_interval.tick().await;
continue;
}
tokio::select! { tokio::select! {
Ok(task) = task_rx.recv() => { Ok(task) = task_rx.recv() => {

View file

@ -41,7 +41,7 @@ impl ScimEntryPutEvent {
} }
} }
impl<'a> QueryServerWriteTransaction<'a> { impl QueryServerWriteTransaction<'_> {
/// SCIM PUT is the handler where a single entry is updated. In a SCIM PUT request /// SCIM PUT is the handler where a single entry is updated. In a SCIM PUT request
/// the request defines the state of an attribute in entirety for the update. This /// the request defines the state of an attribute in entirety for the update. This
/// means if the caller wants to add one email address, they must PUT all existing /// means if the caller wants to add one email address, they must PUT all existing