diff options
Diffstat (limited to 'python/gevent/_threading.py')
-rw-r--r-- | python/gevent/_threading.py | 449 |
1 files changed, 51 insertions, 398 deletions
diff --git a/python/gevent/_threading.py b/python/gevent/_threading.py index 2f5ffbc..9258dfb 100644 --- a/python/gevent/_threading.py +++ b/python/gevent/_threading.py @@ -6,100 +6,35 @@ or not). This module is missing 'Thread' class, but includes 'Queue'. """ from __future__ import absolute_import -try: - from Queue import Full, Empty -except ImportError: - from queue import Full, Empty # pylint:disable=import-error + from collections import deque -import heapq -from time import time as _time, sleep as _sleep from gevent import monkey -from gevent._compat import PY3 - - -__all__ = ['Condition', - 'Event', - 'Lock', - 'RLock', - 'Semaphore', - 'BoundedSemaphore', - 'Queue', - 'local', - 'stack_size'] - - -thread_name = '_thread' if PY3 else 'thread' -start_new_thread, Lock, get_ident, local, stack_size = monkey.get_original(thread_name, [ - 'start_new_thread', 'allocate_lock', 'get_ident', '_local', 'stack_size']) - +from gevent._compat import thread_mod_name -class RLock(object): - def __init__(self): - self.__block = Lock() - self.__owner = None - self.__count = 0 - - def __repr__(self): - owner = self.__owner - return "<%s owner=%r count=%d>" % ( - self.__class__.__name__, owner, self.__count) - - def acquire(self, blocking=1): - me = get_ident() - if self.__owner == me: - self.__count = self.__count + 1 - return 1 - rc = self.__block.acquire(blocking) - if rc: - self.__owner = me - self.__count = 1 - return rc - - __enter__ = acquire - - def release(self): - if self.__owner != get_ident(): - raise RuntimeError("cannot release un-acquired lock") - self.__count = count = self.__count - 1 - if not count: - self.__owner = None - self.__block.release() +__all__ = [ + 'Lock', + 'Queue', +] - def __exit__(self, t, v, tb): - self.release() - - # Internal methods used by condition variables - def _acquire_restore(self, count_owner): - count, owner = count_owner - self.__block.acquire() - self.__count = count - self.__owner = owner +start_new_thread, Lock, get_thread_ident, = monkey.get_original(thread_mod_name, [ + 'start_new_thread', 'allocate_lock', 'get_ident', +]) - def _release_save(self): - count = self.__count - self.__count = 0 - owner = self.__owner - self.__owner = None - self.__block.release() - return (count, owner) - def _is_owned(self): - return self.__owner == get_ident() +# pylint 2.0.dev2 things collections.dequeue.popleft() doesn't return +# pylint:disable=assignment-from-no-return -class Condition(object): +class _Condition(object): # pylint:disable=method-hidden - def __init__(self, lock=None): - if lock is None: - lock = RLock() + def __init__(self, lock): self.__lock = lock - # Export the lock's acquire() and release() methods - self.acquire = lock.acquire - self.release = lock.release + self.__waiters = [] + # If the lock defines _release_save() and/or _acquire_restore(), # these override the default implementations (which just call # release() and acquire() on the lock). Ditto for _is_owned(). @@ -115,13 +50,12 @@ class Condition(object): self._is_owned = lock._is_owned except AttributeError: pass - self.__waiters = [] def __enter__(self): return self.__lock.__enter__() - def __exit__(self, *args): - return self.__lock.__exit__(*args) + def __exit__(self, t, v, tb): + return self.__lock.__exit__(t, v, tb) def __repr__(self): return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) @@ -140,168 +74,47 @@ class Condition(object): return False return True - def wait(self, timeout=None): - if not self._is_owned(): - raise RuntimeError("cannot wait on un-acquired lock") + def wait(self): + # The condition MUST be owned, but we don't check that. waiter = Lock() waiter.acquire() self.__waiters.append(waiter) saved_state = self._release_save() try: # restore state no matter what (e.g., KeyboardInterrupt) - if timeout is None: - waiter.acquire() - else: - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). - endtime = _time() + timeout - delay = 0.0005 # 500 us -> initial delay of 1 ms - while True: - gotit = waiter.acquire(0) - if gotit: - break - remaining = endtime - _time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, .05) - _sleep(delay) - if not gotit: - try: - self.__waiters.remove(waiter) - except ValueError: - pass + waiter.acquire() # Block on the native lock finally: self._acquire_restore(saved_state) - def notify(self, n=1): - if not self._is_owned(): - raise RuntimeError("cannot notify on un-acquired lock") - __waiters = self.__waiters - waiters = __waiters[:n] - if not waiters: - return - for waiter in waiters: - waiter.release() - try: - __waiters.remove(waiter) - except ValueError: - pass - - def notify_all(self): - self.notify(len(self.__waiters)) - - -class Semaphore(object): - - # After Tim Peters' semaphore class, but not quite the same (no maximum) - - def __init__(self, value=1): - if value < 0: - raise ValueError("semaphore initial value must be >= 0") - self.__cond = Condition(Lock()) - self.__value = value - - def acquire(self, blocking=1): - rc = False - self.__cond.acquire() - while self.__value == 0: - if not blocking: - break - self.__cond.wait() + def notify_one(self): + # The condition MUST be owned, but we don't check that. + try: + waiter = self.__waiters.pop() + except IndexError: + # Nobody around + pass else: - self.__value = self.__value - 1 - rc = True - self.__cond.release() - return rc - - __enter__ = acquire - - def release(self): - self.__cond.acquire() - self.__value = self.__value + 1 - self.__cond.notify() - self.__cond.release() - - def __exit__(self, t, v, tb): - self.release() - - -class BoundedSemaphore(Semaphore): - """Semaphore that checks that # releases is <= # acquires""" - def __init__(self, value=1): - Semaphore.__init__(self, value) - self._initial_value = value + waiter.release() - def release(self): - if self.Semaphore__value >= self._initial_value: # pylint:disable=no-member - raise ValueError("Semaphore released too many times") - return Semaphore.release(self) +class Queue(object): + """Create a queue object. -class Event(object): + The queue is always infinite size. + """ - # After Tim Peters' event class (without is_posted()) + __slots__ = ('_queue', '_mutex', '_not_empty', 'unfinished_tasks') def __init__(self): - self.__cond = Condition(Lock()) - self.__flag = False - - def _reset_internal_locks(self): - # private! called by Thread._reset_internal_locks by _after_fork() - self.__cond.__init__() - - def is_set(self): - return self.__flag - - def set(self): - self.__cond.acquire() - try: - self.__flag = True - self.__cond.notify_all() - finally: - self.__cond.release() - - def clear(self): - self.__cond.acquire() - try: - self.__flag = False - finally: - self.__cond.release() - - def wait(self, timeout=None): - self.__cond.acquire() - try: - if not self.__flag: - self.__cond.wait(timeout) - return self.__flag - finally: - self.__cond.release() - - -class Queue: # pylint:disable=old-style-class - """Create a queue object with a given maximum size. - - If maxsize is <= 0, the queue size is infinite. - """ - def __init__(self, maxsize=0): - self.maxsize = maxsize - self._init(maxsize) + self._queue = deque() # mutex must be held whenever the queue is mutating. All methods # that acquire mutex must release it before returning. mutex # is shared between the three conditions, so acquiring and # releasing the conditions also acquires and releases mutex. - self.mutex = Lock() + self._mutex = Lock() # Notify not_empty whenever an item is added to the queue; a # thread waiting to get is notified then. - self.not_empty = Condition(self.mutex) - # Notify not_full whenever an item is removed from the queue; - # a thread waiting to put is notified then. - self.not_full = Condition(self.mutex) - # Notify all_tasks_done whenever the number of unfinished tasks - # drops to zero; thread waiting to join() is notified to resume - self.all_tasks_done = Condition(self.mutex) + self._not_empty = _Condition(self._mutex) + self.unfinished_tasks = 0 def task_done(self): @@ -318,198 +131,38 @@ class Queue: # pylint:disable=old-style-class Raises a ValueError if called more times than there were items placed in the queue. """ - self.all_tasks_done.acquire() - try: + with self._mutex: unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: raise ValueError('task_done() called too many times') - self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished - finally: - self.all_tasks_done.release() - - def join(self): - """Blocks until all items in the Queue have been gotten and processed. - The count of unfinished tasks goes up whenever an item is added to the - queue. The count goes down whenever a consumer thread calls task_done() - to indicate the item was retrieved and all work on it is complete. - - When the count of unfinished tasks drops to zero, join() unblocks. - """ - self.all_tasks_done.acquire() - try: - while self.unfinished_tasks: - self.all_tasks_done.wait() - finally: - self.all_tasks_done.release() - - def qsize(self): + def qsize(self, len=len): """Return the approximate size of the queue (not reliable!).""" - self.mutex.acquire() - try: - return self._qsize() - finally: - self.mutex.release() + return len(self._queue) def empty(self): """Return True if the queue is empty, False otherwise (not reliable!).""" - self.mutex.acquire() - try: - return not self._qsize() - finally: - self.mutex.release() + return not self.qsize() def full(self): """Return True if the queue is full, False otherwise (not reliable!).""" - self.mutex.acquire() - try: - if self.maxsize <= 0: - return False - if self.maxsize >= self._qsize(): - return True - finally: - self.mutex.release() + return False - def put(self, item, block=True, timeout=None): + def put(self, item): """Put an item into the queue. - - If optional args 'block' is true and 'timeout' is None (the default), - block if necessary until a free slot is available. If 'timeout' is - a positive number, it blocks at most 'timeout' seconds and raises - the Full exception if no free slot was available within that time. - Otherwise ('block' is false), put an item on the queue if a free slot - is immediately available, else raise the Full exception ('timeout' - is ignored in that case). """ - self.not_full.acquire() - try: - if self.maxsize > 0: - if not block: - if self._qsize() >= self.maxsize: - raise Full - elif timeout is None: - while self._qsize() >= self.maxsize: - self.not_full.wait() - elif timeout < 0: - raise ValueError("'timeout' must be a positive number") - else: - endtime = _time() + timeout - while self._qsize() >= self.maxsize: - remaining = endtime - _time() - if remaining <= 0.0: - raise Full - self.not_full.wait(remaining) - self._put(item) + with self._not_empty: + self._queue.append(item) self.unfinished_tasks += 1 - self.not_empty.notify() - finally: - self.not_full.release() - - def put_nowait(self, item): - """Put an item into the queue without blocking. + self._not_empty.notify_one() - Only enqueue the item if a free slot is immediately available. - Otherwise raise the Full exception. - """ - return self.put(item, False) - - def get(self, block=True, timeout=None): + def get(self): """Remove and return an item from the queue. - - If optional args 'block' is true and 'timeout' is None (the default), - block if necessary until an item is available. If 'timeout' is - a positive number, it blocks at most 'timeout' seconds and raises - the Empty exception if no item was available within that time. - Otherwise ('block' is false), return an item if one is immediately - available, else raise the Empty exception ('timeout' is ignored - in that case). """ - self.not_empty.acquire() - try: - if not block: - if not self._qsize(): - raise Empty - elif timeout is None: - while not self._qsize(): - self.not_empty.wait() - elif timeout < 0: - raise ValueError("'timeout' must be a positive number") - else: - endtime = _time() + timeout - while not self._qsize(): - remaining = endtime - _time() - if remaining <= 0.0: - raise Empty - self.not_empty.wait(remaining) - item = self._get() - self.not_full.notify() + with self._not_empty: + while not self._queue: + self._not_empty.wait() + item = self._queue.popleft() return item - finally: - self.not_empty.release() - - def get_nowait(self): - """Remove and return an item from the queue without blocking. - - Only get an item if one is immediately available. Otherwise - raise the Empty exception. - """ - return self.get(False) - - # Override these methods to implement other queue organizations - # (e.g. stack or priority queue). - # These will only be called with appropriate locks held - - # Initialize the queue representation - def _init(self, maxsize): - # pylint:disable=unused-argument - self.queue = deque() - - def _qsize(self, len=len): - return len(self.queue) - - # Put a new item in the queue - def _put(self, item): - self.queue.append(item) - - # Get an item from the queue - def _get(self): - return self.queue.popleft() - - -class PriorityQueue(Queue): - '''Variant of Queue that retrieves open entries in priority order (lowest first). - - Entries are typically tuples of the form: (priority number, data). - ''' - - def _init(self, maxsize): - self.queue = [] - - def _qsize(self, len=len): - return len(self.queue) - - def _put(self, item, heappush=heapq.heappush): - # pylint:disable=arguments-differ - heappush(self.queue, item) - - def _get(self, heappop=heapq.heappop): - # pylint:disable=arguments-differ - return heappop(self.queue) - - -class LifoQueue(Queue): - '''Variant of Queue that retrieves most recently added entries first.''' - - def _init(self, maxsize): - self.queue = [] - - def _qsize(self, len=len): - return len(self.queue) - - def _put(self, item): - self.queue.append(item) - - def _get(self): - return self.queue.pop() |