This commit is contained in:
William Brown 2025-03-13 16:25:58 +10:00
parent 67a20ad697
commit c91ef3ec25
6 changed files with 286 additions and 222 deletions

View file

@ -7,6 +7,13 @@ use std::io::Read;
use std::path::Path;
use std::str::FromStr;
#[derive(Serialize, Deserialize, Debug)]
pub struct EtcDb {
pub users: Vec<EtcUser>,
pub shadow: Vec<EtcShadow>,
pub groups: Vec<EtcGroup>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct EtcUser {
pub name: String,
@ -39,7 +46,7 @@ pub fn read_etc_passwd_file<P: AsRef<Path>>(path: P) -> Result<Vec<EtcUser>, Uni
parse_etc_passwd(contents.as_slice()).map_err(|_| UnixIntegrationError)
}
#[derive(Debug, PartialEq, Default)]
#[derive(PartialEq, Default)]
pub enum CryptPw {
Sha256(String),
Sha512(String),
@ -56,6 +63,16 @@ impl fmt::Display for CryptPw {
}
}
impl fmt::Debug for CryptPw {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CryptPw::Invalid => write!(f, "x"),
CryptPw::Sha256(_s) => write!(f, "crypt sha256"),
CryptPw::Sha512(_s) => write!(f, "crypt sha512"),
}
}
}
impl FromStr for CryptPw {
type Err = &'static str;

View file

@ -1,4 +1,4 @@
use crate::unix_passwd::{EtcGroup, EtcUser};
use crate::unix_passwd::{EtcDb, EtcGroup, EtcUser};
use kanidm_proto::internal::OperationError;
use serde::{Deserialize, Serialize};
@ -215,6 +215,7 @@ pub enum TaskRequest {
pub enum TaskResponse {
Success(u64),
Error(String),
NotifyShadowChange(EtcDb),
}
#[test]

View file

@ -18,7 +18,7 @@ use kanidm_hsm_crypto::{soft::SoftTpm, AuthValue, BoxedDynTpm, Tpm};
use kanidm_proto::constants::DEFAULT_CLIENT_CONFIG_PATH;
use kanidm_proto::internal::OperationError;
use kanidm_unix_common::constants::DEFAULT_CONFIG_PATH;
use kanidm_unix_common::unix_passwd::{parse_etc_group, parse_etc_passwd, parse_etc_shadow};
use kanidm_unix_common::unix_passwd::EtcDb;
use kanidm_unix_common::unix_proto::{
ClientRequest, ClientResponse, TaskRequest, TaskRequestFrame, TaskResponse,
};
@ -30,7 +30,6 @@ use kanidm_unix_resolver::resolver::Resolver;
use kanidm_unix_resolver::unix_config::{HsmType, UnixdConfig};
use kanidm_utils_users::{get_current_gid, get_current_uid, get_effective_gid, get_effective_uid};
use libc::umask;
use notify_debouncer_full::{new_debouncer, notify::RecursiveMode, DebouncedEvent};
use sketching::tracing::span;
use sketching::tracing_forest::traits::*;
use sketching::tracing_forest::util::*;
@ -150,7 +149,7 @@ fn rm_if_exist(p: &str) {
async fn handle_task_client(
stream: UnixStream,
_task_channel_tx: &Sender<AsyncTaskRequest>,
notify_shadow_change_tx: &Sender<EtcDb>,
task_channel_rx: &mut Receiver<AsyncTaskRequest>,
broadcast_rx: &mut broadcast::Receiver<bool>,
) -> Result<(), Box<dyn Error>> {
@ -209,8 +208,11 @@ async fn handle_task_client(
}
// If the ID was unregistered, ignore.
}
Some(Ok(TaskResponse::NotifyShadowChange(etc_db))) => {
let _ = notify_shadow_change_tx.send(etc_db).await;
}
// Other things ....
// Some(Ok(TaskResponse::
// Some(Ok(TaskResponse::ReloadSystemIds))
other => {
error!("Error -> {:?}", other);
@ -455,40 +457,6 @@ async fn handle_client(
Ok(())
}
async fn process_etc_passwd_group(
cachelayer: &Resolver,
shadow_is_accessible: bool,
) -> Result<(), Box<dyn Error>> {
let mut file = File::open("/etc/passwd").await?;
let mut contents = vec![];
file.read_to_end(&mut contents).await?;
let users = parse_etc_passwd(contents.as_slice()).map_err(|_| "Invalid passwd content")?;
let maybe_shadow = if shadow_is_accessible {
let mut file = File::open("/etc/shadow").await?;
let mut contents = vec![];
file.read_to_end(&mut contents).await?;
let shadow = parse_etc_shadow(contents.as_slice()).map_err(|_| "Invalid passwd content")?;
Some(shadow)
} else {
None
};
let mut file = File::open("/etc/group").await?;
let mut contents = vec![];
file.read_to_end(&mut contents).await?;
let groups = parse_etc_group(contents.as_slice()).map_err(|_| "Invalid group content")?;
cachelayer
.reload_system_identities(users, maybe_shadow, groups)
.await;
Ok(())
}
async fn read_hsm_pin(hsm_pin_path: &str) -> Result<Vec<u8>, Box<dyn Error>> {
if !PathBuf::from_str(hsm_pin_path)?.exists() {
return Err(std::io::Error::new(
@ -1044,23 +1012,6 @@ async fn main() -> ExitCode {
// Undo umask changes.
let _ = unsafe { umask(before) };
// We pre-check if we can read /etc/shadow, and we flag that for the process so that
// we don't attempt to read it again as we proceed.
let shadow_is_accessible = {
if let Err(err) = File::open("/etc/shadow").await {
warn!(?err, "Unable to read /etc/shadow, some features will be disabled.");
false
} else {
true
}
};
// Pre-process /etc/passwd and /etc/group for nxset
if let Err(err) = process_etc_passwd_group(&cachelayer, shadow_is_accessible).await {
error!(?err, "Failed to process system id providers");
return ExitCode::FAILURE
}
// Setup the tasks socket first.
let (task_channel_tx, mut task_channel_rx) = channel(16);
let task_channel_tx = Arc::new(task_channel_tx);
@ -1072,6 +1023,10 @@ async fn main() -> ExitCode {
let mut c_broadcast_rx = broadcast_tx.subscribe();
let mut d_broadcast_rx = broadcast_tx.subscribe();
// This channel allowss
let (notify_shadow_channel_tx, mut notify_shadow_channel_rx) = channel(16);
let notify_shadow_channel_tx = Arc::new(notify_shadow_channel_tx);
let task_b = tokio::spawn(async move {
loop {
tokio::select! {
@ -1099,7 +1054,7 @@ async fn main() -> ExitCode {
// client.
// We have to check for signals here else this tasks waits forever.
if let Err(err) = handle_task_client(socket, &task_channel_tx, &mut task_channel_rx, &mut d_broadcast_rx).await {
if let Err(err) = handle_task_client(socket, &notify_shadow_channel_tx, &mut task_channel_rx, &mut d_broadcast_rx).await {
error!(?err, "Task client error occurred");
}
// If they disconnect we go back to accept.
@ -1115,80 +1070,32 @@ async fn main() -> ExitCode {
info!("Stopped task connector");
});
// TODO: Setup a task that handles pre-fetching here.
// ====== setup an inotify watcher to reload on changes to system files ======
let (inotify_tx, mut inotify_rx) = channel(4);
let watcher = new_debouncer(Duration::from_secs(1), None, move |event: Result<Vec<DebouncedEvent>, _>| {
let array_of_events = match event {
Ok(events) => events,
Err(array_errors) => {
for err in array_errors {
error!(?err, "inotify debounce error");
}
return
}
};
let mut path_of_interest_was_changed = false;
for inode_event in array_of_events.iter() {
if !inode_event.kind.is_access() && inode_event.paths.iter().any(|path| {
path == Path::new("/etc/group") ||
path == Path::new("/etc/passwd") ||
(shadow_is_accessible && path == Path::new("/etc/shadow"))
}) {
// if shadow_is_accessible.
debug!(?inode_event, "Handling inotify modification event");
path_of_interest_was_changed = true
}
}
if path_of_interest_was_changed {
let _ = inotify_tx.try_send(true);
} else {
debug!(?array_of_events, "IGNORED");
}
})
.and_then(|mut debouncer| {
debouncer.watch(Path::new("/etc"), RecursiveMode::Recursive)
.map(|()| debouncer)
});
let watcher =
match watcher {
Ok(watcher) => {
watcher
}
Err(e) => {
error!("Failed to setup inotify {:?}", e);
return ExitCode::FAILURE
}
};
// ====== Listen for shadow change notification from tasks ======
let shadow_notify_cachelayer = cachelayer.clone();
let mut c_broadcast_rx = broadcast_tx.subscribe();
let inotify_cachelayer = cachelayer.clone();
let task_c = tokio::spawn(async move {
debug!("Spawned inotify task handler");
debug!("Spawned shadow reload task handler");
loop {
tokio::select! {
_ = c_broadcast_rx.recv() => {
break;
}
_ = inotify_rx.recv() => {
if let Err(err) = process_etc_passwd_group(&inotify_cachelayer, shadow_is_accessible).await {
error!(?err, "Failed to process system id providers");
}
Some(EtcDb {
users, shadow, groups
}) = notify_shadow_channel_rx.recv() => {
shadow_notify_cachelayer
.reload_system_identities(users, shadow, groups)
.await;
}
}
}
info!("Stopped inotify task handler");
info!("Stopped shadow reload task handler");
});
// TODO: Setup a task that handles pre-fetching here.
// Set the umask while we open the path for most clients.
let before = unsafe { umask(0) };
let listener = match UnixListener::bind(cfg.sock_path.as_str()) {
@ -1288,9 +1195,6 @@ async fn main() -> ExitCode {
error!("Unable to shutdown workers {:?}", e);
}
debug!("Dropping inotify watcher ...");
drop(watcher);
let _ = task_a.await;
let _ = task_b.await;
let _ = task_c.await;

View file

@ -10,6 +10,23 @@
#![deny(clippy::needless_pass_by_value)]
#![deny(clippy::trivially_copy_pass_by_ref)]
use bytes::{BufMut, BytesMut};
use futures::{SinkExt, StreamExt};
use kanidm_unix_common::constants::DEFAULT_CONFIG_PATH;
use kanidm_unix_common::unix_passwd::{parse_etc_group, parse_etc_passwd, parse_etc_shadow, EtcDb};
use kanidm_unix_common::unix_proto::{
HomeDirectoryInfo, TaskRequest, TaskRequestFrame, TaskResponse,
};
use kanidm_unix_resolver::unix_config::UnixdConfig;
use kanidm_utils_users::{get_effective_gid, get_effective_uid};
use libc::{lchown, umask};
use notify_debouncer_full::notify::RecommendedWatcher;
use notify_debouncer_full::Debouncer;
use notify_debouncer_full::FileIdMap;
use notify_debouncer_full::{new_debouncer, notify::RecursiveMode, DebouncedEvent};
use sketching::tracing_forest::traits::*;
use sketching::tracing_forest::util::*;
use sketching::tracing_forest::{self};
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
use std::os::unix::fs::symlink;
@ -17,19 +34,8 @@ use std::path::{Path, PathBuf};
use std::process::ExitCode;
use std::time::Duration;
use std::{fs, io};
use bytes::{BufMut, BytesMut};
use futures::{SinkExt, StreamExt};
use kanidm_unix_common::constants::DEFAULT_CONFIG_PATH;
use kanidm_unix_common::unix_proto::{
HomeDirectoryInfo, TaskRequest, TaskRequestFrame, TaskResponse,
};
use kanidm_unix_resolver::unix_config::UnixdConfig;
use kanidm_utils_users::{get_effective_gid, get_effective_uid};
use libc::{lchown, umask};
use sketching::tracing_forest::traits::*;
use sketching::tracing_forest::util::*;
use sketching::tracing_forest::{self};
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::net::UnixStream;
use tokio::sync::broadcast;
use tokio::time;
@ -271,41 +277,173 @@ fn create_home_directory(
Ok(())
}
async fn handle_tasks(stream: UnixStream, cfg: &UnixdConfig) {
async fn handle_tasks(
stream: UnixStream,
ctl_broadcast_rx: &mut broadcast::Receiver<bool>,
shadow_broadcast_rx: &mut broadcast::Receiver<bool>,
cfg: &UnixdConfig,
) {
let mut reqs = Framed::new(stream, TaskCodec::new());
loop {
match reqs.next().await {
Some(Ok(TaskRequestFrame {
id,
req: TaskRequest::HomeDirectory(info),
})) => {
debug!("Received task -> HomeDirectory({:?})", info);
let resp = match create_home_directory(
&info,
cfg.home_prefix.as_ref(),
cfg.home_mount_prefix.as_ref(),
cfg.use_etc_skel,
cfg.selinux,
) {
Ok(()) => TaskResponse::Success(id),
Err(msg) => TaskResponse::Error(msg),
};
// Now send a result.
if let Err(e) = reqs.send(resp).await {
error!("Error -> {:?}", e);
return;
}
// All good, loop.
tokio::select! {
_ = ctl_broadcast_rx.recv() => {
break;
}
other => {
error!("Error -> {:?}", other);
return;
request = reqs.next() => {
match request {
Some(Ok(TaskRequestFrame {
id,
req: TaskRequest::HomeDirectory(info),
})) => {
debug!("Received task -> HomeDirectory({:?})", info);
let resp = match create_home_directory(
&info,
cfg.home_prefix.as_ref(),
cfg.home_mount_prefix.as_ref(),
cfg.use_etc_skel,
cfg.selinux,
) {
Ok(()) => TaskResponse::Success(id),
Err(msg) => TaskResponse::Error(msg),
};
// Now send a result.
if let Err(err) = reqs.send(resp).await {
error!(?err, "Unable to communicate to kanidm unixd");
break;
}
// All good, loop.
}
other => {
error!("Error -> {:?}", other);
break;
}
}
}
_ = shadow_broadcast_rx.recv() => {
// process etc shadow and send it here.
match process_etc_passwd_group().await {
Ok(etc_db) => {
let resp = TaskResponse::NotifyShadowChange(etc_db);
if let Err(err) = reqs.send(resp).await {
error!(?err, "Unable to communicate to kanidm unixd");
break;
}
}
Err(()) => {
error!("Unable to process etc db");
continue
}
}
}
}
}
info!("Disconnected from kanidm_unixd ...");
}
async fn process_etc_passwd_group() -> Result<EtcDb, ()> {
let mut file = File::open("/etc/passwd").await.map_err(|err| {
error!(?err);
})?;
let mut contents = vec![];
file.read_to_end(&mut contents).await.map_err(|err| {
error!(?err);
})?;
let users = parse_etc_passwd(contents.as_slice())
.map_err(|_| "Invalid passwd content")
.map_err(|err| {
error!(?err);
})?;
let mut file = File::open("/etc/shadow").await.map_err(|err| {
error!(?err);
})?;
let mut contents = vec![];
file.read_to_end(&mut contents).await.map_err(|err| {
error!(?err);
})?;
let shadow = parse_etc_shadow(contents.as_slice())
.map_err(|_| "Invalid passwd content")
.map_err(|err| {
error!(?err);
})?;
let mut file = File::open("/etc/group").await.map_err(|err| {
error!(?err);
})?;
let mut contents = vec![];
file.read_to_end(&mut contents).await.map_err(|err| {
error!(?err);
})?;
let groups = parse_etc_group(contents.as_slice())
.map_err(|_| "Invalid group content")
.map_err(|err| {
error!(?err);
})?;
Ok(EtcDb {
users,
shadow,
groups,
})
}
fn setup_shadow_inotify_watcher(
shadow_broadcast_tx: broadcast::Sender<bool>,
) -> Result<Debouncer<RecommendedWatcher, FileIdMap>, ExitCode> {
let watcher = new_debouncer(
Duration::from_secs(1),
None,
move |event: Result<Vec<DebouncedEvent>, _>| {
let array_of_events = match event {
Ok(events) => events,
Err(array_errors) => {
for err in array_errors {
error!(?err, "inotify debounce error");
}
return;
}
};
let mut path_of_interest_was_changed = false;
for inode_event in array_of_events.iter() {
if !inode_event.kind.is_access()
&& inode_event.paths.iter().any(|path| {
path == Path::new("/etc/group")
|| path == Path::new("/etc/passwd")
|| path == Path::new("/etc/shadow")
})
{
debug!(?inode_event, "Handling inotify modification event");
path_of_interest_was_changed = true
}
}
if path_of_interest_was_changed {
let _ = shadow_broadcast_tx.send(true);
} else {
debug!(?array_of_events, "IGNORED");
}
},
)
.and_then(|mut debouncer| {
debouncer
.watch(Path::new("/etc"), RecursiveMode::Recursive)
.map(|()| debouncer)
});
watcher.map_err(|err| {
error!(?err, "Failed to setup inotify");
ExitCode::FAILURE
})
}
#[tokio::main(flavor = "current_thread")]
@ -374,9 +512,19 @@ async fn main() -> ExitCode {
let task_sock_path = cfg.task_sock_path.clone();
debug!("Attempting to use {} ...", task_sock_path);
// This is the startup/shutdown control channel
let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
let mut d_broadcast_rx = broadcast_tx.subscribe();
// This is to broadcast when we need to reload the shadow
// files.
let (shadow_broadcast_tx, mut shadow_broadcast_rx) = broadcast::channel(4);
let watcher = match setup_shadow_inotify_watcher(shadow_broadcast_tx.clone()) {
Ok(w) => w,
Err(exit) => return exit,
};
let server = tokio::spawn(async move {
loop {
info!("Attempting to connect to kanidm_unixd ...");
@ -389,16 +537,14 @@ async fn main() -> ExitCode {
match connect_res {
Ok(stream) => {
info!("Found kanidm_unixd, waiting for tasks ...");
// Immediately trigger that we should reload the shadow files
let _ = shadow_broadcast_tx.send(true);
// Yep! Now let the main handler do it's job.
// If it returns (dc, etc, then we loop and try again).
tokio::select! {
_ = d_broadcast_rx.recv() => {
break;
}
_ = handle_tasks(stream, &cfg) => {
continue;
}
}
handle_tasks(stream, &mut d_broadcast_rx, &mut shadow_broadcast_rx, &cfg).await;
continue;
}
Err(e) => {
debug!("\\---> {:?}", e);
@ -408,8 +554,8 @@ async fn main() -> ExitCode {
}
}
}
}
}
} // select
} // loop
});
info!("Server started ...");
@ -467,6 +613,9 @@ async fn main() -> ExitCode {
error!("Unable to shutdown workers {:?}", e);
}
debug!("Dropping inotify watcher ...");
drop(watcher);
let _ = server.await;
ExitCode::SUCCESS
})

View file

@ -147,12 +147,7 @@ impl SystemProvider {
})
}
pub async fn reload(
&self,
users: Vec<EtcUser>,
shadow: Option<Vec<EtcShadow>>,
groups: Vec<EtcGroup>,
) {
pub async fn reload(&self, users: Vec<EtcUser>, shadow: Vec<EtcShadow>, groups: Vec<EtcGroup>) {
let mut system_ids_txn = self.inner.lock().await;
system_ids_txn.users.clear();
system_ids_txn.user_list.clear();
@ -160,52 +155,50 @@ impl SystemProvider {
system_ids_txn.group_list.clear();
system_ids_txn.shadow.clear();
system_ids_txn.shadow_enabled = shadow.is_some();
system_ids_txn.shadow_enabled = !shadow.is_empty();
if let Some(shadow) = shadow {
let s_iter = shadow.into_iter().filter_map(|shadow_entry| {
let EtcShadow {
let s_iter = shadow.into_iter().filter_map(|shadow_entry| {
let EtcShadow {
name,
password,
epoch_change_days,
days_min_password_age,
days_max_password_age,
days_warning_period,
days_inactivity_period,
epoch_expire_date,
flag_reserved: _,
} = shadow_entry;
if password.is_valid() {
let aging_policy = epoch_change_days.map(|change_days| {
AgingPolicy::new(
change_days,
days_min_password_age,
days_max_password_age,
days_warning_period,
days_inactivity_period,
)
});
let expiration_date = epoch_expire_date
.map(|expire| OffsetDateTime::UNIX_EPOCH + time::Duration::days(expire));
Some((
name,
password,
epoch_change_days,
days_min_password_age,
days_max_password_age,
days_warning_period,
days_inactivity_period,
epoch_expire_date,
flag_reserved: _,
} = shadow_entry;
Arc::new(Shadow {
crypt_pw: password,
aging_policy,
expiration_date,
}),
))
} else {
// Invalid password, skip the account
None
}
});
if password.is_valid() {
let aging_policy = epoch_change_days.map(|change_days| {
AgingPolicy::new(
change_days,
days_min_password_age,
days_max_password_age,
days_warning_period,
days_inactivity_period,
)
});
let expiration_date = epoch_expire_date
.map(|expire| OffsetDateTime::UNIX_EPOCH + time::Duration::days(expire));
Some((
name,
Arc::new(Shadow {
crypt_pw: password,
aging_policy,
expiration_date,
}),
))
} else {
// Invalid password, skip the account
None
}
});
system_ids_txn.shadow.extend(s_iter)
};
system_ids_txn.shadow.extend(s_iter);
for group in groups {
let name = Id::Name(group.name.clone());

View file

@ -212,7 +212,7 @@ impl Resolver {
pub async fn reload_system_identities(
&self,
users: Vec<EtcUser>,
shadow: Option<Vec<EtcShadow>>,
shadow: Vec<EtcShadow>,
groups: Vec<EtcGroup>,
) {
self.system_provider.reload(users, shadow, groups).await