aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/threadpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/threadpool.py')
-rw-r--r--python/gevent/threadpool.py162
1 files changed, 122 insertions, 40 deletions
diff --git a/python/gevent/threadpool.py b/python/gevent/threadpool.py
index 1d0e98d..c983390 100644
--- a/python/gevent/threadpool.py
+++ b/python/gevent/threadpool.py
@@ -2,23 +2,60 @@
from __future__ import absolute_import
import sys
import os
+
+from weakref import ref as wref
+
+from greenlet import greenlet as RawGreenlet
+
from gevent._compat import integer_types
-from gevent.hub import get_hub, getcurrent, sleep, _get_hub
+from gevent.hub import _get_hub_noargs as get_hub
+from gevent.hub import getcurrent
+from gevent.hub import sleep
+from gevent.hub import _get_hub
from gevent.event import AsyncResult
from gevent.greenlet import Greenlet
from gevent.pool import GroupMappingMixin
from gevent.lock import Semaphore
-from gevent._threading import Lock, Queue, start_new_thread
+
+from gevent._threading import Lock
+from gevent._threading import Queue
+from gevent._threading import start_new_thread
+from gevent._threading import get_thread_ident
-__all__ = ['ThreadPool',
- 'ThreadResult']
+__all__ = [
+ 'ThreadPool',
+ 'ThreadResult',
+]
+class _WorkerGreenlet(RawGreenlet):
+ # Exists to produce a more useful repr for worker pool
+ # threads/greenlets.
+
+ def __init__(self, threadpool):
+ RawGreenlet.__init__(self, threadpool._worker)
+ self.thread_ident = get_thread_ident()
+ self._threadpool_wref = wref(threadpool)
+
+ # Inform the gevent.util.GreenletTree that this should be
+ # considered the root (for printing purposes) and to
+ # ignore the parent attribute. (We can't set parent to None.)
+ self.greenlet_tree_is_root = True
+ self.parent.greenlet_tree_is_ignored = True
+
+ def __repr__(self):
+ return "<ThreadPoolWorker at 0x%x thread_ident=0x%x %s>" % (
+ id(self),
+ self.thread_ident,
+ self._threadpool_wref())
+
class ThreadPool(GroupMappingMixin):
"""
.. note:: The method :meth:`apply_async` will always return a new
greenlet, bypassing the threadpool entirely.
+ .. caution:: Instances of this class are only true if they have
+ unfinished tasks.
"""
def __init__(self, maxsize, hub=None):
@@ -29,7 +66,11 @@ class ThreadPool(GroupMappingMixin):
self.manager = None
self.pid = os.getpid()
self.fork_watcher = hub.loop.fork(ref=False)
- self._init(maxsize)
+ try:
+ self._init(maxsize)
+ except:
+ self.fork_watcher.close()
+ raise
def _set_maxsize(self, maxsize):
if not isinstance(maxsize, integer_types):
@@ -49,10 +90,16 @@ class ThreadPool(GroupMappingMixin):
maxsize = property(_get_maxsize, _set_maxsize)
def __repr__(self):
- return '<%s at 0x%x %s/%s/%s>' % (self.__class__.__name__, id(self), len(self), self.size, self.maxsize)
+ return '<%s at 0x%x %s/%s/%s hub=<%s at 0x%x thread_ident=0x%s>>' % (
+ self.__class__.__name__,
+ id(self),
+ len(self), self.size, self.maxsize,
+ self.hub.__class__.__name__, id(self.hub), self.hub.thread_ident)
def __len__(self):
# XXX just do unfinished_tasks property
+ # Note that this becomes the boolean value of this class,
+ # that's probably not what we want!
return self.task_queue.unfinished_tasks
def _get_size(self):
@@ -67,7 +114,7 @@ class ThreadPool(GroupMappingMixin):
self.manager.kill()
while self._size < size:
self._add_thread()
- delay = 0.0001
+ delay = getattr(self.hub.loop, 'min_sleep_time', 0.0001) # For libuv
while self._size > size:
while self._size - size > self.task_queue.unfinished_tasks:
self.task_queue.put(None)
@@ -110,6 +157,7 @@ class ThreadPool(GroupMappingMixin):
def kill(self):
self.size = 0
+ self.fork_watcher.close()
def _adjust_step(self):
# if there is a possibility & necessity for adding a thread, do it
@@ -143,7 +191,7 @@ class ThreadPool(GroupMappingMixin):
with self._lock:
self._size += 1
try:
- start_new_thread(self._worker, ())
+ start_new_thread(self.__trampoline, ())
except:
with self._lock:
self._size -= 1
@@ -157,7 +205,7 @@ class ThreadPool(GroupMappingMixin):
:return: A :class:`gevent.event.AsyncResult`.
"""
- while True:
+ while 1:
semaphore = self._semaphore
semaphore.acquire()
if semaphore is self._semaphore:
@@ -171,7 +219,7 @@ class ThreadPool(GroupMappingMixin):
# we get LoopExit (why?). Previously it was done with a rawlink on the
# AsyncResult and the comment that it is "competing for order with get(); this is not
# good, just make ThreadResult release the semaphore before doing anything else"
- thread_result = ThreadResult(result, hub=self.hub, call_when_ready=semaphore.release)
+ thread_result = ThreadResult(result, self.hub, semaphore.release)
task_queue.put((func, args, kwargs, thread_result))
self.adjust()
except:
@@ -189,14 +237,36 @@ class ThreadPool(GroupMappingMixin):
with _lock:
self._size -= 1
- _destroy_worker_hub = False
+ # XXX: This used to be false by default. It really seems like
+ # it should be true to avoid leaking resources.
+ _destroy_worker_hub = True
+
+
+ def __ignore_current_greenlet_blocking(self, hub):
+ if hub is not None and hub.periodic_monitoring_thread is not None:
+ hub.periodic_monitoring_thread.ignore_current_greenlet_blocking()
+
+ def __trampoline(self):
+ # The target that we create new threads with. It exists
+ # solely to create the _WorkerGreenlet and switch to it.
+ # (the __class__ of a raw greenlet cannot be changed.)
+ g = _WorkerGreenlet(self)
+ g.switch()
def _worker(self):
# pylint:disable=too-many-branches
need_decrease = True
try:
- while True:
+ while 1: # tiny bit faster than True on Py2
+ h = _get_hub()
+ if h is not None:
+ h.name = 'ThreadPool Worker Hub'
task_queue = self.task_queue
+ # While we block, don't let the monitoring thread, if any,
+ # report us as blocked. Indeed, so long as we never
+ # try to switch greenlets, don't report us as blocked---
+ # the threadpool is *meant* to run blocking tasks
+ self.__ignore_current_greenlet_blocking(h)
task = task_queue.get()
try:
if task is None:
@@ -263,72 +333,84 @@ class ThreadPool(GroupMappingMixin):
# Always go to Greenlet because our self.spawn uses threads
return True
+class _FakeAsync(object):
+
+ def send(self):
+ pass
+ close = stop = send
+
+ def __call_(self, result):
+ "fake out for 'receiver'"
+
+ def __bool__(self):
+ return False
+
+ __nonzero__ = __bool__
+
+_FakeAsync = _FakeAsync()
class ThreadResult(object):
# Using slots here helps to debug reference cycles/leaks
- __slots__ = ('exc_info', 'async', '_call_when_ready', 'value',
+ __slots__ = ('exc_info', 'async_watcher', '_call_when_ready', 'value',
'context', 'hub', 'receiver')
- def __init__(self, receiver, hub=None, call_when_ready=None):
- if hub is None:
- hub = get_hub()
+ def __init__(self, receiver, hub, call_when_ready):
self.receiver = receiver
self.hub = hub
self.context = None
self.value = None
self.exc_info = ()
- self.async = hub.loop.async()
+ self.async_watcher = hub.loop.async_()
self._call_when_ready = call_when_ready
- self.async.start(self._on_async)
+ self.async_watcher.start(self._on_async)
@property
def exception(self):
return self.exc_info[1] if self.exc_info else None
def _on_async(self):
- self.async.stop()
- if self._call_when_ready:
- # Typically this is pool.semaphore.release and we have to
- # call this in the Hub; if we don't we get the dreaded
- # LoopExit (XXX: Why?)
- self._call_when_ready()
+ self.async_watcher.stop()
+ self.async_watcher.close()
+
+ # Typically this is pool.semaphore.release and we have to
+ # call this in the Hub; if we don't we get the dreaded
+ # LoopExit (XXX: Why?)
+ self._call_when_ready()
+
try:
if self.exc_info:
self.hub.handle_error(self.context, *self.exc_info)
self.context = None
- self.async = None
+ self.async_watcher = _FakeAsync
self.hub = None
- self._call_when_ready = None
- if self.receiver is not None:
- self.receiver(self)
+ self._call_when_ready = _FakeAsync
+
+ self.receiver(self)
finally:
- self.receiver = None
+ self.receiver = _FakeAsync
self.value = None
if self.exc_info:
self.exc_info = (self.exc_info[0], self.exc_info[1], None)
def destroy(self):
- if self.async is not None:
- self.async.stop()
- self.async = None
+ self.async_watcher.stop()
+ self.async_watcher.close()
+ self.async_watcher = _FakeAsync
+
self.context = None
self.hub = None
- self._call_when_ready = None
- self.receiver = None
-
- def _ready(self):
- if self.async is not None:
- self.async.send()
+ self._call_when_ready = _FakeAsync
+ self.receiver = _FakeAsync
def set(self, value):
self.value = value
- self._ready()
+ self.async_watcher.send()
def handle_error(self, context, exc_info):
self.context = context
self.exc_info = exc_info
- self._ready()
+ self.async_watcher.send()
# link protocol:
def successful(self):