aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/hub.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/hub.py')
-rw-r--r--python/gevent/hub.py1052
1 files changed, 1052 insertions, 0 deletions
diff --git a/python/gevent/hub.py b/python/gevent/hub.py
new file mode 100644
index 0000000..001cd06
--- /dev/null
+++ b/python/gevent/hub.py
@@ -0,0 +1,1052 @@
+# Copyright (c) 2009-2015 Denis Bilenko. See LICENSE for details.
+"""
+Event-loop hub.
+"""
+from __future__ import absolute_import
+# XXX: FIXME: Refactor to make this smaller
+# pylint:disable=too-many-lines
+from functools import partial as _functools_partial
+import os
+import sys
+import traceback
+
+from greenlet import greenlet as RawGreenlet, getcurrent, GreenletExit
+
+
+__all__ = [
+ 'getcurrent',
+ 'GreenletExit',
+ 'spawn_raw',
+ 'sleep',
+ 'kill',
+ 'signal',
+ 'reinit',
+ 'get_hub',
+ 'Hub',
+ 'Waiter',
+]
+
+from gevent._compat import string_types
+from gevent._compat import xrange
+from gevent._util import _NONE
+from gevent._util import readproperty
+
+if sys.version_info[0] <= 2:
+ import thread # pylint:disable=import-error
+else:
+ import _thread as thread # python 2 pylint:disable=import-error
+
+# These must be the "real" native thread versions,
+# not monkey-patched.
+threadlocal = thread._local
+
+
+class _threadlocal(threadlocal):
+
+ def __init__(self):
+ # Use a class with an initializer so that we can test
+ # for 'is None' instead of catching AttributeError, making
+ # the code cleaner and possibly solving some corner cases
+ # (like #687)
+ threadlocal.__init__(self)
+ self.Hub = None
+ self.loop = None
+ self.hub = None
+
+_threadlocal = _threadlocal()
+
+get_ident = thread.get_ident
+MAIN_THREAD = get_ident()
+
+
+
+
+class LoopExit(Exception):
+ """
+ Exception thrown when the hub finishes running.
+
+ In a normal application, this is never thrown or caught
+ explicitly. The internal implementation of functions like
+ :func:`join` and :func:`joinall` may catch it, but user code
+ generally should not.
+
+ .. caution::
+ Errors in application programming can also lead to this exception being
+ raised. Some examples include (but are not limited too):
+
+ - greenlets deadlocking on a lock;
+ - using a socket or other gevent object with native thread
+ affinity from a different thread
+
+ """
+ pass
+
+
+class BlockingSwitchOutError(AssertionError):
+ pass
+
+
+class InvalidSwitchError(AssertionError):
+ pass
+
+
+class ConcurrentObjectUseError(AssertionError):
+ # raised when an object is used (waited on) by two greenlets
+ # independently, meaning the object was entered into a blocking
+ # state by one greenlet and then another while still blocking in the
+ # first one
+ pass
+
+
+def spawn_raw(function, *args, **kwargs):
+ """
+ Create a new :class:`greenlet.greenlet` object and schedule it to
+ run ``function(*args, **kwargs)``.
+
+ This returns a raw :class:`~greenlet.greenlet` which does not have all the useful
+ methods that :class:`gevent.Greenlet` has. Typically, applications
+ should prefer :func:`~gevent.spawn`, but this method may
+ occasionally be useful as an optimization if there are many
+ greenlets involved.
+
+ .. versionchanged:: 1.1b1
+ If *function* is not callable, immediately raise a :exc:`TypeError`
+ instead of spawning a greenlet that will raise an uncaught TypeError.
+
+ .. versionchanged:: 1.1rc2
+ Accept keyword arguments for ``function`` as previously (incorrectly)
+ documented. Note that this may incur an additional expense.
+
+ .. versionchanged:: 1.1a3
+ Verify that ``function`` is callable, raising a TypeError if not. Previously,
+ the spawned greenlet would have failed the first time it was switched to.
+ """
+ if not callable(function):
+ raise TypeError("function must be callable")
+ hub = get_hub()
+
+ # The callback class object that we use to run this doesn't
+ # accept kwargs (and those objects are heavily used, as well as being
+ # implemented twice in core.ppyx and corecffi.py) so do it with a partial
+ if kwargs:
+ function = _functools_partial(function, *args, **kwargs)
+ g = RawGreenlet(function, hub)
+ hub.loop.run_callback(g.switch)
+ else:
+ g = RawGreenlet(function, hub)
+ hub.loop.run_callback(g.switch, *args)
+ return g
+
+
+def sleep(seconds=0, ref=True):
+ """
+ Put the current greenlet to sleep for at least *seconds*.
+
+ *seconds* may be specified as an integer, or a float if fractional
+ seconds are desired.
+
+ .. tip:: In the current implementation, a value of 0 (the default)
+ means to yield execution to any other runnable greenlets, but
+ this greenlet may be scheduled again before the event loop
+ cycles (in an extreme case, a greenlet that repeatedly sleeps
+ with 0 can prevent greenlets that are ready to do I/O from
+ being scheduled for some (small) period of time); a value greater than
+ 0, on the other hand, will delay running this greenlet until
+ the next iteration of the loop.
+
+ If *ref* is False, the greenlet running ``sleep()`` will not prevent :func:`gevent.wait`
+ from exiting.
+
+ .. seealso:: :func:`idle`
+ """
+ hub = get_hub()
+ loop = hub.loop
+ if seconds <= 0:
+ waiter = Waiter()
+ loop.run_callback(waiter.switch)
+ waiter.get()
+ else:
+ hub.wait(loop.timer(seconds, ref=ref))
+
+
+def idle(priority=0):
+ """
+ Cause the calling greenlet to wait until the event loop is idle.
+
+ Idle is defined as having no other events of the same or higher
+ *priority* pending. That is, as long as sockets, timeouts or even
+ signals of the same or higher priority are being processed, the loop
+ is not idle.
+
+ .. seealso:: :func:`sleep`
+ """
+ hub = get_hub()
+ watcher = hub.loop.idle()
+ if priority:
+ watcher.priority = priority
+ hub.wait(watcher)
+
+
+def kill(greenlet, exception=GreenletExit):
+ """
+ Kill greenlet asynchronously. The current greenlet is not unscheduled.
+
+ .. note::
+
+ The method :meth:`Greenlet.kill` method does the same and
+ more (and the same caveats listed there apply here). However, the MAIN
+ greenlet - the one that exists initially - does not have a
+ ``kill()`` method, and neither do any created with :func:`spawn_raw`,
+ so you have to use this function.
+
+ .. caution:: Use care when killing greenlets. If they are not prepared for
+ exceptions, this could result in corrupted state.
+
+ .. versionchanged:: 1.1a2
+ If the ``greenlet`` has a :meth:`kill <Greenlet.kill>` method, calls it. This prevents a
+ greenlet from being switched to for the first time after it's been
+ killed but not yet executed.
+ """
+ if not greenlet.dead:
+ if hasattr(greenlet, 'kill'):
+ # dealing with gevent.greenlet.Greenlet. Use it, especially
+ # to avoid allowing one to be switched to for the first time
+ # after it's been killed
+ greenlet.kill(exception=exception, block=False)
+ else:
+ get_hub().loop.run_callback(greenlet.throw, exception)
+
+
+class signal(object):
+ """
+ Call the *handler* with the *args* and *kwargs* when the process
+ receives the signal *signalnum*.
+
+ The *handler* will be run in a new greenlet when the signal is delivered.
+
+ This returns an object with the useful method ``cancel``, which, when called,
+ will prevent future deliveries of *signalnum* from calling *handler*.
+
+ .. note::
+
+ This may not operate correctly with SIGCHLD if libev child watchers
+ are used (as they are by default with os.fork).
+
+ .. versionchanged:: 1.2a1
+ The ``handler`` argument is required to be callable at construction time.
+ """
+
+ # XXX: This is manually documented in gevent.rst while it is aliased in
+ # the gevent module.
+
+ greenlet_class = None
+
+ def __init__(self, signalnum, handler, *args, **kwargs):
+ if not callable(handler):
+ raise TypeError("signal handler must be callable.")
+
+ self.hub = get_hub()
+ self.watcher = self.hub.loop.signal(signalnum, ref=False)
+ self.watcher.start(self._start)
+ self.handler = handler
+ self.args = args
+ self.kwargs = kwargs
+ if self.greenlet_class is None:
+ from gevent import Greenlet
+ self.greenlet_class = Greenlet
+
+ def _get_ref(self):
+ return self.watcher.ref
+
+ def _set_ref(self, value):
+ self.watcher.ref = value
+
+ ref = property(_get_ref, _set_ref)
+ del _get_ref, _set_ref
+
+ def cancel(self):
+ self.watcher.stop()
+
+ def _start(self):
+ try:
+ greenlet = self.greenlet_class(self.handle)
+ greenlet.switch()
+ except: # pylint:disable=bare-except
+ self.hub.handle_error(None, *sys._exc_info()) # pylint:disable=no-member
+
+ def handle(self):
+ try:
+ self.handler(*self.args, **self.kwargs)
+ except: # pylint:disable=bare-except
+ self.hub.handle_error(None, *sys.exc_info())
+
+
+def reinit():
+ """
+ Prepare the gevent hub to run in a new (forked) process.
+
+ This should be called *immediately* after :func:`os.fork` in the
+ child process. This is done automatically by
+ :func:`gevent.os.fork` or if the :mod:`os` module has been
+ monkey-patched. If this function is not called in a forked
+ process, symptoms may include hanging of functions like
+ :func:`socket.getaddrinfo`, and the hub's threadpool is unlikely
+ to work.
+
+ .. note:: Registered fork watchers may or may not run before
+ this function (and thus ``gevent.os.fork``) return. If they have
+ not run, they will run "soon", after an iteration of the event loop.
+ You can force this by inserting a few small (but non-zero) calls to :func:`sleep`
+ after fork returns. (As of gevent 1.1 and before, fork watchers will
+ not have run, but this may change in the future.)
+
+ .. note:: This function may be removed in a future major release
+ if the fork process can be more smoothly managed.
+
+ .. warning:: See remarks in :func:`gevent.os.fork` about greenlets
+ and libev watchers in the child process.
+ """
+ # The loop reinit function in turn calls libev's ev_loop_fork
+ # function.
+ hub = _get_hub()
+
+ if hub is not None:
+ # Note that we reinit the existing loop, not destroy it.
+ # See https://github.com/gevent/gevent/issues/200.
+ hub.loop.reinit()
+ # libev's fork watchers are slow to fire because the only fire
+ # at the beginning of a loop; due to our use of callbacks that
+ # run at the end of the loop, that may be too late. The
+ # threadpool and resolvers depend on the fork handlers being
+ # run (specifically, the threadpool will fail in the forked
+ # child if there were any threads in it, which there will be
+ # if the resolver_thread was in use (the default) before the
+ # fork.)
+ #
+ # If the forked process wants to use the threadpool or
+ # resolver immediately (in a queued callback), it would hang.
+ #
+ # The below is a workaround. Fortunately, both of these
+ # methods are idempotent and can be called multiple times
+ # following a fork if the suddenly started working, or were
+ # already working on some platforms. Other threadpools and fork handlers
+ # will be called at an arbitrary time later ('soon')
+ if hasattr(hub.threadpool, '_on_fork'):
+ hub.threadpool._on_fork()
+ # resolver_ares also has a fork watcher that's not firing
+ if hasattr(hub.resolver, '_on_fork'):
+ hub.resolver._on_fork()
+
+ # TODO: We'd like to sleep for a non-zero amount of time to force the loop to make a
+ # pass around before returning to this greenlet. That will allow any
+ # user-provided fork watchers to run. (Two calls are necessary.) HOWEVER, if
+ # we do this, certain tests that heavily mix threads and forking,
+ # like 2.7/test_threading:test_reinit_tls_after_fork, fail. It's not immediately clear
+ # why.
+ #sleep(0.00001)
+ #sleep(0.00001)
+
+
+def get_hub_class():
+ """Return the type of hub to use for the current thread.
+
+ If there's no type of hub for the current thread yet, 'gevent.hub.Hub' is used.
+ """
+ hubtype = _threadlocal.Hub
+ if hubtype is None:
+ hubtype = _threadlocal.Hub = Hub
+ return hubtype
+
+
+def get_hub(*args, **kwargs):
+ """
+ Return the hub for the current thread.
+
+ If a hub does not exist in the current thread, a new one is
+ created of the type returned by :func:`get_hub_class`.
+ """
+ hub = _threadlocal.hub
+ if hub is None:
+ hubtype = get_hub_class()
+ hub = _threadlocal.hub = hubtype(*args, **kwargs)
+ return hub
+
+
+def _get_hub():
+ """Return the hub for the current thread.
+
+ Return ``None`` if no hub has been created yet.
+ """
+ return _threadlocal.hub
+
+
+def set_hub(hub):
+ _threadlocal.hub = hub
+
+
+def _import(path):
+ # pylint:disable=too-many-branches
+ if isinstance(path, list):
+ if not path:
+ raise ImportError('Cannot import from empty list: %r' % (path, ))
+
+ for item in path[:-1]:
+ try:
+ return _import(item)
+ except ImportError:
+ pass
+
+ return _import(path[-1])
+
+ if not isinstance(path, string_types):
+ return path
+
+ if '.' not in path:
+ raise ImportError("Cannot import %r (required format: [path/][package.]module.class)" % path)
+
+ if '/' in path:
+ package_path, path = path.rsplit('/', 1)
+ sys.path = [package_path] + sys.path
+ else:
+ package_path = None
+
+ try:
+ module, item = path.rsplit('.', 1)
+ x = __import__(module)
+ for attr in path.split('.')[1:]:
+ oldx = x
+ x = getattr(x, attr, _NONE)
+ if x is _NONE:
+ raise ImportError('Cannot import %r from %r' % (attr, oldx))
+ return x
+ finally:
+ try:
+ sys.path.remove(package_path)
+ except ValueError:
+ pass
+
+
+def config(default, envvar):
+ result = os.environ.get(envvar) or default # absolute import gets confused pylint: disable=no-member
+ if isinstance(result, string_types):
+ return result.split(',')
+ return result
+
+
+def resolver_config(default, envvar):
+ result = config(default, envvar)
+ return [_resolvers.get(x, x) for x in result]
+
+
+_resolvers = {'ares': 'gevent.resolver_ares.Resolver',
+ 'thread': 'gevent.resolver_thread.Resolver',
+ 'block': 'gevent.socket.BlockingResolver'}
+
+
+_DEFAULT_LOOP_CLASS = 'gevent.core.loop'
+
+
+class Hub(RawGreenlet):
+ """A greenlet that runs the event loop.
+
+ It is created automatically by :func:`get_hub`.
+
+ **Switching**
+
+ Every time this greenlet (i.e., the event loop) is switched *to*, if
+ the current greenlet has a ``switch_out`` method, it will be called. This
+ allows a greenlet to take some cleanup actions before yielding control. This method
+ should not call any gevent blocking functions.
+ """
+
+ #: If instances of these classes are raised into the event loop,
+ #: they will be propagated out to the main greenlet (where they will
+ #: usually be caught by Python itself)
+ SYSTEM_ERROR = (KeyboardInterrupt, SystemExit, SystemError)
+
+ #: Instances of these classes are not considered to be errors and
+ #: do not get logged/printed when raised by the event loop.
+ NOT_ERROR = (GreenletExit, SystemExit)
+
+ loop_class = config(_DEFAULT_LOOP_CLASS, 'GEVENT_LOOP')
+ # For the standard class, go ahead and import it when this class
+ # is defined. This is no loss of generality because the envvar is
+ # only read when this class is defined, and we know that the
+ # standard class will be available. This can solve problems with
+ # the class being imported from multiple threads at once, leading
+ # to one of the imports failing. Only do this for the object we
+ # need in the constructor, as the rest of the factories are
+ # themselves handled lazily. See #687. (People using a custom loop_class
+ # can probably manage to get_hub() from the main thread or otherwise import
+ # that loop_class themselves.)
+ if loop_class == [_DEFAULT_LOOP_CLASS]:
+ loop_class = [_import(loop_class)]
+
+ resolver_class = ['gevent.resolver_thread.Resolver',
+ 'gevent.resolver_ares.Resolver',
+ 'gevent.socket.BlockingResolver']
+ #: The class or callable object, or the name of a factory function or class,
+ #: that will be used to create :attr:`resolver`. By default, configured according to
+ #: :doc:`dns`. If a list, a list of objects in preference order.
+ resolver_class = resolver_config(resolver_class, 'GEVENT_RESOLVER')
+ threadpool_class = config('gevent.threadpool.ThreadPool', 'GEVENT_THREADPOOL')
+ backend = config(None, 'GEVENT_BACKEND')
+ threadpool_size = 10
+
+ # using pprint.pformat can override custom __repr__ methods on dict/list
+ # subclasses, which can be a security concern
+ format_context = 'pprint.saferepr'
+
+
+ def __init__(self, loop=None, default=None):
+ RawGreenlet.__init__(self)
+ if hasattr(loop, 'run'):
+ if default is not None:
+ raise TypeError("Unexpected argument: default")
+ self.loop = loop
+ elif _threadlocal.loop is not None:
+ # Reuse a loop instance previously set by
+ # destroying a hub without destroying the associated
+ # loop. See #237 and #238.
+ self.loop = _threadlocal.loop
+ else:
+ if default is None and get_ident() != MAIN_THREAD:
+ default = False
+ loop_class = _import(self.loop_class)
+ if loop is None:
+ loop = self.backend
+ self.loop = loop_class(flags=loop, default=default)
+ self._resolver = None
+ self._threadpool = None
+ self.format_context = _import(self.format_context)
+
+ def __repr__(self):
+ if self.loop is None:
+ info = 'destroyed'
+ else:
+ try:
+ info = self.loop._format()
+ except Exception as ex: # pylint:disable=broad-except
+ info = str(ex) or repr(ex) or 'error'
+ result = '<%s at 0x%x %s' % (self.__class__.__name__, id(self), info)
+ if self._resolver is not None:
+ result += ' resolver=%r' % self._resolver
+ if self._threadpool is not None:
+ result += ' threadpool=%r' % self._threadpool
+ return result + '>'
+
+ def handle_error(self, context, type, value, tb):
+ """
+ Called by the event loop when an error occurs. The arguments
+ type, value, and tb are the standard tuple returned by :func:`sys.exc_info`.
+
+ Applications can set a property on the hub with this same signature
+ to override the error handling provided by this class.
+
+ Errors that are :attr:`system errors <SYSTEM_ERROR>` are passed
+ to :meth:`handle_system_error`.
+
+ :param context: If this is ``None``, indicates a system error that
+ should generally result in exiting the loop and being thrown to the
+ parent greenlet.
+ """
+ if isinstance(value, str):
+ # Cython can raise errors where the value is a plain string
+ # e.g., AttributeError, "_semaphore.Semaphore has no attr", <traceback>
+ value = type(value)
+ if not issubclass(type, self.NOT_ERROR):
+ self.print_exception(context, type, value, tb)
+ if context is None or issubclass(type, self.SYSTEM_ERROR):
+ self.handle_system_error(type, value)
+
+ def handle_system_error(self, type, value):
+ current = getcurrent()
+ if current is self or current is self.parent or self.loop is None:
+ self.parent.throw(type, value)
+ else:
+ # in case system error was handled and life goes on
+ # switch back to this greenlet as well
+ cb = None
+ try:
+ cb = self.loop.run_callback(current.switch)
+ except: # pylint:disable=bare-except
+ traceback.print_exc(file=self.exception_stream)
+ try:
+ self.parent.throw(type, value)
+ finally:
+ if cb is not None:
+ cb.stop()
+
+ @readproperty
+ def exception_stream(self):
+ """
+ The stream to which exceptions will be written.
+ Defaults to ``sys.stderr`` unless assigned to.
+
+ .. versionadded:: 1.2a1
+ """
+ # Unwrap any FileObjectThread we have thrown around sys.stderr
+ # (because it can't be used in the hub). Tricky because we are
+ # called in error situations when it's not safe to import.
+ stderr = sys.stderr
+ if type(stderr).__name__ == 'FileObjectThread':
+ stderr = stderr.io # pylint:disable=no-member
+ return stderr
+
+ def print_exception(self, context, type, value, tb):
+ # Python 3 does not gracefully handle None value or tb in
+ # traceback.print_exception() as previous versions did.
+ # pylint:disable=no-member
+ errstream = self.exception_stream
+
+ if value is None:
+ errstream.write('%s\n' % type.__name__)
+ else:
+ traceback.print_exception(type, value, tb, file=errstream)
+ del tb
+
+ try:
+ import time
+ errstream.write(time.ctime())
+ errstream.write(' ' if context is not None else '\n')
+ except: # pylint:disable=bare-except
+ # Possible not safe to import under certain
+ # error conditions in Python 2
+ pass
+
+ if context is not None:
+ if not isinstance(context, str):
+ try:
+ context = self.format_context(context)
+ except: # pylint:disable=bare-except
+ traceback.print_exc(file=self.exception_stream)
+ context = repr(context)
+ errstream.write('%s failed with %s\n\n' % (context, getattr(type, '__name__', 'exception'), ))
+
+ def switch(self):
+ switch_out = getattr(getcurrent(), 'switch_out', None)
+ if switch_out is not None:
+ switch_out()
+ return RawGreenlet.switch(self)
+
+ def switch_out(self):
+ raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback')
+
+ def wait(self, watcher):
+ """
+ Wait until the *watcher* (which should not be started) is ready.
+
+ The current greenlet will be unscheduled during this time.
+
+ .. seealso:: :class:`gevent.core.io`, :class:`gevent.core.timer`,
+ :class:`gevent.core.signal`, :class:`gevent.core.idle`, :class:`gevent.core.prepare`,
+ :class:`gevent.core.check`, :class:`gevent.core.fork`, :class:`gevent.core.async`,
+ :class:`gevent.core.child`, :class:`gevent.core.stat`
+
+ """
+ waiter = Waiter()
+ unique = object()
+ watcher.start(waiter.switch, unique)
+ try:
+ result = waiter.get()
+ if result is not unique:
+ raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique))
+ finally:
+ watcher.stop()
+
+ def cancel_wait(self, watcher, error):
+ """
+ Cancel an in-progress call to :meth:`wait` by throwing the given *error*
+ in the waiting greenlet.
+ """
+ if watcher.callback is not None:
+ self.loop.run_callback(self._cancel_wait, watcher, error)
+
+ def _cancel_wait(self, watcher, error):
+ if watcher.active:
+ switch = watcher.callback
+ if switch is not None:
+ greenlet = getattr(switch, '__self__', None)
+ if greenlet is not None:
+ greenlet.throw(error)
+
+ def run(self):
+ """
+ Entry-point to running the loop. This method is called automatically
+ when the hub greenlet is scheduled; do not call it directly.
+
+ :raises LoopExit: If the loop finishes running. This means
+ that there are no other scheduled greenlets, and no active
+ watchers or servers. In some situations, this indicates a
+ programming error.
+ """
+ assert self is getcurrent(), 'Do not call Hub.run() directly'
+ while True:
+ loop = self.loop
+ loop.error_handler = self
+ try:
+ loop.run()
+ finally:
+ loop.error_handler = None # break the refcount cycle
+ self.parent.throw(LoopExit('This operation would block forever', self))
+ # this function must never return, as it will cause switch() in the parent greenlet
+ # to return an unexpected value
+ # It is still possible to kill this greenlet with throw. However, in that case
+ # switching to it is no longer safe, as switch will return immediatelly
+
+ def join(self, timeout=None):
+ """Wait for the event loop to finish. Exits only when there are
+ no more spawned greenlets, started servers, active timeouts or watchers.
+
+ If *timeout* is provided, wait no longer for the specified number of seconds.
+
+ Returns True if exited because the loop finished execution.
+ Returns False if exited because of timeout expired.
+ """
+ assert getcurrent() is self.parent, "only possible from the MAIN greenlet"
+ if self.dead:
+ return True
+
+ waiter = Waiter()
+
+ if timeout is not None:
+ timeout = self.loop.timer(timeout, ref=False)
+ timeout.start(waiter.switch)
+
+ try:
+ try:
+ waiter.get()
+ except LoopExit:
+ return True
+ finally:
+ if timeout is not None:
+ timeout.stop()
+ return False
+
+ def destroy(self, destroy_loop=None):
+ if self._resolver is not None:
+ self._resolver.close()
+ del self._resolver
+ if self._threadpool is not None:
+ self._threadpool.kill()
+ del self._threadpool
+ if destroy_loop is None:
+ destroy_loop = not self.loop.default
+ if destroy_loop:
+ if _threadlocal.loop is self.loop:
+ # Don't let anyone try to reuse this
+ _threadlocal.loop = None
+ self.loop.destroy()
+ else:
+ # Store in case another hub is created for this
+ # thread.
+ _threadlocal.loop = self.loop
+
+ self.loop = None
+ if _threadlocal.hub is self:
+ _threadlocal.hub = None
+
+ def _get_resolver(self):
+ if self._resolver is None:
+ if self.resolver_class is not None:
+ self.resolver_class = _import(self.resolver_class)
+ self._resolver = self.resolver_class(hub=self)
+ return self._resolver
+
+ def _set_resolver(self, value):
+ self._resolver = value
+
+ def _del_resolver(self):
+ del self._resolver
+
+ resolver = property(_get_resolver, _set_resolver, _del_resolver)
+
+ def _get_threadpool(self):
+ if self._threadpool is None:
+ if self.threadpool_class is not None:
+ self.threadpool_class = _import(self.threadpool_class)
+ self._threadpool = self.threadpool_class(self.threadpool_size, hub=self)
+ return self._threadpool
+
+ def _set_threadpool(self, value):
+ self._threadpool = value
+
+ def _del_threadpool(self):
+ del self._threadpool
+
+ threadpool = property(_get_threadpool, _set_threadpool, _del_threadpool)
+
+
+class Waiter(object):
+ """
+ A low level communication utility for greenlets.
+
+ Waiter is a wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them somewhat safer:
+
+ * switching will occur only if the waiting greenlet is executing :meth:`get` method currently;
+ * any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
+ * if :meth:`switch`/:meth:`throw` is called before the receiver calls :meth:`get`, then :class:`Waiter`
+ will store the value/exception. The following :meth:`get` will return the value/raise the exception.
+
+ The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
+ The :meth:`get` method must be called from a greenlet other than :class:`Hub`.
+
+ >>> result = Waiter()
+ >>> timer = get_hub().loop.timer(0.1)
+ >>> timer.start(result.switch, 'hello from Waiter')
+ >>> result.get() # blocks for 0.1 seconds
+ 'hello from Waiter'
+
+ If switch is called before the greenlet gets a chance to call :meth:`get` then
+ :class:`Waiter` stores the value.
+
+ >>> result = Waiter()
+ >>> timer = get_hub().loop.timer(0.1)
+ >>> timer.start(result.switch, 'hi from Waiter')
+ >>> sleep(0.2)
+ >>> result.get() # returns immediatelly without blocking
+ 'hi from Waiter'
+
+ .. warning::
+
+ This a limited and dangerous way to communicate between
+ greenlets. It can easily leave a greenlet unscheduled forever
+ if used incorrectly. Consider using safer classes such as
+ :class:`gevent.event.Event`, :class:`gevent.event.AsyncResult`,
+ or :class:`gevent.queue.Queue`.
+ """
+
+ __slots__ = ['hub', 'greenlet', 'value', '_exception']
+
+ def __init__(self, hub=None):
+ if hub is None:
+ self.hub = get_hub()
+ else:
+ self.hub = hub
+ self.greenlet = None
+ self.value = None
+ self._exception = _NONE
+
+ def clear(self):
+ self.greenlet = None
+ self.value = None
+ self._exception = _NONE
+
+ def __str__(self):
+ if self._exception is _NONE:
+ return '<%s greenlet=%s>' % (type(self).__name__, self.greenlet)
+ if self._exception is None:
+ return '<%s greenlet=%s value=%r>' % (type(self).__name__, self.greenlet, self.value)
+ return '<%s greenlet=%s exc_info=%r>' % (type(self).__name__, self.greenlet, self.exc_info)
+
+ def ready(self):
+ """Return true if and only if it holds a value or an exception"""
+ return self._exception is not _NONE
+
+ def successful(self):
+ """Return true if and only if it is ready and holds a value"""
+ return self._exception is None
+
+ @property
+ def exc_info(self):
+ "Holds the exception info passed to :meth:`throw` if :meth:`throw` was called. Otherwise ``None``."
+ if self._exception is not _NONE:
+ return self._exception
+
+ def switch(self, value=None):
+ """Switch to the greenlet if one's available. Otherwise store the value."""
+ greenlet = self.greenlet
+ if greenlet is None:
+ self.value = value
+ self._exception = None
+ else:
+ assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
+ switch = greenlet.switch
+ try:
+ switch(value)
+ except: # pylint:disable=bare-except
+ self.hub.handle_error(switch, *sys.exc_info())
+
+ def switch_args(self, *args):
+ return self.switch(args)
+
+ def throw(self, *throw_args):
+ """Switch to the greenlet with the exception. If there's no greenlet, store the exception."""
+ greenlet = self.greenlet
+ if greenlet is None:
+ self._exception = throw_args
+ else:
+ assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
+ throw = greenlet.throw
+ try:
+ throw(*throw_args)
+ except: # pylint:disable=bare-except
+ self.hub.handle_error(throw, *sys.exc_info())
+
+ def get(self):
+ """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
+ if self._exception is not _NONE:
+ if self._exception is None:
+ return self.value
+ else:
+ getcurrent().throw(*self._exception)
+ else:
+ if self.greenlet is not None:
+ raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
+ self.greenlet = getcurrent()
+ try:
+ return self.hub.switch()
+ finally:
+ self.greenlet = None
+
+ def __call__(self, source):
+ if source.exception is None:
+ self.switch(source.value)
+ else:
+ self.throw(source.exception)
+
+ # can also have a debugging version, that wraps the value in a tuple (self, value) in switch()
+ # and unwraps it in wait() thus checking that switch() was indeed called
+
+
+class _MultipleWaiter(Waiter):
+ """
+ An internal extension of Waiter that can be used if multiple objects
+ must be waited on, and there is a chance that in between waits greenlets
+ might be switched out. All greenlets that switch to this waiter
+ will have their value returned.
+
+ This does not handle exceptions or throw methods.
+ """
+ __slots__ = ['_values']
+
+ def __init__(self, *args, **kwargs):
+ Waiter.__init__(self, *args, **kwargs)
+ # we typically expect a relatively small number of these to be outstanding.
+ # since we pop from the left, a deque might be slightly
+ # more efficient, but since we're in the hub we avoid imports if
+ # we can help it to better support monkey-patching, and delaying the import
+ # here can be impractical (see https://github.com/gevent/gevent/issues/652)
+ self._values = list()
+
+ def switch(self, value): # pylint:disable=signature-differs
+ self._values.append(value)
+ Waiter.switch(self, True)
+
+ def get(self):
+ if not self._values:
+ Waiter.get(self)
+ Waiter.clear(self)
+
+ return self._values.pop(0)
+
+
+def iwait(objects, timeout=None, count=None):
+ """
+ Iteratively yield *objects* as they are ready, until all (or *count*) are ready
+ or *timeout* expired.
+
+ :param objects: A sequence (supporting :func:`len`) containing objects
+ implementing the wait protocol (rawlink() and unlink()).
+ :keyword int count: If not `None`, then a number specifying the maximum number
+ of objects to wait for. If ``None`` (the default), all objects
+ are waited for.
+ :keyword float timeout: If given, specifies a maximum number of seconds
+ to wait. If the timeout expires before the desired waited-for objects
+ are available, then this method returns immediately.
+
+ .. seealso:: :func:`wait`
+
+ .. versionchanged:: 1.1a1
+ Add the *count* parameter.
+ .. versionchanged:: 1.1a2
+ No longer raise :exc:`LoopExit` if our caller switches greenlets
+ in between items yielded by this function.
+ """
+ # QQQ would be nice to support iterable here that can be generated slowly (why?)
+ if objects is None:
+ yield get_hub().join(timeout=timeout)
+ return
+
+ count = len(objects) if count is None else min(count, len(objects))
+ waiter = _MultipleWaiter()
+ switch = waiter.switch
+
+ if timeout is not None:
+ timer = get_hub().loop.timer(timeout, priority=-1)
+ timer.start(switch, _NONE)
+
+ try:
+ for obj in objects:
+ obj.rawlink(switch)
+
+ for _ in xrange(count):
+ item = waiter.get()
+ waiter.clear()
+ if item is _NONE:
+ return
+ yield item
+ finally:
+ if timeout is not None:
+ timer.stop()
+ for aobj in objects:
+ unlink = getattr(aobj, 'unlink', None)
+ if unlink:
+ try:
+ unlink(switch)
+ except: # pylint:disable=bare-except
+ traceback.print_exc()
+
+
+def wait(objects=None, timeout=None, count=None):
+ """
+ Wait for ``objects`` to become ready or for event loop to finish.
+
+ If ``objects`` is provided, it must be a list containing objects
+ implementing the wait protocol (rawlink() and unlink() methods):
+
+ - :class:`gevent.Greenlet` instance
+ - :class:`gevent.event.Event` instance
+ - :class:`gevent.lock.Semaphore` instance
+ - :class:`gevent.subprocess.Popen` instance
+
+ If ``objects`` is ``None`` (the default), ``wait()`` blocks until
+ the current event loop has nothing to do (or until ``timeout`` passes):
+
+ - all greenlets have finished
+ - all servers were stopped
+ - all event loop watchers were stopped.
+
+ If ``count`` is ``None`` (the default), wait for all ``objects``
+ to become ready.
+
+ If ``count`` is a number, wait for (up to) ``count`` objects to become
+ ready. (For example, if count is ``1`` then the function exits
+ when any object in the list is ready).
+
+ If ``timeout`` is provided, it specifies the maximum number of
+ seconds ``wait()`` will block.
+
+ Returns the list of ready objects, in the order in which they were
+ ready.
+
+ .. seealso:: :func:`iwait`
+ """
+ if objects is None:
+ return get_hub().join(timeout=timeout)
+ return list(iwait(objects, timeout, count))
+
+
+class linkproxy(object):
+ __slots__ = ['callback', 'obj']
+
+ def __init__(self, callback, obj):
+ self.callback = callback
+ self.obj = obj
+
+ def __call__(self, *args):
+ callback = self.callback
+ obj = self.obj
+ self.callback = None
+ self.obj = None
+ callback(obj)