mirror of
https://github.com/kanidm/kanidm.git
synced 2025-02-23 20:47:01 +01:00
Fixes #2601 Fixes #393 - gid numbers can be part of the systemd nspawn range. Previously we allocated gid numbers based on the fact that uid_t is a u32, so we allowed 65536 through u32::max. However, there are two major issues with this that I didn't realise. The first is that anything greater than i32::max (2147483648) can confuse the linux kernel. The second is that systemd allocates 524288 through 1879048191 to itself for nspawn. This leaves with with only a few usable ranges. 1000 through 60000 60578 through 61183 65520 through 65533 65536 through 524287 1879048192 through 2147483647 The last range being the largest is the natural and obvious area we should allocate from. This happens to nicely fall in the pattern of 0x7000_0000 through 0x7fff_ffff which allows us to take the last 24 bits of the uuid then applying a bit mask we can ensure that we end up in this range. There are now two major issues. We have now changed our validation code to enforce a tighter range, but we may have already allocated users into these ranges. External systems like FreeIPA allocated uid/gid numbers with reckless abandon directly into these ranges. As a result we need to make two concessions. We *secretly* still allow manual allocation of id's from 65536 through to 1879048191 which is the nspawn container range. This happens to be the range that freeipa allocates into. We will never generate an ID in this range, but we will allow it to ease imports since the users of these ranges already have shown they 'don't care' about that range. This also affects SCIM imports for longer term migrations. Second is id's that fall outside the valid ranges. In the extremely unlikely event this has occurred, a startup migration has been added to regenerate these id values for affected entries to prevent upgrade issues. An accidental effect of this is freeing up the range 524288 to 1879048191 for other subuid uses.
388 lines
13 KiB
Rust
388 lines
13 KiB
Rust
use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
|
|
use crate::repl::ReplCtrl;
|
|
use crate::CoreAction;
|
|
use bytes::{BufMut, BytesMut};
|
|
use futures::{SinkExt, StreamExt};
|
|
use kanidm_lib_crypto::serialise::x509b64;
|
|
use kanidm_utils_users::get_current_uid;
|
|
use serde::{Deserialize, Serialize};
|
|
use std::error::Error;
|
|
use std::io;
|
|
use std::path::Path;
|
|
use tokio::net::{UnixListener, UnixStream};
|
|
use tokio::sync::broadcast;
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::oneshot;
|
|
use tokio_util::codec::{Decoder, Encoder, Framed};
|
|
use tracing::{span, Instrument, Level};
|
|
use uuid::Uuid;
|
|
|
|
pub use kanidm_proto::internal::{
|
|
DomainInfo as ProtoDomainInfo, DomainUpgradeCheckReport as ProtoDomainUpgradeCheckReport,
|
|
DomainUpgradeCheckStatus as ProtoDomainUpgradeCheckStatus,
|
|
};
|
|
|
|
#[derive(Serialize, Deserialize, Debug)]
|
|
pub enum AdminTaskRequest {
|
|
RecoverAccount { name: String },
|
|
ShowReplicationCertificate,
|
|
RenewReplicationCertificate,
|
|
RefreshReplicationConsumer,
|
|
DomainShow,
|
|
DomainUpgradeCheck,
|
|
DomainRaise,
|
|
DomainRemigrate { level: Option<u32> },
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize, Debug)]
|
|
pub enum AdminTaskResponse {
|
|
RecoverAccount {
|
|
password: String,
|
|
},
|
|
ShowReplicationCertificate {
|
|
cert: String,
|
|
},
|
|
DomainUpgradeCheck {
|
|
report: ProtoDomainUpgradeCheckReport,
|
|
},
|
|
DomainRaise {
|
|
level: u32,
|
|
},
|
|
DomainShow {
|
|
domain_info: ProtoDomainInfo,
|
|
},
|
|
Success,
|
|
Error,
|
|
}
|
|
|
|
#[derive(Default)]
|
|
pub struct ClientCodec;
|
|
|
|
impl Decoder for ClientCodec {
|
|
type Error = io::Error;
|
|
type Item = AdminTaskResponse;
|
|
|
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
|
trace!("Attempting to decode request ...");
|
|
match serde_json::from_slice::<AdminTaskResponse>(src) {
|
|
Ok(msg) => {
|
|
// Clear the buffer for the next message.
|
|
src.clear();
|
|
Ok(Some(msg))
|
|
}
|
|
_ => Ok(None),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Encoder<AdminTaskRequest> for ClientCodec {
|
|
type Error = io::Error;
|
|
|
|
fn encode(&mut self, msg: AdminTaskRequest, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
|
trace!("Attempting to send response -> {:?} ...", msg);
|
|
let data = serde_json::to_vec(&msg).map_err(|e| {
|
|
error!("socket encoding error -> {:?}", e);
|
|
io::Error::new(io::ErrorKind::Other, "JSON encode error")
|
|
})?;
|
|
dst.put(data.as_slice());
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct ServerCodec;
|
|
|
|
impl Decoder for ServerCodec {
|
|
type Error = io::Error;
|
|
type Item = AdminTaskRequest;
|
|
|
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
|
trace!("Attempting to decode request ...");
|
|
match serde_json::from_slice::<AdminTaskRequest>(src) {
|
|
Ok(msg) => {
|
|
// Clear the buffer for the next message.
|
|
src.clear();
|
|
Ok(Some(msg))
|
|
}
|
|
_ => Ok(None),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Encoder<AdminTaskResponse> for ServerCodec {
|
|
type Error = io::Error;
|
|
|
|
fn encode(&mut self, msg: AdminTaskResponse, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
|
trace!("Attempting to send response -> {:?} ...", msg);
|
|
let data = serde_json::to_vec(&msg).map_err(|e| {
|
|
error!("socket encoding error -> {:?}", e);
|
|
io::Error::new(io::ErrorKind::Other, "JSON encode error")
|
|
})?;
|
|
dst.put(data.as_slice());
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
pub(crate) struct AdminActor;
|
|
|
|
impl AdminActor {
|
|
pub async fn create_admin_sock(
|
|
sock_path: &str,
|
|
server_rw: &'static QueryServerWriteV1,
|
|
server_ro: &'static QueryServerReadV1,
|
|
mut broadcast_rx: broadcast::Receiver<CoreAction>,
|
|
repl_ctrl_tx: Option<mpsc::Sender<ReplCtrl>>,
|
|
) -> Result<tokio::task::JoinHandle<()>, ()> {
|
|
debug!("🧹 Cleaning up sockets from previous invocations");
|
|
rm_if_exist(sock_path);
|
|
|
|
// Setup the unix socket.
|
|
let listener = match UnixListener::bind(sock_path) {
|
|
Ok(l) => l,
|
|
Err(e) => {
|
|
error!(err = ?e, "Failed to bind UNIX socket {}", sock_path);
|
|
return Err(());
|
|
}
|
|
};
|
|
|
|
// what is the uid we are running as?
|
|
let cuid = get_current_uid();
|
|
|
|
let handle = tokio::spawn(async move {
|
|
loop {
|
|
tokio::select! {
|
|
Ok(action) = broadcast_rx.recv() => {
|
|
match action {
|
|
CoreAction::Shutdown => break,
|
|
}
|
|
}
|
|
accept_res = listener.accept() => {
|
|
match accept_res {
|
|
Ok((socket, _addr)) => {
|
|
// Assert that the incoming connection is from root or
|
|
// our own uid.
|
|
// ⚠️ This underpins the security of this socket ⚠️
|
|
if let Ok(ucred) = socket.peer_cred() {
|
|
let incoming_uid = ucred.uid();
|
|
if incoming_uid == 0 || incoming_uid == cuid {
|
|
// all good!
|
|
info!(pid = ?ucred.pid(), "Allowing admin socket access");
|
|
} else {
|
|
warn!(%incoming_uid, "unauthorised user");
|
|
continue;
|
|
}
|
|
} else {
|
|
error!("unable to determine peer credentials");
|
|
continue;
|
|
};
|
|
|
|
// spawn the worker.
|
|
let task_repl_ctrl_tx = repl_ctrl_tx.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = handle_client(socket, server_rw, server_ro, task_repl_ctrl_tx).await {
|
|
error!(err = ?e, "admin client error");
|
|
}
|
|
});
|
|
}
|
|
Err(e) => {
|
|
warn!(err = ?e, "admin socket accept error");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
info!("Stopped {}", super::TaskName::AdminSocket);
|
|
});
|
|
Ok(handle)
|
|
}
|
|
}
|
|
|
|
fn rm_if_exist(p: &str) {
|
|
if Path::new(p).exists() {
|
|
debug!("Removing requested file {:?}", p);
|
|
let _ = std::fs::remove_file(p).map_err(|e| {
|
|
error!(
|
|
"Failure while attempting to attempting to remove {:?} -> {:?}",
|
|
p, e
|
|
);
|
|
});
|
|
} else {
|
|
debug!("Path {:?} doesn't exist, not attempting to remove.", p);
|
|
}
|
|
}
|
|
|
|
async fn show_replication_certificate(ctrl_tx: &mut mpsc::Sender<ReplCtrl>) -> AdminTaskResponse {
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
if ctrl_tx
|
|
.send(ReplCtrl::GetCertificate { respond: tx })
|
|
.await
|
|
.is_err()
|
|
{
|
|
error!("replication control channel has shutdown");
|
|
return AdminTaskResponse::Error;
|
|
}
|
|
|
|
match rx.await {
|
|
Ok(cert) => x509b64::cert_to_string(&cert)
|
|
.map(|cert| AdminTaskResponse::ShowReplicationCertificate { cert })
|
|
.unwrap_or(AdminTaskResponse::Error),
|
|
Err(_) => {
|
|
error!("replication control channel did not respond with certificate.");
|
|
AdminTaskResponse::Error
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn renew_replication_certificate(ctrl_tx: &mut mpsc::Sender<ReplCtrl>) -> AdminTaskResponse {
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
if ctrl_tx
|
|
.send(ReplCtrl::RenewCertificate { respond: tx })
|
|
.await
|
|
.is_err()
|
|
{
|
|
error!("replication control channel has shutdown");
|
|
return AdminTaskResponse::Error;
|
|
}
|
|
|
|
match rx.await {
|
|
Ok(success) => {
|
|
if success {
|
|
show_replication_certificate(ctrl_tx).await
|
|
} else {
|
|
error!("replication control channel indicated that certificate renewal failed.");
|
|
AdminTaskResponse::Error
|
|
}
|
|
}
|
|
Err(_) => {
|
|
error!("replication control channel did not respond with renewal status.");
|
|
AdminTaskResponse::Error
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn replication_consumer_refresh(ctrl_tx: &mut mpsc::Sender<ReplCtrl>) -> AdminTaskResponse {
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
if ctrl_tx
|
|
.send(ReplCtrl::RefreshConsumer { respond: tx })
|
|
.await
|
|
.is_err()
|
|
{
|
|
error!("replication control channel has shutdown");
|
|
return AdminTaskResponse::Error;
|
|
}
|
|
|
|
match rx.await {
|
|
Ok(mut refresh_rx) => {
|
|
if let Some(()) = refresh_rx.recv().await {
|
|
info!("Replication refresh success");
|
|
AdminTaskResponse::Success
|
|
} else {
|
|
error!("Replication refresh failed. Please inspect the logs.");
|
|
AdminTaskResponse::Error
|
|
}
|
|
}
|
|
Err(_) => {
|
|
error!("replication control channel did not respond with refresh status.");
|
|
AdminTaskResponse::Error
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn handle_client(
|
|
sock: UnixStream,
|
|
server_rw: &'static QueryServerWriteV1,
|
|
server_ro: &'static QueryServerReadV1,
|
|
mut repl_ctrl_tx: Option<mpsc::Sender<ReplCtrl>>,
|
|
) -> Result<(), Box<dyn Error>> {
|
|
debug!("Accepted admin socket connection");
|
|
|
|
let mut reqs = Framed::new(sock, ServerCodec);
|
|
|
|
trace!("Waiting for requests ...");
|
|
while let Some(Ok(req)) = reqs.next().await {
|
|
// Setup the logging span
|
|
let eventid = Uuid::new_v4();
|
|
let nspan = span!(Level::INFO, "handle_admin_client_request", uuid = ?eventid);
|
|
|
|
let resp = async {
|
|
match req {
|
|
AdminTaskRequest::RecoverAccount { name } => {
|
|
match server_rw.handle_admin_recover_account(name, eventid).await {
|
|
Ok(password) => AdminTaskResponse::RecoverAccount { password },
|
|
Err(e) => {
|
|
error!(err = ?e, "error during recover-account");
|
|
AdminTaskResponse::Error
|
|
}
|
|
}
|
|
}
|
|
AdminTaskRequest::ShowReplicationCertificate => match repl_ctrl_tx.as_mut() {
|
|
Some(ctrl_tx) => show_replication_certificate(ctrl_tx).await,
|
|
None => {
|
|
error!("replication not configured, unable to display certificate.");
|
|
AdminTaskResponse::Error
|
|
}
|
|
},
|
|
AdminTaskRequest::RenewReplicationCertificate => match repl_ctrl_tx.as_mut() {
|
|
Some(ctrl_tx) => renew_replication_certificate(ctrl_tx).await,
|
|
None => {
|
|
error!("replication not configured, unable to renew certificate.");
|
|
AdminTaskResponse::Error
|
|
}
|
|
},
|
|
AdminTaskRequest::RefreshReplicationConsumer => match repl_ctrl_tx.as_mut() {
|
|
Some(ctrl_tx) => replication_consumer_refresh(ctrl_tx).await,
|
|
None => {
|
|
error!("replication not configured, unable to refresh consumer.");
|
|
AdminTaskResponse::Error
|
|
}
|
|
},
|
|
|
|
AdminTaskRequest::DomainShow => match server_ro.handle_domain_show(eventid).await {
|
|
Ok(domain_info) => AdminTaskResponse::DomainShow { domain_info },
|
|
Err(e) => {
|
|
error!(err = ?e, "error during domain show");
|
|
AdminTaskResponse::Error
|
|
}
|
|
},
|
|
AdminTaskRequest::DomainUpgradeCheck => {
|
|
match server_ro.handle_domain_upgrade_check(eventid).await {
|
|
Ok(report) => AdminTaskResponse::DomainUpgradeCheck { report },
|
|
Err(e) => {
|
|
error!(err = ?e, "error during domain upgrade checkr");
|
|
AdminTaskResponse::Error
|
|
}
|
|
}
|
|
}
|
|
AdminTaskRequest::DomainRaise => match server_rw.handle_domain_raise(eventid).await
|
|
{
|
|
Ok(level) => AdminTaskResponse::DomainRaise { level },
|
|
Err(e) => {
|
|
error!(err = ?e, "error during domain raise");
|
|
AdminTaskResponse::Error
|
|
}
|
|
},
|
|
AdminTaskRequest::DomainRemigrate { level } => {
|
|
match server_rw.handle_domain_remigrate(level, eventid).await {
|
|
Ok(()) => AdminTaskResponse::Success,
|
|
Err(e) => {
|
|
error!(err = ?e, "error during domain remigrate");
|
|
AdminTaskResponse::Error
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
.instrument(nspan)
|
|
.await;
|
|
|
|
reqs.send(resp).await?;
|
|
reqs.flush().await?;
|
|
}
|
|
|
|
debug!("Disconnecting client ...");
|
|
Ok(())
|
|
}
|