aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/threadpool.py
diff options
context:
space:
mode:
authorJames Taylor <user234683@users.noreply.github.com>2019-09-06 16:31:13 -0700
committerJames Taylor <user234683@users.noreply.github.com>2019-09-06 16:31:13 -0700
commit3d57e14df7ba5f14a634295caf3b2e60da50bfe2 (patch)
tree4903bcb79a49ad714a1a9129765b9545405c9978 /python/gevent/threadpool.py
parentac32b24b2a011292b704a3f27e8fd08a7ae9424b (diff)
downloadyt-local-3d57e14df7ba5f14a634295caf3b2e60da50bfe2.tar.lz
yt-local-3d57e14df7ba5f14a634295caf3b2e60da50bfe2.tar.xz
yt-local-3d57e14df7ba5f14a634295caf3b2e60da50bfe2.zip
Remove windows python distribution from repo and add requirements.txt
Diffstat (limited to 'python/gevent/threadpool.py')
-rw-r--r--python/gevent/threadpool.py580
1 files changed, 0 insertions, 580 deletions
diff --git a/python/gevent/threadpool.py b/python/gevent/threadpool.py
deleted file mode 100644
index c983390..0000000
--- a/python/gevent/threadpool.py
+++ /dev/null
@@ -1,580 +0,0 @@
-# Copyright (c) 2012 Denis Bilenko. See LICENSE for details.
-from __future__ import absolute_import
-import sys
-import os
-
-from weakref import ref as wref
-
-from greenlet import greenlet as RawGreenlet
-
-from gevent._compat import integer_types
-from gevent.hub import _get_hub_noargs as get_hub
-from gevent.hub import getcurrent
-from gevent.hub import sleep
-from gevent.hub import _get_hub
-from gevent.event import AsyncResult
-from gevent.greenlet import Greenlet
-from gevent.pool import GroupMappingMixin
-from gevent.lock import Semaphore
-
-from gevent._threading import Lock
-from gevent._threading import Queue
-from gevent._threading import start_new_thread
-from gevent._threading import get_thread_ident
-
-
-__all__ = [
- 'ThreadPool',
- 'ThreadResult',
-]
-
-
-class _WorkerGreenlet(RawGreenlet):
- # Exists to produce a more useful repr for worker pool
- # threads/greenlets.
-
- def __init__(self, threadpool):
- RawGreenlet.__init__(self, threadpool._worker)
- self.thread_ident = get_thread_ident()
- self._threadpool_wref = wref(threadpool)
-
- # Inform the gevent.util.GreenletTree that this should be
- # considered the root (for printing purposes) and to
- # ignore the parent attribute. (We can't set parent to None.)
- self.greenlet_tree_is_root = True
- self.parent.greenlet_tree_is_ignored = True
-
- def __repr__(self):
- return "<ThreadPoolWorker at 0x%x thread_ident=0x%x %s>" % (
- id(self),
- self.thread_ident,
- self._threadpool_wref())
-
-class ThreadPool(GroupMappingMixin):
- """
- .. note:: The method :meth:`apply_async` will always return a new
- greenlet, bypassing the threadpool entirely.
- .. caution:: Instances of this class are only true if they have
- unfinished tasks.
- """
-
- def __init__(self, maxsize, hub=None):
- if hub is None:
- hub = get_hub()
- self.hub = hub
- self._maxsize = 0
- self.manager = None
- self.pid = os.getpid()
- self.fork_watcher = hub.loop.fork(ref=False)
- try:
- self._init(maxsize)
- except:
- self.fork_watcher.close()
- raise
-
- def _set_maxsize(self, maxsize):
- if not isinstance(maxsize, integer_types):
- raise TypeError('maxsize must be integer: %r' % (maxsize, ))
- if maxsize < 0:
- raise ValueError('maxsize must not be negative: %r' % (maxsize, ))
- difference = maxsize - self._maxsize
- self._semaphore.counter += difference
- self._maxsize = maxsize
- self.adjust()
- # make sure all currently blocking spawn() start unlocking if maxsize increased
- self._semaphore._start_notify()
-
- def _get_maxsize(self):
- return self._maxsize
-
- maxsize = property(_get_maxsize, _set_maxsize)
-
- def __repr__(self):
- return '<%s at 0x%x %s/%s/%s hub=<%s at 0x%x thread_ident=0x%s>>' % (
- self.__class__.__name__,
- id(self),
- len(self), self.size, self.maxsize,
- self.hub.__class__.__name__, id(self.hub), self.hub.thread_ident)
-
- def __len__(self):
- # XXX just do unfinished_tasks property
- # Note that this becomes the boolean value of this class,
- # that's probably not what we want!
- return self.task_queue.unfinished_tasks
-
- def _get_size(self):
- return self._size
-
- def _set_size(self, size):
- if size < 0:
- raise ValueError('Size of the pool cannot be negative: %r' % (size, ))
- if size > self._maxsize:
- raise ValueError('Size of the pool cannot be bigger than maxsize: %r > %r' % (size, self._maxsize))
- if self.manager:
- self.manager.kill()
- while self._size < size:
- self._add_thread()
- delay = getattr(self.hub.loop, 'min_sleep_time', 0.0001) # For libuv
- while self._size > size:
- while self._size - size > self.task_queue.unfinished_tasks:
- self.task_queue.put(None)
- if getcurrent() is self.hub:
- break
- sleep(delay)
- delay = min(delay * 2, .05)
- if self._size:
- self.fork_watcher.start(self._on_fork)
- else:
- self.fork_watcher.stop()
-
- size = property(_get_size, _set_size)
-
- def _init(self, maxsize):
- self._size = 0
- self._semaphore = Semaphore(1)
- self._lock = Lock()
- self.task_queue = Queue()
- self._set_maxsize(maxsize)
-
- def _on_fork(self):
- # fork() only leaves one thread; also screws up locks;
- # let's re-create locks and threads.
- # NOTE: See comment in gevent.hub.reinit.
- pid = os.getpid()
- if pid != self.pid:
- self.pid = pid
- # Do not mix fork() and threads; since fork() only copies one thread
- # all objects referenced by other threads has refcount that will never
- # go down to 0.
- self._init(self._maxsize)
-
- def join(self):
- """Waits until all outstanding tasks have been completed."""
- delay = 0.0005
- while self.task_queue.unfinished_tasks > 0:
- sleep(delay)
- delay = min(delay * 2, .05)
-
- def kill(self):
- self.size = 0
- self.fork_watcher.close()
-
- def _adjust_step(self):
- # if there is a possibility & necessity for adding a thread, do it
- while self._size < self._maxsize and self.task_queue.unfinished_tasks > self._size:
- self._add_thread()
- # while the number of threads is more than maxsize, kill one
- # we do not check what's already in task_queue - it could be all Nones
- while self._size - self._maxsize > self.task_queue.unfinished_tasks:
- self.task_queue.put(None)
- if self._size:
- self.fork_watcher.start(self._on_fork)
- else:
- self.fork_watcher.stop()
-
- def _adjust_wait(self):
- delay = 0.0001
- while True:
- self._adjust_step()
- if self._size <= self._maxsize:
- return
- sleep(delay)
- delay = min(delay * 2, .05)
-
- def adjust(self):
- self._adjust_step()
- if not self.manager and self._size > self._maxsize:
- # might need to feed more Nones into the pool
- self.manager = Greenlet.spawn(self._adjust_wait)
-
- def _add_thread(self):
- with self._lock:
- self._size += 1
- try:
- start_new_thread(self.__trampoline, ())
- except:
- with self._lock:
- self._size -= 1
- raise
-
- def spawn(self, func, *args, **kwargs):
- """
- Add a new task to the threadpool that will run ``func(*args, **kwargs)``.
-
- Waits until a slot is available. Creates a new thread if necessary.
-
- :return: A :class:`gevent.event.AsyncResult`.
- """
- while 1:
- semaphore = self._semaphore
- semaphore.acquire()
- if semaphore is self._semaphore:
- break
-
- thread_result = None
- try:
- task_queue = self.task_queue
- result = AsyncResult()
- # XXX We're calling the semaphore release function in the hub, otherwise
- # we get LoopExit (why?). Previously it was done with a rawlink on the
- # AsyncResult and the comment that it is "competing for order with get(); this is not
- # good, just make ThreadResult release the semaphore before doing anything else"
- thread_result = ThreadResult(result, self.hub, semaphore.release)
- task_queue.put((func, args, kwargs, thread_result))
- self.adjust()
- except:
- if thread_result is not None:
- thread_result.destroy()
- semaphore.release()
- raise
- return result
-
- def _decrease_size(self):
- if sys is None:
- return
- _lock = getattr(self, '_lock', None)
- if _lock is not None:
- with _lock:
- self._size -= 1
-
- # XXX: This used to be false by default. It really seems like
- # it should be true to avoid leaking resources.
- _destroy_worker_hub = True
-
-
- def __ignore_current_greenlet_blocking(self, hub):
- if hub is not None and hub.periodic_monitoring_thread is not None:
- hub.periodic_monitoring_thread.ignore_current_greenlet_blocking()
-
- def __trampoline(self):
- # The target that we create new threads with. It exists
- # solely to create the _WorkerGreenlet and switch to it.
- # (the __class__ of a raw greenlet cannot be changed.)
- g = _WorkerGreenlet(self)
- g.switch()
-
- def _worker(self):
- # pylint:disable=too-many-branches
- need_decrease = True
- try:
- while 1: # tiny bit faster than True on Py2
- h = _get_hub()
- if h is not None:
- h.name = 'ThreadPool Worker Hub'
- task_queue = self.task_queue
- # While we block, don't let the monitoring thread, if any,
- # report us as blocked. Indeed, so long as we never
- # try to switch greenlets, don't report us as blocked---
- # the threadpool is *meant* to run blocking tasks
- self.__ignore_current_greenlet_blocking(h)
- task = task_queue.get()
- try:
- if task is None:
- need_decrease = False
- self._decrease_size()
- # we want first to decrease size, then decrease unfinished_tasks
- # otherwise, _adjust might think there's one more idle thread that
- # needs to be killed
- return
- func, args, kwargs, thread_result = task
- try:
- value = func(*args, **kwargs)
- except: # pylint:disable=bare-except
- exc_info = getattr(sys, 'exc_info', None)
- if exc_info is None:
- return
- thread_result.handle_error((self, func), exc_info())
- else:
- if sys is None:
- return
- thread_result.set(value)
- del value
- finally:
- del func, args, kwargs, thread_result, task
- finally:
- if sys is None:
- return # pylint:disable=lost-exception
- task_queue.task_done()
- finally:
- if need_decrease:
- self._decrease_size()
- if sys is not None and self._destroy_worker_hub:
- hub = _get_hub()
- if hub is not None:
- hub.destroy(True)
- del hub
-
- def apply_e(self, expected_errors, function, args=None, kwargs=None):
- """
- .. deprecated:: 1.1a2
- Identical to :meth:`apply`; the ``expected_errors`` argument is ignored.
- """
- # pylint:disable=unused-argument
- # Deprecated but never documented. In the past, before
- # self.apply() allowed all errors to be raised to the caller,
- # expected_errors allowed a caller to specify a set of errors
- # they wanted to be raised, through the wrap_errors function.
- # In practice, it always took the value Exception or
- # BaseException.
- return self.apply(function, args, kwargs)
-
- def _apply_immediately(self):
- # If we're being called from a different thread than the one that
- # created us, e.g., because a worker task is trying to use apply()
- # recursively, we have no choice but to run the task immediately;
- # if we try to AsyncResult.get() in the worker thread, it's likely to have
- # nothing to switch to and lead to a LoopExit.
- return get_hub() is not self.hub
-
- def _apply_async_cb_spawn(self, callback, result):
- callback(result)
-
- def _apply_async_use_greenlet(self):
- # Always go to Greenlet because our self.spawn uses threads
- return True
-
-class _FakeAsync(object):
-
- def send(self):
- pass
- close = stop = send
-
- def __call_(self, result):
- "fake out for 'receiver'"
-
- def __bool__(self):
- return False
-
- __nonzero__ = __bool__
-
-_FakeAsync = _FakeAsync()
-
-class ThreadResult(object):
-
- # Using slots here helps to debug reference cycles/leaks
- __slots__ = ('exc_info', 'async_watcher', '_call_when_ready', 'value',
- 'context', 'hub', 'receiver')
-
- def __init__(self, receiver, hub, call_when_ready):
- self.receiver = receiver
- self.hub = hub
- self.context = None
- self.value = None
- self.exc_info = ()
- self.async_watcher = hub.loop.async_()
- self._call_when_ready = call_when_ready
- self.async_watcher.start(self._on_async)
-
- @property
- def exception(self):
- return self.exc_info[1] if self.exc_info else None
-
- def _on_async(self):
- self.async_watcher.stop()
- self.async_watcher.close()
-
- # Typically this is pool.semaphore.release and we have to
- # call this in the Hub; if we don't we get the dreaded
- # LoopExit (XXX: Why?)
- self._call_when_ready()
-
- try:
- if self.exc_info:
- self.hub.handle_error(self.context, *self.exc_info)
- self.context = None
- self.async_watcher = _FakeAsync
- self.hub = None
- self._call_when_ready = _FakeAsync
-
- self.receiver(self)
- finally:
- self.receiver = _FakeAsync
- self.value = None
- if self.exc_info:
- self.exc_info = (self.exc_info[0], self.exc_info[1], None)
-
- def destroy(self):
- self.async_watcher.stop()
- self.async_watcher.close()
- self.async_watcher = _FakeAsync
-
- self.context = None
- self.hub = None
- self._call_when_ready = _FakeAsync
- self.receiver = _FakeAsync
-
- def set(self, value):
- self.value = value
- self.async_watcher.send()
-
- def handle_error(self, context, exc_info):
- self.context = context
- self.exc_info = exc_info
- self.async_watcher.send()
-
- # link protocol:
- def successful(self):
- return self.exception is None
-
-
-def wrap_errors(errors, function, args, kwargs):
- """
- .. deprecated:: 1.1a2
- Previously used by ThreadPool.apply_e.
- """
- try:
- return True, function(*args, **kwargs)
- except errors as ex:
- return False, ex
-
-try:
- import concurrent.futures
-except ImportError:
- pass
-else:
- __all__.append("ThreadPoolExecutor")
-
- from gevent.timeout import Timeout as GTimeout
- from gevent._util import Lazy
- from concurrent.futures import _base as cfb
-
- def _wrap_error(future, fn):
- def cbwrap(_):
- del _
- # we're called with the async result, but
- # be sure to pass in ourself. Also automatically
- # unlink ourself so that we don't get called multiple
- # times.
- try:
- fn(future)
- except Exception: # pylint: disable=broad-except
- future.hub.print_exception((fn, future), *sys.exc_info())
- cbwrap.auto_unlink = True
- return cbwrap
-
- def _wrap(future, fn):
- def f(_):
- fn(future)
- f.auto_unlink = True
- return f
-
- class _FutureProxy(object):
- def __init__(self, asyncresult):
- self.asyncresult = asyncresult
-
- # Internal implementation details of a c.f.Future
-
- @Lazy
- def _condition(self):
- from gevent import monkey
- if monkey.is_module_patched('threading') or self.done():
- import threading
- return threading.Condition()
- # We can only properly work with conditions
- # when we've been monkey-patched. This is necessary
- # for the wait/as_completed module functions.
- raise AttributeError("_condition")
-
- @Lazy
- def _waiters(self):
- self.asyncresult.rawlink(self.__when_done)
- return []
-
- def __when_done(self, _):
- # We should only be called when _waiters has
- # already been accessed.
- waiters = getattr(self, '_waiters')
- for w in waiters: # pylint:disable=not-an-iterable
- if self.successful():
- w.add_result(self)
- else:
- w.add_exception(self)
-
- __when_done.auto_unlink = True
-
- @property
- def _state(self):
- if self.done():
- return cfb.FINISHED
- return cfb.RUNNING
-
- def set_running_or_notify_cancel(self):
- # Does nothing, not even any consistency checks. It's
- # meant to be internal to the executor and we don't use it.
- return
-
- def result(self, timeout=None):
- try:
- return self.asyncresult.result(timeout=timeout)
- except GTimeout:
- # XXX: Theoretically this could be a completely
- # unrelated timeout instance. Do we care about that?
- raise concurrent.futures.TimeoutError()
-
- def exception(self, timeout=None):
- try:
- self.asyncresult.get(timeout=timeout)
- return self.asyncresult.exception
- except GTimeout:
- raise concurrent.futures.TimeoutError()
-
- def add_done_callback(self, fn):
- if self.done():
- fn(self)
- else:
- self.asyncresult.rawlink(_wrap_error(self, fn))
-
- def rawlink(self, fn):
- self.asyncresult.rawlink(_wrap(self, fn))
-
- def __str__(self):
- return str(self.asyncresult)
-
- def __getattr__(self, name):
- return getattr(self.asyncresult, name)
-
- class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
- """
- A version of :class:`concurrent.futures.ThreadPoolExecutor` that
- always uses native threads, even when threading is monkey-patched.
-
- The ``Future`` objects returned from this object can be used
- with gevent waiting primitives like :func:`gevent.wait`.
-
- .. caution:: If threading is *not* monkey-patched, then the ``Future``
- objects returned by this object are not guaranteed to work with
- :func:`~concurrent.futures.as_completed` and :func:`~concurrent.futures.wait`.
- The individual blocking methods like :meth:`~concurrent.futures.Future.result`
- and :meth:`~concurrent.futures.Future.exception` will always work.
-
- .. versionadded:: 1.2a1
- This is a provisional API.
- """
-
- def __init__(self, max_workers):
- super(ThreadPoolExecutor, self).__init__(max_workers)
- self._threadpool = ThreadPool(max_workers)
- self._threadpool._destroy_worker_hub = True
-
- def submit(self, fn, *args, **kwargs):
- with self._shutdown_lock: # pylint:disable=not-context-manager
- if self._shutdown:
- raise RuntimeError('cannot schedule new futures after shutdown')
-
- future = self._threadpool.spawn(fn, *args, **kwargs)
- return _FutureProxy(future)
-
- def shutdown(self, wait=True):
- super(ThreadPoolExecutor, self).shutdown(wait)
- # XXX: We don't implement wait properly
- kill = getattr(self._threadpool, 'kill', None)
- if kill: # pylint:disable=using-constant-test
- self._threadpool.kill()
- self._threadpool = None
-
- kill = shutdown # greentest compat
-
- def _adjust_thread_count(self):
- # Does nothing. We don't want to spawn any "threads",
- # let the threadpool handle that.
- pass