aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/_threading.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/_threading.py')
-rw-r--r--python/gevent/_threading.py168
1 files changed, 0 insertions, 168 deletions
diff --git a/python/gevent/_threading.py b/python/gevent/_threading.py
deleted file mode 100644
index 9258dfb..0000000
--- a/python/gevent/_threading.py
+++ /dev/null
@@ -1,168 +0,0 @@
-"""A clone of threading module (version 2.7.2) that always
-targets real OS threads. (Unlike 'threading' which flips between
-green and OS threads based on whether the monkey patching is in effect
-or not).
-
-This module is missing 'Thread' class, but includes 'Queue'.
-"""
-from __future__ import absolute_import
-
-from collections import deque
-
-from gevent import monkey
-from gevent._compat import thread_mod_name
-
-
-__all__ = [
- 'Lock',
- 'Queue',
-]
-
-
-start_new_thread, Lock, get_thread_ident, = monkey.get_original(thread_mod_name, [
- 'start_new_thread', 'allocate_lock', 'get_ident',
-])
-
-
-# pylint 2.0.dev2 things collections.dequeue.popleft() doesn't return
-# pylint:disable=assignment-from-no-return
-
-
-class _Condition(object):
- # pylint:disable=method-hidden
-
- def __init__(self, lock):
- self.__lock = lock
- self.__waiters = []
-
- # If the lock defines _release_save() and/or _acquire_restore(),
- # these override the default implementations (which just call
- # release() and acquire() on the lock). Ditto for _is_owned().
- try:
- self._release_save = lock._release_save
- except AttributeError:
- pass
- try:
- self._acquire_restore = lock._acquire_restore
- except AttributeError:
- pass
- try:
- self._is_owned = lock._is_owned
- except AttributeError:
- pass
-
- def __enter__(self):
- return self.__lock.__enter__()
-
- def __exit__(self, t, v, tb):
- return self.__lock.__exit__(t, v, tb)
-
- def __repr__(self):
- return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
-
- def _release_save(self):
- self.__lock.release() # No state to save
-
- def _acquire_restore(self, x): # pylint:disable=unused-argument
- self.__lock.acquire() # Ignore saved state
-
- def _is_owned(self):
- # Return True if lock is owned by current_thread.
- # This method is called only if __lock doesn't have _is_owned().
- if self.__lock.acquire(0):
- self.__lock.release()
- return False
- return True
-
- def wait(self):
- # The condition MUST be owned, but we don't check that.
- waiter = Lock()
- waiter.acquire()
- self.__waiters.append(waiter)
- saved_state = self._release_save()
- try: # restore state no matter what (e.g., KeyboardInterrupt)
- waiter.acquire() # Block on the native lock
- finally:
- self._acquire_restore(saved_state)
-
- def notify_one(self):
- # The condition MUST be owned, but we don't check that.
- try:
- waiter = self.__waiters.pop()
- except IndexError:
- # Nobody around
- pass
- else:
- waiter.release()
-
-
-class Queue(object):
- """Create a queue object.
-
- The queue is always infinite size.
- """
-
- __slots__ = ('_queue', '_mutex', '_not_empty', 'unfinished_tasks')
-
- def __init__(self):
- self._queue = deque()
- # mutex must be held whenever the queue is mutating. All methods
- # that acquire mutex must release it before returning. mutex
- # is shared between the three conditions, so acquiring and
- # releasing the conditions also acquires and releases mutex.
- self._mutex = Lock()
- # Notify not_empty whenever an item is added to the queue; a
- # thread waiting to get is notified then.
- self._not_empty = _Condition(self._mutex)
-
- self.unfinished_tasks = 0
-
- def task_done(self):
- """Indicate that a formerly enqueued task is complete.
-
- Used by Queue consumer threads. For each get() used to fetch a task,
- a subsequent call to task_done() tells the queue that the processing
- on the task is complete.
-
- If a join() is currently blocking, it will resume when all items
- have been processed (meaning that a task_done() call was received
- for every item that had been put() into the queue).
-
- Raises a ValueError if called more times than there were items
- placed in the queue.
- """
- with self._mutex:
- unfinished = self.unfinished_tasks - 1
- if unfinished <= 0:
- if unfinished < 0:
- raise ValueError('task_done() called too many times')
- self.unfinished_tasks = unfinished
-
- def qsize(self, len=len):
- """Return the approximate size of the queue (not reliable!)."""
- return len(self._queue)
-
- def empty(self):
- """Return True if the queue is empty, False otherwise (not reliable!)."""
- return not self.qsize()
-
- def full(self):
- """Return True if the queue is full, False otherwise (not reliable!)."""
- return False
-
- def put(self, item):
- """Put an item into the queue.
- """
- with self._not_empty:
- self._queue.append(item)
- self.unfinished_tasks += 1
- self._not_empty.notify_one()
-
- def get(self):
- """Remove and return an item from the queue.
- """
- with self._not_empty:
- while not self._queue:
- self._not_empty.wait()
- item = self._queue.popleft()
- return item