2023-06-13 06:26:50 +02:00
|
|
|
#![deny(warnings)]
|
|
|
|
#![warn(unused_extern_crates)]
|
|
|
|
#![deny(clippy::todo)]
|
|
|
|
#![deny(clippy::unimplemented)]
|
|
|
|
#![deny(clippy::unwrap_used)]
|
|
|
|
#![deny(clippy::panic)]
|
|
|
|
#![deny(clippy::unreachable)]
|
|
|
|
#![deny(clippy::await_holding_lock)]
|
|
|
|
#![deny(clippy::needless_pass_by_value)]
|
|
|
|
#![deny(clippy::trivially_copy_pass_by_ref)]
|
|
|
|
// We allow expect since it forces good error messages at the least.
|
|
|
|
#![allow(clippy::expect_used)]
|
|
|
|
|
|
|
|
mod config;
|
|
|
|
mod error;
|
|
|
|
|
|
|
|
use crate::config::{Config, EntryConfig};
|
|
|
|
use crate::error::SyncError;
|
|
|
|
use chrono::Utc;
|
|
|
|
use clap::Parser;
|
|
|
|
use cron::Schedule;
|
|
|
|
use std::fs::metadata;
|
|
|
|
use std::fs::File;
|
|
|
|
use std::io::Read;
|
|
|
|
#[cfg(target_family = "unix")]
|
|
|
|
use std::os::unix::fs::MetadataExt;
|
|
|
|
use std::path::{Path, PathBuf};
|
|
|
|
use std::str::FromStr;
|
|
|
|
use std::sync::atomic::AtomicBool;
|
|
|
|
use std::sync::atomic::Ordering;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use std::thread;
|
|
|
|
use std::time::Duration;
|
|
|
|
use tokio::io::AsyncWriteExt;
|
|
|
|
use tokio::net::TcpListener;
|
|
|
|
use tokio::runtime;
|
|
|
|
use tokio::sync::broadcast;
|
|
|
|
use tokio::time::sleep;
|
|
|
|
|
|
|
|
use tracing::{debug, error, info, warn};
|
|
|
|
use tracing_subscriber::prelude::*;
|
|
|
|
use tracing_subscriber::{fmt, EnvFilter};
|
|
|
|
|
|
|
|
use kanidm_client::KanidmClientBuilder;
|
|
|
|
use kanidm_proto::scim_v1::{
|
2023-07-18 00:49:22 +02:00
|
|
|
MultiValueAttr, ScimEntry, ScimExternalMember, ScimSshPubKey, ScimSyncGroup, ScimSyncPerson,
|
|
|
|
ScimSyncRequest, ScimSyncRetentionMode, ScimSyncState,
|
2023-06-13 06:26:50 +02:00
|
|
|
};
|
|
|
|
use kanidmd_lib::utils::file_permissions_readonly;
|
|
|
|
|
|
|
|
#[cfg(target_family = "unix")]
|
|
|
|
use users::{get_current_gid, get_current_uid, get_effective_gid, get_effective_uid};
|
|
|
|
|
|
|
|
use ldap3_client::{proto, LdapClientBuilder, LdapSyncRepl, LdapSyncReplEntry, LdapSyncStateValue};
|
|
|
|
|
|
|
|
include!("./opt.rs");
|
|
|
|
|
|
|
|
async fn driver_main(opt: Opt) {
|
|
|
|
debug!("Starting kanidm ldap sync driver.");
|
|
|
|
|
|
|
|
let mut f = match File::open(&opt.ldap_sync_config) {
|
|
|
|
Ok(f) => f,
|
|
|
|
Err(e) => {
|
|
|
|
error!("Unable to open profile file [{:?}] 🥺", e);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let mut contents = String::new();
|
|
|
|
if let Err(e) = f.read_to_string(&mut contents) {
|
|
|
|
error!("unable to read profile contents {:?}", e);
|
|
|
|
return;
|
|
|
|
};
|
|
|
|
|
|
|
|
let sync_config: Config = match toml::from_str(contents.as_str()) {
|
|
|
|
Ok(c) => c,
|
|
|
|
Err(e) => {
|
|
|
|
eprintln!("unable to parse config {:?}", e);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
debug!(?sync_config);
|
|
|
|
|
|
|
|
let cb = match KanidmClientBuilder::new().read_options_from_optional_config(&opt.client_config)
|
|
|
|
{
|
|
|
|
Ok(v) => v,
|
|
|
|
Err(_) => {
|
|
|
|
error!("Failed to parse {}", opt.client_config.to_string_lossy());
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let expression = sync_config.schedule.as_deref().unwrap_or("0 */5 * * * * *");
|
|
|
|
|
|
|
|
let schedule = match Schedule::from_str(expression) {
|
|
|
|
Ok(s) => s,
|
|
|
|
Err(_) => {
|
|
|
|
error!("Failed to parse cron schedule expression");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if opt.schedule {
|
|
|
|
let last_op_status = Arc::new(AtomicBool::new(true));
|
|
|
|
let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
|
|
|
|
|
|
|
|
let last_op_status_c = last_op_status.clone();
|
|
|
|
|
|
|
|
// Can we setup the socket for status?
|
|
|
|
|
|
|
|
let status_handle = if let Some(sb) = sync_config.status_bind.as_deref() {
|
|
|
|
// Can we bind?
|
|
|
|
let listener = match TcpListener::bind(sb).await {
|
|
|
|
Ok(l) => l,
|
|
|
|
Err(e) => {
|
|
|
|
error!(?e, "Failed to bind status socket");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
info!("Status listener is started on {:?}", sb);
|
|
|
|
// Detach a status listener.
|
|
|
|
let status_rx = broadcast_tx.subscribe();
|
|
|
|
Some(tokio::spawn(async move {
|
|
|
|
status_task(listener, status_rx, last_op_status_c).await
|
|
|
|
}))
|
|
|
|
} else {
|
|
|
|
warn!("No status listener configured, this will prevent you monitoring the sync tool");
|
|
|
|
None
|
|
|
|
};
|
|
|
|
|
|
|
|
// main driver loop
|
|
|
|
let driver_handle = tokio::spawn(async move {
|
|
|
|
loop {
|
|
|
|
let now = Utc::now();
|
|
|
|
let next_time = match schedule.after(&now).next() {
|
|
|
|
Some(v) => v,
|
|
|
|
None => {
|
|
|
|
error!("Failed to access any future scheduled events, terminating.");
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
// If we don't do 1 + here we can trigger the event multiple times
|
|
|
|
// rapidly since we are in the same second.
|
|
|
|
let wait_seconds = 1 + (next_time - now).num_seconds() as u64;
|
|
|
|
info!("next sync on {}, wait_time = {}s", next_time, wait_seconds);
|
|
|
|
|
|
|
|
tokio::select! {
|
|
|
|
_ = broadcast_rx.recv() => {
|
|
|
|
// stop the event loop!
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
_ = sleep(Duration::from_secs(wait_seconds)) => {
|
|
|
|
info!("starting sync ...");
|
|
|
|
match run_sync(cb.clone(), &sync_config, &opt).await {
|
|
|
|
Ok(_) => last_op_status.store(true, Ordering::Relaxed),
|
|
|
|
Err(e) => {
|
|
|
|
error!(?e, "sync completed with error");
|
|
|
|
last_op_status.store(false, Ordering::Relaxed)
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
info!("Stopped sync driver");
|
|
|
|
});
|
|
|
|
|
|
|
|
// TODO: this loop/handler should be generic across the various crates
|
|
|
|
// Block on signals now.
|
|
|
|
loop {
|
|
|
|
#[cfg(target_family = "unix")]
|
|
|
|
{
|
|
|
|
tokio::select! {
|
|
|
|
Ok(()) = tokio::signal::ctrl_c() => {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
Some(()) = async move {
|
|
|
|
let sigterm = tokio::signal::unix::SignalKind::terminate();
|
2023-07-05 14:26:39 +02:00
|
|
|
#[allow(clippy::unwrap_used)]
|
2023-06-13 06:26:50 +02:00
|
|
|
tokio::signal::unix::signal(sigterm).unwrap().recv().await
|
|
|
|
} => {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
Some(()) = async move {
|
|
|
|
let sigterm = tokio::signal::unix::SignalKind::alarm();
|
2023-07-05 14:26:39 +02:00
|
|
|
#[allow(clippy::unwrap_used)]
|
2023-06-13 06:26:50 +02:00
|
|
|
tokio::signal::unix::signal(sigterm).unwrap().recv().await
|
|
|
|
} => {
|
|
|
|
// Ignore
|
|
|
|
}
|
|
|
|
Some(()) = async move {
|
|
|
|
let sigterm = tokio::signal::unix::SignalKind::hangup();
|
2023-07-05 14:26:39 +02:00
|
|
|
#[allow(clippy::unwrap_used)]
|
2023-06-13 06:26:50 +02:00
|
|
|
tokio::signal::unix::signal(sigterm).unwrap().recv().await
|
|
|
|
} => {
|
|
|
|
// Ignore
|
|
|
|
}
|
|
|
|
Some(()) = async move {
|
|
|
|
let sigterm = tokio::signal::unix::SignalKind::user_defined1();
|
2023-07-05 14:26:39 +02:00
|
|
|
#[allow(clippy::unwrap_used)]
|
2023-06-13 06:26:50 +02:00
|
|
|
tokio::signal::unix::signal(sigterm).unwrap().recv().await
|
|
|
|
} => {
|
|
|
|
// Ignore
|
|
|
|
}
|
|
|
|
Some(()) = async move {
|
|
|
|
let sigterm = tokio::signal::unix::SignalKind::user_defined2();
|
2023-07-05 14:26:39 +02:00
|
|
|
#[allow(clippy::unwrap_used)]
|
2023-06-13 06:26:50 +02:00
|
|
|
tokio::signal::unix::signal(sigterm).unwrap().recv().await
|
|
|
|
} => {
|
|
|
|
// Ignore
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#[cfg(target_family = "windows")]
|
|
|
|
{
|
|
|
|
tokio::select! {
|
|
|
|
Ok(()) = tokio::signal::ctrl_c() => {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
broadcast_tx
|
|
|
|
.send(true)
|
|
|
|
.expect("Failed to trigger a clean shutdown!");
|
|
|
|
|
|
|
|
let _ = driver_handle.await;
|
|
|
|
if let Some(sh) = status_handle {
|
|
|
|
let _ = sh.await;
|
|
|
|
}
|
|
|
|
} else if let Err(e) = run_sync(cb, &sync_config, &opt).await {
|
|
|
|
error!(?e, "Sync completed with error");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn run_sync(
|
|
|
|
cb: KanidmClientBuilder,
|
|
|
|
sync_config: &Config,
|
|
|
|
opt: &Opt,
|
|
|
|
) -> Result<(), SyncError> {
|
|
|
|
let rsclient = match cb.build() {
|
|
|
|
Ok(rsc) => rsc,
|
|
|
|
Err(_e) => {
|
|
|
|
error!("Failed to build async client");
|
|
|
|
return Err(SyncError::ClientConfig);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
rsclient.set_token(sync_config.sync_token.clone()).await;
|
|
|
|
|
|
|
|
// Preflight check.
|
|
|
|
// * can we connect to ldap?
|
|
|
|
let mut ldap_client = match LdapClientBuilder::new(&sync_config.ldap_uri)
|
|
|
|
.add_tls_ca(&sync_config.ldap_ca)
|
|
|
|
.build()
|
|
|
|
.await
|
|
|
|
{
|
|
|
|
Ok(lc) => lc,
|
|
|
|
Err(e) => {
|
|
|
|
error!(?e, "Failed to connect to ldap");
|
|
|
|
return Err(SyncError::LdapConn);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
match ldap_client
|
|
|
|
.bind(
|
|
|
|
sync_config.ldap_sync_dn.clone(),
|
|
|
|
sync_config.ldap_sync_pw.clone(),
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
{
|
|
|
|
Ok(()) => {
|
|
|
|
debug!(ldap_sync_dn = ?sync_config.ldap_sync_dn, ldap_uri = %sync_config.ldap_uri);
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
error!(?e, "Failed to bind (authenticate) to freeldap");
|
|
|
|
return Err(SyncError::LdapAuth);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
// * can we connect to kanidm?
|
|
|
|
// - get the current sync cookie from kanidm.
|
|
|
|
let scim_sync_status = match rsclient.scim_v1_sync_status().await {
|
|
|
|
Ok(s) => s,
|
|
|
|
Err(e) => {
|
|
|
|
error!(?e, "Failed to access scim sync status");
|
|
|
|
return Err(SyncError::SyncStatus);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
debug!(state=?scim_sync_status);
|
|
|
|
|
|
|
|
// === Everything is connected! ===
|
|
|
|
|
|
|
|
let mode = proto::SyncRequestMode::RefreshOnly;
|
|
|
|
|
|
|
|
let cookie = match &scim_sync_status {
|
|
|
|
ScimSyncState::Refresh => None,
|
|
|
|
ScimSyncState::Active { cookie } => Some(cookie.0.clone()),
|
|
|
|
};
|
|
|
|
|
|
|
|
let filter = sync_config.ldap_filter.clone();
|
|
|
|
|
|
|
|
debug!(ldap_sync_base_dn = ?sync_config.ldap_sync_base_dn, ?cookie, ?mode, ?filter);
|
|
|
|
let sync_result = match ldap_client
|
|
|
|
.syncrepl(sync_config.ldap_sync_base_dn.clone(), filter, cookie, mode)
|
|
|
|
.await
|
|
|
|
{
|
|
|
|
Ok(results) => results,
|
|
|
|
Err(e) => {
|
|
|
|
error!(?e, "Failed to perform syncrepl from ldap");
|
|
|
|
return Err(SyncError::LdapSyncrepl);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if opt.proto_dump {
|
|
|
|
let stdout = std::io::stdout();
|
|
|
|
if let Err(e) = serde_json::to_writer_pretty(stdout, &sync_result) {
|
|
|
|
error!(?e, "Failed to serialise ldap sync response");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let scim_sync_request = match sync_result {
|
|
|
|
LdapSyncRepl::Success {
|
|
|
|
cookie,
|
|
|
|
refresh_deletes: _,
|
|
|
|
entries,
|
|
|
|
delete_uuids,
|
|
|
|
present_uuids,
|
|
|
|
} => {
|
|
|
|
// refresh deletes is true only on the first refresh from openldap, implying
|
|
|
|
// to delete anything not marked as present. In otherwords
|
|
|
|
// refresh_deletes means to assert the content as it exists from the ldap server
|
|
|
|
// in the openldap case. For our purpose, we can use this to mean "present phase" since
|
|
|
|
// that will imply that all non present entries are purged.
|
|
|
|
|
|
|
|
let to_state = if let Some(cookie) = cookie {
|
|
|
|
// Only update the cookie if it's present - openldap omits!
|
|
|
|
ScimSyncState::Active { cookie }
|
|
|
|
} else {
|
|
|
|
info!("no changes required");
|
|
|
|
return Ok(());
|
|
|
|
};
|
|
|
|
|
|
|
|
let retain = match (delete_uuids, present_uuids) {
|
|
|
|
(None, None) => {
|
|
|
|
// if delete_phase == false && present_phase == false
|
|
|
|
// Only update entries if they are present in the *add* state.
|
|
|
|
// Generally also means do nothing with other entries, no updates for example.
|
|
|
|
//
|
|
|
|
// This is the state of 389-ds with no deletes *and* entries are updated *only*.
|
|
|
|
// This is the state for openldap and 389-ds when there are no changes to apply.
|
|
|
|
ScimSyncRetentionMode::Ignore
|
|
|
|
}
|
|
|
|
(Some(d_uuids), None) => {
|
|
|
|
// update entries that are in Add state, delete from delete uuids.
|
|
|
|
//
|
|
|
|
// This only occurs on 389-ds, which sends a list of deleted uuids as required.
|
|
|
|
ScimSyncRetentionMode::Delete(d_uuids)
|
|
|
|
}
|
|
|
|
(None, Some(p_uuids)) => {
|
|
|
|
// update entries in Add state, assert entry is live from present_uuids
|
|
|
|
// NOTE! Even if an entry is updated, it will also be in the present phase set. This
|
|
|
|
// means we can use present_set > 0 as an indicator too.
|
|
|
|
//
|
|
|
|
// This occurs only on openldap, where present phase lists all uuids in the filter set
|
|
|
|
// *and* includes all entries that are updated at the same time.
|
|
|
|
ScimSyncRetentionMode::Retain(p_uuids)
|
|
|
|
}
|
|
|
|
(Some(_), Some(_)) => {
|
|
|
|
// error! No Ldap server emits this!
|
|
|
|
error!("Ldap server provided an invalid sync repl response - unable to have both delete and present phases.");
|
|
|
|
return Err(SyncError::LdapStateInvalid);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2023-07-05 14:26:39 +02:00
|
|
|
let entries = match process_ldap_sync_result(entries, sync_config).await {
|
2023-06-13 06:26:50 +02:00
|
|
|
Ok(ssr) => ssr,
|
|
|
|
Err(()) => {
|
|
|
|
error!("Failed to process IPA entries to SCIM");
|
|
|
|
return Err(SyncError::Preprocess);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
ScimSyncRequest {
|
|
|
|
from_state: scim_sync_status,
|
|
|
|
to_state,
|
|
|
|
entries,
|
|
|
|
retain,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
LdapSyncRepl::RefreshRequired => {
|
|
|
|
let to_state = ScimSyncState::Refresh;
|
|
|
|
|
|
|
|
ScimSyncRequest {
|
|
|
|
from_state: scim_sync_status,
|
|
|
|
to_state,
|
|
|
|
entries: Vec::new(),
|
|
|
|
retain: ScimSyncRetentionMode::Ignore,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if opt.proto_dump {
|
|
|
|
let stdout = std::io::stdout();
|
|
|
|
// write it out.
|
|
|
|
if let Err(e) = serde_json::to_writer_pretty(stdout, &scim_sync_request) {
|
|
|
|
error!(?e, "Failed to serialise scim sync request");
|
|
|
|
};
|
|
|
|
Ok(())
|
|
|
|
} else if opt.dry_run {
|
|
|
|
info!("dry-run complete");
|
|
|
|
info!("Success!");
|
|
|
|
Ok(())
|
|
|
|
} else if let Err(e) = rsclient.scim_v1_sync_update(&scim_sync_request).await {
|
|
|
|
error!(
|
|
|
|
?e,
|
|
|
|
"Failed to submit scim sync update - see the kanidmd server log for more details."
|
|
|
|
);
|
|
|
|
Err(SyncError::SyncUpdate)
|
|
|
|
} else {
|
|
|
|
info!("Success!");
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
// done!
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn process_ldap_sync_result(
|
|
|
|
ldap_entries: Vec<LdapSyncReplEntry>,
|
|
|
|
sync_config: &Config,
|
|
|
|
) -> Result<Vec<ScimEntry>, ()> {
|
|
|
|
// Future - make this par-map
|
|
|
|
ldap_entries
|
|
|
|
.into_iter()
|
|
|
|
.filter_map(|lentry| {
|
|
|
|
let e_config = sync_config
|
|
|
|
.entry_map
|
|
|
|
.get(&lentry.entry_uuid)
|
|
|
|
.cloned()
|
|
|
|
.unwrap_or_default();
|
|
|
|
|
|
|
|
match ldap_to_scim_entry(lentry, &e_config, sync_config) {
|
|
|
|
Ok(Some(e)) => Some(Ok(e)),
|
|
|
|
Ok(None) => None,
|
|
|
|
Err(()) => Some(Err(())),
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.collect::<Result<Vec<_>, _>>()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn ldap_to_scim_entry(
|
|
|
|
sync_entry: LdapSyncReplEntry,
|
|
|
|
entry_config: &EntryConfig,
|
|
|
|
sync_config: &Config,
|
|
|
|
) -> Result<Option<ScimEntry>, ()> {
|
|
|
|
debug!("{:#?}", sync_entry);
|
|
|
|
|
|
|
|
// check the sync_entry state?
|
2023-07-05 14:26:39 +02:00
|
|
|
#[allow(clippy::unimplemented)]
|
2023-06-13 06:26:50 +02:00
|
|
|
if sync_entry.state != LdapSyncStateValue::Add {
|
|
|
|
unimplemented!();
|
|
|
|
}
|
|
|
|
|
|
|
|
let dn = sync_entry.entry.dn.clone();
|
|
|
|
|
|
|
|
// Is this an entry we need to observe/look at?
|
|
|
|
if entry_config.exclude {
|
|
|
|
info!("entry_config excludes {}", dn);
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
|
|
|
|
let oc = sync_entry.entry.attrs.get("objectclass").ok_or_else(|| {
|
|
|
|
error!("Invalid entry - no object class {}", dn);
|
|
|
|
})?;
|
|
|
|
|
|
|
|
if oc.contains(&sync_config.person_objectclass) {
|
|
|
|
let LdapSyncReplEntry {
|
|
|
|
entry_uuid,
|
|
|
|
state: _,
|
|
|
|
mut entry,
|
|
|
|
} = sync_entry;
|
|
|
|
|
2023-06-16 11:15:36 +02:00
|
|
|
let id = if let Some(map_uuid) = &entry_config.map_uuid {
|
|
|
|
*map_uuid
|
|
|
|
} else {
|
|
|
|
entry_uuid
|
|
|
|
};
|
2023-06-13 06:26:50 +02:00
|
|
|
|
2023-06-16 11:15:36 +02:00
|
|
|
let user_name = if let Some(name) = entry_config.map_name.clone() {
|
|
|
|
name
|
|
|
|
} else {
|
|
|
|
entry
|
|
|
|
.remove_ava_single(&sync_config.person_attr_user_name)
|
|
|
|
.ok_or_else(|| {
|
|
|
|
error!(
|
|
|
|
"Missing required attribute {} (person_attr_user_name)",
|
|
|
|
sync_config.person_attr_user_name
|
|
|
|
);
|
|
|
|
})?
|
|
|
|
};
|
2023-06-13 06:26:50 +02:00
|
|
|
|
|
|
|
let display_name = entry
|
|
|
|
.remove_ava_single(&sync_config.person_attr_display_name)
|
|
|
|
.ok_or_else(|| {
|
|
|
|
error!(
|
|
|
|
"Missing required attribute {} (person_attr_display_name)",
|
|
|
|
sync_config.person_attr_display_name
|
|
|
|
);
|
|
|
|
})?;
|
|
|
|
|
2023-07-05 14:26:39 +02:00
|
|
|
let gidnumber = if let Some(number) = entry_config.map_gidnumber {
|
2023-06-16 11:15:36 +02:00
|
|
|
Some(number)
|
|
|
|
} else {
|
|
|
|
entry
|
|
|
|
.remove_ava_single(&sync_config.person_attr_gidnumber)
|
|
|
|
.map(|gid| {
|
|
|
|
u32::from_str(&gid).map_err(|_| {
|
|
|
|
error!(
|
|
|
|
"Invalid gidnumber - {} is not a u32 (person_attr_gidnumber)",
|
|
|
|
sync_config.person_attr_gidnumber
|
|
|
|
);
|
|
|
|
})
|
2023-06-13 06:26:50 +02:00
|
|
|
})
|
2023-06-16 11:15:36 +02:00
|
|
|
.transpose()?
|
|
|
|
};
|
2023-06-13 06:26:50 +02:00
|
|
|
|
|
|
|
let password_import = entry.remove_ava_single(&sync_config.person_attr_password);
|
|
|
|
|
|
|
|
let password_import = if let Some(pw_prefix) = sync_config.person_password_prefix.as_ref() {
|
|
|
|
password_import.map(|s| format!("{}{}", pw_prefix, s))
|
|
|
|
} else {
|
|
|
|
password_import
|
|
|
|
};
|
|
|
|
|
2023-06-16 11:15:36 +02:00
|
|
|
let mail: Vec<_> = entry
|
|
|
|
.remove_ava(&sync_config.person_attr_mail)
|
|
|
|
.map(|set| {
|
|
|
|
set.into_iter()
|
|
|
|
.map(|addr| MultiValueAttr {
|
|
|
|
type_: None,
|
|
|
|
primary: None,
|
|
|
|
display: None,
|
|
|
|
ref_: None,
|
|
|
|
value: addr,
|
|
|
|
})
|
|
|
|
.collect()
|
|
|
|
})
|
|
|
|
.unwrap_or_default();
|
|
|
|
|
2023-06-13 06:26:50 +02:00
|
|
|
let totp_import = Vec::default();
|
|
|
|
|
2023-07-18 00:49:22 +02:00
|
|
|
let ssh_publickey = entry
|
|
|
|
.remove_ava(&sync_config.person_attr_ssh_public_key)
|
|
|
|
.map(|set| {
|
|
|
|
set.into_iter()
|
|
|
|
.enumerate()
|
|
|
|
.map(|(i, value)| ScimSshPubKey {
|
|
|
|
label: format!("sshpublickey-{}", i),
|
|
|
|
value,
|
|
|
|
})
|
|
|
|
.collect()
|
|
|
|
})
|
|
|
|
.unwrap_or_default();
|
|
|
|
|
2023-06-13 06:26:50 +02:00
|
|
|
let login_shell = entry.remove_ava_single(&sync_config.person_attr_login_shell);
|
|
|
|
let external_id = Some(entry.dn);
|
|
|
|
|
|
|
|
Ok(Some(
|
|
|
|
ScimSyncPerson {
|
|
|
|
id,
|
|
|
|
external_id,
|
|
|
|
user_name,
|
|
|
|
display_name,
|
|
|
|
gidnumber,
|
|
|
|
password_import,
|
|
|
|
totp_import,
|
|
|
|
login_shell,
|
2023-06-16 11:15:36 +02:00
|
|
|
mail,
|
2023-07-18 00:49:22 +02:00
|
|
|
ssh_publickey,
|
2023-06-13 06:26:50 +02:00
|
|
|
}
|
|
|
|
.into(),
|
|
|
|
))
|
|
|
|
} else if oc.contains(&sync_config.group_objectclass) {
|
|
|
|
let LdapSyncReplEntry {
|
|
|
|
entry_uuid,
|
|
|
|
state: _,
|
|
|
|
mut entry,
|
|
|
|
} = sync_entry;
|
|
|
|
|
|
|
|
let id = entry_uuid;
|
|
|
|
|
|
|
|
let name = entry
|
|
|
|
.remove_ava_single(&sync_config.group_attr_name)
|
|
|
|
.ok_or_else(|| {
|
|
|
|
error!(
|
|
|
|
"Missing required attribute {} (group_attr_name)",
|
|
|
|
sync_config.group_attr_name
|
|
|
|
);
|
|
|
|
})?;
|
|
|
|
|
|
|
|
let description = entry.remove_ava_single(&sync_config.group_attr_description);
|
|
|
|
|
|
|
|
let gidnumber = entry
|
|
|
|
.remove_ava_single(&sync_config.group_attr_gidnumber)
|
|
|
|
.map(|gid| {
|
|
|
|
u32::from_str(&gid).map_err(|_| {
|
|
|
|
error!(
|
|
|
|
"Invalid gidnumber - {} is not a u32 (group_attr_gidnumber)",
|
|
|
|
sync_config.group_attr_gidnumber
|
|
|
|
);
|
|
|
|
})
|
|
|
|
})
|
|
|
|
.transpose()?;
|
|
|
|
|
|
|
|
let members: Vec<_> = entry
|
|
|
|
.remove_ava(&sync_config.group_attr_member)
|
|
|
|
.map(|set| {
|
|
|
|
set.into_iter()
|
|
|
|
.map(|external_id| ScimExternalMember { external_id })
|
|
|
|
.collect()
|
|
|
|
})
|
|
|
|
.unwrap_or_default();
|
|
|
|
|
|
|
|
let external_id = Some(entry.dn);
|
|
|
|
|
|
|
|
Ok(Some(
|
|
|
|
ScimSyncGroup {
|
|
|
|
id,
|
|
|
|
external_id,
|
|
|
|
name,
|
|
|
|
description,
|
|
|
|
gidnumber,
|
|
|
|
members,
|
|
|
|
}
|
|
|
|
.into(),
|
|
|
|
))
|
|
|
|
} else {
|
|
|
|
debug!("Skipping entry {} with oc {:?}", dn, oc);
|
|
|
|
Ok(None)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn status_task(
|
|
|
|
listener: TcpListener,
|
|
|
|
mut status_rx: broadcast::Receiver<bool>,
|
|
|
|
last_op_status: Arc<AtomicBool>,
|
|
|
|
) {
|
|
|
|
loop {
|
|
|
|
tokio::select! {
|
|
|
|
_ = status_rx.recv() => {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
maybe_sock = listener.accept() => {
|
|
|
|
let mut stream = match maybe_sock {
|
|
|
|
Ok((sock, addr)) => {
|
|
|
|
debug!("accept from {:?}", addr);
|
|
|
|
sock
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
error!(?e, "Failed to accept status connection");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let sr = if last_op_status.load(Ordering::Relaxed) {
|
|
|
|
stream.write_all(b"Ok\n").await
|
|
|
|
} else {
|
|
|
|
stream.write_all(b"Err\n").await
|
|
|
|
};
|
|
|
|
if let Err(e) = sr {
|
|
|
|
error!(?e, "Failed to send status");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
info!("Stopped status task");
|
|
|
|
}
|
|
|
|
|
|
|
|
fn config_security_checks(cfg_path: &Path) -> bool {
|
|
|
|
let cfg_path_str = cfg_path.to_string_lossy();
|
|
|
|
|
|
|
|
if !cfg_path.exists() {
|
|
|
|
// there's no point trying to start up if we can't read a usable config!
|
|
|
|
error!(
|
|
|
|
"Config missing from {} - cannot start up. Quitting.",
|
|
|
|
cfg_path_str
|
|
|
|
);
|
|
|
|
false
|
|
|
|
} else {
|
|
|
|
let cfg_meta = match metadata(cfg_path) {
|
|
|
|
Ok(v) => v,
|
|
|
|
Err(e) => {
|
|
|
|
error!(
|
|
|
|
"Unable to read metadata for '{}' during security checks - {:?}",
|
|
|
|
cfg_path_str, e
|
|
|
|
);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
if !file_permissions_readonly(&cfg_meta) {
|
|
|
|
warn!("permissions on {} may not be secure. Should be readonly to running uid. This could be a security risk ...",
|
|
|
|
cfg_path_str
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(target_family = "unix")]
|
|
|
|
if cfg_meta.uid() == get_current_uid() || cfg_meta.uid() == get_effective_uid() {
|
|
|
|
warn!("WARNING: {} owned by the current uid, which may allow file permission changes. This could be a security risk ...",
|
|
|
|
cfg_path_str
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn main() {
|
|
|
|
let opt = Opt::parse();
|
|
|
|
|
|
|
|
let fmt_layer = fmt::layer().with_writer(std::io::stderr);
|
|
|
|
|
|
|
|
let filter_layer = if opt.debug {
|
|
|
|
match EnvFilter::try_new("kanidm_client=debug,kanidm_ldap_sync=debug,ldap3_client=debug") {
|
|
|
|
Ok(f) => f,
|
|
|
|
Err(e) => {
|
|
|
|
eprintln!("ERROR! Unable to start tracing {:?}", e);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
match EnvFilter::try_from_default_env() {
|
|
|
|
Ok(f) => f,
|
|
|
|
Err(_) => EnvFilter::new("kanidm_client=warn,kanidm_ldap_sync=info,ldap3_client=warn"),
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
tracing_subscriber::registry()
|
|
|
|
.with(filter_layer)
|
|
|
|
.with(fmt_layer)
|
|
|
|
.init();
|
|
|
|
|
|
|
|
// Startup sanity checks.
|
|
|
|
// TODO: put this in the junk drawer
|
|
|
|
#[cfg(target_family = "unix")]
|
|
|
|
if opt.skip_root_check {
|
|
|
|
warn!("Skipping root user check, if you're running this for testing, ensure you clean up temporary files.")
|
|
|
|
} else if get_current_uid() == 0
|
|
|
|
|| get_effective_uid() == 0
|
|
|
|
|| get_current_gid() == 0
|
|
|
|
|| get_effective_gid() == 0
|
|
|
|
{
|
|
|
|
error!("Refusing to run - this process must not operate as root.");
|
|
|
|
return;
|
|
|
|
};
|
|
|
|
|
|
|
|
if !config_security_checks(&opt.client_config) || !config_security_checks(&opt.ldap_sync_config)
|
|
|
|
{
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
let par_count = thread::available_parallelism()
|
|
|
|
.expect("Failed to determine available parallelism")
|
|
|
|
.get();
|
|
|
|
|
|
|
|
let rt = runtime::Builder::new_current_thread()
|
|
|
|
// We configure this as we use parallel workers at some points.
|
|
|
|
.max_blocking_threads(par_count)
|
|
|
|
.enable_all()
|
|
|
|
.build()
|
|
|
|
.expect("Failed to initialise tokio runtime!");
|
|
|
|
|
|
|
|
tracing::debug!("Using {} worker threads", par_count);
|
|
|
|
|
|
|
|
rt.block_on(async move { driver_main(opt).await });
|
|
|
|
}
|