aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/threadpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/threadpool.py')
-rw-r--r--python/gevent/threadpool.py498
1 files changed, 498 insertions, 0 deletions
diff --git a/python/gevent/threadpool.py b/python/gevent/threadpool.py
new file mode 100644
index 0000000..1d0e98d
--- /dev/null
+++ b/python/gevent/threadpool.py
@@ -0,0 +1,498 @@
+# Copyright (c) 2012 Denis Bilenko. See LICENSE for details.
+from __future__ import absolute_import
+import sys
+import os
+from gevent._compat import integer_types
+from gevent.hub import get_hub, getcurrent, sleep, _get_hub
+from gevent.event import AsyncResult
+from gevent.greenlet import Greenlet
+from gevent.pool import GroupMappingMixin
+from gevent.lock import Semaphore
+from gevent._threading import Lock, Queue, start_new_thread
+
+
+__all__ = ['ThreadPool',
+ 'ThreadResult']
+
+
+class ThreadPool(GroupMappingMixin):
+ """
+ .. note:: The method :meth:`apply_async` will always return a new
+ greenlet, bypassing the threadpool entirely.
+ """
+
+ def __init__(self, maxsize, hub=None):
+ if hub is None:
+ hub = get_hub()
+ self.hub = hub
+ self._maxsize = 0
+ self.manager = None
+ self.pid = os.getpid()
+ self.fork_watcher = hub.loop.fork(ref=False)
+ self._init(maxsize)
+
+ def _set_maxsize(self, maxsize):
+ if not isinstance(maxsize, integer_types):
+ raise TypeError('maxsize must be integer: %r' % (maxsize, ))
+ if maxsize < 0:
+ raise ValueError('maxsize must not be negative: %r' % (maxsize, ))
+ difference = maxsize - self._maxsize
+ self._semaphore.counter += difference
+ self._maxsize = maxsize
+ self.adjust()
+ # make sure all currently blocking spawn() start unlocking if maxsize increased
+ self._semaphore._start_notify()
+
+ def _get_maxsize(self):
+ return self._maxsize
+
+ maxsize = property(_get_maxsize, _set_maxsize)
+
+ def __repr__(self):
+ return '<%s at 0x%x %s/%s/%s>' % (self.__class__.__name__, id(self), len(self), self.size, self.maxsize)
+
+ def __len__(self):
+ # XXX just do unfinished_tasks property
+ return self.task_queue.unfinished_tasks
+
+ def _get_size(self):
+ return self._size
+
+ def _set_size(self, size):
+ if size < 0:
+ raise ValueError('Size of the pool cannot be negative: %r' % (size, ))
+ if size > self._maxsize:
+ raise ValueError('Size of the pool cannot be bigger than maxsize: %r > %r' % (size, self._maxsize))
+ if self.manager:
+ self.manager.kill()
+ while self._size < size:
+ self._add_thread()
+ delay = 0.0001
+ while self._size > size:
+ while self._size - size > self.task_queue.unfinished_tasks:
+ self.task_queue.put(None)
+ if getcurrent() is self.hub:
+ break
+ sleep(delay)
+ delay = min(delay * 2, .05)
+ if self._size:
+ self.fork_watcher.start(self._on_fork)
+ else:
+ self.fork_watcher.stop()
+
+ size = property(_get_size, _set_size)
+
+ def _init(self, maxsize):
+ self._size = 0
+ self._semaphore = Semaphore(1)
+ self._lock = Lock()
+ self.task_queue = Queue()
+ self._set_maxsize(maxsize)
+
+ def _on_fork(self):
+ # fork() only leaves one thread; also screws up locks;
+ # let's re-create locks and threads.
+ # NOTE: See comment in gevent.hub.reinit.
+ pid = os.getpid()
+ if pid != self.pid:
+ self.pid = pid
+ # Do not mix fork() and threads; since fork() only copies one thread
+ # all objects referenced by other threads has refcount that will never
+ # go down to 0.
+ self._init(self._maxsize)
+
+ def join(self):
+ """Waits until all outstanding tasks have been completed."""
+ delay = 0.0005
+ while self.task_queue.unfinished_tasks > 0:
+ sleep(delay)
+ delay = min(delay * 2, .05)
+
+ def kill(self):
+ self.size = 0
+
+ def _adjust_step(self):
+ # if there is a possibility & necessity for adding a thread, do it
+ while self._size < self._maxsize and self.task_queue.unfinished_tasks > self._size:
+ self._add_thread()
+ # while the number of threads is more than maxsize, kill one
+ # we do not check what's already in task_queue - it could be all Nones
+ while self._size - self._maxsize > self.task_queue.unfinished_tasks:
+ self.task_queue.put(None)
+ if self._size:
+ self.fork_watcher.start(self._on_fork)
+ else:
+ self.fork_watcher.stop()
+
+ def _adjust_wait(self):
+ delay = 0.0001
+ while True:
+ self._adjust_step()
+ if self._size <= self._maxsize:
+ return
+ sleep(delay)
+ delay = min(delay * 2, .05)
+
+ def adjust(self):
+ self._adjust_step()
+ if not self.manager and self._size > self._maxsize:
+ # might need to feed more Nones into the pool
+ self.manager = Greenlet.spawn(self._adjust_wait)
+
+ def _add_thread(self):
+ with self._lock:
+ self._size += 1
+ try:
+ start_new_thread(self._worker, ())
+ except:
+ with self._lock:
+ self._size -= 1
+ raise
+
+ def spawn(self, func, *args, **kwargs):
+ """
+ Add a new task to the threadpool that will run ``func(*args, **kwargs)``.
+
+ Waits until a slot is available. Creates a new thread if necessary.
+
+ :return: A :class:`gevent.event.AsyncResult`.
+ """
+ while True:
+ semaphore = self._semaphore
+ semaphore.acquire()
+ if semaphore is self._semaphore:
+ break
+
+ thread_result = None
+ try:
+ task_queue = self.task_queue
+ result = AsyncResult()
+ # XXX We're calling the semaphore release function in the hub, otherwise
+ # we get LoopExit (why?). Previously it was done with a rawlink on the
+ # AsyncResult and the comment that it is "competing for order with get(); this is not
+ # good, just make ThreadResult release the semaphore before doing anything else"
+ thread_result = ThreadResult(result, hub=self.hub, call_when_ready=semaphore.release)
+ task_queue.put((func, args, kwargs, thread_result))
+ self.adjust()
+ except:
+ if thread_result is not None:
+ thread_result.destroy()
+ semaphore.release()
+ raise
+ return result
+
+ def _decrease_size(self):
+ if sys is None:
+ return
+ _lock = getattr(self, '_lock', None)
+ if _lock is not None:
+ with _lock:
+ self._size -= 1
+
+ _destroy_worker_hub = False
+
+ def _worker(self):
+ # pylint:disable=too-many-branches
+ need_decrease = True
+ try:
+ while True:
+ task_queue = self.task_queue
+ task = task_queue.get()
+ try:
+ if task is None:
+ need_decrease = False
+ self._decrease_size()
+ # we want first to decrease size, then decrease unfinished_tasks
+ # otherwise, _adjust might think there's one more idle thread that
+ # needs to be killed
+ return
+ func, args, kwargs, thread_result = task
+ try:
+ value = func(*args, **kwargs)
+ except: # pylint:disable=bare-except
+ exc_info = getattr(sys, 'exc_info', None)
+ if exc_info is None:
+ return
+ thread_result.handle_error((self, func), exc_info())
+ else:
+ if sys is None:
+ return
+ thread_result.set(value)
+ del value
+ finally:
+ del func, args, kwargs, thread_result, task
+ finally:
+ if sys is None:
+ return # pylint:disable=lost-exception
+ task_queue.task_done()
+ finally:
+ if need_decrease:
+ self._decrease_size()
+ if sys is not None and self._destroy_worker_hub:
+ hub = _get_hub()
+ if hub is not None:
+ hub.destroy(True)
+ del hub
+
+ def apply_e(self, expected_errors, function, args=None, kwargs=None):
+ """
+ .. deprecated:: 1.1a2
+ Identical to :meth:`apply`; the ``expected_errors`` argument is ignored.
+ """
+ # pylint:disable=unused-argument
+ # Deprecated but never documented. In the past, before
+ # self.apply() allowed all errors to be raised to the caller,
+ # expected_errors allowed a caller to specify a set of errors
+ # they wanted to be raised, through the wrap_errors function.
+ # In practice, it always took the value Exception or
+ # BaseException.
+ return self.apply(function, args, kwargs)
+
+ def _apply_immediately(self):
+ # If we're being called from a different thread than the one that
+ # created us, e.g., because a worker task is trying to use apply()
+ # recursively, we have no choice but to run the task immediately;
+ # if we try to AsyncResult.get() in the worker thread, it's likely to have
+ # nothing to switch to and lead to a LoopExit.
+ return get_hub() is not self.hub
+
+ def _apply_async_cb_spawn(self, callback, result):
+ callback(result)
+
+ def _apply_async_use_greenlet(self):
+ # Always go to Greenlet because our self.spawn uses threads
+ return True
+
+
+class ThreadResult(object):
+
+ # Using slots here helps to debug reference cycles/leaks
+ __slots__ = ('exc_info', 'async', '_call_when_ready', 'value',
+ 'context', 'hub', 'receiver')
+
+ def __init__(self, receiver, hub=None, call_when_ready=None):
+ if hub is None:
+ hub = get_hub()
+ self.receiver = receiver
+ self.hub = hub
+ self.context = None
+ self.value = None
+ self.exc_info = ()
+ self.async = hub.loop.async()
+ self._call_when_ready = call_when_ready
+ self.async.start(self._on_async)
+
+ @property
+ def exception(self):
+ return self.exc_info[1] if self.exc_info else None
+
+ def _on_async(self):
+ self.async.stop()
+ if self._call_when_ready:
+ # Typically this is pool.semaphore.release and we have to
+ # call this in the Hub; if we don't we get the dreaded
+ # LoopExit (XXX: Why?)
+ self._call_when_ready()
+ try:
+ if self.exc_info:
+ self.hub.handle_error(self.context, *self.exc_info)
+ self.context = None
+ self.async = None
+ self.hub = None
+ self._call_when_ready = None
+ if self.receiver is not None:
+ self.receiver(self)
+ finally:
+ self.receiver = None
+ self.value = None
+ if self.exc_info:
+ self.exc_info = (self.exc_info[0], self.exc_info[1], None)
+
+ def destroy(self):
+ if self.async is not None:
+ self.async.stop()
+ self.async = None
+ self.context = None
+ self.hub = None
+ self._call_when_ready = None
+ self.receiver = None
+
+ def _ready(self):
+ if self.async is not None:
+ self.async.send()
+
+ def set(self, value):
+ self.value = value
+ self._ready()
+
+ def handle_error(self, context, exc_info):
+ self.context = context
+ self.exc_info = exc_info
+ self._ready()
+
+ # link protocol:
+ def successful(self):
+ return self.exception is None
+
+
+def wrap_errors(errors, function, args, kwargs):
+ """
+ .. deprecated:: 1.1a2
+ Previously used by ThreadPool.apply_e.
+ """
+ try:
+ return True, function(*args, **kwargs)
+ except errors as ex:
+ return False, ex
+
+try:
+ import concurrent.futures
+except ImportError:
+ pass
+else:
+ __all__.append("ThreadPoolExecutor")
+
+ from gevent.timeout import Timeout as GTimeout
+ from gevent._util import Lazy
+ from concurrent.futures import _base as cfb
+
+ def _wrap_error(future, fn):
+ def cbwrap(_):
+ del _
+ # we're called with the async result, but
+ # be sure to pass in ourself. Also automatically
+ # unlink ourself so that we don't get called multiple
+ # times.
+ try:
+ fn(future)
+ except Exception: # pylint: disable=broad-except
+ future.hub.print_exception((fn, future), *sys.exc_info())
+ cbwrap.auto_unlink = True
+ return cbwrap
+
+ def _wrap(future, fn):
+ def f(_):
+ fn(future)
+ f.auto_unlink = True
+ return f
+
+ class _FutureProxy(object):
+ def __init__(self, asyncresult):
+ self.asyncresult = asyncresult
+
+ # Internal implementation details of a c.f.Future
+
+ @Lazy
+ def _condition(self):
+ from gevent import monkey
+ if monkey.is_module_patched('threading') or self.done():
+ import threading
+ return threading.Condition()
+ # We can only properly work with conditions
+ # when we've been monkey-patched. This is necessary
+ # for the wait/as_completed module functions.
+ raise AttributeError("_condition")
+
+ @Lazy
+ def _waiters(self):
+ self.asyncresult.rawlink(self.__when_done)
+ return []
+
+ def __when_done(self, _):
+ # We should only be called when _waiters has
+ # already been accessed.
+ waiters = getattr(self, '_waiters')
+ for w in waiters: # pylint:disable=not-an-iterable
+ if self.successful():
+ w.add_result(self)
+ else:
+ w.add_exception(self)
+
+ __when_done.auto_unlink = True
+
+ @property
+ def _state(self):
+ if self.done():
+ return cfb.FINISHED
+ return cfb.RUNNING
+
+ def set_running_or_notify_cancel(self):
+ # Does nothing, not even any consistency checks. It's
+ # meant to be internal to the executor and we don't use it.
+ return
+
+ def result(self, timeout=None):
+ try:
+ return self.asyncresult.result(timeout=timeout)
+ except GTimeout:
+ # XXX: Theoretically this could be a completely
+ # unrelated timeout instance. Do we care about that?
+ raise concurrent.futures.TimeoutError()
+
+ def exception(self, timeout=None):
+ try:
+ self.asyncresult.get(timeout=timeout)
+ return self.asyncresult.exception
+ except GTimeout:
+ raise concurrent.futures.TimeoutError()
+
+ def add_done_callback(self, fn):
+ if self.done():
+ fn(self)
+ else:
+ self.asyncresult.rawlink(_wrap_error(self, fn))
+
+ def rawlink(self, fn):
+ self.asyncresult.rawlink(_wrap(self, fn))
+
+ def __str__(self):
+ return str(self.asyncresult)
+
+ def __getattr__(self, name):
+ return getattr(self.asyncresult, name)
+
+ class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
+ """
+ A version of :class:`concurrent.futures.ThreadPoolExecutor` that
+ always uses native threads, even when threading is monkey-patched.
+
+ The ``Future`` objects returned from this object can be used
+ with gevent waiting primitives like :func:`gevent.wait`.
+
+ .. caution:: If threading is *not* monkey-patched, then the ``Future``
+ objects returned by this object are not guaranteed to work with
+ :func:`~concurrent.futures.as_completed` and :func:`~concurrent.futures.wait`.
+ The individual blocking methods like :meth:`~concurrent.futures.Future.result`
+ and :meth:`~concurrent.futures.Future.exception` will always work.
+
+ .. versionadded:: 1.2a1
+ This is a provisional API.
+ """
+
+ def __init__(self, max_workers):
+ super(ThreadPoolExecutor, self).__init__(max_workers)
+ self._threadpool = ThreadPool(max_workers)
+ self._threadpool._destroy_worker_hub = True
+
+ def submit(self, fn, *args, **kwargs):
+ with self._shutdown_lock: # pylint:disable=not-context-manager
+ if self._shutdown:
+ raise RuntimeError('cannot schedule new futures after shutdown')
+
+ future = self._threadpool.spawn(fn, *args, **kwargs)
+ return _FutureProxy(future)
+
+ def shutdown(self, wait=True):
+ super(ThreadPoolExecutor, self).shutdown(wait)
+ # XXX: We don't implement wait properly
+ kill = getattr(self._threadpool, 'kill', None)
+ if kill: # pylint:disable=using-constant-test
+ self._threadpool.kill()
+ self._threadpool = None
+
+ kill = shutdown # greentest compat
+
+ def _adjust_thread_count(self):
+ # Does nothing. We don't want to spawn any "threads",
+ # let the threadpool handle that.
+ pass