aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/_threading.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/_threading.py')
-rw-r--r--python/gevent/_threading.py449
1 files changed, 51 insertions, 398 deletions
diff --git a/python/gevent/_threading.py b/python/gevent/_threading.py
index 2f5ffbc..9258dfb 100644
--- a/python/gevent/_threading.py
+++ b/python/gevent/_threading.py
@@ -6,100 +6,35 @@ or not).
This module is missing 'Thread' class, but includes 'Queue'.
"""
from __future__ import absolute_import
-try:
- from Queue import Full, Empty
-except ImportError:
- from queue import Full, Empty # pylint:disable=import-error
+
from collections import deque
-import heapq
-from time import time as _time, sleep as _sleep
from gevent import monkey
-from gevent._compat import PY3
-
-
-__all__ = ['Condition',
- 'Event',
- 'Lock',
- 'RLock',
- 'Semaphore',
- 'BoundedSemaphore',
- 'Queue',
- 'local',
- 'stack_size']
-
-
-thread_name = '_thread' if PY3 else 'thread'
-start_new_thread, Lock, get_ident, local, stack_size = monkey.get_original(thread_name, [
- 'start_new_thread', 'allocate_lock', 'get_ident', '_local', 'stack_size'])
-
+from gevent._compat import thread_mod_name
-class RLock(object):
- def __init__(self):
- self.__block = Lock()
- self.__owner = None
- self.__count = 0
-
- def __repr__(self):
- owner = self.__owner
- return "<%s owner=%r count=%d>" % (
- self.__class__.__name__, owner, self.__count)
-
- def acquire(self, blocking=1):
- me = get_ident()
- if self.__owner == me:
- self.__count = self.__count + 1
- return 1
- rc = self.__block.acquire(blocking)
- if rc:
- self.__owner = me
- self.__count = 1
- return rc
-
- __enter__ = acquire
-
- def release(self):
- if self.__owner != get_ident():
- raise RuntimeError("cannot release un-acquired lock")
- self.__count = count = self.__count - 1
- if not count:
- self.__owner = None
- self.__block.release()
+__all__ = [
+ 'Lock',
+ 'Queue',
+]
- def __exit__(self, t, v, tb):
- self.release()
-
- # Internal methods used by condition variables
- def _acquire_restore(self, count_owner):
- count, owner = count_owner
- self.__block.acquire()
- self.__count = count
- self.__owner = owner
+start_new_thread, Lock, get_thread_ident, = monkey.get_original(thread_mod_name, [
+ 'start_new_thread', 'allocate_lock', 'get_ident',
+])
- def _release_save(self):
- count = self.__count
- self.__count = 0
- owner = self.__owner
- self.__owner = None
- self.__block.release()
- return (count, owner)
- def _is_owned(self):
- return self.__owner == get_ident()
+# pylint 2.0.dev2 things collections.dequeue.popleft() doesn't return
+# pylint:disable=assignment-from-no-return
-class Condition(object):
+class _Condition(object):
# pylint:disable=method-hidden
- def __init__(self, lock=None):
- if lock is None:
- lock = RLock()
+ def __init__(self, lock):
self.__lock = lock
- # Export the lock's acquire() and release() methods
- self.acquire = lock.acquire
- self.release = lock.release
+ self.__waiters = []
+
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
@@ -115,13 +50,12 @@ class Condition(object):
self._is_owned = lock._is_owned
except AttributeError:
pass
- self.__waiters = []
def __enter__(self):
return self.__lock.__enter__()
- def __exit__(self, *args):
- return self.__lock.__exit__(*args)
+ def __exit__(self, t, v, tb):
+ return self.__lock.__exit__(t, v, tb)
def __repr__(self):
return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
@@ -140,168 +74,47 @@ class Condition(object):
return False
return True
- def wait(self, timeout=None):
- if not self._is_owned():
- raise RuntimeError("cannot wait on un-acquired lock")
+ def wait(self):
+ # The condition MUST be owned, but we don't check that.
waiter = Lock()
waiter.acquire()
self.__waiters.append(waiter)
saved_state = self._release_save()
try: # restore state no matter what (e.g., KeyboardInterrupt)
- if timeout is None:
- waiter.acquire()
- else:
- # Balancing act: We can't afford a pure busy loop, so we
- # have to sleep; but if we sleep the whole timeout time,
- # we'll be unresponsive. The scheme here sleeps very
- # little at first, longer as time goes on, but never longer
- # than 20 times per second (or the timeout time remaining).
- endtime = _time() + timeout
- delay = 0.0005 # 500 us -> initial delay of 1 ms
- while True:
- gotit = waiter.acquire(0)
- if gotit:
- break
- remaining = endtime - _time()
- if remaining <= 0:
- break
- delay = min(delay * 2, remaining, .05)
- _sleep(delay)
- if not gotit:
- try:
- self.__waiters.remove(waiter)
- except ValueError:
- pass
+ waiter.acquire() # Block on the native lock
finally:
self._acquire_restore(saved_state)
- def notify(self, n=1):
- if not self._is_owned():
- raise RuntimeError("cannot notify on un-acquired lock")
- __waiters = self.__waiters
- waiters = __waiters[:n]
- if not waiters:
- return
- for waiter in waiters:
- waiter.release()
- try:
- __waiters.remove(waiter)
- except ValueError:
- pass
-
- def notify_all(self):
- self.notify(len(self.__waiters))
-
-
-class Semaphore(object):
-
- # After Tim Peters' semaphore class, but not quite the same (no maximum)
-
- def __init__(self, value=1):
- if value < 0:
- raise ValueError("semaphore initial value must be >= 0")
- self.__cond = Condition(Lock())
- self.__value = value
-
- def acquire(self, blocking=1):
- rc = False
- self.__cond.acquire()
- while self.__value == 0:
- if not blocking:
- break
- self.__cond.wait()
+ def notify_one(self):
+ # The condition MUST be owned, but we don't check that.
+ try:
+ waiter = self.__waiters.pop()
+ except IndexError:
+ # Nobody around
+ pass
else:
- self.__value = self.__value - 1
- rc = True
- self.__cond.release()
- return rc
-
- __enter__ = acquire
-
- def release(self):
- self.__cond.acquire()
- self.__value = self.__value + 1
- self.__cond.notify()
- self.__cond.release()
-
- def __exit__(self, t, v, tb):
- self.release()
-
-
-class BoundedSemaphore(Semaphore):
- """Semaphore that checks that # releases is <= # acquires"""
- def __init__(self, value=1):
- Semaphore.__init__(self, value)
- self._initial_value = value
+ waiter.release()
- def release(self):
- if self.Semaphore__value >= self._initial_value: # pylint:disable=no-member
- raise ValueError("Semaphore released too many times")
- return Semaphore.release(self)
+class Queue(object):
+ """Create a queue object.
-class Event(object):
+ The queue is always infinite size.
+ """
- # After Tim Peters' event class (without is_posted())
+ __slots__ = ('_queue', '_mutex', '_not_empty', 'unfinished_tasks')
def __init__(self):
- self.__cond = Condition(Lock())
- self.__flag = False
-
- def _reset_internal_locks(self):
- # private! called by Thread._reset_internal_locks by _after_fork()
- self.__cond.__init__()
-
- def is_set(self):
- return self.__flag
-
- def set(self):
- self.__cond.acquire()
- try:
- self.__flag = True
- self.__cond.notify_all()
- finally:
- self.__cond.release()
-
- def clear(self):
- self.__cond.acquire()
- try:
- self.__flag = False
- finally:
- self.__cond.release()
-
- def wait(self, timeout=None):
- self.__cond.acquire()
- try:
- if not self.__flag:
- self.__cond.wait(timeout)
- return self.__flag
- finally:
- self.__cond.release()
-
-
-class Queue: # pylint:disable=old-style-class
- """Create a queue object with a given maximum size.
-
- If maxsize is <= 0, the queue size is infinite.
- """
- def __init__(self, maxsize=0):
- self.maxsize = maxsize
- self._init(maxsize)
+ self._queue = deque()
# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
- self.mutex = Lock()
+ self._mutex = Lock()
# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
- self.not_empty = Condition(self.mutex)
- # Notify not_full whenever an item is removed from the queue;
- # a thread waiting to put is notified then.
- self.not_full = Condition(self.mutex)
- # Notify all_tasks_done whenever the number of unfinished tasks
- # drops to zero; thread waiting to join() is notified to resume
- self.all_tasks_done = Condition(self.mutex)
+ self._not_empty = _Condition(self._mutex)
+
self.unfinished_tasks = 0
def task_done(self):
@@ -318,198 +131,38 @@ class Queue: # pylint:disable=old-style-class
Raises a ValueError if called more times than there were items
placed in the queue.
"""
- self.all_tasks_done.acquire()
- try:
+ with self._mutex:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
- self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
- finally:
- self.all_tasks_done.release()
-
- def join(self):
- """Blocks until all items in the Queue have been gotten and processed.
- The count of unfinished tasks goes up whenever an item is added to the
- queue. The count goes down whenever a consumer thread calls task_done()
- to indicate the item was retrieved and all work on it is complete.
-
- When the count of unfinished tasks drops to zero, join() unblocks.
- """
- self.all_tasks_done.acquire()
- try:
- while self.unfinished_tasks:
- self.all_tasks_done.wait()
- finally:
- self.all_tasks_done.release()
-
- def qsize(self):
+ def qsize(self, len=len):
"""Return the approximate size of the queue (not reliable!)."""
- self.mutex.acquire()
- try:
- return self._qsize()
- finally:
- self.mutex.release()
+ return len(self._queue)
def empty(self):
"""Return True if the queue is empty, False otherwise (not reliable!)."""
- self.mutex.acquire()
- try:
- return not self._qsize()
- finally:
- self.mutex.release()
+ return not self.qsize()
def full(self):
"""Return True if the queue is full, False otherwise (not reliable!)."""
- self.mutex.acquire()
- try:
- if self.maxsize <= 0:
- return False
- if self.maxsize >= self._qsize():
- return True
- finally:
- self.mutex.release()
+ return False
- def put(self, item, block=True, timeout=None):
+ def put(self, item):
"""Put an item into the queue.
-
- If optional args 'block' is true and 'timeout' is None (the default),
- block if necessary until a free slot is available. If 'timeout' is
- a positive number, it blocks at most 'timeout' seconds and raises
- the Full exception if no free slot was available within that time.
- Otherwise ('block' is false), put an item on the queue if a free slot
- is immediately available, else raise the Full exception ('timeout'
- is ignored in that case).
"""
- self.not_full.acquire()
- try:
- if self.maxsize > 0:
- if not block:
- if self._qsize() >= self.maxsize:
- raise Full
- elif timeout is None:
- while self._qsize() >= self.maxsize:
- self.not_full.wait()
- elif timeout < 0:
- raise ValueError("'timeout' must be a positive number")
- else:
- endtime = _time() + timeout
- while self._qsize() >= self.maxsize:
- remaining = endtime - _time()
- if remaining <= 0.0:
- raise Full
- self.not_full.wait(remaining)
- self._put(item)
+ with self._not_empty:
+ self._queue.append(item)
self.unfinished_tasks += 1
- self.not_empty.notify()
- finally:
- self.not_full.release()
-
- def put_nowait(self, item):
- """Put an item into the queue without blocking.
+ self._not_empty.notify_one()
- Only enqueue the item if a free slot is immediately available.
- Otherwise raise the Full exception.
- """
- return self.put(item, False)
-
- def get(self, block=True, timeout=None):
+ def get(self):
"""Remove and return an item from the queue.
-
- If optional args 'block' is true and 'timeout' is None (the default),
- block if necessary until an item is available. If 'timeout' is
- a positive number, it blocks at most 'timeout' seconds and raises
- the Empty exception if no item was available within that time.
- Otherwise ('block' is false), return an item if one is immediately
- available, else raise the Empty exception ('timeout' is ignored
- in that case).
"""
- self.not_empty.acquire()
- try:
- if not block:
- if not self._qsize():
- raise Empty
- elif timeout is None:
- while not self._qsize():
- self.not_empty.wait()
- elif timeout < 0:
- raise ValueError("'timeout' must be a positive number")
- else:
- endtime = _time() + timeout
- while not self._qsize():
- remaining = endtime - _time()
- if remaining <= 0.0:
- raise Empty
- self.not_empty.wait(remaining)
- item = self._get()
- self.not_full.notify()
+ with self._not_empty:
+ while not self._queue:
+ self._not_empty.wait()
+ item = self._queue.popleft()
return item
- finally:
- self.not_empty.release()
-
- def get_nowait(self):
- """Remove and return an item from the queue without blocking.
-
- Only get an item if one is immediately available. Otherwise
- raise the Empty exception.
- """
- return self.get(False)
-
- # Override these methods to implement other queue organizations
- # (e.g. stack or priority queue).
- # These will only be called with appropriate locks held
-
- # Initialize the queue representation
- def _init(self, maxsize):
- # pylint:disable=unused-argument
- self.queue = deque()
-
- def _qsize(self, len=len):
- return len(self.queue)
-
- # Put a new item in the queue
- def _put(self, item):
- self.queue.append(item)
-
- # Get an item from the queue
- def _get(self):
- return self.queue.popleft()
-
-
-class PriorityQueue(Queue):
- '''Variant of Queue that retrieves open entries in priority order (lowest first).
-
- Entries are typically tuples of the form: (priority number, data).
- '''
-
- def _init(self, maxsize):
- self.queue = []
-
- def _qsize(self, len=len):
- return len(self.queue)
-
- def _put(self, item, heappush=heapq.heappush):
- # pylint:disable=arguments-differ
- heappush(self.queue, item)
-
- def _get(self, heappop=heapq.heappop):
- # pylint:disable=arguments-differ
- return heappop(self.queue)
-
-
-class LifoQueue(Queue):
- '''Variant of Queue that retrieves most recently added entries first.'''
-
- def _init(self, maxsize):
- self.queue = []
-
- def _qsize(self, len=len):
- return len(self.queue)
-
- def _put(self, item):
- self.queue.append(item)
-
- def _get(self):
- return self.queue.pop()