Feature: configurable replication poll interval (#2283)

* Feature: configurable replication poll interval (#2282)
* Updating log messages because REPL != LDAP
This commit is contained in:
James Hodgkinson 2023-11-02 12:07:53 +10:00 committed by GitHub
parent 9e5449a644
commit cf35a7e667
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 18 additions and 5 deletions

View file

@ -97,11 +97,25 @@ pub enum RepNodeConfig {
pub struct ReplicationConfiguration { pub struct ReplicationConfiguration {
pub origin: Url, pub origin: Url,
pub bindaddress: SocketAddr, pub bindaddress: SocketAddr,
/// Number of seconds between running a replication event
pub task_poll_interval: Option<u64>,
#[serde(flatten)] #[serde(flatten)]
pub manual: BTreeMap<Url, RepNodeConfig>, pub manual: BTreeMap<Url, RepNodeConfig>,
} }
const DEFAULT_REPL_TASK_POLL_INTERVAL: u64 = 15;
impl ReplicationConfiguration {
/// Get the task poll interval, or the default if not set.
pub(crate) fn get_task_poll_interval(&self) -> core::time::Duration {
core::time::Duration::from_secs(
self.task_poll_interval
.unwrap_or(DEFAULT_REPL_TASK_POLL_INTERVAL),
)
}
}
/// This is the Server Configuration as read from `server.toml`. /// This is the Server Configuration as read from `server.toml`.
/// ///
/// NOTE: not all flags or values from the internal [Configuration] object are exposed via this structure /// NOTE: not all flags or values from the internal [Configuration] object are exposed via this structure

View file

@ -530,12 +530,12 @@ async fn handle_repl_conn(
{ {
Ok(ta) => ta, Ok(ta) => ta,
Err(err) => { Err(err) => {
error!(?err, "LDAP TLS setup error, disconnecting client"); error!(?err, "Replication TLS setup error, disconnecting client");
return; return;
} }
}; };
if let Err(err) = SslStream::accept(Pin::new(&mut tlsstream)).await { if let Err(err) = SslStream::accept(Pin::new(&mut tlsstream)).await {
error!(?err, "LDAP TLS accept error, disconnecting client"); error!(?err, "Replication TLS accept error, disconnecting client");
return; return;
}; };
let (r, w) = tokio::io::split(tlsstream); let (r, w) = tokio::io::split(tlsstream);
@ -606,14 +606,13 @@ async fn repl_acceptor(
info!("Starting Replication Acceptor ..."); info!("Starting Replication Acceptor ...");
// Persistent parts // Persistent parts
// These all probably need changes later ... // These all probably need changes later ...
let task_poll_interval = Duration::from_secs(10);
let replica_connect_timeout = Duration::from_secs(2); let replica_connect_timeout = Duration::from_secs(2);
let retry_timeout = Duration::from_secs(60); let retry_timeout = Duration::from_secs(60);
let max_frame_bytes = 268435456; let max_frame_bytes = 268435456;
let consumer_conn_settings = ConsumerConnSettings { let consumer_conn_settings = ConsumerConnSettings {
max_frame_bytes, max_frame_bytes,
task_poll_interval, task_poll_interval: repl_config.get_task_poll_interval(),
replica_connect_timeout, replica_connect_timeout,
}; };

View file

@ -113,7 +113,7 @@ async fn setup_mtls_test(
}; };
if let Err(err) = SslStream::accept(Pin::new(&mut tlsstream)).await { if let Err(err) = SslStream::accept(Pin::new(&mut tlsstream)).await {
error!("LDAP TLS accept error, continuing -> {:?}", err); error!("Replication TLS accept error, continuing -> {:?}", err);
let ossl_err = err.ssl_error().and_then(|e| e.errors().last()).unwrap(); let ossl_err = err.ssl_error().and_then(|e| e.errors().last()).unwrap();