From cf35a7e667550c4e500d7f63080dbefa8e38e47f Mon Sep 17 00:00:00 2001 From: James Hodgkinson Date: Thu, 2 Nov 2023 12:07:53 +1000 Subject: [PATCH] Feature: configurable replication poll interval (#2283) * Feature: configurable replication poll interval (#2282) * Updating log messages because REPL != LDAP --- server/core/src/config.rs | 14 ++++++++++++++ server/core/src/repl/mod.rs | 7 +++---- server/testkit/tests/mtls_test.rs | 2 +- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/server/core/src/config.rs b/server/core/src/config.rs index 83dde87ef..7e0aa4df0 100644 --- a/server/core/src/config.rs +++ b/server/core/src/config.rs @@ -97,11 +97,25 @@ pub enum RepNodeConfig { pub struct ReplicationConfiguration { pub origin: Url, pub bindaddress: SocketAddr, + /// Number of seconds between running a replication event + pub task_poll_interval: Option, #[serde(flatten)] pub manual: BTreeMap, } +const DEFAULT_REPL_TASK_POLL_INTERVAL: u64 = 15; + +impl ReplicationConfiguration { + /// Get the task poll interval, or the default if not set. + pub(crate) fn get_task_poll_interval(&self) -> core::time::Duration { + core::time::Duration::from_secs( + self.task_poll_interval + .unwrap_or(DEFAULT_REPL_TASK_POLL_INTERVAL), + ) + } +} + /// This is the Server Configuration as read from `server.toml`. /// /// NOTE: not all flags or values from the internal [Configuration] object are exposed via this structure diff --git a/server/core/src/repl/mod.rs b/server/core/src/repl/mod.rs index da2498030..883f4ec60 100644 --- a/server/core/src/repl/mod.rs +++ b/server/core/src/repl/mod.rs @@ -530,12 +530,12 @@ async fn handle_repl_conn( { Ok(ta) => ta, Err(err) => { - error!(?err, "LDAP TLS setup error, disconnecting client"); + error!(?err, "Replication TLS setup error, disconnecting client"); return; } }; if let Err(err) = SslStream::accept(Pin::new(&mut tlsstream)).await { - error!(?err, "LDAP TLS accept error, disconnecting client"); + error!(?err, "Replication TLS accept error, disconnecting client"); return; }; let (r, w) = tokio::io::split(tlsstream); @@ -606,14 +606,13 @@ async fn repl_acceptor( info!("Starting Replication Acceptor ..."); // Persistent parts // These all probably need changes later ... - let task_poll_interval = Duration::from_secs(10); let replica_connect_timeout = Duration::from_secs(2); let retry_timeout = Duration::from_secs(60); let max_frame_bytes = 268435456; let consumer_conn_settings = ConsumerConnSettings { max_frame_bytes, - task_poll_interval, + task_poll_interval: repl_config.get_task_poll_interval(), replica_connect_timeout, }; diff --git a/server/testkit/tests/mtls_test.rs b/server/testkit/tests/mtls_test.rs index bff41071f..f8888a3f8 100644 --- a/server/testkit/tests/mtls_test.rs +++ b/server/testkit/tests/mtls_test.rs @@ -113,7 +113,7 @@ async fn setup_mtls_test( }; if let Err(err) = SslStream::accept(Pin::new(&mut tlsstream)).await { - error!("LDAP TLS accept error, continuing -> {:?}", err); + error!("Replication TLS accept error, continuing -> {:?}", err); let ossl_err = err.ssl_error().and_then(|e| e.errors().last()).unwrap();