Auto merge of #140165 - ChrisDenton:rollup-on2dpr5, r=ChrisDenton · rust-lang/rust@1a5bf12 (original) (raw)
`@@ -21,16 +21,26 @@ impl<'a> DeadlineQueue<'a> {
`
21
21
`Self { queue: VecDeque::with_capacity(capacity) }
`
22
22
`}
`
23
23
``
``
24
`` +
/// All calls to [Instant::now
] go through this wrapper method.
``
``
25
`+
/// This makes it easier to find all places that read the current time.
`
``
26
`+
fn now(&self) -> Instant {
`
``
27
`+
Instant::now()
`
``
28
`+
}
`
``
29
+
24
30
`pub(crate) fn push(&mut self, id: TestId, test: &'a CollectedTest) {
`
25
``
`-
let deadline = Instant::now() + Duration::from_secs(TEST_WARN_TIMEOUT_S);
`
``
31
`+
let deadline = self.now() + Duration::from_secs(TEST_WARN_TIMEOUT_S);
`
``
32
`+
if let Some(back) = self.queue.back() {
`
``
33
`+
assert!(back.deadline <= deadline);
`
``
34
`+
}
`
26
35
`self.queue.push_back(DeadlineEntry { id, test, deadline });
`
27
36
`}
`
28
37
``
29
``
`` -
/// Equivalent to rx.read()
, except that if any test exceeds its deadline
``
``
38
`` +
/// Equivalent to rx.recv()
, except that if a test exceeds its deadline
``
30
39
`/// during the wait, the given callback will also be called for that test.
`
31
40
`pub(crate) fn read_channel_while_checking_deadlines(
`
32
41
`&mut self,
`
33
42
`rx: &mpsc::Receiver,
`
``
43
`+
is_running: impl Fn(TestId) -> bool,
`
34
44
`mut on_deadline_passed: impl FnMut(TestId, &CollectedTest),
`
35
45
`) -> Result<T, RecvError> {
`
36
46
`loop {
`
`@@ -39,18 +49,18 @@ impl<'a> DeadlineQueue<'a> {
`
39
49
`// deadline, so do a normal receive.
`
40
50
`return rx.recv();
`
41
51
`};
`
42
``
`-
let wait_duration = next_deadline.saturating_duration_since(Instant::now());
`
``
52
`+
let next_deadline_timeout = next_deadline.saturating_duration_since(self.now());
`
``
53
+
``
54
`+
let recv_result = rx.recv_timeout(next_deadline_timeout);
`
``
55
`+
// Process deadlines after every receive attempt, regardless of
`
``
56
`+
// outcome, so that we don't build up an unbounded backlog of stale
`
``
57
`+
// entries due to a constant stream of tests finishing.
`
``
58
`+
self.for_each_entry_past_deadline(&is_running, &mut on_deadline_passed);
`
43
59
``
44
``
`-
let recv_result = rx.recv_timeout(wait_duration);
`
45
60
`match recv_result {
`
46
61
`Ok(value) => return Ok(value),
`
47
``
`-
Err(RecvTimeoutError::Timeout) => {
`
48
``
`-
// Notify the callback of tests that have exceeded their
`
49
``
`-
// deadline, then loop and do annother channel read.
`
50
``
`-
for DeadlineEntry { id, test, .. } in self.remove_tests_past_deadline() {
`
51
``
`-
on_deadline_passed(id, test);
`
52
``
`-
}
`
53
``
`-
}
`
``
62
`+
// Deadlines have already been processed, so loop and do another receive.
`
``
63
`+
Err(RecvTimeoutError::Timeout) => {}
`
54
64
`Err(RecvTimeoutError::Disconnected) => return Err(RecvError),
`
55
65
`}
`
56
66
`}
`
`@@ -60,14 +70,28 @@ impl<'a> DeadlineQueue<'a> {
`
60
70
`Some(self.queue.front()?.deadline)
`
61
71
`}
`
62
72
``
63
``
`-
fn remove_tests_past_deadline(&mut self) -> Vec<DeadlineEntry<'a>> {
`
64
``
`-
let now = Instant::now();
`
65
``
`-
let mut timed_out = vec![];
`
66
``
`-
while let Some(deadline_entry) = pop_front_if(&mut self.queue, |entry| now < entry.deadline)
`
67
``
`-
{
`
68
``
`-
timed_out.push(deadline_entry);
`
``
73
`+
fn for_each_entry_past_deadline(
`
``
74
`+
&mut self,
`
``
75
`+
is_running: impl Fn(TestId) -> bool,
`
``
76
`+
mut on_deadline_passed: impl FnMut(TestId, &CollectedTest),
`
``
77
`+
) {
`
``
78
`+
let now = self.now();
`
``
79
+
``
80
`+
// Clear out entries that are past their deadline, but only invoke the
`
``
81
`+
// callback for tests that are still considered running.
`
``
82
`+
while let Some(entry) = pop_front_if(&mut self.queue, |entry| entry.deadline <= now) {
`
``
83
`+
if is_running(entry.id) {
`
``
84
`+
on_deadline_passed(entry.id, entry.test);
`
``
85
`+
}
`
``
86
`+
}
`
``
87
+
``
88
`+
// Also clear out any leading entries that are no longer running, even
`
``
89
`+
// if their deadline hasn't been reached.
`
``
90
`+
while let Some(_) = pop_front_if(&mut self.queue, |entry| !is_running(entry.id)) {}
`
``
91
+
``
92
`+
if let Some(front) = self.queue.front() {
`
``
93
`+
assert!(now < front.deadline);
`
69
94
`}
`
70
``
`-
timed_out
`
71
95
`}
`
72
96
`}
`
73
97
``