diff options
Diffstat (limited to 'python/gevent/threadpool.py')
-rw-r--r-- | python/gevent/threadpool.py | 162 |
1 files changed, 122 insertions, 40 deletions
diff --git a/python/gevent/threadpool.py b/python/gevent/threadpool.py index 1d0e98d..c983390 100644 --- a/python/gevent/threadpool.py +++ b/python/gevent/threadpool.py @@ -2,23 +2,60 @@ from __future__ import absolute_import import sys import os + +from weakref import ref as wref + +from greenlet import greenlet as RawGreenlet + from gevent._compat import integer_types -from gevent.hub import get_hub, getcurrent, sleep, _get_hub +from gevent.hub import _get_hub_noargs as get_hub +from gevent.hub import getcurrent +from gevent.hub import sleep +from gevent.hub import _get_hub from gevent.event import AsyncResult from gevent.greenlet import Greenlet from gevent.pool import GroupMappingMixin from gevent.lock import Semaphore -from gevent._threading import Lock, Queue, start_new_thread + +from gevent._threading import Lock +from gevent._threading import Queue +from gevent._threading import start_new_thread +from gevent._threading import get_thread_ident -__all__ = ['ThreadPool', - 'ThreadResult'] +__all__ = [ + 'ThreadPool', + 'ThreadResult', +] +class _WorkerGreenlet(RawGreenlet): + # Exists to produce a more useful repr for worker pool + # threads/greenlets. + + def __init__(self, threadpool): + RawGreenlet.__init__(self, threadpool._worker) + self.thread_ident = get_thread_ident() + self._threadpool_wref = wref(threadpool) + + # Inform the gevent.util.GreenletTree that this should be + # considered the root (for printing purposes) and to + # ignore the parent attribute. (We can't set parent to None.) + self.greenlet_tree_is_root = True + self.parent.greenlet_tree_is_ignored = True + + def __repr__(self): + return "<ThreadPoolWorker at 0x%x thread_ident=0x%x %s>" % ( + id(self), + self.thread_ident, + self._threadpool_wref()) + class ThreadPool(GroupMappingMixin): """ .. note:: The method :meth:`apply_async` will always return a new greenlet, bypassing the threadpool entirely. + .. caution:: Instances of this class are only true if they have + unfinished tasks. """ def __init__(self, maxsize, hub=None): @@ -29,7 +66,11 @@ class ThreadPool(GroupMappingMixin): self.manager = None self.pid = os.getpid() self.fork_watcher = hub.loop.fork(ref=False) - self._init(maxsize) + try: + self._init(maxsize) + except: + self.fork_watcher.close() + raise def _set_maxsize(self, maxsize): if not isinstance(maxsize, integer_types): @@ -49,10 +90,16 @@ class ThreadPool(GroupMappingMixin): maxsize = property(_get_maxsize, _set_maxsize) def __repr__(self): - return '<%s at 0x%x %s/%s/%s>' % (self.__class__.__name__, id(self), len(self), self.size, self.maxsize) + return '<%s at 0x%x %s/%s/%s hub=<%s at 0x%x thread_ident=0x%s>>' % ( + self.__class__.__name__, + id(self), + len(self), self.size, self.maxsize, + self.hub.__class__.__name__, id(self.hub), self.hub.thread_ident) def __len__(self): # XXX just do unfinished_tasks property + # Note that this becomes the boolean value of this class, + # that's probably not what we want! return self.task_queue.unfinished_tasks def _get_size(self): @@ -67,7 +114,7 @@ class ThreadPool(GroupMappingMixin): self.manager.kill() while self._size < size: self._add_thread() - delay = 0.0001 + delay = getattr(self.hub.loop, 'min_sleep_time', 0.0001) # For libuv while self._size > size: while self._size - size > self.task_queue.unfinished_tasks: self.task_queue.put(None) @@ -110,6 +157,7 @@ class ThreadPool(GroupMappingMixin): def kill(self): self.size = 0 + self.fork_watcher.close() def _adjust_step(self): # if there is a possibility & necessity for adding a thread, do it @@ -143,7 +191,7 @@ class ThreadPool(GroupMappingMixin): with self._lock: self._size += 1 try: - start_new_thread(self._worker, ()) + start_new_thread(self.__trampoline, ()) except: with self._lock: self._size -= 1 @@ -157,7 +205,7 @@ class ThreadPool(GroupMappingMixin): :return: A :class:`gevent.event.AsyncResult`. """ - while True: + while 1: semaphore = self._semaphore semaphore.acquire() if semaphore is self._semaphore: @@ -171,7 +219,7 @@ class ThreadPool(GroupMappingMixin): # we get LoopExit (why?). Previously it was done with a rawlink on the # AsyncResult and the comment that it is "competing for order with get(); this is not # good, just make ThreadResult release the semaphore before doing anything else" - thread_result = ThreadResult(result, hub=self.hub, call_when_ready=semaphore.release) + thread_result = ThreadResult(result, self.hub, semaphore.release) task_queue.put((func, args, kwargs, thread_result)) self.adjust() except: @@ -189,14 +237,36 @@ class ThreadPool(GroupMappingMixin): with _lock: self._size -= 1 - _destroy_worker_hub = False + # XXX: This used to be false by default. It really seems like + # it should be true to avoid leaking resources. + _destroy_worker_hub = True + + + def __ignore_current_greenlet_blocking(self, hub): + if hub is not None and hub.periodic_monitoring_thread is not None: + hub.periodic_monitoring_thread.ignore_current_greenlet_blocking() + + def __trampoline(self): + # The target that we create new threads with. It exists + # solely to create the _WorkerGreenlet and switch to it. + # (the __class__ of a raw greenlet cannot be changed.) + g = _WorkerGreenlet(self) + g.switch() def _worker(self): # pylint:disable=too-many-branches need_decrease = True try: - while True: + while 1: # tiny bit faster than True on Py2 + h = _get_hub() + if h is not None: + h.name = 'ThreadPool Worker Hub' task_queue = self.task_queue + # While we block, don't let the monitoring thread, if any, + # report us as blocked. Indeed, so long as we never + # try to switch greenlets, don't report us as blocked--- + # the threadpool is *meant* to run blocking tasks + self.__ignore_current_greenlet_blocking(h) task = task_queue.get() try: if task is None: @@ -263,72 +333,84 @@ class ThreadPool(GroupMappingMixin): # Always go to Greenlet because our self.spawn uses threads return True +class _FakeAsync(object): + + def send(self): + pass + close = stop = send + + def __call_(self, result): + "fake out for 'receiver'" + + def __bool__(self): + return False + + __nonzero__ = __bool__ + +_FakeAsync = _FakeAsync() class ThreadResult(object): # Using slots here helps to debug reference cycles/leaks - __slots__ = ('exc_info', 'async', '_call_when_ready', 'value', + __slots__ = ('exc_info', 'async_watcher', '_call_when_ready', 'value', 'context', 'hub', 'receiver') - def __init__(self, receiver, hub=None, call_when_ready=None): - if hub is None: - hub = get_hub() + def __init__(self, receiver, hub, call_when_ready): self.receiver = receiver self.hub = hub self.context = None self.value = None self.exc_info = () - self.async = hub.loop.async() + self.async_watcher = hub.loop.async_() self._call_when_ready = call_when_ready - self.async.start(self._on_async) + self.async_watcher.start(self._on_async) @property def exception(self): return self.exc_info[1] if self.exc_info else None def _on_async(self): - self.async.stop() - if self._call_when_ready: - # Typically this is pool.semaphore.release and we have to - # call this in the Hub; if we don't we get the dreaded - # LoopExit (XXX: Why?) - self._call_when_ready() + self.async_watcher.stop() + self.async_watcher.close() + + # Typically this is pool.semaphore.release and we have to + # call this in the Hub; if we don't we get the dreaded + # LoopExit (XXX: Why?) + self._call_when_ready() + try: if self.exc_info: self.hub.handle_error(self.context, *self.exc_info) self.context = None - self.async = None + self.async_watcher = _FakeAsync self.hub = None - self._call_when_ready = None - if self.receiver is not None: - self.receiver(self) + self._call_when_ready = _FakeAsync + + self.receiver(self) finally: - self.receiver = None + self.receiver = _FakeAsync self.value = None if self.exc_info: self.exc_info = (self.exc_info[0], self.exc_info[1], None) def destroy(self): - if self.async is not None: - self.async.stop() - self.async = None + self.async_watcher.stop() + self.async_watcher.close() + self.async_watcher = _FakeAsync + self.context = None self.hub = None - self._call_when_ready = None - self.receiver = None - - def _ready(self): - if self.async is not None: - self.async.send() + self._call_when_ready = _FakeAsync + self.receiver = _FakeAsync def set(self, value): self.value = value - self._ready() + self.async_watcher.send() def handle_error(self, context, exc_info): self.context = context self.exc_info = exc_info - self._ready() + self.async_watcher.send() # link protocol: def successful(self): |