gh-115258: Fix failed tests on threading queue shutdown by YvesDup · Pull Request #115940 · python/cpython (original) (raw)
diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 9dc7f62999..d223c358e0 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -320,58 +320,90 @@ def test_shutdown_immediate_all_methods_in_one_thread(self): def _write_msg_thread(self, q, n, results, i_when_exec_shutdown, event_shutdown, barrier_start):
time.sleep(random.random() / 10.0) # All `write_msg_threads` # put several items into the queue. for i in range(0, i_when_exec_shutdown//2):
time.sleep(random.random() / 10.0) q.put((i, 'LOYD'))
time.sleep(random.random() / 10.0) # Wait for the barrier to be complete. barrier_start.wait()
time.sleep(random.random() / 10.0) for i in range(i, n):
time.sleep(random.random() / 10.0) try: q.put((i, "YDLO")) except self.queue.ShutDown:
time.sleep(random.random() / 10.0) results.append(False)
time.sleep(random.random() / 10.0) break
time.sleep(random.random() / 10.0) # Trigger queue shutdown. if i == i_when_exec_shutdown:
time.sleep(random.random() / 10.0) # Only once thread do it. if not event_shutdown.is_set():
time.sleep(random.random() / 10.0) event_shutdown.set()
time.sleep(random.random() / 10.0) results.append(True)
time.sleep(random.random() / 10.0) q.join()
def _read_msg_thread(self, q, results, barrier_start): # Wait for the barrier to be complete.time.sleep(random.random() / 10.0)
time.sleep(random.random() / 10.0) barrier_start.wait()
time.sleep(random.random() / 10.0) while True:
time.sleep(random.random() / 10.0) try: q.get(False)
time.sleep(random.random() / 10.0) q.task_done() except self.queue.ShutDown:
time.sleep(random.random() / 10.0) results.append(True)
time.sleep(random.random() / 10.0) break except self.queue.Empty: pass
time.sleep(random.random() / 10.0)
time.sleep(random.random() / 10.0) q.join()
def _shutdown_thread(self, q, results, event_end, immediate):time.sleep(random.random() / 10.0)
time.sleep(random.random() / 10.0) event_end.wait()
time.sleep(random.random() / 10.0) q.shutdown(immediate)
time.sleep(random.random() / 10.0) results.append(q.qsize() == 0)
time.sleep(random.random() / 10.0) q.join()
def _join_thread(self, q, barrier_start):time.sleep(random.random() / 10.0)
time.sleep(random.random() / 10.0) # Wait for the barrier to be complete. barrier_start.wait()
time.sleep(random.random() / 10.0) q.join()
def _shutdown_all_methods_in_many_threads(self, immediate): # Run a 'multi-producers/consumers queue' use case, # with enough items into the queue. # When shutdown, all running threads will be concerned.time.sleep(random.random() / 10.0)
time.sleep(random.random() / 10.0) q = self.type2test()
time.sleep(random.random() / 10.0) ps = [] res_puts = [] res_gets = []
@@ -382,11 +414,14 @@ def _shutdown_all_methods_in_many_threads(self, immediate): nb_msgs = 1024*64 nb_msgs_w = nb_msgs // write_threads when_exec_shutdown = nb_msgs_w // 2
time.sleep(random.random() / 10.0) # Use of a `threading.Barrier`` to ensure that all `_write_msg_threads` # put their part of items into the queue. And trigger the start of # other threads as `_read_msg_thread`and `_join_thread`. barrier_start = threading.Barrier(write_threads+read_threads+join_threads)
time.sleep(random.random() / 10.0) ev_exec_shutdown = threading.Event()
time.sleep(random.random() / 10.0) lprocs = ( (self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts, when_exec_shutdown, ev_exec_shutdown,
@@ -395,19 +430,34 @@ def _shutdown_all_methods_in_many_threads(self, immediate): (self._join_thread, join_threads, (q, barrier_start)), (self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, immediate)), )
time.sleep(random.random() / 10.0) # start all threads. for func, n, args in lprocs:
time.sleep(random.random() / 10.0) for i in range(n):
time.sleep(random.random() / 10.0) ps.append(threading.Thread(target=func, args=args))
time.sleep(random.random() / 10.0) ps[-1].start()
time.sleep(random.random() / 10.0)
time.sleep(random.random() / 10.0)
time.sleep(random.random() / 10.0) for thread in ps:
time.sleep(random.random() / 10.0) thread.join()
time.sleep(random.random() / 10.0)
time.sleep(random.random() / 10.0) self.assertEqual(res_puts.count(True), 1)
time.sleep(random.random() / 10.0) self.assertLessEqual(res_gets.count(True), read_threads)
time.sleep(random.random() / 10.0) if immediate:
time.sleep(random.random() / 10.0) self.assertListEqual(res_shutdown, [True])
time.sleep(random.random() / 10.0) self.assertTrue(q.empty())
def test_shutdown_all_methods_in_many_threads(self): return self._shutdown_all_methods_in_many_threads(False)time.sleep(random.random() / 10.0)