aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/hub.py
diff options
context:
space:
mode:
authorJames Taylor <user234683@users.noreply.github.com>2018-09-14 19:32:27 -0700
committerJames Taylor <user234683@users.noreply.github.com>2018-09-14 19:32:27 -0700
commit4212164e91ba2f49583cf44ad623a29b36db8f77 (patch)
tree47aefe3c0162f03e0c823b43873356f69c1cd636 /python/gevent/hub.py
parent6ca20ff7010f2bafc7fefcb8cad982be27a8aeae (diff)
downloadyt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.tar.lz
yt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.tar.xz
yt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.zip
Windows: Use 32-bit distribution of python
Diffstat (limited to 'python/gevent/hub.py')
-rw-r--r--python/gevent/hub.py841
1 files changed, 261 insertions, 580 deletions
diff --git a/python/gevent/hub.py b/python/gevent/hub.py
index 001cd06..ca88d20 100644
--- a/python/gevent/hub.py
+++ b/python/gevent/hub.py
@@ -2,15 +2,19 @@
"""
Event-loop hub.
"""
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
# XXX: FIXME: Refactor to make this smaller
# pylint:disable=too-many-lines
from functools import partial as _functools_partial
-import os
+
import sys
import traceback
-from greenlet import greenlet as RawGreenlet, getcurrent, GreenletExit
+
+from greenlet import greenlet as RawGreenlet
+from greenlet import getcurrent
+from greenlet import GreenletExit
+
__all__ = [
@@ -26,76 +30,38 @@ __all__ = [
'Waiter',
]
-from gevent._compat import string_types
-from gevent._compat import xrange
-from gevent._util import _NONE
+from gevent._config import config as GEVENT_CONFIG
+from gevent._compat import thread_mod_name
from gevent._util import readproperty
+from gevent._util import Lazy
+from gevent._util import gmctime
+from gevent._ident import IdentRegistry
-if sys.version_info[0] <= 2:
- import thread # pylint:disable=import-error
-else:
- import _thread as thread # python 2 pylint:disable=import-error
-
-# These must be the "real" native thread versions,
-# not monkey-patched.
-threadlocal = thread._local
-
-
-class _threadlocal(threadlocal):
-
- def __init__(self):
- # Use a class with an initializer so that we can test
- # for 'is None' instead of catching AttributeError, making
- # the code cleaner and possibly solving some corner cases
- # (like #687)
- threadlocal.__init__(self)
- self.Hub = None
- self.loop = None
- self.hub = None
-
-_threadlocal = _threadlocal()
-
-get_ident = thread.get_ident
-MAIN_THREAD = get_ident()
-
+from gevent._hub_local import get_hub
+from gevent._hub_local import get_loop
+from gevent._hub_local import set_hub
+from gevent._hub_local import set_loop
+from gevent._hub_local import get_hub_if_exists as _get_hub
+from gevent._hub_local import get_hub_noargs as _get_hub_noargs
+from gevent._hub_local import set_default_hub_class
+from gevent._greenlet_primitives import TrackedRawGreenlet
+from gevent._hub_primitives import WaitOperationsGreenlet
+# Export
+from gevent import _hub_primitives
+wait = _hub_primitives.wait_on_objects
+iwait = _hub_primitives.iwait_on_objects
-class LoopExit(Exception):
- """
- Exception thrown when the hub finishes running.
-
- In a normal application, this is never thrown or caught
- explicitly. The internal implementation of functions like
- :func:`join` and :func:`joinall` may catch it, but user code
- generally should not.
-
- .. caution::
- Errors in application programming can also lead to this exception being
- raised. Some examples include (but are not limited too):
-
- - greenlets deadlocking on a lock;
- - using a socket or other gevent object with native thread
- affinity from a different thread
-
- """
- pass
+from gevent.exceptions import LoopExit
-class BlockingSwitchOutError(AssertionError):
- pass
+from gevent._waiter import Waiter
-
-class InvalidSwitchError(AssertionError):
- pass
-
-
-class ConcurrentObjectUseError(AssertionError):
- # raised when an object is used (waited on) by two greenlets
- # independently, meaning the object was entered into a blocking
- # state by one greenlet and then another while still blocking in the
- # first one
- pass
+# Need the real get_ident. We're imported early enough (by gevent/__init__.py)
+# that we can be sure nothing is monkey patched yet.
+get_thread_ident = __import__(thread_mod_name).get_ident
+MAIN_THREAD_IDENT = get_thread_ident() # XXX: Assuming import is done on the main thread.
def spawn_raw(function, *args, **kwargs):
@@ -109,6 +75,10 @@ def spawn_raw(function, *args, **kwargs):
occasionally be useful as an optimization if there are many
greenlets involved.
+ .. versionchanged:: 1.1a3
+ Verify that ``function`` is callable, raising a TypeError if not. Previously,
+ the spawned greenlet would have failed the first time it was switched to.
+
.. versionchanged:: 1.1b1
If *function* is not callable, immediately raise a :exc:`TypeError`
instead of spawning a greenlet that will raise an uncaught TypeError.
@@ -117,24 +87,35 @@ def spawn_raw(function, *args, **kwargs):
Accept keyword arguments for ``function`` as previously (incorrectly)
documented. Note that this may incur an additional expense.
- .. versionchanged:: 1.1a3
- Verify that ``function`` is callable, raising a TypeError if not. Previously,
- the spawned greenlet would have failed the first time it was switched to.
+ .. versionchanged:: 1.3a2
+ Populate the ``spawning_greenlet`` and ``spawn_tree_locals``
+ attributes of the returned greenlet.
+
+ .. versionchanged:: 1.3b1
+ *Only* populate ``spawning_greenlet`` and ``spawn_tree_locals``
+ if ``GEVENT_TRACK_GREENLET_TREE`` is enabled (the default). If not enabled,
+ those attributes will not be set.
+
"""
if not callable(function):
raise TypeError("function must be callable")
- hub = get_hub()
+
+ # The hub is always the parent.
+ hub = _get_hub_noargs()
+
+ factory = TrackedRawGreenlet if GEVENT_CONFIG.track_greenlet_tree else RawGreenlet
# The callback class object that we use to run this doesn't
# accept kwargs (and those objects are heavily used, as well as being
# implemented twice in core.ppyx and corecffi.py) so do it with a partial
if kwargs:
function = _functools_partial(function, *args, **kwargs)
- g = RawGreenlet(function, hub)
+ g = factory(function, hub)
hub.loop.run_callback(g.switch)
else:
- g = RawGreenlet(function, hub)
+ g = factory(function, hub)
hub.loop.run_callback(g.switch, *args)
+
return g
@@ -157,16 +138,25 @@ def sleep(seconds=0, ref=True):
If *ref* is False, the greenlet running ``sleep()`` will not prevent :func:`gevent.wait`
from exiting.
+ .. versionchanged:: 1.3a1
+ Sleeping with a value of 0 will now be bounded to approximately block the
+ loop for no longer than :func:`gevent.getswitchinterval`.
+
.. seealso:: :func:`idle`
"""
- hub = get_hub()
+ hub = _get_hub_noargs()
loop = hub.loop
if seconds <= 0:
- waiter = Waiter()
- loop.run_callback(waiter.switch)
+ waiter = Waiter(hub)
+ loop.run_callback(waiter.switch, None)
waiter.get()
else:
- hub.wait(loop.timer(seconds, ref=ref))
+ with loop.timer(seconds, ref=ref) as t:
+ # Sleeping is expected to be an "absolute" measure with
+ # respect to time.time(), not a relative measure, so it's
+ # important to update the loop's notion of now before we start
+ loop.update_now()
+ hub.wait(t)
def idle(priority=0):
@@ -180,7 +170,7 @@ def idle(priority=0):
.. seealso:: :func:`sleep`
"""
- hub = get_hub()
+ hub = _get_hub_noargs()
watcher = hub.loop.idle()
if priority:
watcher.priority = priority
@@ -214,7 +204,7 @@ def kill(greenlet, exception=GreenletExit):
# after it's been killed
greenlet.kill(exception=exception, block=False)
else:
- get_hub().loop.run_callback(greenlet.throw, exception)
+ _get_hub_noargs().loop.run_callback(greenlet.throw, exception)
class signal(object):
@@ -245,7 +235,7 @@ class signal(object):
if not callable(handler):
raise TypeError("signal handler must be callable.")
- self.hub = get_hub()
+ self.hub = _get_hub_noargs()
self.watcher = self.hub.loop.signal(signalnum, ref=False)
self.watcher.start(self._start)
self.handler = handler
@@ -281,8 +271,10 @@ class signal(object):
self.hub.handle_error(None, *sys.exc_info())
-def reinit():
+def reinit(hub=None):
"""
+ reinit() -> None
+
Prepare the gevent hub to run in a new (forked) process.
This should be called *immediately* after :func:`os.fork` in the
@@ -304,159 +296,64 @@ def reinit():
if the fork process can be more smoothly managed.
.. warning:: See remarks in :func:`gevent.os.fork` about greenlets
- and libev watchers in the child process.
+ and event loop watchers in the child process.
"""
+ # Note the signature line in the docstring: hub is not a public param.
+
# The loop reinit function in turn calls libev's ev_loop_fork
# function.
- hub = _get_hub()
-
- if hub is not None:
- # Note that we reinit the existing loop, not destroy it.
- # See https://github.com/gevent/gevent/issues/200.
- hub.loop.reinit()
- # libev's fork watchers are slow to fire because the only fire
- # at the beginning of a loop; due to our use of callbacks that
- # run at the end of the loop, that may be too late. The
- # threadpool and resolvers depend on the fork handlers being
- # run (specifically, the threadpool will fail in the forked
- # child if there were any threads in it, which there will be
- # if the resolver_thread was in use (the default) before the
- # fork.)
- #
- # If the forked process wants to use the threadpool or
- # resolver immediately (in a queued callback), it would hang.
- #
- # The below is a workaround. Fortunately, both of these
- # methods are idempotent and can be called multiple times
- # following a fork if the suddenly started working, or were
- # already working on some platforms. Other threadpools and fork handlers
- # will be called at an arbitrary time later ('soon')
- if hasattr(hub.threadpool, '_on_fork'):
- hub.threadpool._on_fork()
- # resolver_ares also has a fork watcher that's not firing
- if hasattr(hub.resolver, '_on_fork'):
- hub.resolver._on_fork()
-
- # TODO: We'd like to sleep for a non-zero amount of time to force the loop to make a
- # pass around before returning to this greenlet. That will allow any
- # user-provided fork watchers to run. (Two calls are necessary.) HOWEVER, if
- # we do this, certain tests that heavily mix threads and forking,
- # like 2.7/test_threading:test_reinit_tls_after_fork, fail. It's not immediately clear
- # why.
- #sleep(0.00001)
- #sleep(0.00001)
-
-
-def get_hub_class():
- """Return the type of hub to use for the current thread.
-
- If there's no type of hub for the current thread yet, 'gevent.hub.Hub' is used.
- """
- hubtype = _threadlocal.Hub
- if hubtype is None:
- hubtype = _threadlocal.Hub = Hub
- return hubtype
-
-
-def get_hub(*args, **kwargs):
- """
- Return the hub for the current thread.
-
- If a hub does not exist in the current thread, a new one is
- created of the type returned by :func:`get_hub_class`.
- """
- hub = _threadlocal.hub
+ hub = _get_hub() if hub is None else hub
if hub is None:
- hubtype = get_hub_class()
- hub = _threadlocal.hub = hubtype(*args, **kwargs)
- return hub
-
-
-def _get_hub():
- """Return the hub for the current thread.
+ return
- Return ``None`` if no hub has been created yet.
+ # Note that we reinit the existing loop, not destroy it.
+ # See https://github.com/gevent/gevent/issues/200.
+ hub.loop.reinit()
+ # libev's fork watchers are slow to fire because the only fire
+ # at the beginning of a loop; due to our use of callbacks that
+ # run at the end of the loop, that may be too late. The
+ # threadpool and resolvers depend on the fork handlers being
+ # run (specifically, the threadpool will fail in the forked
+ # child if there were any threads in it, which there will be
+ # if the resolver_thread was in use (the default) before the
+ # fork.)
+ #
+ # If the forked process wants to use the threadpool or
+ # resolver immediately (in a queued callback), it would hang.
+ #
+ # The below is a workaround. Fortunately, all of these
+ # methods are idempotent and can be called multiple times
+ # following a fork if the suddenly started working, or were
+ # already working on some platforms. Other threadpools and fork handlers
+ # will be called at an arbitrary time later ('soon')
+ for obj in (hub._threadpool, hub._resolver, hub.periodic_monitoring_thread):
+ getattr(obj, '_on_fork', lambda: None)()
+
+ # TODO: We'd like to sleep for a non-zero amount of time to force the loop to make a
+ # pass around before returning to this greenlet. That will allow any
+ # user-provided fork watchers to run. (Two calls are necessary.) HOWEVER, if
+ # we do this, certain tests that heavily mix threads and forking,
+ # like 2.7/test_threading:test_reinit_tls_after_fork, fail. It's not immediately clear
+ # why.
+ #sleep(0.00001)
+ #sleep(0.00001)
+
+
+hub_ident_registry = IdentRegistry()
+
+class Hub(WaitOperationsGreenlet):
"""
- return _threadlocal.hub
-
-
-def set_hub(hub):
- _threadlocal.hub = hub
-
-
-def _import(path):
- # pylint:disable=too-many-branches
- if isinstance(path, list):
- if not path:
- raise ImportError('Cannot import from empty list: %r' % (path, ))
-
- for item in path[:-1]:
- try:
- return _import(item)
- except ImportError:
- pass
-
- return _import(path[-1])
-
- if not isinstance(path, string_types):
- return path
-
- if '.' not in path:
- raise ImportError("Cannot import %r (required format: [path/][package.]module.class)" % path)
-
- if '/' in path:
- package_path, path = path.rsplit('/', 1)
- sys.path = [package_path] + sys.path
- else:
- package_path = None
-
- try:
- module, item = path.rsplit('.', 1)
- x = __import__(module)
- for attr in path.split('.')[1:]:
- oldx = x
- x = getattr(x, attr, _NONE)
- if x is _NONE:
- raise ImportError('Cannot import %r from %r' % (attr, oldx))
- return x
- finally:
- try:
- sys.path.remove(package_path)
- except ValueError:
- pass
-
-
-def config(default, envvar):
- result = os.environ.get(envvar) or default # absolute import gets confused pylint: disable=no-member
- if isinstance(result, string_types):
- return result.split(',')
- return result
-
-
-def resolver_config(default, envvar):
- result = config(default, envvar)
- return [_resolvers.get(x, x) for x in result]
-
-
-_resolvers = {'ares': 'gevent.resolver_ares.Resolver',
- 'thread': 'gevent.resolver_thread.Resolver',
- 'block': 'gevent.socket.BlockingResolver'}
-
-
-_DEFAULT_LOOP_CLASS = 'gevent.core.loop'
-
-
-class Hub(RawGreenlet):
- """A greenlet that runs the event loop.
+ A greenlet that runs the event loop.
It is created automatically by :func:`get_hub`.
- **Switching**
+ .. rubric:: Switching
- Every time this greenlet (i.e., the event loop) is switched *to*, if
- the current greenlet has a ``switch_out`` method, it will be called. This
- allows a greenlet to take some cleanup actions before yielding control. This method
- should not call any gevent blocking functions.
+ Every time this greenlet (i.e., the event loop) is switched *to*,
+ if the current greenlet has a ``switch_out`` method, it will be
+ called. This allows a greenlet to take some cleanup actions before
+ yielding control. This method should not call any gevent blocking
+ functions.
"""
#: If instances of these classes are raised into the event loop,
@@ -468,57 +365,71 @@ class Hub(RawGreenlet):
#: do not get logged/printed when raised by the event loop.
NOT_ERROR = (GreenletExit, SystemExit)
- loop_class = config(_DEFAULT_LOOP_CLASS, 'GEVENT_LOOP')
- # For the standard class, go ahead and import it when this class
- # is defined. This is no loss of generality because the envvar is
- # only read when this class is defined, and we know that the
- # standard class will be available. This can solve problems with
- # the class being imported from multiple threads at once, leading
- # to one of the imports failing. Only do this for the object we
- # need in the constructor, as the rest of the factories are
- # themselves handled lazily. See #687. (People using a custom loop_class
- # can probably manage to get_hub() from the main thread or otherwise import
- # that loop_class themselves.)
- if loop_class == [_DEFAULT_LOOP_CLASS]:
- loop_class = [_import(loop_class)]
-
- resolver_class = ['gevent.resolver_thread.Resolver',
- 'gevent.resolver_ares.Resolver',
- 'gevent.socket.BlockingResolver']
- #: The class or callable object, or the name of a factory function or class,
- #: that will be used to create :attr:`resolver`. By default, configured according to
- #: :doc:`dns`. If a list, a list of objects in preference order.
- resolver_class = resolver_config(resolver_class, 'GEVENT_RESOLVER')
- threadpool_class = config('gevent.threadpool.ThreadPool', 'GEVENT_THREADPOOL')
- backend = config(None, 'GEVENT_BACKEND')
+ #: The size we use for our threadpool. Either use a subclass
+ #: for this, or change it immediately after creating the hub.
threadpool_size = 10
- # using pprint.pformat can override custom __repr__ methods on dict/list
- # subclasses, which can be a security concern
- format_context = 'pprint.saferepr'
+ # An instance of PeriodicMonitoringThread, if started.
+ periodic_monitoring_thread = None
+
+ # The ident of the thread we were created in, which should be the
+ # thread that we run in.
+ thread_ident = None
+ #: A string giving the name of this hub. Useful for associating hubs
+ #: with particular threads. Printed as part of the default repr.
+ #:
+ #: .. versionadded:: 1.3b1
+ name = ''
+
+ # NOTE: We cannot define a class-level 'loop' attribute
+ # because that conflicts with the slot we inherit from the
+ # Cythonized-bases.
def __init__(self, loop=None, default=None):
- RawGreenlet.__init__(self)
+ WaitOperationsGreenlet.__init__(self, None, None)
+ self.thread_ident = get_thread_ident()
if hasattr(loop, 'run'):
if default is not None:
raise TypeError("Unexpected argument: default")
self.loop = loop
- elif _threadlocal.loop is not None:
+ elif get_loop() is not None:
# Reuse a loop instance previously set by
# destroying a hub without destroying the associated
# loop. See #237 and #238.
- self.loop = _threadlocal.loop
+ self.loop = get_loop()
else:
- if default is None and get_ident() != MAIN_THREAD:
+ if default is None and self.thread_ident != MAIN_THREAD_IDENT:
default = False
- loop_class = _import(self.loop_class)
+
if loop is None:
loop = self.backend
- self.loop = loop_class(flags=loop, default=default)
+ self.loop = self.loop_class(flags=loop, default=default) # pylint:disable=not-callable
self._resolver = None
self._threadpool = None
- self.format_context = _import(self.format_context)
+ self.format_context = GEVENT_CONFIG.format_context
+ self.minimal_ident = hub_ident_registry.get_ident(self)
+
+ @Lazy
+ def ident_registry(self):
+ return IdentRegistry()
+
+ @property
+ def loop_class(self):
+ return GEVENT_CONFIG.loop
+
+ @property
+ def backend(self):
+ return GEVENT_CONFIG.libev_backend
+
+ @property
+ def main_hub(self):
+ """
+ Is this the hub for the main thread?
+
+ .. versionadded:: 1.3b1
+ """
+ return self.thread_ident == MAIN_THREAD_IDENT
def __repr__(self):
if self.loop is None:
@@ -528,11 +439,16 @@ class Hub(RawGreenlet):
info = self.loop._format()
except Exception as ex: # pylint:disable=broad-except
info = str(ex) or repr(ex) or 'error'
- result = '<%s at 0x%x %s' % (self.__class__.__name__, id(self), info)
+ result = '<%s %r at 0x%x %s' % (
+ self.__class__.__name__,
+ self.name,
+ id(self),
+ info)
if self._resolver is not None:
result += ' resolver=%r' % self._resolver
if self._threadpool is not None:
result += ' threadpool=%r' % self._threadpool
+ result += ' thread_ident=%s' % (hex(self.thread_ident), )
return result + '>'
def handle_error(self, context, type, value, tb):
@@ -560,6 +476,13 @@ class Hub(RawGreenlet):
self.handle_system_error(type, value)
def handle_system_error(self, type, value):
+ """
+ Called from `handle_error` when the exception type is determined
+ to be a :attr:`system error <SYSTEM_ERROR>`.
+
+ System errors cause the exception to be raised in the main
+ greenlet (the parent of this hub).
+ """
current = getcurrent()
if current is self or current is self.parent or self.loop is None:
self.parent.throw(type, value)
@@ -606,8 +529,7 @@ class Hub(RawGreenlet):
del tb
try:
- import time
- errstream.write(time.ctime())
+ errstream.write(gmctime())
errstream.write(' ' if context is not None else '\n')
except: # pylint:disable=bare-except
# Possible not safe to import under certain
@@ -623,76 +545,54 @@ class Hub(RawGreenlet):
context = repr(context)
errstream.write('%s failed with %s\n\n' % (context, getattr(type, '__name__', 'exception'), ))
- def switch(self):
- switch_out = getattr(getcurrent(), 'switch_out', None)
- if switch_out is not None:
- switch_out()
- return RawGreenlet.switch(self)
-
- def switch_out(self):
- raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback')
-
- def wait(self, watcher):
- """
- Wait until the *watcher* (which should not be started) is ready.
-
- The current greenlet will be unscheduled during this time.
-
- .. seealso:: :class:`gevent.core.io`, :class:`gevent.core.timer`,
- :class:`gevent.core.signal`, :class:`gevent.core.idle`, :class:`gevent.core.prepare`,
- :class:`gevent.core.check`, :class:`gevent.core.fork`, :class:`gevent.core.async`,
- :class:`gevent.core.child`, :class:`gevent.core.stat`
-
- """
- waiter = Waiter()
- unique = object()
- watcher.start(waiter.switch, unique)
- try:
- result = waiter.get()
- if result is not unique:
- raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique))
- finally:
- watcher.stop()
-
- def cancel_wait(self, watcher, error):
- """
- Cancel an in-progress call to :meth:`wait` by throwing the given *error*
- in the waiting greenlet.
- """
- if watcher.callback is not None:
- self.loop.run_callback(self._cancel_wait, watcher, error)
-
- def _cancel_wait(self, watcher, error):
- if watcher.active:
- switch = watcher.callback
- if switch is not None:
- greenlet = getattr(switch, '__self__', None)
- if greenlet is not None:
- greenlet.throw(error)
def run(self):
"""
Entry-point to running the loop. This method is called automatically
when the hub greenlet is scheduled; do not call it directly.
- :raises LoopExit: If the loop finishes running. This means
+ :raises gevent.exceptions.LoopExit: If the loop finishes running. This means
that there are no other scheduled greenlets, and no active
watchers or servers. In some situations, this indicates a
programming error.
"""
assert self is getcurrent(), 'Do not call Hub.run() directly'
- while True:
+ self.start_periodic_monitoring_thread()
+ while 1:
loop = self.loop
loop.error_handler = self
try:
loop.run()
finally:
loop.error_handler = None # break the refcount cycle
- self.parent.throw(LoopExit('This operation would block forever', self))
+ debug = []
+ if hasattr(loop, 'debug'):
+ debug = loop.debug()
+ self.parent.throw(LoopExit('This operation would block forever', self, debug))
# this function must never return, as it will cause switch() in the parent greenlet
# to return an unexpected value
# It is still possible to kill this greenlet with throw. However, in that case
- # switching to it is no longer safe, as switch will return immediatelly
+ # switching to it is no longer safe, as switch will return immediately
+
+ def start_periodic_monitoring_thread(self):
+ if self.periodic_monitoring_thread is None and GEVENT_CONFIG.monitor_thread:
+ # Note that it is possible for one real thread to
+ # (temporarily) wind up with multiple monitoring threads,
+ # if hubs are started and stopped within the thread. This shows up
+ # in the threadpool tests. The monitoring threads will eventually notice their
+ # hub object is gone.
+ from gevent._monitor import PeriodicMonitoringThread
+ from gevent.events import PeriodicMonitorThreadStartedEvent
+ from gevent.events import notify_and_call_entry_points
+ self.periodic_monitoring_thread = PeriodicMonitoringThread(self)
+
+ if self.main_hub:
+ self.periodic_monitoring_thread.install_monitor_memory_usage()
+
+ notify_and_call_entry_points(PeriodicMonitorThreadStartedEvent(
+ self.periodic_monitoring_thread))
+
+ return self.periodic_monitoring_thread
def join(self, timeout=None):
"""Wait for the event loop to finish. Exits only when there are
@@ -707,11 +607,11 @@ class Hub(RawGreenlet):
if self.dead:
return True
- waiter = Waiter()
+ waiter = Waiter(self)
if timeout is not None:
timeout = self.loop.timer(timeout, ref=False)
- timeout.start(waiter.switch)
+ timeout.start(waiter.switch, None)
try:
try:
@@ -721,9 +621,19 @@ class Hub(RawGreenlet):
finally:
if timeout is not None:
timeout.stop()
+ timeout.close()
return False
def destroy(self, destroy_loop=None):
+ """
+ Destroy this hub and clean up its resources.
+
+ If you manually create hubs, you *should* call this
+ method before disposing of the hub object reference.
+ """
+ if self.periodic_monitoring_thread is not None:
+ self.periodic_monitoring_thread.kill()
+ self.periodic_monitoring_thread = None
if self._resolver is not None:
self._resolver.close()
del self._resolver
@@ -733,308 +643,79 @@ class Hub(RawGreenlet):
if destroy_loop is None:
destroy_loop = not self.loop.default
if destroy_loop:
- if _threadlocal.loop is self.loop:
+ if get_loop() is self.loop:
# Don't let anyone try to reuse this
- _threadlocal.loop = None
+ set_loop(None)
self.loop.destroy()
else:
# Store in case another hub is created for this
# thread.
- _threadlocal.loop = self.loop
+ set_loop(self.loop)
+
self.loop = None
- if _threadlocal.hub is self:
- _threadlocal.hub = None
+ if _get_hub() is self:
+ set_hub(None)
+
+
+ # XXX: We can probably simplify the resolver and threadpool properties.
+
+ @property
+ def resolver_class(self):
+ return GEVENT_CONFIG.resolver
def _get_resolver(self):
if self._resolver is None:
- if self.resolver_class is not None:
- self.resolver_class = _import(self.resolver_class)
- self._resolver = self.resolver_class(hub=self)
+ self._resolver = self.resolver_class(hub=self) # pylint:disable=not-callable
return self._resolver
def _set_resolver(self, value):
self._resolver = value
def _del_resolver(self):
- del self._resolver
+ self._resolver = None
+
+ resolver = property(_get_resolver, _set_resolver, _del_resolver,
+ """
+ The DNS resolver that the socket functions will use.
+
+ .. seealso:: :doc:`/dns`
+ """)
- resolver = property(_get_resolver, _set_resolver, _del_resolver)
+
+ @property
+ def threadpool_class(self):
+ return GEVENT_CONFIG.threadpool
def _get_threadpool(self):
if self._threadpool is None:
- if self.threadpool_class is not None:
- self.threadpool_class = _import(self.threadpool_class)
- self._threadpool = self.threadpool_class(self.threadpool_size, hub=self)
+ # pylint:disable=not-callable
+ self._threadpool = self.threadpool_class(self.threadpool_size, hub=self)
return self._threadpool
def _set_threadpool(self, value):
self._threadpool = value
def _del_threadpool(self):
- del self._threadpool
-
- threadpool = property(_get_threadpool, _set_threadpool, _del_threadpool)
-
-
-class Waiter(object):
- """
- A low level communication utility for greenlets.
-
- Waiter is a wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them somewhat safer:
-
- * switching will occur only if the waiting greenlet is executing :meth:`get` method currently;
- * any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
- * if :meth:`switch`/:meth:`throw` is called before the receiver calls :meth:`get`, then :class:`Waiter`
- will store the value/exception. The following :meth:`get` will return the value/raise the exception.
-
- The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
- The :meth:`get` method must be called from a greenlet other than :class:`Hub`.
-
- >>> result = Waiter()
- >>> timer = get_hub().loop.timer(0.1)
- >>> timer.start(result.switch, 'hello from Waiter')
- >>> result.get() # blocks for 0.1 seconds
- 'hello from Waiter'
-
- If switch is called before the greenlet gets a chance to call :meth:`get` then
- :class:`Waiter` stores the value.
-
- >>> result = Waiter()
- >>> timer = get_hub().loop.timer(0.1)
- >>> timer.start(result.switch, 'hi from Waiter')
- >>> sleep(0.2)
- >>> result.get() # returns immediatelly without blocking
- 'hi from Waiter'
-
- .. warning::
-
- This a limited and dangerous way to communicate between
- greenlets. It can easily leave a greenlet unscheduled forever
- if used incorrectly. Consider using safer classes such as
- :class:`gevent.event.Event`, :class:`gevent.event.AsyncResult`,
- or :class:`gevent.queue.Queue`.
- """
-
- __slots__ = ['hub', 'greenlet', 'value', '_exception']
-
- def __init__(self, hub=None):
- if hub is None:
- self.hub = get_hub()
- else:
- self.hub = hub
- self.greenlet = None
- self.value = None
- self._exception = _NONE
-
- def clear(self):
- self.greenlet = None
- self.value = None
- self._exception = _NONE
-
- def __str__(self):
- if self._exception is _NONE:
- return '<%s greenlet=%s>' % (type(self).__name__, self.greenlet)
- if self._exception is None:
- return '<%s greenlet=%s value=%r>' % (type(self).__name__, self.greenlet, self.value)
- return '<%s greenlet=%s exc_info=%r>' % (type(self).__name__, self.greenlet, self.exc_info)
-
- def ready(self):
- """Return true if and only if it holds a value or an exception"""
- return self._exception is not _NONE
-
- def successful(self):
- """Return true if and only if it is ready and holds a value"""
- return self._exception is None
-
- @property
- def exc_info(self):
- "Holds the exception info passed to :meth:`throw` if :meth:`throw` was called. Otherwise ``None``."
- if self._exception is not _NONE:
- return self._exception
-
- def switch(self, value=None):
- """Switch to the greenlet if one's available. Otherwise store the value."""
- greenlet = self.greenlet
- if greenlet is None:
- self.value = value
- self._exception = None
- else:
- assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
- switch = greenlet.switch
- try:
- switch(value)
- except: # pylint:disable=bare-except
- self.hub.handle_error(switch, *sys.exc_info())
-
- def switch_args(self, *args):
- return self.switch(args)
-
- def throw(self, *throw_args):
- """Switch to the greenlet with the exception. If there's no greenlet, store the exception."""
- greenlet = self.greenlet
- if greenlet is None:
- self._exception = throw_args
- else:
- assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
- throw = greenlet.throw
- try:
- throw(*throw_args)
- except: # pylint:disable=bare-except
- self.hub.handle_error(throw, *sys.exc_info())
-
- def get(self):
- """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
- if self._exception is not _NONE:
- if self._exception is None:
- return self.value
- else:
- getcurrent().throw(*self._exception)
- else:
- if self.greenlet is not None:
- raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
- self.greenlet = getcurrent()
- try:
- return self.hub.switch()
- finally:
- self.greenlet = None
-
- def __call__(self, source):
- if source.exception is None:
- self.switch(source.value)
- else:
- self.throw(source.exception)
-
- # can also have a debugging version, that wraps the value in a tuple (self, value) in switch()
- # and unwraps it in wait() thus checking that switch() was indeed called
-
-
-class _MultipleWaiter(Waiter):
- """
- An internal extension of Waiter that can be used if multiple objects
- must be waited on, and there is a chance that in between waits greenlets
- might be switched out. All greenlets that switch to this waiter
- will have their value returned.
-
- This does not handle exceptions or throw methods.
- """
- __slots__ = ['_values']
-
- def __init__(self, *args, **kwargs):
- Waiter.__init__(self, *args, **kwargs)
- # we typically expect a relatively small number of these to be outstanding.
- # since we pop from the left, a deque might be slightly
- # more efficient, but since we're in the hub we avoid imports if
- # we can help it to better support monkey-patching, and delaying the import
- # here can be impractical (see https://github.com/gevent/gevent/issues/652)
- self._values = list()
-
- def switch(self, value): # pylint:disable=signature-differs
- self._values.append(value)
- Waiter.switch(self, True)
-
- def get(self):
- if not self._values:
- Waiter.get(self)
- Waiter.clear(self)
-
- return self._values.pop(0)
-
-
-def iwait(objects, timeout=None, count=None):
- """
- Iteratively yield *objects* as they are ready, until all (or *count*) are ready
- or *timeout* expired.
-
- :param objects: A sequence (supporting :func:`len`) containing objects
- implementing the wait protocol (rawlink() and unlink()).
- :keyword int count: If not `None`, then a number specifying the maximum number
- of objects to wait for. If ``None`` (the default), all objects
- are waited for.
- :keyword float timeout: If given, specifies a maximum number of seconds
- to wait. If the timeout expires before the desired waited-for objects
- are available, then this method returns immediately.
-
- .. seealso:: :func:`wait`
-
- .. versionchanged:: 1.1a1
- Add the *count* parameter.
- .. versionchanged:: 1.1a2
- No longer raise :exc:`LoopExit` if our caller switches greenlets
- in between items yielded by this function.
- """
- # QQQ would be nice to support iterable here that can be generated slowly (why?)
- if objects is None:
- yield get_hub().join(timeout=timeout)
- return
-
- count = len(objects) if count is None else min(count, len(objects))
- waiter = _MultipleWaiter()
- switch = waiter.switch
-
- if timeout is not None:
- timer = get_hub().loop.timer(timeout, priority=-1)
- timer.start(switch, _NONE)
-
- try:
- for obj in objects:
- obj.rawlink(switch)
-
- for _ in xrange(count):
- item = waiter.get()
- waiter.clear()
- if item is _NONE:
- return
- yield item
- finally:
- if timeout is not None:
- timer.stop()
- for aobj in objects:
- unlink = getattr(aobj, 'unlink', None)
- if unlink:
- try:
- unlink(switch)
- except: # pylint:disable=bare-except
- traceback.print_exc()
-
-
-def wait(objects=None, timeout=None, count=None):
- """
- Wait for ``objects`` to become ready or for event loop to finish.
-
- If ``objects`` is provided, it must be a list containing objects
- implementing the wait protocol (rawlink() and unlink() methods):
-
- - :class:`gevent.Greenlet` instance
- - :class:`gevent.event.Event` instance
- - :class:`gevent.lock.Semaphore` instance
- - :class:`gevent.subprocess.Popen` instance
-
- If ``objects`` is ``None`` (the default), ``wait()`` blocks until
- the current event loop has nothing to do (or until ``timeout`` passes):
+ self._threadpool = None
- - all greenlets have finished
- - all servers were stopped
- - all event loop watchers were stopped.
+ threadpool = property(_get_threadpool, _set_threadpool, _del_threadpool,
+ """
+ The threadpool associated with this hub.
- If ``count`` is ``None`` (the default), wait for all ``objects``
- to become ready.
+ Usually this is a
+ :class:`gevent.threadpool.ThreadPool`, but
+ you :attr:`can customize that
+ <gevent._config.Config.threadpool>`.
- If ``count`` is a number, wait for (up to) ``count`` objects to become
- ready. (For example, if count is ``1`` then the function exits
- when any object in the list is ready).
+ Use this object to schedule blocking
+ (non-cooperative) operations in a different
+ thread to prevent them from halting the event loop.
+ """)
- If ``timeout`` is provided, it specifies the maximum number of
- seconds ``wait()`` will block.
- Returns the list of ready objects, in the order in which they were
- ready.
+set_default_hub_class(Hub)
- .. seealso:: :func:`iwait`
- """
- if objects is None:
- return get_hub().join(timeout=timeout)
- return list(iwait(objects, timeout, count))
class linkproxy(object):