cpython: 01d1fd775d16 (original) (raw)

Mercurial > cpython

changeset 60598:01d1fd775d16

Issue #7316: the acquire() method of lock objects in the :mod:`threading` module now takes an optional timeout argument in seconds. Timeout support relies on the system threading library, so as to avoid a semi-busy wait loop. [#7316]

Antoine Pitrou solipsis@pitrou.net
date Wed, 14 Apr 2010 15:44:10 +0000
parents 62d11785204f
children ca11c830f5cc
files Doc/library/_thread.rst Doc/library/threading.rst Include/pythread.h Lib/_dummy_thread.py Lib/multiprocessing/pool.py Lib/test/lock_tests.py Lib/threading.py Misc/NEWS Modules/_threadmodule.c Python/thread_nt.h Python/thread_pthread.h
diffstat 11 files changed, 327 insertions(+), 78 deletions(-)[+] [-] Doc/library/_thread.rst 31 Doc/library/threading.rst 28 Include/pythread.h 35 Lib/_dummy_thread.py 8 Lib/multiprocessing/pool.py 4 Lib/test/lock_tests.py 44 Lib/threading.py 25 Misc/NEWS 5 Modules/_threadmodule.c 89 Python/thread_nt.h 35 Python/thread_pthread.h 101

line wrap: on

line diff

--- a/Doc/library/_thread.rst +++ b/Doc/library/_thread.rst @@ -28,7 +28,7 @@ implementation. For systems lacking the :mod:_dummy_thread module is available. It duplicates this module's interface and can be used as a drop-in replacement. -It defines the following constant and functions: +It defines the following constants and functions: .. exception:: error @@ -103,19 +103,34 @@ It defines the following constant and fu Availability: Windows, systems with POSIX threads. +.. data:: TIMEOUT_MAX +

+ Lock objects have the following methods: -.. method:: lock.acquire([waitflag]) +.. method:: lock.acquire(waitflag=1, timeout=-1)

.. method:: lock.release()

--- a/Doc/library/threading.rst +++ b/Doc/library/threading.rst @@ -155,6 +155,16 @@ This module defines the following functi Availability: Windows, systems with POSIX threads. +This module also defines the following constant: + +.. data:: TIMEOUT_MAX +

+ Detailed interfaces for the objects are documented below. The design of this module is loosely based on Java's threading model. However, @@ -349,7 +359,7 @@ and may vary across implementations. All methods are executed atomically. -.. method:: Lock.acquire(blocking=True) +.. method:: Lock.acquire(blocking=True, timeout=-1) Acquire a lock, blocking or non-blocking. @@ -363,6 +373,15 @@ All methods are executed atomically. without an argument would block, return false immediately; otherwise, do the same thing as when called without arguments, and return true.

.. method:: Lock.release() @@ -396,7 +415,7 @@ pair) resets the lock to unlocked and al :meth:acquire to proceed. -.. method:: RLock.acquire(blocking=True) +.. method:: RLock.acquire(blocking=True, timeout=-1) Acquire a lock, blocking or non-blocking. @@ -415,6 +434,11 @@ pair) resets the lock to unlocked and al without an argument would block, return false immediately; otherwise, do the same thing as when called without arguments, and return true.

.. method:: RLock.release()

--- a/Include/pythread.h +++ b/Include/pythread.h @@ -19,6 +19,41 @@ PyAPI_FUNC(void) PyThread_free_lock(PyTh PyAPI_FUNC(int) PyThread_acquire_lock(PyThread_type_lock, int); #define WAIT_LOCK 1 #define NOWAIT_LOCK 0 + +/* PY_TIMEOUT_T is the integral type used to specify timeouts when waiting

+/* In the NT API, the timeout is a DWORD and is expressed in milliseconds / +#if defined (NT_THREADS) +#if (0xFFFFFFFFLL * 1000 < PY_TIMEOUT_MAX) +#undef PY_TIMEOUT_MAX +#define PY_TIMEOUT_MAX (0xFFFFFFFFLL * 1000) +#endif +#endif + +/ If microseconds == 0, the call is non-blocking: it returns immediately

--- a/Lib/_dummy_thread.py +++ b/Lib/_dummy_thread.py @@ -17,6 +17,10 @@ Suggested usage is:: 'interrupt_main', 'LockType'] import traceback as _traceback +import time + +# A dummy value +TIMEOUT_MAX = 2**31 class error(Exception): """Dummy implementation of _thread.error.""" @@ -92,7 +96,7 @@ class LockType(object): def init(self): self.locked_status = False

For blocking calls, self.locked_status is automatically set to @@ -111,6 +115,8 @@ class LockType(object): self.locked_status = True return True else:

enter = acquire

--- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -440,10 +440,10 @@ class Pool(object): p.terminate() debug('joining task handler')

debug('joining result handler')

if pool and hasattr(pool[0], 'terminate'): debug('joining pool workers')

--- a/Lib/test/lock_tests.py +++ b/Lib/test/lock_tests.py @@ -4,7 +4,7 @@ Various tests for synchronization primit import sys import time -from _thread import start_new_thread, get_ident +from _thread import start_new_thread, get_ident, TIMEOUT_MAX import threading import unittest @@ -62,6 +62,14 @@ class BaseTestCase(unittest.TestCase): support.threading_cleanup(*self._threads) support.reap_children()

+ class BaseLockTests(BaseTestCase): """ @@ -143,6 +151,32 @@ class BaseLockTests(BaseTestCase): Bunch(f, 15).wait_for_finished() self.assertEqual(n, len(threading.enumerate()))

+ class LockTests(BaseLockTests): """ @@ -284,14 +318,14 @@ class EventTests(BaseTestCase): def f(): results1.append(evt.wait(0.0)) t1 = time.time()

@@ -397,14 +431,14 @@ class ConditionTests(BaseTestCase): def f(): cond.acquire() t1 = time.time()

class BaseSemaphoreTests(BaseTestCase):

--- a/Lib/threading.py +++ b/Lib/threading.py @@ -31,6 +31,7 @@ try: _CRLock = _thread.RLock except AttributeError: _CRLock = None +TIMEOUT_MAX = _thread.TIMEOUT_MAX del _thread @@ -107,14 +108,14 @@ class _RLock(_Verbose): return "<%s owner=%r count=%d>" % ( self.class.name, owner, self._count)

@@ -234,22 +235,10 @@ class _Condition(_Verbose): if debug: self._note("%s.wait(): got it", self) else:

--- a/Misc/NEWS +++ b/Misc/NEWS @@ -312,6 +312,11 @@ C-API Library ------- +- Issue #7316: the acquire() method of lock objects in the :mod:threading

--- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -40,18 +40,47 @@ lock_dealloc(lockobject *self) } static PyObject * -lock_PyThread_acquire_lock(lockobject *self, PyObject *args) +lock_PyThread_acquire_lock(lockobject *self, PyObject *args, PyObject *kwds) { - int i = 1; + char kwlist[] = {"blocking", "timeout", NULL}; + int blocking = 1; + double timeout = -1; + PY_TIMEOUT_T microseconds; + int r; - if (!PyArg_ParseTuple(args, "|i:acquire", &i)) + if (!PyArg_ParseTupleAndKeywords(args, kwds, "|id:acquire", kwlist, + &blocking, &timeout)) return NULL; + if (!blocking && timeout != -1) { + PyErr_SetString(PyExc_ValueError, "can't specify a timeout " + "for a non-blocking call"); + return NULL; + } + if (timeout < 0 && timeout != -1) { + PyErr_SetString(PyExc_ValueError, "timeout value must be " + "strictly positive"); + return NULL; + } + if (!blocking) + microseconds = 0; + else if (timeout == -1) + microseconds = -1; + else { + timeout *= 1e6; + if (timeout >= (double) PY_TIMEOUT_MAX) { + PyErr_SetString(PyExc_OverflowError, + "timeout value is too large"); + return NULL; + } + microseconds = (PY_TIMEOUT_T) timeout; + } + Py_BEGIN_ALLOW_THREADS - i = PyThread_acquire_lock(self->lock_lock, i); + r = PyThread_acquire_lock_timed(self->lock_lock, microseconds); Py_END_ALLOW_THREADS - return PyBool_FromLong((long)i); + return PyBool_FromLong(r); } PyDoc_STRVAR(acquire_doc, @@ -106,9 +135,9 @@ Return whether the lock is in the locked static PyMethodDef lock_methods[] = { {"acquire_lock", (PyCFunction)lock_PyThread_acquire_lock, - METH_VARARGS, acquire_doc}, + METH_VARARGS | METH_KEYWORDS, acquire_doc}, {"acquire", (PyCFunction)lock_PyThread_acquire_lock, - METH_VARARGS, acquire_doc}, + METH_VARARGS | METH_KEYWORDS, acquire_doc}, {"release_lock", (PyCFunction)lock_PyThread_release_lock, METH_NOARGS, release_doc}, {"release", (PyCFunction)lock_PyThread_release_lock, @@ -118,7 +147,7 @@ static PyMethodDef lock_methods[] = { {"locked", (PyCFunction)lock_locked_lock, METH_NOARGS, locked_doc}, {"enter", (PyCFunction)lock_PyThread_acquire_lock, - METH_VARARGS, acquire_doc}, + METH_VARARGS | METH_KEYWORDS, acquire_doc}, {"exit", (PyCFunction)lock_PyThread_release_lock, METH_VARARGS, release_doc}, {NULL, NULL} / sentinel */ @@ -183,15 +212,41 @@ rlock_dealloc(rlockobject *self) static PyObject * rlock_acquire(rlockobject *self, PyObject *args, PyObject *kwds) { - char *kwlist[] = {"blocking", NULL}; + char *kwlist[] = {"blocking", "timeout", NULL}; int blocking = 1; + double timeout = -1; + PY_TIMEOUT_T microseconds; long tid; int r = 1; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "|i:acquire", kwlist, - &blocking)) + if (!PyArg_ParseTupleAndKeywords(args, kwds, "|id:acquire", kwlist, + &blocking, &timeout)) return NULL; + if (!blocking && timeout != -1) { + PyErr_SetString(PyExc_ValueError, "can't specify a timeout " + "for a non-blocking call"); + return NULL; + } + if (timeout < 0 && timeout != -1) { + PyErr_SetString(PyExc_ValueError, "timeout value must be " + "strictly positive"); + return NULL; + } + if (!blocking) + microseconds = 0; + else if (timeout == -1) + microseconds = -1; + else { + timeout *= 1e6; + if (timeout >= (double) PY_TIMEOUT_MAX) { + PyErr_SetString(PyExc_OverflowError, + "timeout value is too large"); + return NULL; + } + microseconds = (PY_TIMEOUT_T) timeout; + } + tid = PyThread_get_thread_ident(); if (self->rlock_count > 0 && tid == self->rlock_owner) { unsigned long count = self->rlock_count + 1; @@ -206,11 +261,11 @@ rlock_acquire(rlockobject *self, PyObjec if (self->rlock_count > 0 || !PyThread_acquire_lock(self->rlock_lock, 0)) { - if (!blocking) { + if (microseconds == 0) { Py_RETURN_FALSE; } Py_BEGIN_ALLOW_THREADS - r = PyThread_acquire_lock(self->rlock_lock, blocking); + r = PyThread_acquire_lock_timed(self->rlock_lock, microseconds); Py_END_ALLOW_THREADS } if (r) { @@ -1005,7 +1060,7 @@ static struct PyModuleDef threadmodule = PyMODINIT_FUNC PyInit__thread(void) { - PyObject *m, *d; + PyObject *m, *d, timeout_max; / Initialize types: / if (PyType_Ready(&localtype) < 0) @@ -1020,6 +1075,12 @@ PyInit__thread(void) if (m == NULL) return NULL; + timeout_max = PyFloat_FromDouble(PY_TIMEOUT_MAX / 1000000); + if (!timeout_max) + return NULL; + if (PyModule_AddObject(m, "TIMEOUT_MAX", timeout_max) < 0) + return NULL; + / Add a symbolic constant */ d = PyModule_GetDict(m); ThreadError = PyErr_NewException("_thread.error", NULL, NULL);

--- a/Python/thread_nt.h +++ b/Python/thread_nt.h @@ -34,13 +34,13 @@ DeleteNonRecursiveMutex(PNRMUTEX mutex) } DWORD -EnterNonRecursiveMutex(PNRMUTEX mutex, BOOL wait) +EnterNonRecursiveMutex(PNRMUTEX mutex, DWORD milliseconds) { /* Assume that the thread waits successfully / DWORD ret ; / InterlockedIncrement(&mutex->owned) == 0 means that no thread currently owns the mutex / - if (!wait) + if (milliseconds == 0) { if (InterlockedCompareExchange(&mutex->owned, 0, -1) != -1) return WAIT_TIMEOUT ; @@ -49,7 +49,7 @@ EnterNonRecursiveMutex(PNRMUTEX mutex, B else ret = InterlockedIncrement(&mutex->owned) ? / Some thread owns the mutex, let's wait... / - WaitForSingleObject(mutex->hevent, INFINITE) : WAIT_OBJECT_0 ; + WaitForSingleObject(mutex->hevent, milliseconds) : WAIT_OBJECT_0 ; mutex->thread_id = GetCurrentThreadId() ; / We own it */ return ret ; @@ -239,18 +239,37 @@ PyThread_free_lock(PyThread_type_lock aL

+ + success = aLock && EnterNonRecursiveMutex((PNRMUTEX) aLock, (DWORD) milliseconds) == WAIT_OBJECT_0 ; + + dprintf(("%ld: PyThread_acquire_lock(%p, %lld) -> %d\n", + PyThread_get_thread_ident(), aLock, microseconds, success)); return success; } +int +PyThread_acquire_lock(PyThread_type_lock aLock, int waitflag) +{ + return PyThread_acquire_lock_timed(aLock, waitflag ? -1 : 0); +} void PyThread_release_lock(PyThread_type_lock aLock)

--- a/Python/thread_pthread.h +++ b/Python/thread_pthread.h @@ -83,6 +83,26 @@ #endif +/* We assume all modern POSIX systems have gettimeofday() */ +#ifdef GETTIMEOFDAY_NO_TZ +#define GETTIMEOFDAY(ptv) gettimeofday(ptv) +#else +#define GETTIMEOFDAY(ptv) gettimeofday(ptv, (struct timezone )NULL) +#endif + +#define MICROSECONDS_TO_TIMESPEC(microseconds, ts) [](#l11.14) +do { [](#l11.15) + struct timeval tv; [](#l11.16) + GETTIMEOFDAY(&tv); [](#l11.17) + tv.tv_usec += microseconds % 1000000; [](#l11.18) + tv.tv_sec += microseconds / 1000000; [](#l11.19) + tv.tv_sec += tv.tv_usec / 1000000; [](#l11.20) + tv.tv_usec %= 1000000; [](#l11.21) + ts.tv_sec = tv.tv_sec; [](#l11.22) + ts.tv_nsec = tv.tv_usec * 1000; [](#l11.23) +} while(0) + + / A pthread mutex isn't sufficient to model the Python lock type

-int -PyThread_acquire_lock(PyThread_type_lock lock, int waitflag) +int +PyThread_acquire_lock_timed(PyThread_type_lock lock, PY_TIMEOUT_T microseconds) { int success; sem_t *thelock = (sem_t )lock; int status, error = 0; + struct timespec ts; - dprintf(("PyThread_acquire_lock(%p, %d) called\n", lock, waitflag)); + dprintf(("PyThread_acquire_lock_timed(%p, %lld) called\n", + lock, microseconds)); + if (microseconds > 0) + MICROSECONDS_TO_TIMESPEC(microseconds, ts); do { - if (waitflag) + if (microseconds > 0) + status = fix_status(sem_timedwait(thelock, &ts)); + else if (microseconds == 0) + status = fix_status(sem_trywait(thelock)); + else status = fix_status(sem_wait(thelock)); - else - status = fix_status(sem_trywait(thelock)); } while (status == EINTR); / Retry if interrupted by a signal */ - if (waitflag) { + if (microseconds > 0) { + if (status != ETIMEDOUT) + CHECK_STATUS("sem_timedwait"); + } + else if (microseconds == 0) { + if (status != EAGAIN) + CHECK_STATUS("sem_trywait"); + } + else { CHECK_STATUS("sem_wait"); - } else if (status != EAGAIN) { - CHECK_STATUS("sem_trywait"); } success = (status == 0) ? 1 : 0; - dprintf(("PyThread_acquire_lock(%p, %d) -> %d\n", lock, waitflag, success)); + dprintf(("PyThread_acquire_lock_timed(%p, %lld) -> %d\n", + lock, microseconds, success)); return success; } +int +PyThread_acquire_lock(PyThread_type_lock lock, int waitflag) +{ + return PyThread_acquire_lock_timed(lock, waitflag ? -1 : 0); +} + void PyThread_release_lock(PyThread_type_lock lock) { @@ -390,40 +429,62 @@ PyThread_free_lock(PyThread_type_lock lo free((void *)thelock); } -int -PyThread_acquire_lock(PyThread_type_lock lock, int waitflag) +int +PyThread_acquire_lock_timed(PyThread_type_lock lock, PY_TIMEOUT_T microseconds) { int success; pthread_lock *thelock = (pthread_lock )lock; int status, error = 0; - dprintf(("PyThread_acquire_lock(%p, %d) called\n", lock, waitflag)); + dprintf(("PyThread_acquire_lock_timed(%p, %lld) called\n", + lock, microseconds)); status = pthread_mutex_lock( &thelock->mut ); CHECK_STATUS("pthread_mutex_lock[1]"); success = thelock->locked == 0; - if ( !success && waitflag ) { + if (!success && microseconds != 0) { + struct timespec ts; + if (microseconds > 0) + MICROSECONDS_TO_TIMESPEC(microseconds, ts); / continue trying until we get the lock / / mut must be locked by me -- part of the condition * protocol */ - while ( thelock->locked ) { - status = pthread_cond_wait(&thelock->lock_released, - &thelock->mut); - CHECK_STATUS("pthread_cond_wait"); + while (thelock->locked) { + if (microseconds > 0) { + status = pthread_cond_timedwait( + &thelock->lock_released, + &thelock->mut, &ts); + if (status == ETIMEDOUT) + break; + CHECK_STATUS("pthread_cond_timed_wait"); + } + else { + status = pthread_cond_wait( + &thelock->lock_released, + &thelock->mut); + CHECK_STATUS("pthread_cond_wait"); + } } - success = 1; + success = (status == 0); } if (success) thelock->locked = 1; status = pthread_mutex_unlock( &thelock->mut ); CHECK_STATUS("pthread_mutex_unlock[1]"); if (error) success = 0; - dprintf(("PyThread_acquire_lock(%p, %d) -> %d\n", lock, waitflag, success)); + dprintf(("PyThread_acquire_lock_timed(%p, %lld) -> %d\n", + lock, microseconds, success)); return success; } +int +PyThread_acquire_lock(PyThread_type_lock lock, int waitflag) +{ + return PyThread_acquire_lock_timed(lock, waitflag ? -1 : 0); +} + void PyThread_release_lock(PyThread_type_lock lock) {