(original) (raw)

diff -r 402a227564f5 Lib/pdb.py --- a/Lib/pdb.py Sun Dec 11 14:48:44 2016 -0800 +++ b/Lib/pdb.py Thu Dec 15 03:00:37 2016 +0800 @@ -158,6 +158,7 @@ pass self.allow_kbdint = False self.nosigint = nosigint + self.sigint_hook_type = None # Read $HOME/.pdbrc and ./.pdbrc self.rcLines = [] @@ -188,9 +189,31 @@ def sigint_handler(self, signum, frame): if self.allow_kbdint: raise KeyboardInterrupt - self.message("\nProgram interrupted. (Use 'cont' to resume).") - self.set_step() - self.set_trace(frame) + try: + self.message("\nProgram interrupted.") + + if self.sigint_hook_type == 'continue': + self.set_step() + else: + self.set_continue() + + self.set_trace(frame) + except Exception as e: + # e.g. 'print' is not safe in signal handlers, check issue24283 + raise bdb.BdbQuit('Fail to resume Pdb when receiving SIGINT (^C).') + + def register_sigint_handler(self, hook_type=''): + if not self.nosigint: + self.sigint_hook_type = hook_type + try: + Pdb._previous_sigint_handler = \ + signal.signal(signal.SIGINT, self.sigint_handler) + except ValueError: + # ValueError happens when any do_xxx() is invoked from + # a non-main thread in which case we just continue without + # SIGINT set. Would printing a message here (once) make + # sense? + pass def reset(self): bdb.Bdb.reset(self) @@ -977,6 +1000,7 @@ or equal to that is reached. In both cases, also stop when the current frame returns. """ + self.register_sigint_handler() if arg: try: lineno = int(arg) @@ -1008,6 +1032,7 @@ Continue execution until the next line in the current function is reached or it returns. """ + self.register_sigint_handler() self.set_next(self.curframe) return 1 do_n = do_next @@ -1033,6 +1058,7 @@ """r(eturn) Continue execution until the current function returns. """ + self.register_sigint_handler() self.set_return(self.curframe) return 1 do_r = do_return @@ -1041,16 +1067,7 @@ """c(ont(inue)) Continue execution, only stop when a breakpoint is encountered. """ - if not self.nosigint: - try: - Pdb._previous_sigint_handler = \ - signal.signal(signal.SIGINT, self.sigint_handler) - except ValueError: - # ValueError happens when do_continue() is invoked from - # a non-main thread in which case we just continue without - # SIGINT set. Would printing a message here (once) make - # sense? - pass + self.register_sigint_handler(hook_type='continue') self.set_continue() return 1 do_c = do_cont = do_continue diff -r 402a227564f5 Lib/test/test_pdb.py --- a/Lib/test/test_pdb.py Sun Dec 11 14:48:44 2016 -0800 +++ b/Lib/test/test_pdb.py Thu Dec 15 03:00:37 2016 +0800 @@ -8,6 +8,8 @@ import unittest import subprocess import textwrap +import signal +import time from test import support # This little helper class is essential for testing pdb under doctest. @@ -955,6 +957,34 @@ stderr = stderr and bytes.decode(stderr) return stdout, stderr + def run_pdb_interactively(self, script): + """Run 'script' lines with pdb in a subprocess. + Use '_send_cmd' method to send pdb 'commands' to the subprocess.""" + with open(support.TESTFN, 'wb') as f: + f.write(textwrap.dedent(script).encode('ascii')) + proc = subprocess.Popen([sys.executable, '-u', support.TESTFN], + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + stderr=sys.stdout.buffer, + bufsize=1, + universal_newlines=True) + return proc + + def _send_cmd(self, proc, cmd, pre_delay=0, post_delay=0): + if pre_delay: + time.sleep(pre_delay) + proc.stdin.write(cmd+'\n') + if post_delay: + time.sleep(post_delay) + + def _wait_for_result(self, proc, timeout=2): + # Kill the Pdb process when read() hangs too long + timer = support.threading.Timer(timeout, proc.terminate) + timer.start() + result = proc.stdout.read() + timer.cancel() + return result + def _assert_find_function(self, file_content, func_name, expected): file_content = textwrap.dedent(file_content) @@ -1110,6 +1140,59 @@ if save_home is not None: os.environ['HOME'] = save_home + @unittest.skipIf(sys.platform == 'win32', 'Not valid on Windows') + @unittest.skipIf(not support.threading, 'Require threading module') + def test_break_during_interactive_input(self): + script = """ + import time + import pdb + pdb.Pdb(readrc=False).set_trace() + """ + start = time.time() + proc = self.run_pdb_interactively(script) + with proc: + # Ctrl-C should work when the user evals a long-wait expression + self._send_cmd(proc, 'time.sleep(2)', post_delay=0.5) + os.kill(proc.pid, signal.SIGINT) + self._send_cmd(proc, "p 'Resume Pdb'", pre_delay=0.5) + self._send_cmd(proc, 'continue') + + result = self._wait_for_result(proc) + self.assertIn('KeyboardInterrupt', result) + self.assertIn('Resume Pdb', result) + + # Make sure that Ctrl-C did break time.sleep + end = time.time() + self.assertTrue(2 > (end-start)) + + @unittest.skipIf(sys.platform == 'win32', 'Not valid on Windows') + @unittest.skipIf(not support.threading, 'Require threading module') + def test_resume_from_sigint(self): + cmds = ['continue', 'return', 'until 99', 'next'] + script = """ + import time + import pdb + + def delay(): + loc = 'in delay()' + time.sleep(1) + + def main(): + pdb.Pdb(readrc=False).set_trace() + delay() + + main() + """ + for cmd in cmds: + proc = self.run_pdb_interactively(script) + with proc: + self._send_cmd(proc, cmd, post_delay=0.5) + os.kill(proc.pid, signal.SIGINT) + self._send_cmd(proc, "p 'Resume Pdb ' + loc", pre_delay=0.5) + self._send_cmd(proc, 'continue') + result = self._wait_for_result(proc) + self.assertIn("Resume Pdb in delay()", result) + def tearDown(self): support.unlink(support.TESTFN)