20250313 unixd system cache ()

The implementation of the unixd cache relies on inotify to detect changes to files in /etc so that we know when to reload the data for nss/passwd. However, the way that groupadd/del and other tools work is they copy the file, change it, and then move it into place. It turns out that william of the past didn't realise that inotify works on inodes not paths like other tools do (auditctl for example).

As a result, when something modified /etc/group or another related file, the removal was seen, but this breaks notifications on any future change until you reload unixd.

To resolve this we need to recursively watch /etc with inotify - yep, that's correct. We have to watch everything in /etc for changes because it's the only way to pick up on the add/remove of files. But because we have to watch everything, we need permissions to watch everything.

This forces us to move the parsing of the etc passwd/group/shadow files to the unixd tasks daemon - arguably, this is the correct place to read these anyway since that is a high priv (and locked down) daemon. Because of this, we actually end up solving the missing "shadow" group on debian issue, and probably similar on the BSD's in future.

In order to make my life easier while testing I also threw in a makefile that symlinks the files to needed locations for testing. It has plenty of warnings as it should.

Fixes 
Fixes 
Fixes 
This commit is contained in:
Firstyear 2025-03-14 13:46:26 +10:00 committed by GitHub
parent e3243ce6b0
commit b88b6923eb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 816 additions and 564 deletions

687
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -206,7 +206,7 @@ lru = "^0.12.5"
mathru = "^0.13.0"
md-5 = "0.10.6"
mimalloc = "0.1.43"
notify-debouncer-full = { version = "0.1" }
notify-debouncer-full = { version = "0.5" }
num_enum = "^0.5.11"
oauth2_ext = { version = "^4.4.2", package = "oauth2", default-features = false }
openssl-sys = "^0.9"

View file

@ -12,7 +12,7 @@ Conflicts=nscd.service
[Service]
DynamicUser=yes
SupplementaryGroups=tss shadow
SupplementaryGroups=tss
UMask=0027
CacheDirectory=kanidm-unixd
RuntimeDirectory=kanidm-unixd

19
unix_integration/Makefile Normal file
View file

@ -0,0 +1,19 @@
current_dir = $(shell pwd)
dev_install:
@ echo "WARNING: THIS WILL BREAK EXISTING UNIXD INSTALLS"
@ echo "ctrl-c now if this is not what you want"
@ read
@ echo "LAST CHANCE"
@ sleep 5
ln -s -f $(current_dir)/../platform/opensuse/kanidm-unixd.service /etc/systemd/system/kanidm-unixd.service
ln -s -f $(current_dir)/../platform/opensuse/kanidm-unixd-tasks.service /etc/systemd/system/kanidm-unixd-tasks.service
ln -s -f $(current_dir)/../target/debug/kanidm-unix /usr/sbin/kanidm-unix
ln -s -f $(current_dir)/../target/debug/kanidm_ssh_authorizedkeys /usr/sbin/kanidm_ssh_authorizedkeys
ln -s -f $(current_dir)/../target/debug/kanidm_unixd_tasks /usr/sbin/kanidm_unixd_tasks
ln -s -f $(current_dir)/../target/debug/kanidm_unixd /usr/sbin/kanidm_unixd
ln -s -f $(current_dir)/../target/debug/libpam_kanidm.so /lib64/security/pam_kanidm.so
ln -s -f $(current_dir)/../target/debug/libnss_kanidm.so /usr/lib64/libnss_kanidm.so.2

View file

@ -7,6 +7,13 @@ use std::io::Read;
use std::path::Path;
use std::str::FromStr;
#[derive(Serialize, Deserialize, Debug)]
pub struct EtcDb {
pub users: Vec<EtcUser>,
pub shadow: Vec<EtcShadow>,
pub groups: Vec<EtcGroup>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct EtcUser {
pub name: String,
@ -39,7 +46,7 @@ pub fn read_etc_passwd_file<P: AsRef<Path>>(path: P) -> Result<Vec<EtcUser>, Uni
parse_etc_passwd(contents.as_slice()).map_err(|_| UnixIntegrationError)
}
#[derive(Debug, PartialEq, Default)]
#[derive(PartialEq, Default)]
pub enum CryptPw {
Sha256(String),
Sha512(String),
@ -56,6 +63,16 @@ impl fmt::Display for CryptPw {
}
}
impl fmt::Debug for CryptPw {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CryptPw::Invalid => write!(f, "x"),
CryptPw::Sha256(_s) => write!(f, "crypt sha256"),
CryptPw::Sha512(_s) => write!(f, "crypt sha512"),
}
}
}
impl FromStr for CryptPw {
type Err = &'static str;

View file

@ -1,4 +1,4 @@
use crate::unix_passwd::{EtcGroup, EtcUser};
use crate::unix_passwd::{EtcDb, EtcGroup, EtcUser};
use kanidm_proto::internal::OperationError;
use serde::{Deserialize, Serialize};
@ -200,6 +200,12 @@ pub struct HomeDirectoryInfo {
pub aliases: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TaskRequestFrame {
pub id: u64,
pub req: TaskRequest,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum TaskRequest {
HomeDirectory(HomeDirectoryInfo),
@ -207,8 +213,9 @@ pub enum TaskRequest {
#[derive(Serialize, Deserialize, Debug)]
pub enum TaskResponse {
Success,
Success(u64),
Error(String),
NotifyShadowChange(EtcDb),
}
#[test]

View file

@ -10,6 +10,31 @@
#![deny(clippy::needless_pass_by_value)]
#![deny(clippy::trivially_copy_pass_by_ref)]
use bytes::{BufMut, BytesMut};
use clap::{Arg, ArgAction, Command};
use futures::{SinkExt, StreamExt};
use kanidm_client::KanidmClientBuilder;
use kanidm_hsm_crypto::{soft::SoftTpm, AuthValue, BoxedDynTpm, Tpm};
use kanidm_proto::constants::DEFAULT_CLIENT_CONFIG_PATH;
use kanidm_proto::internal::OperationError;
use kanidm_unix_common::constants::DEFAULT_CONFIG_PATH;
use kanidm_unix_common::unix_passwd::EtcDb;
use kanidm_unix_common::unix_proto::{
ClientRequest, ClientResponse, TaskRequest, TaskRequestFrame, TaskResponse,
};
use kanidm_unix_resolver::db::{Cache, Db};
use kanidm_unix_resolver::idprovider::interface::IdProvider;
use kanidm_unix_resolver::idprovider::kanidm::KanidmProvider;
use kanidm_unix_resolver::idprovider::system::SystemProvider;
use kanidm_unix_resolver::resolver::Resolver;
use kanidm_unix_resolver::unix_config::{HsmType, UnixdConfig};
use kanidm_utils_users::{get_current_gid, get_current_uid, get_effective_gid, get_effective_uid};
use libc::umask;
use sketching::tracing::span;
use sketching::tracing_forest::traits::*;
use sketching::tracing_forest::util::*;
use sketching::tracing_forest::{self};
use std::collections::BTreeMap;
use std::error::Error;
use std::fs::metadata;
use std::io;
@ -20,29 +45,6 @@ use std::process::ExitCode;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use bytes::{BufMut, BytesMut};
use clap::{Arg, ArgAction, Command};
use futures::{SinkExt, StreamExt};
use kanidm_client::KanidmClientBuilder;
use kanidm_proto::constants::DEFAULT_CLIENT_CONFIG_PATH;
use kanidm_proto::internal::OperationError;
use kanidm_unix_common::constants::DEFAULT_CONFIG_PATH;
use kanidm_unix_common::unix_passwd::{parse_etc_group, parse_etc_passwd, parse_etc_shadow};
use kanidm_unix_common::unix_proto::{ClientRequest, ClientResponse, TaskRequest, TaskResponse};
use kanidm_unix_resolver::db::{Cache, Db};
use kanidm_unix_resolver::idprovider::interface::IdProvider;
use kanidm_unix_resolver::idprovider::kanidm::KanidmProvider;
use kanidm_unix_resolver::idprovider::system::SystemProvider;
use kanidm_unix_resolver::resolver::Resolver;
use kanidm_unix_resolver::unix_config::{HsmType, UnixdConfig};
use kanidm_utils_users::{get_current_gid, get_current_uid, get_effective_gid, get_effective_uid};
use libc::umask;
use sketching::tracing::span;
use sketching::tracing_forest::traits::*;
use sketching::tracing_forest::util::*;
use sketching::tracing_forest::{self};
use time::OffsetDateTime;
use tokio::fs::File;
use tokio::io::AsyncReadExt; // for read_to_end()
@ -52,17 +54,16 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::oneshot;
use tokio_util::codec::{Decoder, Encoder, Framed};
use kanidm_hsm_crypto::{soft::SoftTpm, AuthValue, BoxedDynTpm, Tpm};
use notify_debouncer_full::{new_debouncer, notify::RecursiveMode, notify::Watcher};
#[cfg(not(target_os = "illumos"))]
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
//=== the codec
type AsyncTaskRequest = (TaskRequest, oneshot::Sender<()>);
struct AsyncTaskRequest {
task_req: TaskRequest,
task_chan: oneshot::Sender<()>,
}
#[derive(Default)]
struct ClientCodec;
@ -117,11 +118,11 @@ impl Decoder for TaskCodec {
}
}
impl Encoder<TaskRequest> for TaskCodec {
impl Encoder<TaskRequestFrame> for TaskCodec {
type Error = io::Error;
fn encode(&mut self, msg: TaskRequest, dst: &mut BytesMut) -> Result<(), Self::Error> {
debug!("Attempting to send request -> {:?} ...", msg);
fn encode(&mut self, msg: TaskRequestFrame, dst: &mut BytesMut) -> Result<(), Self::Error> {
debug!("Attempting to send request -> {:?} ...", msg.id);
let data = serde_json::to_vec(&msg).map_err(|e| {
error!("socket encoding error -> {:?}", e);
io::Error::new(io::ErrorKind::Other, "JSON encode error")
@ -148,46 +149,79 @@ fn rm_if_exist(p: &str) {
async fn handle_task_client(
stream: UnixStream,
task_channel_tx: &Sender<AsyncTaskRequest>,
notify_shadow_change_tx: &Sender<EtcDb>,
task_channel_rx: &mut Receiver<AsyncTaskRequest>,
broadcast_rx: &mut broadcast::Receiver<bool>,
) -> Result<(), Box<dyn Error>> {
// setup the codec
let mut reqs = Framed::new(stream, TaskCodec);
// setup the codec, this is to the unix socket which the task daemon
// connected to us with.
let mut last_task_id: u64 = 0;
let mut task_handles = BTreeMap::new();
let mut framed_stream = Framed::new(stream, TaskCodec);
loop {
// TODO wait on the channel OR the task handler, so we know
// when it closes.
let v = match task_channel_rx.recv().await {
Some(v) => v,
None => return Ok(()),
};
debug!("Sending Task -> {:?}", v.0);
// Write the req to the socket.
if let Err(_e) = reqs.send(v.0.clone()).await {
// re-queue the event if not timed out.
// This is indicated by the one shot being dropped.
if !v.1.is_closed() {
let _ = task_channel_tx
.send_timeout(v, Duration::from_millis(100))
.await;
tokio::select! {
// We have been commanded to stop operation.
_ = broadcast_rx.recv() => {
return Ok(())
}
// now return the error.
return Err(Box::new(IoError::new(ErrorKind::Other, "oh no!")));
}
task_request = task_channel_rx.recv() => {
let Some(AsyncTaskRequest {
task_req,
task_chan
}) = task_request else {
// Task channel has died, cease operation.
return Ok(())
};
match reqs.next().await {
Some(Ok(TaskResponse::Success)) => {
debug!("Task was acknowledged and completed.");
// Send a result back via the one-shot
// Ignore if it fails.
let _ = v.1.send(());
debug!("Sending Task -> {:?}", task_req);
last_task_id += 1;
let task_id = last_task_id;
// Setup the task handle so we know who to get back to.
task_handles.insert(task_id, task_chan);
let task_frame = TaskRequestFrame {
id: task_id,
req: task_req,
};
if let Err(err) = framed_stream.send(task_frame).await {
warn!("Unable to queue task for completion");
return Err(Box::new(err));
}
// Task sent
}
other => {
error!("Error -> {:?}", other);
return Err(Box::new(IoError::new(ErrorKind::Other, "oh no!")));
response = framed_stream.next() => {
// Process incoming messages. They may be out of order.
match response {
Some(Ok(TaskResponse::Success(task_id))) => {
debug!("Task was acknowledged and completed.");
if let Some(handle) = task_handles.remove(&task_id) {
// Send a result back via the one-shot
// Ignore if it fails.
let _ = handle.send(());
}
// If the ID was unregistered, ignore.
}
Some(Ok(TaskResponse::NotifyShadowChange(etc_db))) => {
let _ = notify_shadow_change_tx.send(etc_db).await;
}
// Other things ....
// Some(Ok(TaskResponse::ReloadSystemIds))
other => {
error!("Error -> {:?}", other);
return Err(Box::new(IoError::new(ErrorKind::Other, "oh no!")));
}
}
}
}
}
}
@ -341,7 +375,10 @@ async fn handle_client(
match task_channel_tx
.send_timeout(
(TaskRequest::HomeDirectory(info), tx),
AsyncTaskRequest {
task_req: TaskRequest::HomeDirectory(info),
task_chan: tx,
},
Duration::from_millis(100),
)
.await
@ -420,40 +457,6 @@ async fn handle_client(
Ok(())
}
async fn process_etc_passwd_group(
cachelayer: &Resolver,
shadow_is_accessible: bool,
) -> Result<(), Box<dyn Error>> {
let mut file = File::open("/etc/passwd").await?;
let mut contents = vec![];
file.read_to_end(&mut contents).await?;
let users = parse_etc_passwd(contents.as_slice()).map_err(|_| "Invalid passwd content")?;
let maybe_shadow = if shadow_is_accessible {
let mut file = File::open("/etc/shadow").await?;
let mut contents = vec![];
file.read_to_end(&mut contents).await?;
let shadow = parse_etc_shadow(contents.as_slice()).map_err(|_| "Invalid passwd content")?;
Some(shadow)
} else {
None
};
let mut file = File::open("/etc/group").await?;
let mut contents = vec![];
file.read_to_end(&mut contents).await?;
let groups = parse_etc_group(contents.as_slice()).map_err(|_| "Invalid group content")?;
cachelayer
.reload_system_identities(users, maybe_shadow, groups)
.await;
Ok(())
}
async fn read_hsm_pin(hsm_pin_path: &str) -> Result<Vec<u8>, Box<dyn Error>> {
if !PathBuf::from_str(hsm_pin_path)?.exists() {
return Err(std::io::Error::new(
@ -1009,23 +1012,6 @@ async fn main() -> ExitCode {
// Undo umask changes.
let _ = unsafe { umask(before) };
// We pre-check if we can read /etc/shadow, and we flag that for the process so that
// we don't attempt to read it again as we proceed.
let shadow_is_accessible = {
if let Err(err) = File::open("/etc/shadow").await {
warn!(?err, "Unable to read /etc/shadow, some features will be disabled.");
false
} else {
true
}
};
// Pre-process /etc/passwd and /etc/group for nxset
if let Err(err) = process_etc_passwd_group(&cachelayer, shadow_is_accessible).await {
error!(?err, "Failed to process system id providers");
return ExitCode::FAILURE
}
// Setup the tasks socket first.
let (task_channel_tx, mut task_channel_rx) = channel(16);
let task_channel_tx = Arc::new(task_channel_tx);
@ -1037,9 +1023,14 @@ async fn main() -> ExitCode {
let mut c_broadcast_rx = broadcast_tx.subscribe();
let mut d_broadcast_rx = broadcast_tx.subscribe();
// This channel allowss
let (notify_shadow_channel_tx, mut notify_shadow_channel_rx) = channel(16);
let notify_shadow_channel_tx = Arc::new(notify_shadow_channel_tx);
let task_b = tokio::spawn(async move {
loop {
tokio::select! {
// Wait on the broadcast to see if we need to close down.
_ = c_broadcast_rx.recv() => {
break;
}
@ -1062,16 +1053,11 @@ async fn main() -> ExitCode {
// It did? Great, now we can wait and spin on that one
// client.
tokio::select! {
_ = d_broadcast_rx.recv() => {
break;
}
// We have to check for signals here else this tasks waits forever.
Err(e) = handle_task_client(socket, &task_channel_tx, &mut task_channel_rx) => {
error!("Task client error occurred; error = {:?}", e);
}
// We have to check for signals here else this tasks waits forever.
if let Err(err) = handle_task_client(socket, &notify_shadow_channel_tx, &mut task_channel_rx, &mut d_broadcast_rx).await {
error!(?err, "Task client error occurred");
}
// If they DC we go back to accept.
// If they disconnect we go back to accept.
}
Err(err) => {
error!("Task Accept error -> {:?}", err);
@ -1084,57 +1070,32 @@ async fn main() -> ExitCode {
info!("Stopped task connector");
});
// TODO: Setup a task that handles pre-fetching here.
let (inotify_tx, mut inotify_rx) = channel(4);
let watcher = new_debouncer(Duration::from_secs(2), None, move |_event| {
let _ = inotify_tx.try_send(true);
})
.and_then(|mut debouncer| {
debouncer.watcher().watch(Path::new("/etc/passwd"), RecursiveMode::NonRecursive)
.map(|()| debouncer)
})
.and_then(|mut debouncer| debouncer.watcher().watch(Path::new("/etc/group"), RecursiveMode::NonRecursive)
.map(|()| debouncer)
)
.and_then(|mut debouncer| if shadow_is_accessible {
debouncer.watcher().watch(Path::new("/etc/shadow"), RecursiveMode::NonRecursive)
.map(|()| debouncer)
} else {
Ok(debouncer)
}
);
let watcher =
match watcher {
Ok(watcher) => {
watcher
}
Err(e) => {
error!("Failed to setup inotify {:?}", e);
return ExitCode::FAILURE
}
};
// ====== Listen for shadow change notification from tasks ======
let shadow_notify_cachelayer = cachelayer.clone();
let mut c_broadcast_rx = broadcast_tx.subscribe();
let inotify_cachelayer = cachelayer.clone();
let task_c = tokio::spawn(async move {
debug!("Spawned shadow reload task handler");
loop {
tokio::select! {
_ = c_broadcast_rx.recv() => {
break;
}
_ = inotify_rx.recv() => {
if let Err(err) = process_etc_passwd_group(&inotify_cachelayer, shadow_is_accessible).await {
error!(?err, "Failed to process system id providers");
}
Some(EtcDb {
users, shadow, groups
}) = notify_shadow_channel_rx.recv() => {
shadow_notify_cachelayer
.reload_system_identities(users, shadow, groups)
.await;
}
}
}
info!("Stopped inotify watcher");
info!("Stopped shadow reload task handler");
});
// TODO: Setup a task that handles pre-fetching here.
// Set the umask while we open the path for most clients.
let before = unsafe { umask(0) };
let listener = match UnixListener::bind(cfg.sock_path.as_str()) {
@ -1234,8 +1195,6 @@ async fn main() -> ExitCode {
error!("Unable to shutdown workers {:?}", e);
}
drop(watcher);
let _ = task_a.await;
let _ = task_b.await;
let _ = task_c.await;

View file

@ -10,6 +10,23 @@
#![deny(clippy::needless_pass_by_value)]
#![deny(clippy::trivially_copy_pass_by_ref)]
use bytes::{BufMut, BytesMut};
use futures::{SinkExt, StreamExt};
use kanidm_unix_common::constants::DEFAULT_CONFIG_PATH;
use kanidm_unix_common::unix_passwd::{parse_etc_group, parse_etc_passwd, parse_etc_shadow, EtcDb};
use kanidm_unix_common::unix_proto::{
HomeDirectoryInfo, TaskRequest, TaskRequestFrame, TaskResponse,
};
use kanidm_unix_resolver::unix_config::UnixdConfig;
use kanidm_utils_users::{get_effective_gid, get_effective_uid};
use libc::{lchown, umask};
use notify_debouncer_full::notify::RecommendedWatcher;
use notify_debouncer_full::Debouncer;
use notify_debouncer_full::RecommendedCache;
use notify_debouncer_full::{new_debouncer, notify::RecursiveMode, DebouncedEvent};
use sketching::tracing_forest::traits::*;
use sketching::tracing_forest::util::*;
use sketching::tracing_forest::{self};
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
use std::os::unix::fs::symlink;
@ -17,17 +34,8 @@ use std::path::{Path, PathBuf};
use std::process::ExitCode;
use std::time::Duration;
use std::{fs, io};
use bytes::{BufMut, BytesMut};
use futures::{SinkExt, StreamExt};
use kanidm_unix_common::constants::DEFAULT_CONFIG_PATH;
use kanidm_unix_common::unix_proto::{HomeDirectoryInfo, TaskRequest, TaskResponse};
use kanidm_unix_resolver::unix_config::UnixdConfig;
use kanidm_utils_users::{get_effective_gid, get_effective_uid};
use libc::{lchown, umask};
use sketching::tracing_forest::traits::*;
use sketching::tracing_forest::util::*;
use sketching::tracing_forest::{self};
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::net::UnixStream;
use tokio::sync::broadcast;
use tokio::time;
@ -41,10 +49,10 @@ struct TaskCodec;
impl Decoder for TaskCodec {
type Error = io::Error;
type Item = TaskRequest;
type Item = TaskRequestFrame;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match serde_json::from_slice::<TaskRequest>(src) {
match serde_json::from_slice::<TaskRequestFrame>(src) {
Ok(msg) => {
// Clear the buffer for the next message.
src.clear();
@ -269,38 +277,173 @@ fn create_home_directory(
Ok(())
}
async fn handle_tasks(stream: UnixStream, cfg: &UnixdConfig) {
async fn handle_tasks(
stream: UnixStream,
ctl_broadcast_rx: &mut broadcast::Receiver<bool>,
shadow_broadcast_rx: &mut broadcast::Receiver<bool>,
cfg: &UnixdConfig,
) {
let mut reqs = Framed::new(stream, TaskCodec::new());
loop {
match reqs.next().await {
Some(Ok(TaskRequest::HomeDirectory(info))) => {
debug!("Received task -> HomeDirectory({:?})", info);
let resp = match create_home_directory(
&info,
cfg.home_prefix.as_ref(),
cfg.home_mount_prefix.as_ref(),
cfg.use_etc_skel,
cfg.selinux,
) {
Ok(()) => TaskResponse::Success,
Err(msg) => TaskResponse::Error(msg),
};
// Now send a result.
if let Err(e) = reqs.send(resp).await {
error!("Error -> {:?}", e);
return;
}
// All good, loop.
tokio::select! {
_ = ctl_broadcast_rx.recv() => {
break;
}
other => {
error!("Error -> {:?}", other);
return;
request = reqs.next() => {
match request {
Some(Ok(TaskRequestFrame {
id,
req: TaskRequest::HomeDirectory(info),
})) => {
debug!("Received task -> HomeDirectory({:?})", info);
let resp = match create_home_directory(
&info,
cfg.home_prefix.as_ref(),
cfg.home_mount_prefix.as_ref(),
cfg.use_etc_skel,
cfg.selinux,
) {
Ok(()) => TaskResponse::Success(id),
Err(msg) => TaskResponse::Error(msg),
};
// Now send a result.
if let Err(err) = reqs.send(resp).await {
error!(?err, "Unable to communicate to kanidm unixd");
break;
}
// All good, loop.
}
other => {
error!("Error -> {:?}", other);
break;
}
}
}
_ = shadow_broadcast_rx.recv() => {
// process etc shadow and send it here.
match process_etc_passwd_group().await {
Ok(etc_db) => {
let resp = TaskResponse::NotifyShadowChange(etc_db);
if let Err(err) = reqs.send(resp).await {
error!(?err, "Unable to communicate to kanidm unixd");
break;
}
}
Err(()) => {
error!("Unable to process etc db");
continue
}
}
}
}
}
info!("Disconnected from kanidm_unixd ...");
}
async fn process_etc_passwd_group() -> Result<EtcDb, ()> {
let mut file = File::open("/etc/passwd").await.map_err(|err| {
error!(?err);
})?;
let mut contents = vec![];
file.read_to_end(&mut contents).await.map_err(|err| {
error!(?err);
})?;
let users = parse_etc_passwd(contents.as_slice())
.map_err(|_| "Invalid passwd content")
.map_err(|err| {
error!(?err);
})?;
let mut file = File::open("/etc/shadow").await.map_err(|err| {
error!(?err);
})?;
let mut contents = vec![];
file.read_to_end(&mut contents).await.map_err(|err| {
error!(?err);
})?;
let shadow = parse_etc_shadow(contents.as_slice())
.map_err(|_| "Invalid passwd content")
.map_err(|err| {
error!(?err);
})?;
let mut file = File::open("/etc/group").await.map_err(|err| {
error!(?err);
})?;
let mut contents = vec![];
file.read_to_end(&mut contents).await.map_err(|err| {
error!(?err);
})?;
let groups = parse_etc_group(contents.as_slice())
.map_err(|_| "Invalid group content")
.map_err(|err| {
error!(?err);
})?;
Ok(EtcDb {
users,
shadow,
groups,
})
}
fn setup_shadow_inotify_watcher(
shadow_broadcast_tx: broadcast::Sender<bool>,
) -> Result<Debouncer<RecommendedWatcher, RecommendedCache>, ExitCode> {
let watcher = new_debouncer(
Duration::from_secs(1),
None,
move |event: Result<Vec<DebouncedEvent>, _>| {
let array_of_events = match event {
Ok(events) => events,
Err(array_errors) => {
for err in array_errors {
error!(?err, "inotify debounce error");
}
return;
}
};
let mut path_of_interest_was_changed = false;
for inode_event in array_of_events.iter() {
if !inode_event.kind.is_access()
&& inode_event.paths.iter().any(|path| {
path == Path::new("/etc/group")
|| path == Path::new("/etc/passwd")
|| path == Path::new("/etc/shadow")
})
{
debug!(?inode_event, "Handling inotify modification event");
path_of_interest_was_changed = true
}
}
if path_of_interest_was_changed {
let _ = shadow_broadcast_tx.send(true);
} else {
debug!(?array_of_events, "IGNORED");
}
},
)
.and_then(|mut debouncer| {
debouncer
.watch(Path::new("/etc"), RecursiveMode::Recursive)
.map(|()| debouncer)
});
watcher.map_err(|err| {
error!(?err, "Failed to setup inotify");
ExitCode::FAILURE
})
}
#[tokio::main(flavor = "current_thread")]
@ -369,9 +512,19 @@ async fn main() -> ExitCode {
let task_sock_path = cfg.task_sock_path.clone();
debug!("Attempting to use {} ...", task_sock_path);
// This is the startup/shutdown control channel
let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
let mut d_broadcast_rx = broadcast_tx.subscribe();
// This is to broadcast when we need to reload the shadow
// files.
let (shadow_broadcast_tx, mut shadow_broadcast_rx) = broadcast::channel(4);
let watcher = match setup_shadow_inotify_watcher(shadow_broadcast_tx.clone()) {
Ok(w) => w,
Err(exit) => return exit,
};
let server = tokio::spawn(async move {
loop {
info!("Attempting to connect to kanidm_unixd ...");
@ -384,16 +537,14 @@ async fn main() -> ExitCode {
match connect_res {
Ok(stream) => {
info!("Found kanidm_unixd, waiting for tasks ...");
// Immediately trigger that we should reload the shadow files
let _ = shadow_broadcast_tx.send(true);
// Yep! Now let the main handler do it's job.
// If it returns (dc, etc, then we loop and try again).
tokio::select! {
_ = d_broadcast_rx.recv() => {
break;
}
_ = handle_tasks(stream, &cfg) => {
continue;
}
}
handle_tasks(stream, &mut d_broadcast_rx, &mut shadow_broadcast_rx, &cfg).await;
continue;
}
Err(e) => {
debug!("\\---> {:?}", e);
@ -403,8 +554,8 @@ async fn main() -> ExitCode {
}
}
}
}
}
} // select
} // loop
});
info!("Server started ...");
@ -462,6 +613,9 @@ async fn main() -> ExitCode {
error!("Unable to shutdown workers {:?}", e);
}
debug!("Dropping inotify watcher ...");
drop(watcher);
let _ = server.await;
ExitCode::SUCCESS
})

View file

@ -147,12 +147,7 @@ impl SystemProvider {
})
}
pub async fn reload(
&self,
users: Vec<EtcUser>,
shadow: Option<Vec<EtcShadow>>,
groups: Vec<EtcGroup>,
) {
pub async fn reload(&self, users: Vec<EtcUser>, shadow: Vec<EtcShadow>, groups: Vec<EtcGroup>) {
let mut system_ids_txn = self.inner.lock().await;
system_ids_txn.users.clear();
system_ids_txn.user_list.clear();
@ -160,52 +155,51 @@ impl SystemProvider {
system_ids_txn.group_list.clear();
system_ids_txn.shadow.clear();
system_ids_txn.shadow_enabled = shadow.is_some();
system_ids_txn.shadow_enabled = !shadow.is_empty();
if let Some(shadow) = shadow {
let s_iter = shadow.into_iter().filter_map(|shadow_entry| {
let EtcShadow {
let s_iter = shadow.into_iter().filter_map(|shadow_entry| {
let EtcShadow {
name,
password,
epoch_change_days,
days_min_password_age,
days_max_password_age,
days_warning_period,
days_inactivity_period,
epoch_expire_date,
flag_reserved: _,
} = shadow_entry;
if password.is_valid() {
let aging_policy = epoch_change_days.map(|change_days| {
AgingPolicy::new(
change_days,
days_min_password_age,
days_max_password_age,
days_warning_period,
days_inactivity_period,
)
});
let expiration_date = epoch_expire_date
.map(|expire| OffsetDateTime::UNIX_EPOCH + time::Duration::days(expire));
Some((
name,
password,
epoch_change_days,
days_min_password_age,
days_max_password_age,
days_warning_period,
days_inactivity_period,
epoch_expire_date,
flag_reserved: _,
} = shadow_entry;
Arc::new(Shadow {
crypt_pw: password,
aging_policy,
expiration_date,
}),
))
} else {
// Invalid password, skip the account
debug!(?name, "account password is invalid.");
None
}
});
if password.is_valid() {
let aging_policy = epoch_change_days.map(|change_days| {
AgingPolicy::new(
change_days,
days_min_password_age,
days_max_password_age,
days_warning_period,
days_inactivity_period,
)
});
let expiration_date = epoch_expire_date
.map(|expire| OffsetDateTime::UNIX_EPOCH + time::Duration::days(expire));
Some((
name,
Arc::new(Shadow {
crypt_pw: password,
aging_policy,
expiration_date,
}),
))
} else {
// Invalid password, skip the account
None
}
});
system_ids_txn.shadow.extend(s_iter)
};
system_ids_txn.shadow.extend(s_iter);
for group in groups {
let name = Id::Name(group.name.clone());

View file

@ -208,10 +208,11 @@ impl Resolver {
nxcache_txn.get(id).copied()
}
#[instrument(level = "info", skip_all)]
pub async fn reload_system_identities(
&self,
users: Vec<EtcUser>,
shadow: Option<Vec<EtcShadow>>,
shadow: Vec<EtcShadow>,
groups: Vec<EtcGroup>,
) {
self.system_provider.reload(users, shadow, groups).await

View file

@ -776,7 +776,7 @@ async fn test_cache_nxset_account() {
homedir: Default::default(),
shell: Default::default(),
}],
None,
vec![],
vec![],
)
.await;
@ -832,7 +832,7 @@ async fn test_cache_nxset_group() {
cachelayer
.reload_system_identities(
vec![],
None,
vec![],
vec![EtcGroup {
name: "testgroup1".to_string(),
// Important! We set the GID to differ from what kanidm stores so we can
@ -940,7 +940,7 @@ async fn test_cache_authenticate_system_account() {
shell: Default::default(),
}
],
Some(vec![
vec![
EtcShadow {
name: "testaccount1".to_string(),
// The very secure password, "a".
@ -965,7 +965,7 @@ async fn test_cache_authenticate_system_account() {
epoch_expire_date: Some(380),
flag_reserved: None
},
]),
],
vec![],
)
.await;
@ -1111,7 +1111,7 @@ async fn test_cache_extend_group_members() {
homedir: Default::default(),
shell: Default::default(),
}],
None,
vec![],
vec![EtcGroup {
// This group is configured to allow extension from
// the group "testgroup1"