diff options
Diffstat (limited to 'python/gevent/hub.py')
-rw-r--r-- | python/gevent/hub.py | 841 |
1 files changed, 261 insertions, 580 deletions
diff --git a/python/gevent/hub.py b/python/gevent/hub.py index 001cd06..ca88d20 100644 --- a/python/gevent/hub.py +++ b/python/gevent/hub.py @@ -2,15 +2,19 @@ """ Event-loop hub. """ -from __future__ import absolute_import +from __future__ import absolute_import, print_function # XXX: FIXME: Refactor to make this smaller # pylint:disable=too-many-lines from functools import partial as _functools_partial -import os + import sys import traceback -from greenlet import greenlet as RawGreenlet, getcurrent, GreenletExit + +from greenlet import greenlet as RawGreenlet +from greenlet import getcurrent +from greenlet import GreenletExit + __all__ = [ @@ -26,76 +30,38 @@ __all__ = [ 'Waiter', ] -from gevent._compat import string_types -from gevent._compat import xrange -from gevent._util import _NONE +from gevent._config import config as GEVENT_CONFIG +from gevent._compat import thread_mod_name from gevent._util import readproperty +from gevent._util import Lazy +from gevent._util import gmctime +from gevent._ident import IdentRegistry -if sys.version_info[0] <= 2: - import thread # pylint:disable=import-error -else: - import _thread as thread # python 2 pylint:disable=import-error - -# These must be the "real" native thread versions, -# not monkey-patched. -threadlocal = thread._local - - -class _threadlocal(threadlocal): - - def __init__(self): - # Use a class with an initializer so that we can test - # for 'is None' instead of catching AttributeError, making - # the code cleaner and possibly solving some corner cases - # (like #687) - threadlocal.__init__(self) - self.Hub = None - self.loop = None - self.hub = None - -_threadlocal = _threadlocal() - -get_ident = thread.get_ident -MAIN_THREAD = get_ident() - +from gevent._hub_local import get_hub +from gevent._hub_local import get_loop +from gevent._hub_local import set_hub +from gevent._hub_local import set_loop +from gevent._hub_local import get_hub_if_exists as _get_hub +from gevent._hub_local import get_hub_noargs as _get_hub_noargs +from gevent._hub_local import set_default_hub_class +from gevent._greenlet_primitives import TrackedRawGreenlet +from gevent._hub_primitives import WaitOperationsGreenlet +# Export +from gevent import _hub_primitives +wait = _hub_primitives.wait_on_objects +iwait = _hub_primitives.iwait_on_objects -class LoopExit(Exception): - """ - Exception thrown when the hub finishes running. - - In a normal application, this is never thrown or caught - explicitly. The internal implementation of functions like - :func:`join` and :func:`joinall` may catch it, but user code - generally should not. - - .. caution:: - Errors in application programming can also lead to this exception being - raised. Some examples include (but are not limited too): - - - greenlets deadlocking on a lock; - - using a socket or other gevent object with native thread - affinity from a different thread - - """ - pass +from gevent.exceptions import LoopExit -class BlockingSwitchOutError(AssertionError): - pass +from gevent._waiter import Waiter - -class InvalidSwitchError(AssertionError): - pass - - -class ConcurrentObjectUseError(AssertionError): - # raised when an object is used (waited on) by two greenlets - # independently, meaning the object was entered into a blocking - # state by one greenlet and then another while still blocking in the - # first one - pass +# Need the real get_ident. We're imported early enough (by gevent/__init__.py) +# that we can be sure nothing is monkey patched yet. +get_thread_ident = __import__(thread_mod_name).get_ident +MAIN_THREAD_IDENT = get_thread_ident() # XXX: Assuming import is done on the main thread. def spawn_raw(function, *args, **kwargs): @@ -109,6 +75,10 @@ def spawn_raw(function, *args, **kwargs): occasionally be useful as an optimization if there are many greenlets involved. + .. versionchanged:: 1.1a3 + Verify that ``function`` is callable, raising a TypeError if not. Previously, + the spawned greenlet would have failed the first time it was switched to. + .. versionchanged:: 1.1b1 If *function* is not callable, immediately raise a :exc:`TypeError` instead of spawning a greenlet that will raise an uncaught TypeError. @@ -117,24 +87,35 @@ def spawn_raw(function, *args, **kwargs): Accept keyword arguments for ``function`` as previously (incorrectly) documented. Note that this may incur an additional expense. - .. versionchanged:: 1.1a3 - Verify that ``function`` is callable, raising a TypeError if not. Previously, - the spawned greenlet would have failed the first time it was switched to. + .. versionchanged:: 1.3a2 + Populate the ``spawning_greenlet`` and ``spawn_tree_locals`` + attributes of the returned greenlet. + + .. versionchanged:: 1.3b1 + *Only* populate ``spawning_greenlet`` and ``spawn_tree_locals`` + if ``GEVENT_TRACK_GREENLET_TREE`` is enabled (the default). If not enabled, + those attributes will not be set. + """ if not callable(function): raise TypeError("function must be callable") - hub = get_hub() + + # The hub is always the parent. + hub = _get_hub_noargs() + + factory = TrackedRawGreenlet if GEVENT_CONFIG.track_greenlet_tree else RawGreenlet # The callback class object that we use to run this doesn't # accept kwargs (and those objects are heavily used, as well as being # implemented twice in core.ppyx and corecffi.py) so do it with a partial if kwargs: function = _functools_partial(function, *args, **kwargs) - g = RawGreenlet(function, hub) + g = factory(function, hub) hub.loop.run_callback(g.switch) else: - g = RawGreenlet(function, hub) + g = factory(function, hub) hub.loop.run_callback(g.switch, *args) + return g @@ -157,16 +138,25 @@ def sleep(seconds=0, ref=True): If *ref* is False, the greenlet running ``sleep()`` will not prevent :func:`gevent.wait` from exiting. + .. versionchanged:: 1.3a1 + Sleeping with a value of 0 will now be bounded to approximately block the + loop for no longer than :func:`gevent.getswitchinterval`. + .. seealso:: :func:`idle` """ - hub = get_hub() + hub = _get_hub_noargs() loop = hub.loop if seconds <= 0: - waiter = Waiter() - loop.run_callback(waiter.switch) + waiter = Waiter(hub) + loop.run_callback(waiter.switch, None) waiter.get() else: - hub.wait(loop.timer(seconds, ref=ref)) + with loop.timer(seconds, ref=ref) as t: + # Sleeping is expected to be an "absolute" measure with + # respect to time.time(), not a relative measure, so it's + # important to update the loop's notion of now before we start + loop.update_now() + hub.wait(t) def idle(priority=0): @@ -180,7 +170,7 @@ def idle(priority=0): .. seealso:: :func:`sleep` """ - hub = get_hub() + hub = _get_hub_noargs() watcher = hub.loop.idle() if priority: watcher.priority = priority @@ -214,7 +204,7 @@ def kill(greenlet, exception=GreenletExit): # after it's been killed greenlet.kill(exception=exception, block=False) else: - get_hub().loop.run_callback(greenlet.throw, exception) + _get_hub_noargs().loop.run_callback(greenlet.throw, exception) class signal(object): @@ -245,7 +235,7 @@ class signal(object): if not callable(handler): raise TypeError("signal handler must be callable.") - self.hub = get_hub() + self.hub = _get_hub_noargs() self.watcher = self.hub.loop.signal(signalnum, ref=False) self.watcher.start(self._start) self.handler = handler @@ -281,8 +271,10 @@ class signal(object): self.hub.handle_error(None, *sys.exc_info()) -def reinit(): +def reinit(hub=None): """ + reinit() -> None + Prepare the gevent hub to run in a new (forked) process. This should be called *immediately* after :func:`os.fork` in the @@ -304,159 +296,64 @@ def reinit(): if the fork process can be more smoothly managed. .. warning:: See remarks in :func:`gevent.os.fork` about greenlets - and libev watchers in the child process. + and event loop watchers in the child process. """ + # Note the signature line in the docstring: hub is not a public param. + # The loop reinit function in turn calls libev's ev_loop_fork # function. - hub = _get_hub() - - if hub is not None: - # Note that we reinit the existing loop, not destroy it. - # See https://github.com/gevent/gevent/issues/200. - hub.loop.reinit() - # libev's fork watchers are slow to fire because the only fire - # at the beginning of a loop; due to our use of callbacks that - # run at the end of the loop, that may be too late. The - # threadpool and resolvers depend on the fork handlers being - # run (specifically, the threadpool will fail in the forked - # child if there were any threads in it, which there will be - # if the resolver_thread was in use (the default) before the - # fork.) - # - # If the forked process wants to use the threadpool or - # resolver immediately (in a queued callback), it would hang. - # - # The below is a workaround. Fortunately, both of these - # methods are idempotent and can be called multiple times - # following a fork if the suddenly started working, or were - # already working on some platforms. Other threadpools and fork handlers - # will be called at an arbitrary time later ('soon') - if hasattr(hub.threadpool, '_on_fork'): - hub.threadpool._on_fork() - # resolver_ares also has a fork watcher that's not firing - if hasattr(hub.resolver, '_on_fork'): - hub.resolver._on_fork() - - # TODO: We'd like to sleep for a non-zero amount of time to force the loop to make a - # pass around before returning to this greenlet. That will allow any - # user-provided fork watchers to run. (Two calls are necessary.) HOWEVER, if - # we do this, certain tests that heavily mix threads and forking, - # like 2.7/test_threading:test_reinit_tls_after_fork, fail. It's not immediately clear - # why. - #sleep(0.00001) - #sleep(0.00001) - - -def get_hub_class(): - """Return the type of hub to use for the current thread. - - If there's no type of hub for the current thread yet, 'gevent.hub.Hub' is used. - """ - hubtype = _threadlocal.Hub - if hubtype is None: - hubtype = _threadlocal.Hub = Hub - return hubtype - - -def get_hub(*args, **kwargs): - """ - Return the hub for the current thread. - - If a hub does not exist in the current thread, a new one is - created of the type returned by :func:`get_hub_class`. - """ - hub = _threadlocal.hub + hub = _get_hub() if hub is None else hub if hub is None: - hubtype = get_hub_class() - hub = _threadlocal.hub = hubtype(*args, **kwargs) - return hub - - -def _get_hub(): - """Return the hub for the current thread. + return - Return ``None`` if no hub has been created yet. + # Note that we reinit the existing loop, not destroy it. + # See https://github.com/gevent/gevent/issues/200. + hub.loop.reinit() + # libev's fork watchers are slow to fire because the only fire + # at the beginning of a loop; due to our use of callbacks that + # run at the end of the loop, that may be too late. The + # threadpool and resolvers depend on the fork handlers being + # run (specifically, the threadpool will fail in the forked + # child if there were any threads in it, which there will be + # if the resolver_thread was in use (the default) before the + # fork.) + # + # If the forked process wants to use the threadpool or + # resolver immediately (in a queued callback), it would hang. + # + # The below is a workaround. Fortunately, all of these + # methods are idempotent and can be called multiple times + # following a fork if the suddenly started working, or were + # already working on some platforms. Other threadpools and fork handlers + # will be called at an arbitrary time later ('soon') + for obj in (hub._threadpool, hub._resolver, hub.periodic_monitoring_thread): + getattr(obj, '_on_fork', lambda: None)() + + # TODO: We'd like to sleep for a non-zero amount of time to force the loop to make a + # pass around before returning to this greenlet. That will allow any + # user-provided fork watchers to run. (Two calls are necessary.) HOWEVER, if + # we do this, certain tests that heavily mix threads and forking, + # like 2.7/test_threading:test_reinit_tls_after_fork, fail. It's not immediately clear + # why. + #sleep(0.00001) + #sleep(0.00001) + + +hub_ident_registry = IdentRegistry() + +class Hub(WaitOperationsGreenlet): """ - return _threadlocal.hub - - -def set_hub(hub): - _threadlocal.hub = hub - - -def _import(path): - # pylint:disable=too-many-branches - if isinstance(path, list): - if not path: - raise ImportError('Cannot import from empty list: %r' % (path, )) - - for item in path[:-1]: - try: - return _import(item) - except ImportError: - pass - - return _import(path[-1]) - - if not isinstance(path, string_types): - return path - - if '.' not in path: - raise ImportError("Cannot import %r (required format: [path/][package.]module.class)" % path) - - if '/' in path: - package_path, path = path.rsplit('/', 1) - sys.path = [package_path] + sys.path - else: - package_path = None - - try: - module, item = path.rsplit('.', 1) - x = __import__(module) - for attr in path.split('.')[1:]: - oldx = x - x = getattr(x, attr, _NONE) - if x is _NONE: - raise ImportError('Cannot import %r from %r' % (attr, oldx)) - return x - finally: - try: - sys.path.remove(package_path) - except ValueError: - pass - - -def config(default, envvar): - result = os.environ.get(envvar) or default # absolute import gets confused pylint: disable=no-member - if isinstance(result, string_types): - return result.split(',') - return result - - -def resolver_config(default, envvar): - result = config(default, envvar) - return [_resolvers.get(x, x) for x in result] - - -_resolvers = {'ares': 'gevent.resolver_ares.Resolver', - 'thread': 'gevent.resolver_thread.Resolver', - 'block': 'gevent.socket.BlockingResolver'} - - -_DEFAULT_LOOP_CLASS = 'gevent.core.loop' - - -class Hub(RawGreenlet): - """A greenlet that runs the event loop. + A greenlet that runs the event loop. It is created automatically by :func:`get_hub`. - **Switching** + .. rubric:: Switching - Every time this greenlet (i.e., the event loop) is switched *to*, if - the current greenlet has a ``switch_out`` method, it will be called. This - allows a greenlet to take some cleanup actions before yielding control. This method - should not call any gevent blocking functions. + Every time this greenlet (i.e., the event loop) is switched *to*, + if the current greenlet has a ``switch_out`` method, it will be + called. This allows a greenlet to take some cleanup actions before + yielding control. This method should not call any gevent blocking + functions. """ #: If instances of these classes are raised into the event loop, @@ -468,57 +365,71 @@ class Hub(RawGreenlet): #: do not get logged/printed when raised by the event loop. NOT_ERROR = (GreenletExit, SystemExit) - loop_class = config(_DEFAULT_LOOP_CLASS, 'GEVENT_LOOP') - # For the standard class, go ahead and import it when this class - # is defined. This is no loss of generality because the envvar is - # only read when this class is defined, and we know that the - # standard class will be available. This can solve problems with - # the class being imported from multiple threads at once, leading - # to one of the imports failing. Only do this for the object we - # need in the constructor, as the rest of the factories are - # themselves handled lazily. See #687. (People using a custom loop_class - # can probably manage to get_hub() from the main thread or otherwise import - # that loop_class themselves.) - if loop_class == [_DEFAULT_LOOP_CLASS]: - loop_class = [_import(loop_class)] - - resolver_class = ['gevent.resolver_thread.Resolver', - 'gevent.resolver_ares.Resolver', - 'gevent.socket.BlockingResolver'] - #: The class or callable object, or the name of a factory function or class, - #: that will be used to create :attr:`resolver`. By default, configured according to - #: :doc:`dns`. If a list, a list of objects in preference order. - resolver_class = resolver_config(resolver_class, 'GEVENT_RESOLVER') - threadpool_class = config('gevent.threadpool.ThreadPool', 'GEVENT_THREADPOOL') - backend = config(None, 'GEVENT_BACKEND') + #: The size we use for our threadpool. Either use a subclass + #: for this, or change it immediately after creating the hub. threadpool_size = 10 - # using pprint.pformat can override custom __repr__ methods on dict/list - # subclasses, which can be a security concern - format_context = 'pprint.saferepr' + # An instance of PeriodicMonitoringThread, if started. + periodic_monitoring_thread = None + + # The ident of the thread we were created in, which should be the + # thread that we run in. + thread_ident = None + #: A string giving the name of this hub. Useful for associating hubs + #: with particular threads. Printed as part of the default repr. + #: + #: .. versionadded:: 1.3b1 + name = '' + + # NOTE: We cannot define a class-level 'loop' attribute + # because that conflicts with the slot we inherit from the + # Cythonized-bases. def __init__(self, loop=None, default=None): - RawGreenlet.__init__(self) + WaitOperationsGreenlet.__init__(self, None, None) + self.thread_ident = get_thread_ident() if hasattr(loop, 'run'): if default is not None: raise TypeError("Unexpected argument: default") self.loop = loop - elif _threadlocal.loop is not None: + elif get_loop() is not None: # Reuse a loop instance previously set by # destroying a hub without destroying the associated # loop. See #237 and #238. - self.loop = _threadlocal.loop + self.loop = get_loop() else: - if default is None and get_ident() != MAIN_THREAD: + if default is None and self.thread_ident != MAIN_THREAD_IDENT: default = False - loop_class = _import(self.loop_class) + if loop is None: loop = self.backend - self.loop = loop_class(flags=loop, default=default) + self.loop = self.loop_class(flags=loop, default=default) # pylint:disable=not-callable self._resolver = None self._threadpool = None - self.format_context = _import(self.format_context) + self.format_context = GEVENT_CONFIG.format_context + self.minimal_ident = hub_ident_registry.get_ident(self) + + @Lazy + def ident_registry(self): + return IdentRegistry() + + @property + def loop_class(self): + return GEVENT_CONFIG.loop + + @property + def backend(self): + return GEVENT_CONFIG.libev_backend + + @property + def main_hub(self): + """ + Is this the hub for the main thread? + + .. versionadded:: 1.3b1 + """ + return self.thread_ident == MAIN_THREAD_IDENT def __repr__(self): if self.loop is None: @@ -528,11 +439,16 @@ class Hub(RawGreenlet): info = self.loop._format() except Exception as ex: # pylint:disable=broad-except info = str(ex) or repr(ex) or 'error' - result = '<%s at 0x%x %s' % (self.__class__.__name__, id(self), info) + result = '<%s %r at 0x%x %s' % ( + self.__class__.__name__, + self.name, + id(self), + info) if self._resolver is not None: result += ' resolver=%r' % self._resolver if self._threadpool is not None: result += ' threadpool=%r' % self._threadpool + result += ' thread_ident=%s' % (hex(self.thread_ident), ) return result + '>' def handle_error(self, context, type, value, tb): @@ -560,6 +476,13 @@ class Hub(RawGreenlet): self.handle_system_error(type, value) def handle_system_error(self, type, value): + """ + Called from `handle_error` when the exception type is determined + to be a :attr:`system error <SYSTEM_ERROR>`. + + System errors cause the exception to be raised in the main + greenlet (the parent of this hub). + """ current = getcurrent() if current is self or current is self.parent or self.loop is None: self.parent.throw(type, value) @@ -606,8 +529,7 @@ class Hub(RawGreenlet): del tb try: - import time - errstream.write(time.ctime()) + errstream.write(gmctime()) errstream.write(' ' if context is not None else '\n') except: # pylint:disable=bare-except # Possible not safe to import under certain @@ -623,76 +545,54 @@ class Hub(RawGreenlet): context = repr(context) errstream.write('%s failed with %s\n\n' % (context, getattr(type, '__name__', 'exception'), )) - def switch(self): - switch_out = getattr(getcurrent(), 'switch_out', None) - if switch_out is not None: - switch_out() - return RawGreenlet.switch(self) - - def switch_out(self): - raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback') - - def wait(self, watcher): - """ - Wait until the *watcher* (which should not be started) is ready. - - The current greenlet will be unscheduled during this time. - - .. seealso:: :class:`gevent.core.io`, :class:`gevent.core.timer`, - :class:`gevent.core.signal`, :class:`gevent.core.idle`, :class:`gevent.core.prepare`, - :class:`gevent.core.check`, :class:`gevent.core.fork`, :class:`gevent.core.async`, - :class:`gevent.core.child`, :class:`gevent.core.stat` - - """ - waiter = Waiter() - unique = object() - watcher.start(waiter.switch, unique) - try: - result = waiter.get() - if result is not unique: - raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique)) - finally: - watcher.stop() - - def cancel_wait(self, watcher, error): - """ - Cancel an in-progress call to :meth:`wait` by throwing the given *error* - in the waiting greenlet. - """ - if watcher.callback is not None: - self.loop.run_callback(self._cancel_wait, watcher, error) - - def _cancel_wait(self, watcher, error): - if watcher.active: - switch = watcher.callback - if switch is not None: - greenlet = getattr(switch, '__self__', None) - if greenlet is not None: - greenlet.throw(error) def run(self): """ Entry-point to running the loop. This method is called automatically when the hub greenlet is scheduled; do not call it directly. - :raises LoopExit: If the loop finishes running. This means + :raises gevent.exceptions.LoopExit: If the loop finishes running. This means that there are no other scheduled greenlets, and no active watchers or servers. In some situations, this indicates a programming error. """ assert self is getcurrent(), 'Do not call Hub.run() directly' - while True: + self.start_periodic_monitoring_thread() + while 1: loop = self.loop loop.error_handler = self try: loop.run() finally: loop.error_handler = None # break the refcount cycle - self.parent.throw(LoopExit('This operation would block forever', self)) + debug = [] + if hasattr(loop, 'debug'): + debug = loop.debug() + self.parent.throw(LoopExit('This operation would block forever', self, debug)) # this function must never return, as it will cause switch() in the parent greenlet # to return an unexpected value # It is still possible to kill this greenlet with throw. However, in that case - # switching to it is no longer safe, as switch will return immediatelly + # switching to it is no longer safe, as switch will return immediately + + def start_periodic_monitoring_thread(self): + if self.periodic_monitoring_thread is None and GEVENT_CONFIG.monitor_thread: + # Note that it is possible for one real thread to + # (temporarily) wind up with multiple monitoring threads, + # if hubs are started and stopped within the thread. This shows up + # in the threadpool tests. The monitoring threads will eventually notice their + # hub object is gone. + from gevent._monitor import PeriodicMonitoringThread + from gevent.events import PeriodicMonitorThreadStartedEvent + from gevent.events import notify_and_call_entry_points + self.periodic_monitoring_thread = PeriodicMonitoringThread(self) + + if self.main_hub: + self.periodic_monitoring_thread.install_monitor_memory_usage() + + notify_and_call_entry_points(PeriodicMonitorThreadStartedEvent( + self.periodic_monitoring_thread)) + + return self.periodic_monitoring_thread def join(self, timeout=None): """Wait for the event loop to finish. Exits only when there are @@ -707,11 +607,11 @@ class Hub(RawGreenlet): if self.dead: return True - waiter = Waiter() + waiter = Waiter(self) if timeout is not None: timeout = self.loop.timer(timeout, ref=False) - timeout.start(waiter.switch) + timeout.start(waiter.switch, None) try: try: @@ -721,9 +621,19 @@ class Hub(RawGreenlet): finally: if timeout is not None: timeout.stop() + timeout.close() return False def destroy(self, destroy_loop=None): + """ + Destroy this hub and clean up its resources. + + If you manually create hubs, you *should* call this + method before disposing of the hub object reference. + """ + if self.periodic_monitoring_thread is not None: + self.periodic_monitoring_thread.kill() + self.periodic_monitoring_thread = None if self._resolver is not None: self._resolver.close() del self._resolver @@ -733,308 +643,79 @@ class Hub(RawGreenlet): if destroy_loop is None: destroy_loop = not self.loop.default if destroy_loop: - if _threadlocal.loop is self.loop: + if get_loop() is self.loop: # Don't let anyone try to reuse this - _threadlocal.loop = None + set_loop(None) self.loop.destroy() else: # Store in case another hub is created for this # thread. - _threadlocal.loop = self.loop + set_loop(self.loop) + self.loop = None - if _threadlocal.hub is self: - _threadlocal.hub = None + if _get_hub() is self: + set_hub(None) + + + # XXX: We can probably simplify the resolver and threadpool properties. + + @property + def resolver_class(self): + return GEVENT_CONFIG.resolver def _get_resolver(self): if self._resolver is None: - if self.resolver_class is not None: - self.resolver_class = _import(self.resolver_class) - self._resolver = self.resolver_class(hub=self) + self._resolver = self.resolver_class(hub=self) # pylint:disable=not-callable return self._resolver def _set_resolver(self, value): self._resolver = value def _del_resolver(self): - del self._resolver + self._resolver = None + + resolver = property(_get_resolver, _set_resolver, _del_resolver, + """ + The DNS resolver that the socket functions will use. + + .. seealso:: :doc:`/dns` + """) - resolver = property(_get_resolver, _set_resolver, _del_resolver) + + @property + def threadpool_class(self): + return GEVENT_CONFIG.threadpool def _get_threadpool(self): if self._threadpool is None: - if self.threadpool_class is not None: - self.threadpool_class = _import(self.threadpool_class) - self._threadpool = self.threadpool_class(self.threadpool_size, hub=self) + # pylint:disable=not-callable + self._threadpool = self.threadpool_class(self.threadpool_size, hub=self) return self._threadpool def _set_threadpool(self, value): self._threadpool = value def _del_threadpool(self): - del self._threadpool - - threadpool = property(_get_threadpool, _set_threadpool, _del_threadpool) - - -class Waiter(object): - """ - A low level communication utility for greenlets. - - Waiter is a wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them somewhat safer: - - * switching will occur only if the waiting greenlet is executing :meth:`get` method currently; - * any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw` - * if :meth:`switch`/:meth:`throw` is called before the receiver calls :meth:`get`, then :class:`Waiter` - will store the value/exception. The following :meth:`get` will return the value/raise the exception. - - The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet. - The :meth:`get` method must be called from a greenlet other than :class:`Hub`. - - >>> result = Waiter() - >>> timer = get_hub().loop.timer(0.1) - >>> timer.start(result.switch, 'hello from Waiter') - >>> result.get() # blocks for 0.1 seconds - 'hello from Waiter' - - If switch is called before the greenlet gets a chance to call :meth:`get` then - :class:`Waiter` stores the value. - - >>> result = Waiter() - >>> timer = get_hub().loop.timer(0.1) - >>> timer.start(result.switch, 'hi from Waiter') - >>> sleep(0.2) - >>> result.get() # returns immediatelly without blocking - 'hi from Waiter' - - .. warning:: - - This a limited and dangerous way to communicate between - greenlets. It can easily leave a greenlet unscheduled forever - if used incorrectly. Consider using safer classes such as - :class:`gevent.event.Event`, :class:`gevent.event.AsyncResult`, - or :class:`gevent.queue.Queue`. - """ - - __slots__ = ['hub', 'greenlet', 'value', '_exception'] - - def __init__(self, hub=None): - if hub is None: - self.hub = get_hub() - else: - self.hub = hub - self.greenlet = None - self.value = None - self._exception = _NONE - - def clear(self): - self.greenlet = None - self.value = None - self._exception = _NONE - - def __str__(self): - if self._exception is _NONE: - return '<%s greenlet=%s>' % (type(self).__name__, self.greenlet) - if self._exception is None: - return '<%s greenlet=%s value=%r>' % (type(self).__name__, self.greenlet, self.value) - return '<%s greenlet=%s exc_info=%r>' % (type(self).__name__, self.greenlet, self.exc_info) - - def ready(self): - """Return true if and only if it holds a value or an exception""" - return self._exception is not _NONE - - def successful(self): - """Return true if and only if it is ready and holds a value""" - return self._exception is None - - @property - def exc_info(self): - "Holds the exception info passed to :meth:`throw` if :meth:`throw` was called. Otherwise ``None``." - if self._exception is not _NONE: - return self._exception - - def switch(self, value=None): - """Switch to the greenlet if one's available. Otherwise store the value.""" - greenlet = self.greenlet - if greenlet is None: - self.value = value - self._exception = None - else: - assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet" - switch = greenlet.switch - try: - switch(value) - except: # pylint:disable=bare-except - self.hub.handle_error(switch, *sys.exc_info()) - - def switch_args(self, *args): - return self.switch(args) - - def throw(self, *throw_args): - """Switch to the greenlet with the exception. If there's no greenlet, store the exception.""" - greenlet = self.greenlet - if greenlet is None: - self._exception = throw_args - else: - assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet" - throw = greenlet.throw - try: - throw(*throw_args) - except: # pylint:disable=bare-except - self.hub.handle_error(throw, *sys.exc_info()) - - def get(self): - """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called.""" - if self._exception is not _NONE: - if self._exception is None: - return self.value - else: - getcurrent().throw(*self._exception) - else: - if self.greenlet is not None: - raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, )) - self.greenlet = getcurrent() - try: - return self.hub.switch() - finally: - self.greenlet = None - - def __call__(self, source): - if source.exception is None: - self.switch(source.value) - else: - self.throw(source.exception) - - # can also have a debugging version, that wraps the value in a tuple (self, value) in switch() - # and unwraps it in wait() thus checking that switch() was indeed called - - -class _MultipleWaiter(Waiter): - """ - An internal extension of Waiter that can be used if multiple objects - must be waited on, and there is a chance that in between waits greenlets - might be switched out. All greenlets that switch to this waiter - will have their value returned. - - This does not handle exceptions or throw methods. - """ - __slots__ = ['_values'] - - def __init__(self, *args, **kwargs): - Waiter.__init__(self, *args, **kwargs) - # we typically expect a relatively small number of these to be outstanding. - # since we pop from the left, a deque might be slightly - # more efficient, but since we're in the hub we avoid imports if - # we can help it to better support monkey-patching, and delaying the import - # here can be impractical (see https://github.com/gevent/gevent/issues/652) - self._values = list() - - def switch(self, value): # pylint:disable=signature-differs - self._values.append(value) - Waiter.switch(self, True) - - def get(self): - if not self._values: - Waiter.get(self) - Waiter.clear(self) - - return self._values.pop(0) - - -def iwait(objects, timeout=None, count=None): - """ - Iteratively yield *objects* as they are ready, until all (or *count*) are ready - or *timeout* expired. - - :param objects: A sequence (supporting :func:`len`) containing objects - implementing the wait protocol (rawlink() and unlink()). - :keyword int count: If not `None`, then a number specifying the maximum number - of objects to wait for. If ``None`` (the default), all objects - are waited for. - :keyword float timeout: If given, specifies a maximum number of seconds - to wait. If the timeout expires before the desired waited-for objects - are available, then this method returns immediately. - - .. seealso:: :func:`wait` - - .. versionchanged:: 1.1a1 - Add the *count* parameter. - .. versionchanged:: 1.1a2 - No longer raise :exc:`LoopExit` if our caller switches greenlets - in between items yielded by this function. - """ - # QQQ would be nice to support iterable here that can be generated slowly (why?) - if objects is None: - yield get_hub().join(timeout=timeout) - return - - count = len(objects) if count is None else min(count, len(objects)) - waiter = _MultipleWaiter() - switch = waiter.switch - - if timeout is not None: - timer = get_hub().loop.timer(timeout, priority=-1) - timer.start(switch, _NONE) - - try: - for obj in objects: - obj.rawlink(switch) - - for _ in xrange(count): - item = waiter.get() - waiter.clear() - if item is _NONE: - return - yield item - finally: - if timeout is not None: - timer.stop() - for aobj in objects: - unlink = getattr(aobj, 'unlink', None) - if unlink: - try: - unlink(switch) - except: # pylint:disable=bare-except - traceback.print_exc() - - -def wait(objects=None, timeout=None, count=None): - """ - Wait for ``objects`` to become ready or for event loop to finish. - - If ``objects`` is provided, it must be a list containing objects - implementing the wait protocol (rawlink() and unlink() methods): - - - :class:`gevent.Greenlet` instance - - :class:`gevent.event.Event` instance - - :class:`gevent.lock.Semaphore` instance - - :class:`gevent.subprocess.Popen` instance - - If ``objects`` is ``None`` (the default), ``wait()`` blocks until - the current event loop has nothing to do (or until ``timeout`` passes): + self._threadpool = None - - all greenlets have finished - - all servers were stopped - - all event loop watchers were stopped. + threadpool = property(_get_threadpool, _set_threadpool, _del_threadpool, + """ + The threadpool associated with this hub. - If ``count`` is ``None`` (the default), wait for all ``objects`` - to become ready. + Usually this is a + :class:`gevent.threadpool.ThreadPool`, but + you :attr:`can customize that + <gevent._config.Config.threadpool>`. - If ``count`` is a number, wait for (up to) ``count`` objects to become - ready. (For example, if count is ``1`` then the function exits - when any object in the list is ready). + Use this object to schedule blocking + (non-cooperative) operations in a different + thread to prevent them from halting the event loop. + """) - If ``timeout`` is provided, it specifies the maximum number of - seconds ``wait()`` will block. - Returns the list of ready objects, in the order in which they were - ready. +set_default_hub_class(Hub) - .. seealso:: :func:`iwait` - """ - if objects is None: - return get_hub().join(timeout=timeout) - return list(iwait(objects, timeout, count)) class linkproxy(object): |