From bc6122560016aa71fa88cdf0a3212dcfe3ba34ba Mon Sep 17 00:00:00 2001 From: James Hodgkinson Date: Tue, 3 Dec 2024 13:58:16 +1000 Subject: [PATCH] Check DNS on replication loop start not at task start (#3243) --- server/core/src/repl/mod.rs | 59 +++++++++++++++++++++++++++---------- 1 file changed, 44 insertions(+), 15 deletions(-) diff --git a/server/core/src/repl/mod.rs b/server/core/src/repl/mod.rs index 745995ae2..3d4ff8fe6 100644 --- a/server/core/src/repl/mod.rs +++ b/server/core/src/repl/mod.rs @@ -400,14 +400,6 @@ async fn repl_task( } }; - let socket_addrs = match origin.socket_addrs(|| Some(443)) { - Ok(sa) => sa, - Err(err) => { - error!(?err, "Replica origin could not resolve to ip:port"); - return; - } - }; - // Setup our tls connector. let mut ssl_builder = match SslConnector::builder(SslMethod::tls_client()) { Ok(sb) => sb, @@ -465,20 +457,57 @@ async fn repl_task( // we keep track of the "last known good" socketaddr so we can try that first next time. let mut last_working_address: Option = None; - // Okay, all the parameters are setup. Now we wait on our interval. + // Okay, all the parameters are set up. Now we replicate on our interval. loop { - // if the target address worked last time, then let's use it this time! + // we resolve the DNS entry to the ip:port each time we attempt a connection to avoid stale + // DNS issues, ref #3188. If we are unable to resolve the address, we backoff and try again + // as in something like docker the address may change frequently. + // + // Note, if DNS isn't available, we can proceed with the last used working address too. This + // prevents DNS (or lack thereof) from causing a replication outage. let mut sorted_socket_addrs = vec![]; + // If the target address worked last time, then let's use it this time! if let Some(addr) = last_working_address { + debug!(?last_working_address); sorted_socket_addrs.push(addr); }; - // this is O(2^n) but we *should* be talking about a small number of addresses for a given hostname - socket_addrs.iter().for_each(|addr| { - if !sorted_socket_addrs.contains(addr) { - sorted_socket_addrs.push(addr.to_owned()); + + // Default to port 443 if not set in the origin + match origin.socket_addrs(|| Some(443)) { + Ok(mut socket_addrs) => { + // Make every address unique. + socket_addrs.sort_unstable(); + socket_addrs.dedup(); + + // The only possible conflict is with the last working address, + // so lets just check that. + socket_addrs.into_iter().for_each(|addr| { + if Some(&addr) != last_working_address.as_ref() { + // Not already present, append + sorted_socket_addrs.push(addr); + } + }); } - }); + Err(err) => { + if let Some(addr) = last_working_address { + warn!( + ?err, + "Unable to resolve '{origin}' to ip:port, using last known working address '{addr}'" + ); + } else { + warn!(?err, "Unable to resolve '{origin}' to ip:port."); + } + } + }; + + if sorted_socket_addrs.is_empty() { + warn!( + "No replication addresses available, delaying replication operation for '{origin}'" + ); + repl_interval.tick().await; + continue; + } tokio::select! { Ok(task) = task_rx.recv() => {