1#![allow(non_camel_case_types, dead_code)]
2
3use std::io;
4use std::os::unix::io::RawFd;
5use std::slice;
6use std::time::Duration;
7
8use nix::libc::c_int;
9use nix::poll::{PollFd, PollFlags};
10#[cfg(target_os = "linux")]
11use nix::sys::signal::SigSet;
12#[cfg(any(target_os = "linux", test))]
13use nix::sys::time::TimeSpec;
14
15pub fn wait_read_fd(fd: RawFd, timeout: Duration) -> io::Result<()> {
16 wait_fd(fd, PollFlags::POLLIN, timeout)
17}
18
19pub fn wait_write_fd(fd: RawFd, timeout: Duration) -> io::Result<()> {
20 wait_fd(fd, PollFlags::POLLOUT, timeout)
21}
22
23fn wait_fd(fd: RawFd, events: PollFlags, timeout: Duration) -> io::Result<()> {
24 use nix::errno::Errno::{EIO, EPIPE};
25
26 let mut fd = PollFd::new(fd, events);
27
28 let wait = match poll_clamped(&mut fd, timeout) {
29 Ok(r) => r,
30 Err(e) => return Err(io::Error::from(crate::Error::from(e))),
31 };
32 if wait != 1 {
35 return Err(io::Error::new(
36 io::ErrorKind::TimedOut,
37 "Operation timed out",
38 ));
39 }
40
41 match fd.revents() {
43 Some(e) if e == events => return Ok(()),
44 Some(e) if e.contains(PollFlags::POLLHUP) || e.contains(PollFlags::POLLNVAL) => {
46 return Err(io::Error::new(io::ErrorKind::BrokenPipe, EPIPE.desc()));
47 }
48 Some(_) | None => (),
49 }
50
51 Err(io::Error::new(io::ErrorKind::Other, EIO.desc()))
52}
53
54#[cfg(target_os = "linux")]
57fn poll_clamped(fd: &mut PollFd, timeout: Duration) -> nix::Result<c_int> {
58 let spec = clamped_time_spec(timeout);
59 nix::poll::ppoll(slice::from_mut(fd), Some(spec), Some(SigSet::empty()))
60}
61
62#[cfg(any(target_os = "linux", test))]
63#[cfg_attr(target_env = "musl", allow(deprecated))]
69fn clamped_time_spec(duration: Duration) -> TimeSpec {
70 use nix::libc::c_long;
71 use nix::sys::time::time_t;
72
73 let secs_limit = time_t::MAX as u64;
77 let secs = duration.as_secs();
78 if secs <= secs_limit {
79 TimeSpec::new(secs as time_t, duration.subsec_nanos() as c_long)
80 } else {
81 TimeSpec::new(time_t::MAX, 999_999_999)
82 }
83}
84
85#[cfg(not(target_os = "linux"))]
88fn poll_clamped(fd: &mut PollFd, timeout: Duration) -> nix::Result<c_int> {
89 let millis = clamped_millis_c_int(timeout);
90 nix::poll::poll(slice::from_mut(fd), millis)
91}
92
93#[cfg(any(not(target_os = "linux"), test))]
94fn clamped_millis_c_int(duration: Duration) -> c_int {
95 let secs_limit = (c_int::MAX as u64) / 1000;
96 let secs = duration.as_secs();
97
98 if secs <= secs_limit {
99 secs as c_int * 1000 + duration.subsec_millis() as c_int
100 } else {
101 c_int::MAX
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108 use crate::tests::timeout::MONOTONIC_DURATIONS;
109
110 #[test]
111 fn clamped_millis_c_int_is_monotonic() {
112 let mut last = clamped_millis_c_int(Duration::ZERO);
113
114 for (i, d) in MONOTONIC_DURATIONS.iter().enumerate() {
115 let next = clamped_millis_c_int(*d);
116 assert!(
117 next >= last,
118 "{next} >= {last} failed for {d:?} at index {i}"
119 );
120 last = next;
121 }
122 }
123
124 #[test]
125 fn clamped_millis_c_int_zero_is_zero() {
126 assert_eq!(0, clamped_millis_c_int(Duration::ZERO));
127 }
128
129 #[test]
130 fn clamped_time_spec_is_monotonic() {
131 let mut last = clamped_time_spec(Duration::ZERO);
132
133 for (i, d) in MONOTONIC_DURATIONS.iter().enumerate() {
134 let next = clamped_time_spec(*d);
135 assert!(
136 next >= last,
137 "{next} >= {last} failed for {d:?} at index {i}"
138 );
139 last = next;
140 }
141 }
142
143 #[test]
144 fn clamped_time_spec_zero_is_zero() {
145 let spec = clamped_time_spec(Duration::ZERO);
146 assert_eq!(0, spec.tv_sec());
147 assert_eq!(0, spec.tv_nsec());
148 }
149}