1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use std::error::Error;
use std::io::{Error as IoError, ErrorKind, Read, Write};
use std::os::unix::net::UnixStream;
use std::time::{Duration, SystemTime};

use crate::unix_proto::{ClientRequest, ClientResponse};

pub fn call_daemon_blocking(
    path: &str,
    req: &ClientRequest,
    timeout: u64,
) -> Result<ClientResponse, Box<dyn Error>> {
    let timeout = Duration::from_secs(timeout);

    let mut stream = UnixStream::connect(path)
        .and_then(|socket| socket.set_read_timeout(Some(timeout)).map(|_| socket))
        .and_then(|socket| socket.set_write_timeout(Some(timeout)).map(|_| socket))
        .map_err(|e| {
            error!("stream setup error -> {:?}", e);
            e
        })
        .map_err(Box::new)?;

    let data = serde_json::to_vec(&req).map_err(|e| {
        error!("socket encoding error -> {:?}", e);
        Box::new(IoError::new(ErrorKind::Other, "JSON encode error"))
    })?;
    //  .map_err(Box::new)?;

    stream
        .write_all(data.as_slice())
        .and_then(|_| stream.flush())
        .map_err(|e| {
            error!("stream write error -> {:?}", e);
            e
        })
        .map_err(Box::new)?;

    // Now wait on the response.
    let start = SystemTime::now();
    let mut read_started = false;
    let mut data = Vec::with_capacity(1024);
    let mut counter = 0;

    loop {
        let mut buffer = [0; 1024];
        let durr = SystemTime::now().duration_since(start).map_err(Box::new)?;
        if durr > timeout {
            error!("Socket timeout");
            // timed out, not enough activity.
            break;
        }
        // Would be a lot easier if we had peek ...
        // https://github.com/rust-lang/rust/issues/76923
        match stream.read(&mut buffer) {
            Ok(0) => {
                if read_started {
                    debug!("read_started true, we have completed");
                    // We're done, no more bytes.
                    break;
                } else {
                    debug!("Waiting ...");
                    // Still can wait ...
                    continue;
                }
            }
            Ok(count) => {
                data.extend_from_slice(&buffer);
                counter += count;
                if count == 1024 {
                    debug!("Filled 1024 bytes, looping ...");
                    // We have filled the buffer, we need to copy and loop again.
                    read_started = true;
                    continue;
                } else {
                    debug!("Filled {} bytes, complete", count);
                    // We have a partial read, so we are complete.
                    break;
                }
            }
            Err(e) => {
                error!("Steam read failure -> {:?}", e);
                // Failure!
                return Err(Box::new(e));
            }
        }
    }

    // Extend from slice fills with 0's, so we need to truncate now.
    data.truncate(counter);

    // Now attempt to decode.
    let cr = serde_json::from_slice::<ClientResponse>(data.as_slice()).map_err(|e| {
        error!("socket encoding error -> {:?}", e);
        Box::new(IoError::new(ErrorKind::Other, "JSON decode error"))
    })?;

    Ok(cr)
}