2020-02-15 01:27:25 +01:00
|
|
|
use std::error::Error;
|
2022-10-01 08:08:51 +02:00
|
|
|
use std::io::{Error as IoError, ErrorKind};
|
|
|
|
|
|
|
|
use bytes::{BufMut, BytesMut};
|
|
|
|
use futures::{SinkExt, StreamExt};
|
2020-02-15 01:27:25 +01:00
|
|
|
use tokio::net::UnixStream;
|
2021-03-13 03:33:15 +01:00
|
|
|
// use tokio::runtime::Builder;
|
2023-07-28 02:48:56 +02:00
|
|
|
use tokio::time::{self, Duration};
|
2020-02-15 01:27:25 +01:00
|
|
|
use tokio_util::codec::Framed;
|
|
|
|
use tokio_util::codec::{Decoder, Encoder};
|
|
|
|
|
|
|
|
use crate::unix_proto::{ClientRequest, ClientResponse};
|
|
|
|
|
|
|
|
struct ClientCodec;
|
|
|
|
|
|
|
|
impl Decoder for ClientCodec {
|
|
|
|
type Error = IoError;
|
2022-10-01 08:08:51 +02:00
|
|
|
type Item = ClientResponse;
|
2020-02-15 01:27:25 +01:00
|
|
|
|
|
|
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
2023-01-25 07:09:54 +01:00
|
|
|
match serde_json::from_slice::<ClientResponse>(src) {
|
2020-02-15 01:27:25 +01:00
|
|
|
Ok(msg) => {
|
|
|
|
// Clear the buffer for the next message.
|
|
|
|
src.clear();
|
|
|
|
Ok(Some(msg))
|
|
|
|
}
|
|
|
|
_ => Ok(None),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-11 02:32:56 +02:00
|
|
|
impl Encoder<ClientRequest> for ClientCodec {
|
2020-02-15 01:27:25 +01:00
|
|
|
type Error = IoError;
|
|
|
|
|
|
|
|
fn encode(&mut self, msg: ClientRequest, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
2022-05-24 02:49:34 +02:00
|
|
|
let data = serde_json::to_vec(&msg).map_err(|e| {
|
2020-02-15 01:27:25 +01:00
|
|
|
error!("socket encoding error -> {:?}", e);
|
2022-05-24 02:49:34 +02:00
|
|
|
IoError::new(ErrorKind::Other, "JSON encode error")
|
2020-02-15 01:27:25 +01:00
|
|
|
})?;
|
|
|
|
debug!("Attempting to send request -> {:?} ...", data);
|
|
|
|
dst.put(data.as_slice());
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ClientCodec {
|
|
|
|
fn new() -> Self {
|
|
|
|
ClientCodec
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-28 02:48:56 +02:00
|
|
|
async fn call_daemon_inner(
|
|
|
|
path: &str,
|
|
|
|
req: ClientRequest,
|
|
|
|
) -> Result<ClientResponse, Box<dyn Error>> {
|
2023-06-19 07:02:09 +02:00
|
|
|
trace!(?path, ?req);
|
2020-02-15 01:27:25 +01:00
|
|
|
let stream = UnixStream::connect(path).await?;
|
2023-06-19 07:02:09 +02:00
|
|
|
trace!("connected");
|
2020-02-15 01:27:25 +01:00
|
|
|
|
|
|
|
let mut reqs = Framed::new(stream, ClientCodec::new());
|
|
|
|
|
|
|
|
reqs.send(req).await?;
|
|
|
|
reqs.flush().await?;
|
2023-06-19 07:02:09 +02:00
|
|
|
trace!("flushed, waiting ...");
|
2020-02-15 01:27:25 +01:00
|
|
|
|
|
|
|
match reqs.next().await {
|
|
|
|
Some(Ok(res)) => {
|
|
|
|
debug!("Response -> {:?}", res);
|
|
|
|
Ok(res)
|
|
|
|
}
|
|
|
|
_ => {
|
2021-08-08 01:54:21 +02:00
|
|
|
error!("Error making request to kanidm_unixd");
|
2020-02-15 01:27:25 +01:00
|
|
|
Err(Box::new(IoError::new(ErrorKind::Other, "oh no!")))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2023-07-28 02:48:56 +02:00
|
|
|
|
|
|
|
/// Makes a call to kanidm_unixd via a unix socket at `path`
|
|
|
|
pub async fn call_daemon(
|
|
|
|
path: &str,
|
|
|
|
req: ClientRequest,
|
|
|
|
timeout: u64,
|
|
|
|
) -> Result<ClientResponse, Box<dyn Error>> {
|
2023-08-21 06:14:32 +02:00
|
|
|
let sleep = time::sleep(Duration::from_secs(timeout));
|
2023-07-28 02:48:56 +02:00
|
|
|
tokio::pin!(sleep);
|
|
|
|
|
|
|
|
tokio::select! {
|
|
|
|
_ = &mut sleep => {
|
2023-08-21 06:14:32 +02:00
|
|
|
error!(?timeout, "Timed out making request to kanidm_unixd");
|
2023-07-28 02:48:56 +02:00
|
|
|
Err(Box::new(IoError::new(ErrorKind::Other, "timeout")))
|
|
|
|
}
|
|
|
|
res = call_daemon_inner(path, req) => {
|
|
|
|
res
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|