1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
use std::error::Error;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::{Read, Write};
use std::os::unix::net::UnixStream;

use std::time::{Duration, SystemTime};

use crate::unix_proto::{ClientRequest, ClientResponse};

const TIMEOUT: u64 = 2000;

pub fn call_daemon_blocking(
    path: &str,
    req: &ClientRequest,
) -> Result<ClientResponse, Box<dyn Error>> {
    let mut stream = UnixStream::connect(path)
        .and_then(|socket| {
            socket
                .set_read_timeout(Some(Duration::from_millis(TIMEOUT)))
                .map(|_| socket)
        })
        .and_then(|socket| {
            socket
                .set_write_timeout(Some(Duration::from_millis(TIMEOUT)))
                .map(|_| socket)
        })
        .map_err(|e| {
            error!("stream setup error -> {:?}", e);
            e
        })
        .map_err(Box::new)?;

    let data = serde_cbor::to_vec(&req).map_err(|e| {
        error!("socket encoding error -> {:?}", e);
        Box::new(IoError::new(ErrorKind::Other, "CBOR encode error"))
    })?;
    //  .map_err(Box::new)?;

    stream
        .write_all(data.as_slice())
        .and_then(|_| stream.flush())
        .map_err(|e| {
            error!("stream write error -> {:?}", e);
            e
        })
        .map_err(Box::new)?;

    // Now wait on the response.
    let start = SystemTime::now();
    let timeout = Duration::from_millis(TIMEOUT);
    let mut read_started = false;
    let mut data = Vec::with_capacity(1024);
    let mut counter = 0;

    loop {
        let mut buffer = [0; 1024];
        let durr = SystemTime::now().duration_since(start).map_err(Box::new)?;
        if durr > timeout {
            error!("Socket timeout");
            // timed out, not enough activity.
            break;
        }
        // Would be a lot easier if we had peek ...
        // https://github.com/rust-lang/rust/issues/76923
        match stream.read(&mut buffer) {
            Ok(0) => {
                if read_started {
                    debug!("read_started true, we have completed");
                    // We're done, no more bytes.
                    break;
                } else {
                    debug!("Waiting ...");
                    // Still can wait ...
                    continue;
                }
            }
            Ok(count) => {
                data.extend_from_slice(&buffer);
                counter += count;
                if count == 1024 {
                    debug!("Filled 1024 bytes, looping ...");
                    // We have filled the buffer, we need to copy and loop again.
                    read_started = true;
                    continue;
                } else {
                    debug!("Filled {} bytes, complete", count);
                    // We have a partial read, so we are complete.
                    break;
                }
            }
            Err(e) => {
                error!("Steam read failure -> {:?}", e);
                // Failure!
                return Err(Box::new(e));
            }
        }
    }

    // Extend from slice fills with 0's, so we need to truncate now.
    data.truncate(counter);

    // Now attempt to decode.
    let cr = serde_cbor::from_slice::<ClientResponse>(data.as_slice()).map_err(|e| {
        error!("socket encoding error -> {:?}", e);
        Box::new(IoError::new(ErrorKind::Other, "CBOR decode error"))
    })?;

    Ok(cr)
}