diff options
Diffstat (limited to 'python/gevent/threadpool.py')
-rw-r--r-- | python/gevent/threadpool.py | 498 |
1 files changed, 498 insertions, 0 deletions
diff --git a/python/gevent/threadpool.py b/python/gevent/threadpool.py new file mode 100644 index 0000000..1d0e98d --- /dev/null +++ b/python/gevent/threadpool.py @@ -0,0 +1,498 @@ +# Copyright (c) 2012 Denis Bilenko. See LICENSE for details. +from __future__ import absolute_import +import sys +import os +from gevent._compat import integer_types +from gevent.hub import get_hub, getcurrent, sleep, _get_hub +from gevent.event import AsyncResult +from gevent.greenlet import Greenlet +from gevent.pool import GroupMappingMixin +from gevent.lock import Semaphore +from gevent._threading import Lock, Queue, start_new_thread + + +__all__ = ['ThreadPool', + 'ThreadResult'] + + +class ThreadPool(GroupMappingMixin): + """ + .. note:: The method :meth:`apply_async` will always return a new + greenlet, bypassing the threadpool entirely. + """ + + def __init__(self, maxsize, hub=None): + if hub is None: + hub = get_hub() + self.hub = hub + self._maxsize = 0 + self.manager = None + self.pid = os.getpid() + self.fork_watcher = hub.loop.fork(ref=False) + self._init(maxsize) + + def _set_maxsize(self, maxsize): + if not isinstance(maxsize, integer_types): + raise TypeError('maxsize must be integer: %r' % (maxsize, )) + if maxsize < 0: + raise ValueError('maxsize must not be negative: %r' % (maxsize, )) + difference = maxsize - self._maxsize + self._semaphore.counter += difference + self._maxsize = maxsize + self.adjust() + # make sure all currently blocking spawn() start unlocking if maxsize increased + self._semaphore._start_notify() + + def _get_maxsize(self): + return self._maxsize + + maxsize = property(_get_maxsize, _set_maxsize) + + def __repr__(self): + return '<%s at 0x%x %s/%s/%s>' % (self.__class__.__name__, id(self), len(self), self.size, self.maxsize) + + def __len__(self): + # XXX just do unfinished_tasks property + return self.task_queue.unfinished_tasks + + def _get_size(self): + return self._size + + def _set_size(self, size): + if size < 0: + raise ValueError('Size of the pool cannot be negative: %r' % (size, )) + if size > self._maxsize: + raise ValueError('Size of the pool cannot be bigger than maxsize: %r > %r' % (size, self._maxsize)) + if self.manager: + self.manager.kill() + while self._size < size: + self._add_thread() + delay = 0.0001 + while self._size > size: + while self._size - size > self.task_queue.unfinished_tasks: + self.task_queue.put(None) + if getcurrent() is self.hub: + break + sleep(delay) + delay = min(delay * 2, .05) + if self._size: + self.fork_watcher.start(self._on_fork) + else: + self.fork_watcher.stop() + + size = property(_get_size, _set_size) + + def _init(self, maxsize): + self._size = 0 + self._semaphore = Semaphore(1) + self._lock = Lock() + self.task_queue = Queue() + self._set_maxsize(maxsize) + + def _on_fork(self): + # fork() only leaves one thread; also screws up locks; + # let's re-create locks and threads. + # NOTE: See comment in gevent.hub.reinit. + pid = os.getpid() + if pid != self.pid: + self.pid = pid + # Do not mix fork() and threads; since fork() only copies one thread + # all objects referenced by other threads has refcount that will never + # go down to 0. + self._init(self._maxsize) + + def join(self): + """Waits until all outstanding tasks have been completed.""" + delay = 0.0005 + while self.task_queue.unfinished_tasks > 0: + sleep(delay) + delay = min(delay * 2, .05) + + def kill(self): + self.size = 0 + + def _adjust_step(self): + # if there is a possibility & necessity for adding a thread, do it + while self._size < self._maxsize and self.task_queue.unfinished_tasks > self._size: + self._add_thread() + # while the number of threads is more than maxsize, kill one + # we do not check what's already in task_queue - it could be all Nones + while self._size - self._maxsize > self.task_queue.unfinished_tasks: + self.task_queue.put(None) + if self._size: + self.fork_watcher.start(self._on_fork) + else: + self.fork_watcher.stop() + + def _adjust_wait(self): + delay = 0.0001 + while True: + self._adjust_step() + if self._size <= self._maxsize: + return + sleep(delay) + delay = min(delay * 2, .05) + + def adjust(self): + self._adjust_step() + if not self.manager and self._size > self._maxsize: + # might need to feed more Nones into the pool + self.manager = Greenlet.spawn(self._adjust_wait) + + def _add_thread(self): + with self._lock: + self._size += 1 + try: + start_new_thread(self._worker, ()) + except: + with self._lock: + self._size -= 1 + raise + + def spawn(self, func, *args, **kwargs): + """ + Add a new task to the threadpool that will run ``func(*args, **kwargs)``. + + Waits until a slot is available. Creates a new thread if necessary. + + :return: A :class:`gevent.event.AsyncResult`. + """ + while True: + semaphore = self._semaphore + semaphore.acquire() + if semaphore is self._semaphore: + break + + thread_result = None + try: + task_queue = self.task_queue + result = AsyncResult() + # XXX We're calling the semaphore release function in the hub, otherwise + # we get LoopExit (why?). Previously it was done with a rawlink on the + # AsyncResult and the comment that it is "competing for order with get(); this is not + # good, just make ThreadResult release the semaphore before doing anything else" + thread_result = ThreadResult(result, hub=self.hub, call_when_ready=semaphore.release) + task_queue.put((func, args, kwargs, thread_result)) + self.adjust() + except: + if thread_result is not None: + thread_result.destroy() + semaphore.release() + raise + return result + + def _decrease_size(self): + if sys is None: + return + _lock = getattr(self, '_lock', None) + if _lock is not None: + with _lock: + self._size -= 1 + + _destroy_worker_hub = False + + def _worker(self): + # pylint:disable=too-many-branches + need_decrease = True + try: + while True: + task_queue = self.task_queue + task = task_queue.get() + try: + if task is None: + need_decrease = False + self._decrease_size() + # we want first to decrease size, then decrease unfinished_tasks + # otherwise, _adjust might think there's one more idle thread that + # needs to be killed + return + func, args, kwargs, thread_result = task + try: + value = func(*args, **kwargs) + except: # pylint:disable=bare-except + exc_info = getattr(sys, 'exc_info', None) + if exc_info is None: + return + thread_result.handle_error((self, func), exc_info()) + else: + if sys is None: + return + thread_result.set(value) + del value + finally: + del func, args, kwargs, thread_result, task + finally: + if sys is None: + return # pylint:disable=lost-exception + task_queue.task_done() + finally: + if need_decrease: + self._decrease_size() + if sys is not None and self._destroy_worker_hub: + hub = _get_hub() + if hub is not None: + hub.destroy(True) + del hub + + def apply_e(self, expected_errors, function, args=None, kwargs=None): + """ + .. deprecated:: 1.1a2 + Identical to :meth:`apply`; the ``expected_errors`` argument is ignored. + """ + # pylint:disable=unused-argument + # Deprecated but never documented. In the past, before + # self.apply() allowed all errors to be raised to the caller, + # expected_errors allowed a caller to specify a set of errors + # they wanted to be raised, through the wrap_errors function. + # In practice, it always took the value Exception or + # BaseException. + return self.apply(function, args, kwargs) + + def _apply_immediately(self): + # If we're being called from a different thread than the one that + # created us, e.g., because a worker task is trying to use apply() + # recursively, we have no choice but to run the task immediately; + # if we try to AsyncResult.get() in the worker thread, it's likely to have + # nothing to switch to and lead to a LoopExit. + return get_hub() is not self.hub + + def _apply_async_cb_spawn(self, callback, result): + callback(result) + + def _apply_async_use_greenlet(self): + # Always go to Greenlet because our self.spawn uses threads + return True + + +class ThreadResult(object): + + # Using slots here helps to debug reference cycles/leaks + __slots__ = ('exc_info', 'async', '_call_when_ready', 'value', + 'context', 'hub', 'receiver') + + def __init__(self, receiver, hub=None, call_when_ready=None): + if hub is None: + hub = get_hub() + self.receiver = receiver + self.hub = hub + self.context = None + self.value = None + self.exc_info = () + self.async = hub.loop.async() + self._call_when_ready = call_when_ready + self.async.start(self._on_async) + + @property + def exception(self): + return self.exc_info[1] if self.exc_info else None + + def _on_async(self): + self.async.stop() + if self._call_when_ready: + # Typically this is pool.semaphore.release and we have to + # call this in the Hub; if we don't we get the dreaded + # LoopExit (XXX: Why?) + self._call_when_ready() + try: + if self.exc_info: + self.hub.handle_error(self.context, *self.exc_info) + self.context = None + self.async = None + self.hub = None + self._call_when_ready = None + if self.receiver is not None: + self.receiver(self) + finally: + self.receiver = None + self.value = None + if self.exc_info: + self.exc_info = (self.exc_info[0], self.exc_info[1], None) + + def destroy(self): + if self.async is not None: + self.async.stop() + self.async = None + self.context = None + self.hub = None + self._call_when_ready = None + self.receiver = None + + def _ready(self): + if self.async is not None: + self.async.send() + + def set(self, value): + self.value = value + self._ready() + + def handle_error(self, context, exc_info): + self.context = context + self.exc_info = exc_info + self._ready() + + # link protocol: + def successful(self): + return self.exception is None + + +def wrap_errors(errors, function, args, kwargs): + """ + .. deprecated:: 1.1a2 + Previously used by ThreadPool.apply_e. + """ + try: + return True, function(*args, **kwargs) + except errors as ex: + return False, ex + +try: + import concurrent.futures +except ImportError: + pass +else: + __all__.append("ThreadPoolExecutor") + + from gevent.timeout import Timeout as GTimeout + from gevent._util import Lazy + from concurrent.futures import _base as cfb + + def _wrap_error(future, fn): + def cbwrap(_): + del _ + # we're called with the async result, but + # be sure to pass in ourself. Also automatically + # unlink ourself so that we don't get called multiple + # times. + try: + fn(future) + except Exception: # pylint: disable=broad-except + future.hub.print_exception((fn, future), *sys.exc_info()) + cbwrap.auto_unlink = True + return cbwrap + + def _wrap(future, fn): + def f(_): + fn(future) + f.auto_unlink = True + return f + + class _FutureProxy(object): + def __init__(self, asyncresult): + self.asyncresult = asyncresult + + # Internal implementation details of a c.f.Future + + @Lazy + def _condition(self): + from gevent import monkey + if monkey.is_module_patched('threading') or self.done(): + import threading + return threading.Condition() + # We can only properly work with conditions + # when we've been monkey-patched. This is necessary + # for the wait/as_completed module functions. + raise AttributeError("_condition") + + @Lazy + def _waiters(self): + self.asyncresult.rawlink(self.__when_done) + return [] + + def __when_done(self, _): + # We should only be called when _waiters has + # already been accessed. + waiters = getattr(self, '_waiters') + for w in waiters: # pylint:disable=not-an-iterable + if self.successful(): + w.add_result(self) + else: + w.add_exception(self) + + __when_done.auto_unlink = True + + @property + def _state(self): + if self.done(): + return cfb.FINISHED + return cfb.RUNNING + + def set_running_or_notify_cancel(self): + # Does nothing, not even any consistency checks. It's + # meant to be internal to the executor and we don't use it. + return + + def result(self, timeout=None): + try: + return self.asyncresult.result(timeout=timeout) + except GTimeout: + # XXX: Theoretically this could be a completely + # unrelated timeout instance. Do we care about that? + raise concurrent.futures.TimeoutError() + + def exception(self, timeout=None): + try: + self.asyncresult.get(timeout=timeout) + return self.asyncresult.exception + except GTimeout: + raise concurrent.futures.TimeoutError() + + def add_done_callback(self, fn): + if self.done(): + fn(self) + else: + self.asyncresult.rawlink(_wrap_error(self, fn)) + + def rawlink(self, fn): + self.asyncresult.rawlink(_wrap(self, fn)) + + def __str__(self): + return str(self.asyncresult) + + def __getattr__(self, name): + return getattr(self.asyncresult, name) + + class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor): + """ + A version of :class:`concurrent.futures.ThreadPoolExecutor` that + always uses native threads, even when threading is monkey-patched. + + The ``Future`` objects returned from this object can be used + with gevent waiting primitives like :func:`gevent.wait`. + + .. caution:: If threading is *not* monkey-patched, then the ``Future`` + objects returned by this object are not guaranteed to work with + :func:`~concurrent.futures.as_completed` and :func:`~concurrent.futures.wait`. + The individual blocking methods like :meth:`~concurrent.futures.Future.result` + and :meth:`~concurrent.futures.Future.exception` will always work. + + .. versionadded:: 1.2a1 + This is a provisional API. + """ + + def __init__(self, max_workers): + super(ThreadPoolExecutor, self).__init__(max_workers) + self._threadpool = ThreadPool(max_workers) + self._threadpool._destroy_worker_hub = True + + def submit(self, fn, *args, **kwargs): + with self._shutdown_lock: # pylint:disable=not-context-manager + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + + future = self._threadpool.spawn(fn, *args, **kwargs) + return _FutureProxy(future) + + def shutdown(self, wait=True): + super(ThreadPoolExecutor, self).shutdown(wait) + # XXX: We don't implement wait properly + kill = getattr(self._threadpool, 'kill', None) + if kill: # pylint:disable=using-constant-test + self._threadpool.kill() + self._threadpool = None + + kill = shutdown # greentest compat + + def _adjust_thread_count(self): + # Does nothing. We don't want to spawn any "threads", + # let the threadpool handle that. + pass |