# Copyright (c) 2009-2015 Denis Bilenko. See LICENSE for details. """ Event-loop hub. """ from __future__ import absolute_import # XXX: FIXME: Refactor to make this smaller # pylint:disable=too-many-lines from functools import partial as _functools_partial import os import sys import traceback from greenlet import greenlet as RawGreenlet, getcurrent, GreenletExit __all__ = [ 'getcurrent', 'GreenletExit', 'spawn_raw', 'sleep', 'kill', 'signal', 'reinit', 'get_hub', 'Hub', 'Waiter', ] from gevent._compat import string_types from gevent._compat import xrange from gevent._util import _NONE from gevent._util import readproperty if sys.version_info[0] <= 2: import thread # pylint:disable=import-error else: import _thread as thread # python 2 pylint:disable=import-error # These must be the "real" native thread versions, # not monkey-patched. threadlocal = thread._local class _threadlocal(threadlocal): def __init__(self): # Use a class with an initializer so that we can test # for 'is None' instead of catching AttributeError, making # the code cleaner and possibly solving some corner cases # (like #687) threadlocal.__init__(self) self.Hub = None self.loop = None self.hub = None _threadlocal = _threadlocal() get_ident = thread.get_ident MAIN_THREAD = get_ident() class LoopExit(Exception): """ Exception thrown when the hub finishes running. In a normal application, this is never thrown or caught explicitly. The internal implementation of functions like :func:`join` and :func:`joinall` may catch it, but user code generally should not. .. caution:: Errors in application programming can also lead to this exception being raised. Some examples include (but are not limited too): - greenlets deadlocking on a lock; - using a socket or other gevent object with native thread affinity from a different thread """ pass class BlockingSwitchOutError(AssertionError): pass class InvalidSwitchError(AssertionError): pass class ConcurrentObjectUseError(AssertionError): # raised when an object is used (waited on) by two greenlets # independently, meaning the object was entered into a blocking # state by one greenlet and then another while still blocking in the # first one pass def spawn_raw(function, *args, **kwargs): """ Create a new :class:`greenlet.greenlet` object and schedule it to run ``function(*args, **kwargs)``. This returns a raw :class:`~greenlet.greenlet` which does not have all the useful methods that :class:`gevent.Greenlet` has. Typically, applications should prefer :func:`~gevent.spawn`, but this method may occasionally be useful as an optimization if there are many greenlets involved. .. versionchanged:: 1.1b1 If *function* is not callable, immediately raise a :exc:`TypeError` instead of spawning a greenlet that will raise an uncaught TypeError. .. versionchanged:: 1.1rc2 Accept keyword arguments for ``function`` as previously (incorrectly) documented. Note that this may incur an additional expense. .. versionchanged:: 1.1a3 Verify that ``function`` is callable, raising a TypeError if not. Previously, the spawned greenlet would have failed the first time it was switched to. """ if not callable(function): raise TypeError("function must be callable") hub = get_hub() # The callback class object that we use to run this doesn't # accept kwargs (and those objects are heavily used, as well as being # implemented twice in core.ppyx and corecffi.py) so do it with a partial if kwargs: function = _functools_partial(function, *args, **kwargs) g = RawGreenlet(function, hub) hub.loop.run_callback(g.switch) else: g = RawGreenlet(function, hub) hub.loop.run_callback(g.switch, *args) return g def sleep(seconds=0, ref=True): """ Put the current greenlet to sleep for at least *seconds*. *seconds* may be specified as an integer, or a float if fractional seconds are desired. .. tip:: In the current implementation, a value of 0 (the default) means to yield execution to any other runnable greenlets, but this greenlet may be scheduled again before the event loop cycles (in an extreme case, a greenlet that repeatedly sleeps with 0 can prevent greenlets that are ready to do I/O from being scheduled for some (small) period of time); a value greater than 0, on the other hand, will delay running this greenlet until the next iteration of the loop. If *ref* is False, the greenlet running ``sleep()`` will not prevent :func:`gevent.wait` from exiting. .. seealso:: :func:`idle` """ hub = get_hub() loop = hub.loop if seconds <= 0: waiter = Waiter() loop.run_callback(waiter.switch) waiter.get() else: hub.wait(loop.timer(seconds, ref=ref)) def idle(priority=0): """ Cause the calling greenlet to wait until the event loop is idle. Idle is defined as having no other events of the same or higher *priority* pending. That is, as long as sockets, timeouts or even signals of the same or higher priority are being processed, the loop is not idle. .. seealso:: :func:`sleep` """ hub = get_hub() watcher = hub.loop.idle() if priority: watcher.priority = priority hub.wait(watcher) def kill(greenlet, exception=GreenletExit): """ Kill greenlet asynchronously. The current greenlet is not unscheduled. .. note:: The method :meth:`Greenlet.kill` method does the same and more (and the same caveats listed there apply here). However, the MAIN greenlet - the one that exists initially - does not have a ``kill()`` method, and neither do any created with :func:`spawn_raw`, so you have to use this function. .. caution:: Use care when killing greenlets. If they are not prepared for exceptions, this could result in corrupted state. .. versionchanged:: 1.1a2 If the ``greenlet`` has a :meth:`kill ` method, calls it. This prevents a greenlet from being switched to for the first time after it's been killed but not yet executed. """ if not greenlet.dead: if hasattr(greenlet, 'kill'): # dealing with gevent.greenlet.Greenlet. Use it, especially # to avoid allowing one to be switched to for the first time # after it's been killed greenlet.kill(exception=exception, block=False) else: get_hub().loop.run_callback(greenlet.throw, exception) class signal(object): """ Call the *handler* with the *args* and *kwargs* when the process receives the signal *signalnum*. The *handler* will be run in a new greenlet when the signal is delivered. This returns an object with the useful method ``cancel``, which, when called, will prevent future deliveries of *signalnum* from calling *handler*. .. note:: This may not operate correctly with SIGCHLD if libev child watchers are used (as they are by default with os.fork). .. versionchanged:: 1.2a1 The ``handler`` argument is required to be callable at construction time. """ # XXX: This is manually documented in gevent.rst while it is aliased in # the gevent module. greenlet_class = None def __init__(self, signalnum, handler, *args, **kwargs): if not callable(handler): raise TypeError("signal handler must be callable.") self.hub = get_hub() self.watcher = self.hub.loop.signal(signalnum, ref=False) self.watcher.start(self._start) self.handler = handler self.args = args self.kwargs = kwargs if self.greenlet_class is None: from gevent import Greenlet self.greenlet_class = Greenlet def _get_ref(self): return self.watcher.ref def _set_ref(self, value): self.watcher.ref = value ref = property(_get_ref, _set_ref) del _get_ref, _set_ref def cancel(self): self.watcher.stop() def _start(self): try: greenlet = self.greenlet_class(self.handle) greenlet.switch() except: # pylint:disable=bare-except self.hub.handle_error(None, *sys._exc_info()) # pylint:disable=no-member def handle(self): try: self.handler(*self.args, **self.kwargs) except: # pylint:disable=bare-except self.hub.handle_error(None, *sys.exc_info()) def reinit(): """ Prepare the gevent hub to run in a new (forked) process. This should be called *immediately* after :func:`os.fork` in the child process. This is done automatically by :func:`gevent.os.fork` or if the :mod:`os` module has been monkey-patched. If this function is not called in a forked process, symptoms may include hanging of functions like :func:`socket.getaddrinfo`, and the hub's threadpool is unlikely to work. .. note:: Registered fork watchers may or may not run before this function (and thus ``gevent.os.fork``) return. If they have not run, they will run "soon", after an iteration of the event loop. You can force this by inserting a few small (but non-zero) calls to :func:`sleep` after fork returns. (As of gevent 1.1 and before, fork watchers will not have run, but this may change in the future.) .. note:: This function may be removed in a future major release if the fork process can be more smoothly managed. .. warning:: See remarks in :func:`gevent.os.fork` about greenlets and libev watchers in the child process. """ # The loop reinit function in turn calls libev's ev_loop_fork # function. hub = _get_hub() if hub is not None: # Note that we reinit the existing loop, not destroy it. # See https://github.com/gevent/gevent/issues/200. hub.loop.reinit() # libev's fork watchers are slow to fire because the only fire # at the beginning of a loop; due to our use of callbacks that # run at the end of the loop, that may be too late. The # threadpool and resolvers depend on the fork handlers being # run (specifically, the threadpool will fail in the forked # child if there were any threads in it, which there will be # if the resolver_thread was in use (the default) before the # fork.) # # If the forked process wants to use the threadpool or # resolver immediately (in a queued callback), it would hang. # # The below is a workaround. Fortunately, both of these # methods are idempotent and can be called multiple times # following a fork if the suddenly started working, or were # already working on some platforms. Other threadpools and fork handlers # will be called at an arbitrary time later ('soon') if hasattr(hub.threadpool, '_on_fork'): hub.threadpool._on_fork() # resolver_ares also has a fork watcher that's not firing if hasattr(hub.resolver, '_on_fork'): hub.resolver._on_fork() # TODO: We'd like to sleep for a non-zero amount of time to force the loop to make a # pass around before returning to this greenlet. That will allow any # user-provided fork watchers to run. (Two calls are necessary.) HOWEVER, if # we do this, certain tests that heavily mix threads and forking, # like 2.7/test_threading:test_reinit_tls_after_fork, fail. It's not immediately clear # why. #sleep(0.00001) #sleep(0.00001) def get_hub_class(): """Return the type of hub to use for the current thread. If there's no type of hub for the current thread yet, 'gevent.hub.Hub' is used. """ hubtype = _threadlocal.Hub if hubtype is None: hubtype = _threadlocal.Hub = Hub return hubtype def get_hub(*args, **kwargs): """ Return the hub for the current thread. If a hub does not exist in the current thread, a new one is created of the type returned by :func:`get_hub_class`. """ hub = _threadlocal.hub if hub is None: hubtype = get_hub_class() hub = _threadlocal.hub = hubtype(*args, **kwargs) return hub def _get_hub(): """Return the hub for the current thread. Return ``None`` if no hub has been created yet. """ return _threadlocal.hub def set_hub(hub): _threadlocal.hub = hub def _import(path): # pylint:disable=too-many-branches if isinstance(path, list): if not path: raise ImportError('Cannot import from empty list: %r' % (path, )) for item in path[:-1]: try: return _import(item) except ImportError: pass return _import(path[-1]) if not isinstance(path, string_types): return path if '.' not in path: raise ImportError("Cannot import %r (required format: [path/][package.]module.class)" % path) if '/' in path: package_path, path = path.rsplit('/', 1) sys.path = [package_path] + sys.path else: package_path = None try: module, item = path.rsplit('.', 1) x = __import__(module) for attr in path.split('.')[1:]: oldx = x x = getattr(x, attr, _NONE) if x is _NONE: raise ImportError('Cannot import %r from %r' % (attr, oldx)) return x finally: try: sys.path.remove(package_path) except ValueError: pass def config(default, envvar): result = os.environ.get(envvar) or default # absolute import gets confused pylint: disable=no-member if isinstance(result, string_types): return result.split(',') return result def resolver_config(default, envvar): result = config(default, envvar) return [_resolvers.get(x, x) for x in result] _resolvers = {'ares': 'gevent.resolver_ares.Resolver', 'thread': 'gevent.resolver_thread.Resolver', 'block': 'gevent.socket.BlockingResolver'} _DEFAULT_LOOP_CLASS = 'gevent.core.loop' class Hub(RawGreenlet): """A greenlet that runs the event loop. It is created automatically by :func:`get_hub`. **Switching** Every time this greenlet (i.e., the event loop) is switched *to*, if the current greenlet has a ``switch_out`` method, it will be called. This allows a greenlet to take some cleanup actions before yielding control. This method should not call any gevent blocking functions. """ #: If instances of these classes are raised into the event loop, #: they will be propagated out to the main greenlet (where they will #: usually be caught by Python itself) SYSTEM_ERROR = (KeyboardInterrupt, SystemExit, SystemError) #: Instances of these classes are not considered to be errors and #: do not get logged/printed when raised by the event loop. NOT_ERROR = (GreenletExit, SystemExit) loop_class = config(_DEFAULT_LOOP_CLASS, 'GEVENT_LOOP') # For the standard class, go ahead and import it when this class # is defined. This is no loss of generality because the envvar is # only read when this class is defined, and we know that the # standard class will be available. This can solve problems with # the class being imported from multiple threads at once, leading # to one of the imports failing. Only do this for the object we # need in the constructor, as the rest of the factories are # themselves handled lazily. See #687. (People using a custom loop_class # can probably manage to get_hub() from the main thread or otherwise import # that loop_class themselves.) if loop_class == [_DEFAULT_LOOP_CLASS]: loop_class = [_import(loop_class)] resolver_class = ['gevent.resolver_thread.Resolver', 'gevent.resolver_ares.Resolver', 'gevent.socket.BlockingResolver'] #: The class or callable object, or the name of a factory function or class, #: that will be used to create :attr:`resolver`. By default, configured according to #: :doc:`dns`. If a list, a list of objects in preference order. resolver_class = resolver_config(resolver_class, 'GEVENT_RESOLVER') threadpool_class = config('gevent.threadpool.ThreadPool', 'GEVENT_THREADPOOL') backend = config(None, 'GEVENT_BACKEND') threadpool_size = 10 # using pprint.pformat can override custom __repr__ methods on dict/list # subclasses, which can be a security concern format_context = 'pprint.saferepr' def __init__(self, loop=None, default=None): RawGreenlet.__init__(self) if hasattr(loop, 'run'): if default is not None: raise TypeError("Unexpected argument: default") self.loop = loop elif _threadlocal.loop is not None: # Reuse a loop instance previously set by # destroying a hub without destroying the associated # loop. See #237 and #238. self.loop = _threadlocal.loop else: if default is None and get_ident() != MAIN_THREAD: default = False loop_class = _import(self.loop_class) if loop is None: loop = self.backend self.loop = loop_class(flags=loop, default=default) self._resolver = None self._threadpool = None self.format_context = _import(self.format_context) def __repr__(self): if self.loop is None: info = 'destroyed' else: try: info = self.loop._format() except Exception as ex: # pylint:disable=broad-except info = str(ex) or repr(ex) or 'error' result = '<%s at 0x%x %s' % (self.__class__.__name__, id(self), info) if self._resolver is not None: result += ' resolver=%r' % self._resolver if self._threadpool is not None: result += ' threadpool=%r' % self._threadpool return result + '>' def handle_error(self, context, type, value, tb): """ Called by the event loop when an error occurs. The arguments type, value, and tb are the standard tuple returned by :func:`sys.exc_info`. Applications can set a property on the hub with this same signature to override the error handling provided by this class. Errors that are :attr:`system errors ` are passed to :meth:`handle_system_error`. :param context: If this is ``None``, indicates a system error that should generally result in exiting the loop and being thrown to the parent greenlet. """ if isinstance(value, str): # Cython can raise errors where the value is a plain string # e.g., AttributeError, "_semaphore.Semaphore has no attr", value = type(value) if not issubclass(type, self.NOT_ERROR): self.print_exception(context, type, value, tb) if context is None or issubclass(type, self.SYSTEM_ERROR): self.handle_system_error(type, value) def handle_system_error(self, type, value): current = getcurrent() if current is self or current is self.parent or self.loop is None: self.parent.throw(type, value) else: # in case system error was handled and life goes on # switch back to this greenlet as well cb = None try: cb = self.loop.run_callback(current.switch) except: # pylint:disable=bare-except traceback.print_exc(file=self.exception_stream) try: self.parent.throw(type, value) finally: if cb is not None: cb.stop() @readproperty def exception_stream(self): """ The stream to which exceptions will be written. Defaults to ``sys.stderr`` unless assigned to. .. versionadded:: 1.2a1 """ # Unwrap any FileObjectThread we have thrown around sys.stderr # (because it can't be used in the hub). Tricky because we are # called in error situations when it's not safe to import. stderr = sys.stderr if type(stderr).__name__ == 'FileObjectThread': stderr = stderr.io # pylint:disable=no-member return stderr def print_exception(self, context, type, value, tb): # Python 3 does not gracefully handle None value or tb in # traceback.print_exception() as previous versions did. # pylint:disable=no-member errstream = self.exception_stream if value is None: errstream.write('%s\n' % type.__name__) else: traceback.print_exception(type, value, tb, file=errstream) del tb try: import time errstream.write(time.ctime()) errstream.write(' ' if context is not None else '\n') except: # pylint:disable=bare-except # Possible not safe to import under certain # error conditions in Python 2 pass if context is not None: if not isinstance(context, str): try: context = self.format_context(context) except: # pylint:disable=bare-except traceback.print_exc(file=self.exception_stream) context = repr(context) errstream.write('%s failed with %s\n\n' % (context, getattr(type, '__name__', 'exception'), )) def switch(self): switch_out = getattr(getcurrent(), 'switch_out', None) if switch_out is not None: switch_out() return RawGreenlet.switch(self) def switch_out(self): raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback') def wait(self, watcher): """ Wait until the *watcher* (which should not be started) is ready. The current greenlet will be unscheduled during this time. .. seealso:: :class:`gevent.core.io`, :class:`gevent.core.timer`, :class:`gevent.core.signal`, :class:`gevent.core.idle`, :class:`gevent.core.prepare`, :class:`gevent.core.check`, :class:`gevent.core.fork`, :class:`gevent.core.async`, :class:`gevent.core.child`, :class:`gevent.core.stat` """ waiter = Waiter() unique = object() watcher.start(waiter.switch, unique) try: result = waiter.get() if result is not unique: raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique)) finally: watcher.stop() def cancel_wait(self, watcher, error): """ Cancel an in-progress call to :meth:`wait` by throwing the given *error* in the waiting greenlet. """ if watcher.callback is not None: self.loop.run_callback(self._cancel_wait, watcher, error) def _cancel_wait(self, watcher, error): if watcher.active: switch = watcher.callback if switch is not None: greenlet = getattr(switch, '__self__', None) if greenlet is not None: greenlet.throw(error) def run(self): """ Entry-point to running the loop. This method is called automatically when the hub greenlet is scheduled; do not call it directly. :raises LoopExit: If the loop finishes running. This means that there are no other scheduled greenlets, and no active watchers or servers. In some situations, this indicates a programming error. """ assert self is getcurrent(), 'Do not call Hub.run() directly' while True: loop = self.loop loop.error_handler = self try: loop.run() finally: loop.error_handler = None # break the refcount cycle self.parent.throw(LoopExit('This operation would block forever', self)) # this function must never return, as it will cause switch() in the parent greenlet # to return an unexpected value # It is still possible to kill this greenlet with throw. However, in that case # switching to it is no longer safe, as switch will return immediatelly def join(self, timeout=None): """Wait for the event loop to finish. Exits only when there are no more spawned greenlets, started servers, active timeouts or watchers. If *timeout* is provided, wait no longer for the specified number of seconds. Returns True if exited because the loop finished execution. Returns False if exited because of timeout expired. """ assert getcurrent() is self.parent, "only possible from the MAIN greenlet" if self.dead: return True waiter = Waiter() if timeout is not None: timeout = self.loop.timer(timeout, ref=False) timeout.start(waiter.switch) try: try: waiter.get() except LoopExit: return True finally: if timeout is not None: timeout.stop() return False def destroy(self, destroy_loop=None): if self._resolver is not None: self._resolver.close() del self._resolver if self._threadpool is not None: self._threadpool.kill() del self._threadpool if destroy_loop is None: destroy_loop = not self.loop.default if destroy_loop: if _threadlocal.loop is self.loop: # Don't let anyone try to reuse this _threadlocal.loop = None self.loop.destroy() else: # Store in case another hub is created for this # thread. _threadlocal.loop = self.loop self.loop = None if _threadlocal.hub is self: _threadlocal.hub = None def _get_resolver(self): if self._resolver is None: if self.resolver_class is not None: self.resolver_class = _import(self.resolver_class) self._resolver = self.resolver_class(hub=self) return self._resolver def _set_resolver(self, value): self._resolver = value def _del_resolver(self): del self._resolver resolver = property(_get_resolver, _set_resolver, _del_resolver) def _get_threadpool(self): if self._threadpool is None: if self.threadpool_class is not None: self.threadpool_class = _import(self.threadpool_class) self._threadpool = self.threadpool_class(self.threadpool_size, hub=self) return self._threadpool def _set_threadpool(self, value): self._threadpool = value def _del_threadpool(self): del self._threadpool threadpool = property(_get_threadpool, _set_threadpool, _del_threadpool) class Waiter(object): """ A low level communication utility for greenlets. Waiter is a wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them somewhat safer: * switching will occur only if the waiting greenlet is executing :meth:`get` method currently; * any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw` * if :meth:`switch`/:meth:`throw` is called before the receiver calls :meth:`get`, then :class:`Waiter` will store the value/exception. The following :meth:`get` will return the value/raise the exception. The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet. The :meth:`get` method must be called from a greenlet other than :class:`Hub`. >>> result = Waiter() >>> timer = get_hub().loop.timer(0.1) >>> timer.start(result.switch, 'hello from Waiter') >>> result.get() # blocks for 0.1 seconds 'hello from Waiter' If switch is called before the greenlet gets a chance to call :meth:`get` then :class:`Waiter` stores the value. >>> result = Waiter() >>> timer = get_hub().loop.timer(0.1) >>> timer.start(result.switch, 'hi from Waiter') >>> sleep(0.2) >>> result.get() # returns immediatelly without blocking 'hi from Waiter' .. warning:: This a limited and dangerous way to communicate between greenlets. It can easily leave a greenlet unscheduled forever if used incorrectly. Consider using safer classes such as :class:`gevent.event.Event`, :class:`gevent.event.AsyncResult`, or :class:`gevent.queue.Queue`. """ __slots__ = ['hub', 'greenlet', 'value', '_exception'] def __init__(self, hub=None): if hub is None: self.hub = get_hub() else: self.hub = hub self.greenlet = None self.value = None self._exception = _NONE def clear(self): self.greenlet = None self.value = None self._exception = _NONE def __str__(self): if self._exception is _NONE: return '<%s greenlet=%s>' % (type(self).__name__, self.greenlet) if self._exception is None: return '<%s greenlet=%s value=%r>' % (type(self).__name__, self.greenlet, self.value) return '<%s greenlet=%s exc_info=%r>' % (type(self).__name__, self.greenlet, self.exc_info) def ready(self): """Return true if and only if it holds a value or an exception""" return self._exception is not _NONE def successful(self): """Return true if and only if it is ready and holds a value""" return self._exception is None @property def exc_info(self): "Holds the exception info passed to :meth:`throw` if :meth:`throw` was called. Otherwise ``None``." if self._exception is not _NONE: return self._exception def switch(self, value=None): """Switch to the greenlet if one's available. Otherwise store the value.""" greenlet = self.greenlet if greenlet is None: self.value = value self._exception = None else: assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet" switch = greenlet.switch try: switch(value) except: # pylint:disable=bare-except self.hub.handle_error(switch, *sys.exc_info()) def switch_args(self, *args): return self.switch(args) def throw(self, *throw_args): """Switch to the greenlet with the exception. If there's no greenlet, store the exception.""" greenlet = self.greenlet if greenlet is None: self._exception = throw_args else: assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet" throw = greenlet.throw try: throw(*throw_args) except: # pylint:disable=bare-except self.hub.handle_error(throw, *sys.exc_info()) def get(self): """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called.""" if self._exception is not _NONE: if self._exception is None: return self.value else: getcurrent().throw(*self._exception) else: if self.greenlet is not None: raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, )) self.greenlet = getcurrent() try: return self.hub.switch() finally: self.greenlet = None def __call__(self, source): if source.exception is None: self.switch(source.value) else: self.throw(source.exception) # can also have a debugging version, that wraps the value in a tuple (self, value) in switch() # and unwraps it in wait() thus checking that switch() was indeed called class _MultipleWaiter(Waiter): """ An internal extension of Waiter that can be used if multiple objects must be waited on, and there is a chance that in between waits greenlets might be switched out. All greenlets that switch to this waiter will have their value returned. This does not handle exceptions or throw methods. """ __slots__ = ['_values'] def __init__(self, *args, **kwargs): Waiter.__init__(self, *args, **kwargs) # we typically expect a relatively small number of these to be outstanding. # since we pop from the left, a deque might be slightly # more efficient, but since we're in the hub we avoid imports if # we can help it to better support monkey-patching, and delaying the import # here can be impractical (see https://github.com/gevent/gevent/issues/652) self._values = list() def switch(self, value): # pylint:disable=signature-differs self._values.append(value) Waiter.switch(self, True) def get(self): if not self._values: Waiter.get(self) Waiter.clear(self) return self._values.pop(0) def iwait(objects, timeout=None, count=None): """ Iteratively yield *objects* as they are ready, until all (or *count*) are ready or *timeout* expired. :param objects: A sequence (supporting :func:`len`) containing objects implementing the wait protocol (rawlink() and unlink()). :keyword int count: If not `None`, then a number specifying the maximum number of objects to wait for. If ``None`` (the default), all objects are waited for. :keyword float timeout: If given, specifies a maximum number of seconds to wait. If the timeout expires before the desired waited-for objects are available, then this method returns immediately. .. seealso:: :func:`wait` .. versionchanged:: 1.1a1 Add the *count* parameter. .. versionchanged:: 1.1a2 No longer raise :exc:`LoopExit` if our caller switches greenlets in between items yielded by this function. """ # QQQ would be nice to support iterable here that can be generated slowly (why?) if objects is None: yield get_hub().join(timeout=timeout) return count = len(objects) if count is None else min(count, len(objects)) waiter = _MultipleWaiter() switch = waiter.switch if timeout is not None: timer = get_hub().loop.timer(timeout, priority=-1) timer.start(switch, _NONE) try: for obj in objects: obj.rawlink(switch) for _ in xrange(count): item = waiter.get() waiter.clear() if item is _NONE: return yield item finally: if timeout is not None: timer.stop() for aobj in objects: unlink = getattr(aobj, 'unlink', None) if unlink: try: unlink(switch) except: # pylint:disable=bare-except traceback.print_exc() def wait(objects=None, timeout=None, count=None): """ Wait for ``objects`` to become ready or for event loop to finish. If ``objects`` is provided, it must be a list containing objects implementing the wait protocol (rawlink() and unlink() methods): - :class:`gevent.Greenlet` instance - :class:`gevent.event.Event` instance - :class:`gevent.lock.Semaphore` instance - :class:`gevent.subprocess.Popen` instance If ``objects`` is ``None`` (the default), ``wait()`` blocks until the current event loop has nothing to do (or until ``timeout`` passes): - all greenlets have finished - all servers were stopped - all event loop watchers were stopped. If ``count`` is ``None`` (the default), wait for all ``objects`` to become ready. If ``count`` is a number, wait for (up to) ``count`` objects to become ready. (For example, if count is ``1`` then the function exits when any object in the list is ready). If ``timeout`` is provided, it specifies the maximum number of seconds ``wait()`` will block. Returns the list of ready objects, in the order in which they were ready. .. seealso:: :func:`iwait` """ if objects is None: return get_hub().join(timeout=timeout) return list(iwait(objects, timeout, count)) class linkproxy(object): __slots__ = ['callback', 'obj'] def __init__(self, callback, obj): self.callback = callback self.obj = obj def __call__(self, *args): callback = self.callback obj = self.obj self.callback = None self.obj = None callback(obj)