mirror of
https://github.com/kanidm/kanidm.git
synced 2025-06-12 02:57:46 +02:00
Compare commits
2 commits
7d37bf7f1d
...
268839ea14
Author | SHA1 | Date | |
---|---|---|---|
|
268839ea14 | ||
|
8daeddb9e7 |
server
|
@ -92,7 +92,7 @@ pub(crate) async fn create_repl_server(
|
|||
Ok((repl_handle, ctrl_tx))
|
||||
}
|
||||
|
||||
#[instrument(level = "info", skip_all)]
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
/// This returns the remote address that worked, so you can try that first next time
|
||||
async fn repl_consumer_connect_supplier(
|
||||
domain: &str,
|
||||
|
@ -116,7 +116,10 @@ async fn repl_consumer_connect_supplier(
|
|||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(tc)) => tc,
|
||||
Ok(Ok(tc)) => {
|
||||
trace!("Connection established to peer on {:?}", sock_addr);
|
||||
tc
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
debug!(?err, "Failed to connect to {}", sock_addr);
|
||||
continue;
|
||||
|
@ -127,8 +130,6 @@ async fn repl_consumer_connect_supplier(
|
|||
}
|
||||
};
|
||||
|
||||
trace!("Connection established to peer on {:?}", sock_addr);
|
||||
|
||||
let mut tlsstream = match Ssl::new(tls_connector.context())
|
||||
.and_then(|tls_obj| SslStream::new(tls_obj, tcpstream))
|
||||
{
|
||||
|
@ -236,7 +237,7 @@ async fn repl_run_consumer_refresh(
|
|||
Ok(Some(addr))
|
||||
}
|
||||
|
||||
#[instrument(level="info", skip(tls_connector, idms), fields(eventid=Uuid::new_v4().to_string()))]
|
||||
#[instrument(level="debug", skip(tls_connector, idms), fields(eventid=Uuid::new_v4().to_string()))]
|
||||
async fn repl_run_consumer(
|
||||
domain: &str,
|
||||
sock_addrs: &[SocketAddr],
|
||||
|
@ -282,11 +283,11 @@ async fn repl_run_consumer(
|
|||
changes
|
||||
}
|
||||
Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Refresh(_)) => {
|
||||
error!("Supplier Response contains invalid State");
|
||||
error!("Supplier Response contains invalid state");
|
||||
return None;
|
||||
}
|
||||
Err(err) => {
|
||||
error!(?err, "consumer decode error, unable to continue.");
|
||||
error!(?err, "Consumer decode error, unable to continue.");
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
@ -306,7 +307,7 @@ async fn repl_run_consumer(
|
|||
}) {
|
||||
Ok(state) => state,
|
||||
Err(err) => {
|
||||
error!(?err, "consumer was not able to apply changes.");
|
||||
error!(?err, "Consumer was not able to apply changes.");
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
@ -365,7 +366,7 @@ async fn repl_run_consumer(
|
|||
return None;
|
||||
}
|
||||
|
||||
warn!("Replication refresh was successful.");
|
||||
info!("Replication refresh was successful.");
|
||||
Some(socket_addr)
|
||||
}
|
||||
|
||||
|
@ -544,7 +545,7 @@ async fn repl_task(
|
|||
info!("Replica task for {} has stopped.", origin);
|
||||
}
|
||||
|
||||
#[instrument(level = "info", skip_all)]
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
async fn handle_repl_conn(
|
||||
max_frame_bytes: usize,
|
||||
tcpstream: TcpStream,
|
||||
|
@ -626,6 +627,7 @@ async fn handle_repl_conn(
|
|||
debug!(?client_address, "replication client disconnected 🛬");
|
||||
}
|
||||
|
||||
/// This is the main acceptor for the replication server.
|
||||
async fn repl_acceptor(
|
||||
listener: TcpListener,
|
||||
idms: Arc<IdmServer>,
|
||||
|
|
|
@ -343,7 +343,7 @@ impl QueryServerWriteTransaction<'_> {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "info", skip_all)]
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
fn consumer_apply_changes_v1(
|
||||
&mut self,
|
||||
ctx_domain_version: DomainVersion,
|
||||
|
@ -394,7 +394,7 @@ impl QueryServerWriteTransaction<'_> {
|
|||
})?;
|
||||
|
||||
// == ⚠️ Below this point we begin to make changes! ==
|
||||
info!(
|
||||
debug!(
|
||||
"Proceeding to apply incremental from domain {:?} at level {}",
|
||||
ctx_domain_uuid, ctx_domain_version
|
||||
);
|
||||
|
|
|
@ -148,14 +148,14 @@ impl QueryServerReadTransaction<'_> {
|
|||
RangeDiffStatus::Ok(ranges) => ranges,
|
||||
RangeDiffStatus::Refresh { lag_range } => {
|
||||
error!("Replication - Consumer is lagging and must be refreshed.");
|
||||
info!(?lag_range);
|
||||
debug!(?lag_range);
|
||||
debug!(consumer_ranges = ?ctx_ranges);
|
||||
debug!(supplier_ranges = ?our_ranges);
|
||||
return Ok(ReplIncrementalContext::RefreshRequired);
|
||||
}
|
||||
RangeDiffStatus::Unwilling { adv_range } => {
|
||||
error!("Replication - Supplier is lagging and must be investigated.");
|
||||
info!(?adv_range);
|
||||
debug!(?adv_range);
|
||||
debug!(consumer_ranges = ?ctx_ranges);
|
||||
debug!(supplier_ranges = ?our_ranges);
|
||||
return Ok(ReplIncrementalContext::UnwillingToSupply);
|
||||
|
@ -164,9 +164,7 @@ impl QueryServerReadTransaction<'_> {
|
|||
lag_range,
|
||||
adv_range,
|
||||
} => {
|
||||
error!("Replication Critical - Consumers are advanced of us, and also lagging! This must be immediately investigated!");
|
||||
info!(?lag_range);
|
||||
info!(?adv_range);
|
||||
error!(?adv_range, ?lag_range, "Replication Critical - Consumers are advanced of us, and also lagging! This must be immediately investigated!");
|
||||
debug!(consumer_ranges = ?ctx_ranges);
|
||||
debug!(supplier_ranges = ?our_ranges);
|
||||
return Ok(ReplIncrementalContext::UnwillingToSupply);
|
||||
|
|
Loading…
Reference in a new issue