aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/_ffi
diff options
context:
space:
mode:
authorJames Taylor <user234683@users.noreply.github.com>2018-09-14 19:32:27 -0700
committerJames Taylor <user234683@users.noreply.github.com>2018-09-14 19:32:27 -0700
commit4212164e91ba2f49583cf44ad623a29b36db8f77 (patch)
tree47aefe3c0162f03e0c823b43873356f69c1cd636 /python/gevent/_ffi
parent6ca20ff7010f2bafc7fefcb8cad982be27a8aeae (diff)
downloadyt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.tar.lz
yt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.tar.xz
yt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.zip
Windows: Use 32-bit distribution of python
Diffstat (limited to 'python/gevent/_ffi')
-rw-r--r--python/gevent/_ffi/__init__.py27
-rw-r--r--python/gevent/_ffi/callback.py58
-rw-r--r--python/gevent/_ffi/loop.py709
-rw-r--r--python/gevent/_ffi/watcher.py641
4 files changed, 1435 insertions, 0 deletions
diff --git a/python/gevent/_ffi/__init__.py b/python/gevent/_ffi/__init__.py
new file mode 100644
index 0000000..56f1e96
--- /dev/null
+++ b/python/gevent/_ffi/__init__.py
@@ -0,0 +1,27 @@
+"""
+Internal helpers for FFI implementations.
+"""
+from __future__ import print_function, absolute_import
+
+import os
+import sys
+
+def _dbg(*args, **kwargs):
+ # pylint:disable=unused-argument
+ pass
+
+#_dbg = print
+
+def _pid_dbg(*args, **kwargs):
+ kwargs['file'] = sys.stderr
+ print(os.getpid(), *args, **kwargs)
+
+CRITICAL = 1
+ERROR = 3
+DEBUG = 5
+TRACE = 9
+
+GEVENT_DEBUG_LEVEL = vars()[os.getenv("GEVENT_DEBUG", 'CRITICAL').upper()]
+
+if GEVENT_DEBUG_LEVEL >= TRACE:
+ _dbg = _pid_dbg
diff --git a/python/gevent/_ffi/callback.py b/python/gevent/_ffi/callback.py
new file mode 100644
index 0000000..df59a9f
--- /dev/null
+++ b/python/gevent/_ffi/callback.py
@@ -0,0 +1,58 @@
+from __future__ import absolute_import, print_function
+
+__all__ = [
+ 'callback',
+]
+
+
+# For times when *args is captured but often not passed (empty),
+# we can avoid keeping the new tuple that was created for *args
+# around by using a constant.
+_NOARGS = ()
+
+
+class callback(object):
+
+ __slots__ = ('callback', 'args')
+
+ def __init__(self, cb, args):
+ self.callback = cb
+ self.args = args or _NOARGS
+
+ def stop(self):
+ self.callback = None
+ self.args = None
+
+ close = stop
+
+ # Note that __nonzero__ and pending are different
+ # bool() is used in contexts where we need to know whether to schedule another callback,
+ # so it's true if it's pending or currently running
+ # 'pending' has the same meaning as libev watchers: it is cleared before actually
+ # running the callback
+
+ def __nonzero__(self):
+ # it's nonzero if it's pending or currently executing
+ # NOTE: This depends on loop._run_callbacks setting the args property
+ # to None.
+ return self.args is not None
+ __bool__ = __nonzero__
+
+ @property
+ def pending(self):
+ return self.callback is not None
+
+ def _format(self):
+ return ''
+
+ def __repr__(self):
+ result = "<%s at 0x%x" % (self.__class__.__name__, id(self))
+ if self.pending:
+ result += " pending"
+ if self.callback is not None:
+ result += " callback=%r" % (self.callback, )
+ if self.args is not None:
+ result += " args=%r" % (self.args, )
+ if self.callback is None and self.args is None:
+ result += " stopped"
+ return result + ">"
diff --git a/python/gevent/_ffi/loop.py b/python/gevent/_ffi/loop.py
new file mode 100644
index 0000000..efe4462
--- /dev/null
+++ b/python/gevent/_ffi/loop.py
@@ -0,0 +1,709 @@
+"""
+Basic loop implementation for ffi-based cores.
+"""
+# pylint: disable=too-many-lines, protected-access, redefined-outer-name, not-callable
+from __future__ import absolute_import, print_function
+
+from collections import deque
+import sys
+import os
+import traceback
+
+from gevent._ffi import _dbg
+from gevent._ffi import GEVENT_DEBUG_LEVEL
+from gevent._ffi import TRACE
+from gevent._ffi.callback import callback
+from gevent._compat import PYPY
+
+from gevent import getswitchinterval
+
+__all__ = [
+ 'AbstractLoop',
+ 'assign_standard_callbacks',
+]
+
+
+class _EVENTSType(object):
+ def __repr__(self):
+ return 'gevent.core.EVENTS'
+
+EVENTS = GEVENT_CORE_EVENTS = _EVENTSType()
+
+
+#####
+## Note on CFFI objects, callbacks and the lifecycle of watcher objects
+#
+# Each subclass of `watcher` allocates a C structure of the
+# appropriate type e.g., struct gevent_ev_io and holds this pointer in
+# its `_gwatcher` attribute. When that watcher instance is garbage
+# collected, then the C structure is also freed. The C structure is
+# passed to libev from the watcher's start() method and then to the
+# appropriate C callback function, e.g., _gevent_ev_io_callback, which
+# passes it back to python's _python_callback where we need the
+# watcher instance. Therefore, as long as that callback is active (the
+# watcher is started), the watcher instance must not be allowed to get
+# GC'd---any access at the C level or even the FFI level to the freed
+# memory could crash the process.
+#
+# However, the typical idiom calls for writing something like this:
+# loop.io(fd, python_cb).start()
+# thus forgetting the newly created watcher subclass and allowing it to be immediately
+# GC'd. To combat this, when the watcher is started, it places itself into the loop's
+# `_keepaliveset`, and it only removes itself when the watcher's `stop()` method is called.
+# Often, this is the *only* reference keeping the watcher object, and hence its C structure,
+# alive.
+#
+# This is slightly complicated by the fact that the python-level
+# callback, called from the C callback, could choose to manually stop
+# the watcher. When we return to the C level callback, we now have an
+# invalid pointer, and attempting to pass it back to Python (e.g., to
+# handle an error) could crash. Hence, _python_callback,
+# _gevent_io_callback, and _python_handle_error cooperate to make sure
+# that the watcher instance stays in the loops `_keepaliveset` while
+# the C code could be running---and if it gets removed, to not call back
+# to Python again.
+# See also https://github.com/gevent/gevent/issues/676
+####
+class AbstractCallbacks(object):
+
+
+ def __init__(self, ffi):
+ self.ffi = ffi
+ self.callbacks = []
+ if GEVENT_DEBUG_LEVEL < TRACE:
+ self.from_handle = ffi.from_handle
+
+ def from_handle(self, handle): # pylint:disable=method-hidden
+ x = self.ffi.from_handle(handle)
+ return x
+
+ def python_callback(self, handle, revents):
+ """
+ Returns an integer having one of three values:
+
+ - -1
+ An exception occurred during the callback and you must call
+ :func:`_python_handle_error` to deal with it. The Python watcher
+ object will have the exception tuple saved in ``_exc_info``.
+ - 1
+ Everything went according to plan. You should check to see if the libev
+ watcher is still active, and call :func:`python_stop` if it is not. This will
+ clean up the memory. Finding the watcher still active at the event loop level,
+ but not having stopped itself at the gevent level is a buggy scenario and
+ shouldn't happen.
+ - 2
+ Everything went according to plan, but the watcher has already
+ been stopped. Its memory may no longer be valid.
+
+ This function should never return 0, as that's the default value that
+ Python exceptions will produce.
+ """
+ #print("Running callback", handle)
+ orig_ffi_watcher = None
+ try:
+ # Even dereferencing the handle needs to be inside the try/except;
+ # if we don't return normally (e.g., a signal) then we wind up going
+ # to the 'onerror' handler (unhandled_onerror), which
+ # is not what we want; that can permanently wedge the loop depending
+ # on which callback was executing.
+ # XXX: See comments in that function. We may be able to restart and do better?
+ if not handle:
+ # Hmm, a NULL handle. That's not supposed to happen.
+ # We can easily get into a loop if we deref it and allow that
+ # to raise.
+ _dbg("python_callback got null handle")
+ return 1
+ the_watcher = self.from_handle(handle)
+ orig_ffi_watcher = the_watcher._watcher
+ args = the_watcher.args
+ if args is None:
+ # Legacy behaviour from corecext: convert None into ()
+ # See test__core_watcher.py
+ args = _NOARGS
+ if args and args[0] == GEVENT_CORE_EVENTS:
+ args = (revents, ) + args[1:]
+ #print("Calling function", the_watcher.callback, args)
+ the_watcher.callback(*args)
+ except: # pylint:disable=bare-except
+ _dbg("Got exception servicing watcher with handle", handle, sys.exc_info())
+ # It's possible for ``the_watcher`` to be undefined (UnboundLocalError)
+ # if we threw an exception (signal) on the line that created that variable.
+ # This is typically the case with a signal under libuv
+ try:
+ the_watcher
+ except UnboundLocalError:
+ the_watcher = self.from_handle(handle)
+ the_watcher._exc_info = sys.exc_info()
+ # Depending on when the exception happened, the watcher
+ # may or may not have been stopped. We need to make sure its
+ # memory stays valid so we can stop it at the ev level if needed.
+ the_watcher.loop._keepaliveset.add(the_watcher)
+ return -1
+ else:
+ if (the_watcher.loop is not None
+ and the_watcher in the_watcher.loop._keepaliveset
+ and the_watcher._watcher is orig_ffi_watcher):
+ # It didn't stop itself, *and* it didn't stop itself, reset
+ # its watcher, and start itself again. libuv's io watchers MAY
+ # do that.
+ # The normal, expected scenario when we find the watcher still
+ # in the keepaliveset is that it is still active at the event loop
+ # level, so we don't expect that python_stop gets called.
+ #_dbg("The watcher has not stopped itself, possibly still active", the_watcher)
+ return 1
+ return 2 # it stopped itself
+
+ def python_handle_error(self, handle, _revents):
+ _dbg("Handling error for handle", handle)
+ if not handle:
+ return
+ try:
+ watcher = self.from_handle(handle)
+ exc_info = watcher._exc_info
+ del watcher._exc_info
+ # In the past, we passed the ``watcher`` itself as the context,
+ # which typically meant that the Hub would just print
+ # the exception. This is a problem because sometimes we can't
+ # detect signals until late in ``python_callback``; specifically,
+ # test_selectors.py:DefaultSelectorTest.test_select_interrupt_exc
+ # installs a SIGALRM handler that raises an exception. That exception can happen
+ # before we enter ``python_callback`` or at any point within it because of the way
+ # libuv swallows signals. By passing None, we get the exception prapagated into
+ # the main greenlet (which is probably *also* not what we always want, but
+ # I see no way to distinguish the cases).
+ watcher.loop.handle_error(None, *exc_info)
+ finally:
+ # XXX Since we're here on an error condition, and we
+ # made sure that the watcher object was put in loop._keepaliveset,
+ # what about not stopping the watcher? Looks like a possible
+ # memory leak?
+ # XXX: This used to do "if revents & (libev.EV_READ | libev.EV_WRITE)"
+ # before stopping. Why?
+ try:
+ watcher.stop()
+ except: # pylint:disable=bare-except
+ watcher.loop.handle_error(watcher, *sys.exc_info())
+ return # pylint:disable=lost-exception
+
+ def unhandled_onerror(self, t, v, tb):
+ # This is supposed to be called for signals, etc.
+ # This is the onerror= value for CFFI.
+ # If we return None, C will get a value of 0/NULL;
+ # if we raise, CFFI will print the exception and then
+ # return 0/NULL; (unless error= was configured)
+ # If things go as planned, we return the value that asks
+ # C to call back and check on if the watcher needs to be closed or
+ # not.
+
+ # XXX: TODO: Could this cause events to be lost? Maybe we need to return
+ # a value that causes the C loop to try the callback again?
+ # at least for signals under libuv, which are delivered at very odd times.
+ # Hopefully the event still shows up when we poll the next time.
+ watcher = None
+ handle = tb.tb_frame.f_locals['handle'] if tb is not None else None
+ if handle: # handle could be NULL
+ watcher = self.from_handle(handle)
+ if watcher is not None:
+ watcher.loop.handle_error(None, t, v, tb)
+ return 1
+
+ # Raising it causes a lot of noise from CFFI
+ print("WARNING: gevent: Unhandled error with no watcher",
+ file=sys.stderr)
+ traceback.print_exception(t, v, tb)
+
+ def python_stop(self, handle):
+ if not handle: # pragma: no cover
+ print(
+ "WARNING: gevent: Unable to dereference handle; not stopping watcher. "
+ "Native resources may leak. This is most likely a bug in gevent.",
+ file=sys.stderr)
+ # The alternative is to crash with no helpful information
+ # NOTE: Raising exceptions here does nothing, they're swallowed by CFFI.
+ # Since the C level passed in a null pointer, even dereferencing the handle
+ # will just produce some exceptions.
+ return
+ watcher = self.from_handle(handle)
+ watcher.stop()
+
+ if not PYPY:
+ def python_check_callback(self, watcher_ptr): # pylint:disable=unused-argument
+ # If we have the onerror callback, this is a no-op; all the real
+ # work to rethrow the exception is done by the onerror callback
+
+ # NOTE: Unlike the rest of the functions, this is called with a pointer
+ # to the C level structure, *not* a pointer to the void* that represents a
+ # <cdata> for the Python Watcher object.
+ pass
+ else: # PyPy
+ # On PyPy, we need the function to have some sort of body, otherwise
+ # the signal exceptions don't always get caught, *especially* with
+ # libuv (however, there's no reason to expect this to only be a libuv
+ # issue; it's just that we don't depend on the periodic signal timer
+ # under libev, so the issue is much more pronounced under libuv)
+ # test_socket's test_sendall_interrupted can hang.
+ # See https://github.com/gevent/gevent/issues/1112
+
+ def python_check_callback(self, watcher_ptr): # pylint:disable=unused-argument
+ # Things we've tried that *don't* work:
+ # greenlet.getcurrent()
+ # 1 + 1
+ try:
+ raise MemoryError()
+ except MemoryError:
+ pass
+
+ def python_prepare_callback(self, watcher_ptr):
+ loop = self._find_loop_from_c_watcher(watcher_ptr)
+ if loop is None: # pragma: no cover
+ print("WARNING: gevent: running prepare callbacks from a destroyed handle: ",
+ watcher_ptr)
+ return
+ loop._run_callbacks()
+
+ def check_callback_onerror(self, t, v, tb):
+ watcher_ptr = tb.tb_frame.f_locals['watcher_ptr'] if tb is not None else None
+ if watcher_ptr:
+ loop = self._find_loop_from_c_watcher(watcher_ptr)
+ if loop is not None:
+ # None as the context argument causes the exception to be raised
+ # in the main greenlet.
+ loop.handle_error(None, t, v, tb)
+ return None
+ raise v # Let CFFI print
+
+ def _find_loop_from_c_watcher(self, watcher_ptr):
+ raise NotImplementedError()
+
+
+
+def assign_standard_callbacks(ffi, lib, callbacks_class, extras=()): # pylint:disable=unused-argument
+ # callbacks keeps these cdata objects alive at the python level
+ callbacks = callbacks_class(ffi)
+ extras = tuple([(getattr(callbacks, name), error) for name, error in extras])
+ for (func, error_func) in ((callbacks.python_callback, None),
+ (callbacks.python_handle_error, None),
+ (callbacks.python_stop, None),
+ (callbacks.python_check_callback,
+ callbacks.check_callback_onerror),
+ (callbacks.python_prepare_callback,
+ callbacks.check_callback_onerror)) + extras:
+ # The name of the callback function matches the 'extern Python' declaration.
+ error_func = error_func or callbacks.unhandled_onerror
+ callback = ffi.def_extern(onerror=error_func)(func)
+ # keep alive the cdata
+ # (def_extern returns the original function, and it requests that
+ # the function be "global", so maybe it keeps a hard reference to it somewhere now
+ # unlike ffi.callback(), and we don't need to do this?)
+ callbacks.callbacks.append(callback)
+
+ # At this point, the library C variable (static function, actually)
+ # is filled in.
+
+ return callbacks
+
+
+if sys.version_info[0] >= 3:
+ basestring = (bytes, str)
+ integer_types = (int,)
+else:
+ import __builtin__ # pylint:disable=import-error
+ basestring = (__builtin__.basestring,)
+ integer_types = (int, __builtin__.long)
+
+
+
+
+_NOARGS = ()
+
+CALLBACK_CHECK_COUNT = 50
+
+class AbstractLoop(object):
+ # pylint:disable=too-many-public-methods,too-many-instance-attributes
+
+ error_handler = None
+
+ _CHECK_POINTER = None
+
+ _TIMER_POINTER = None
+ _TIMER_CALLBACK_SIG = None
+
+ _PREPARE_POINTER = None
+
+ starting_timer_may_update_loop_time = False
+
+ # Subclasses should set this in __init__ to reflect
+ # whether they were the default loop.
+ _default = None
+
+ def __init__(self, ffi, lib, watchers, flags=None, default=None):
+ self._ffi = ffi
+ self._lib = lib
+ self._ptr = None
+ self._handle_to_self = self._ffi.new_handle(self) # XXX: Reference cycle?
+ self._watchers = watchers
+ self._in_callback = False
+ self._callbacks = deque()
+ # Stores python watcher objects while they are started
+ self._keepaliveset = set()
+ self._init_loop_and_aux_watchers(flags, default)
+
+
+ def _init_loop_and_aux_watchers(self, flags=None, default=None):
+
+ self._ptr = self._init_loop(flags, default)
+
+
+ # self._check is a watcher that runs in each iteration of the
+ # mainloop, just after the blocking call. It's point is to handle
+ # signals. It doesn't run watchers or callbacks, it just exists to give
+ # CFFI a chance to raise signal exceptions so we can handle them.
+ self._check = self._ffi.new(self._CHECK_POINTER)
+ self._check.data = self._handle_to_self
+ self._init_and_start_check()
+
+ # self._prepare is a watcher that runs in each iteration of the mainloop,
+ # just before the blocking call. It's where we run deferred callbacks
+ # from self.run_callback. This cooperates with _setup_for_run_callback()
+ # to schedule self._timer0 if needed.
+ self._prepare = self._ffi.new(self._PREPARE_POINTER)
+ self._prepare.data = self._handle_to_self
+ self._init_and_start_prepare()
+
+ # A timer we start and stop on demand. If we have callbacks,
+ # too many to run in one iteration of _run_callbacks, we turn this
+ # on so as to have the next iteration of the run loop return to us
+ # as quickly as possible.
+ # TODO: There may be a more efficient way to do this using ev_timer_again;
+ # see the "ev_timer" section of the ev manpage (http://linux.die.net/man/3/ev)
+ # Alternatively, setting the ev maximum block time may also work.
+ self._timer0 = self._ffi.new(self._TIMER_POINTER)
+ self._timer0.data = self._handle_to_self
+ self._init_callback_timer()
+
+ # TODO: We may be able to do something nicer and use the existing python_callback
+ # combined with onerror and the class check/timer/prepare to simplify things
+ # and unify our handling
+
+ def _init_loop(self, flags, default):
+ """
+ Called by __init__ to create or find the loop. The return value
+ is assigned to self._ptr.
+ """
+ raise NotImplementedError()
+
+ def _init_and_start_check(self):
+ raise NotImplementedError()
+
+ def _init_and_start_prepare(self):
+ raise NotImplementedError()
+
+ def _init_callback_timer(self):
+ raise NotImplementedError()
+
+ def _stop_callback_timer(self):
+ raise NotImplementedError()
+
+ def _start_callback_timer(self):
+ raise NotImplementedError()
+
+ def _check_callback_handle_error(self, t, v, tb):
+ self.handle_error(None, t, v, tb)
+
+ def _run_callbacks(self): # pylint:disable=too-many-branches
+ # When we're running callbacks, its safe for timers to
+ # update the notion of the current time (because if we're here,
+ # we're not running in a timer callback that may let other timers
+ # run; this is mostly an issue for libuv).
+
+ # That's actually a bit of a lie: on libev, self._timer0 really is
+ # a timer, and so sometimes this is running in a timer callback, not
+ # a prepare callback. But that's OK, libev doesn't suffer from cascading
+ # timer expiration and its safe to update the loop time at any
+ # moment there.
+ self.starting_timer_may_update_loop_time = True
+ try:
+ count = CALLBACK_CHECK_COUNT
+ now = self.now()
+ expiration = now + getswitchinterval()
+ self._stop_callback_timer()
+ while self._callbacks:
+ cb = self._callbacks.popleft() # pylint:disable=assignment-from-no-return
+ count -= 1
+ self.unref() # XXX: libuv doesn't have a global ref count!
+ callback = cb.callback
+ cb.callback = None
+ args = cb.args
+ if callback is None or args is None:
+ # it's been stopped
+ continue
+
+ try:
+ callback(*args)
+ except: # pylint:disable=bare-except
+ # If we allow an exception to escape this method (while we are running the ev callback),
+ # then CFFI will print the error and libev will continue executing.
+ # There are two problems with this. The first is that the code after
+ # the loop won't run. The second is that any remaining callbacks scheduled
+ # for this loop iteration will be silently dropped; they won't run, but they'll
+ # also not be *stopped* (which is not a huge deal unless you're looking for
+ # consistency or checking the boolean/pending status; the loop doesn't keep
+ # a reference to them like it does to watchers...*UNLESS* the callback itself had
+ # a reference to a watcher; then I don't know what would happen, it depends on
+ # the state of the watcher---a leak or crash is not totally inconceivable).
+ # The Cython implementation in core.ppyx uses gevent_call from callbacks.c
+ # to run the callback, which uses gevent_handle_error to handle any errors the
+ # Python callback raises...it unconditionally simply prints any error raised
+ # by loop.handle_error and clears it, so callback handling continues.
+ # We take a similar approach (but are extra careful about printing)
+ try:
+ self.handle_error(cb, *sys.exc_info())
+ except: # pylint:disable=bare-except
+ try:
+ print("Exception while handling another error", file=sys.stderr)
+ traceback.print_exc()
+ except: # pylint:disable=bare-except
+ pass # Nothing we can do here
+ finally:
+ # NOTE: this must be reset here, because cb.args is used as a flag in
+ # the callback class so that bool(cb) of a callback that has been run
+ # becomes False
+ cb.args = None
+
+ # We've finished running one group of callbacks
+ # but we may have more, so before looping check our
+ # switch interval.
+ if count == 0 and self._callbacks:
+ count = CALLBACK_CHECK_COUNT
+ self.update_now()
+ if self.now() >= expiration:
+ now = 0
+ break
+
+ # Update the time before we start going again, if we didn't
+ # just do so.
+ if now != 0:
+ self.update_now()
+
+ if self._callbacks:
+ self._start_callback_timer()
+ finally:
+ self.starting_timer_may_update_loop_time = False
+
+ def _stop_aux_watchers(self):
+ raise NotImplementedError()
+
+ def destroy(self):
+ if self._ptr:
+ try:
+ if not self._can_destroy_loop(self._ptr):
+ return False
+ self._stop_aux_watchers()
+ self._destroy_loop(self._ptr)
+ finally:
+ # not ffi.NULL, we don't want something that can be
+ # passed to C and crash later. This will create nice friendly
+ # TypeError from CFFI.
+ self._ptr = None
+ del self._handle_to_self
+ del self._callbacks
+ del self._keepaliveset
+
+ return True
+
+ def _can_destroy_loop(self, ptr):
+ raise NotImplementedError()
+
+ def _destroy_loop(self, ptr):
+ raise NotImplementedError()
+
+ @property
+ def ptr(self):
+ return self._ptr
+
+ @property
+ def WatcherType(self):
+ return self._watchers.watcher
+
+ @property
+ def MAXPRI(self):
+ return 1
+
+ @property
+ def MINPRI(self):
+ return 1
+
+ def _handle_syserr(self, message, errno):
+ try:
+ errno = os.strerror(errno)
+ except: # pylint:disable=bare-except
+ traceback.print_exc()
+ try:
+ message = '%s: %s' % (message, errno)
+ except: # pylint:disable=bare-except
+ traceback.print_exc()
+ self.handle_error(None, SystemError, SystemError(message), None)
+
+ def handle_error(self, context, type, value, tb):
+ handle_error = None
+ error_handler = self.error_handler
+ if error_handler is not None:
+ # we do want to do getattr every time so that setting Hub.handle_error property just works
+ handle_error = getattr(error_handler, 'handle_error', error_handler)
+ handle_error(context, type, value, tb)
+ else:
+ self._default_handle_error(context, type, value, tb)
+
+ def _default_handle_error(self, context, type, value, tb): # pylint:disable=unused-argument
+ # note: Hub sets its own error handler so this is not used by gevent
+ # this is here to make core.loop usable without the rest of gevent
+ # Should cause the loop to stop running.
+ traceback.print_exception(type, value, tb)
+
+
+ def run(self, nowait=False, once=False):
+ raise NotImplementedError()
+
+ def reinit(self):
+ raise NotImplementedError()
+
+ def ref(self):
+ # XXX: libuv doesn't do it this way
+ raise NotImplementedError()
+
+ def unref(self):
+ raise NotImplementedError()
+
+ def break_(self, how=None):
+ raise NotImplementedError()
+
+ def verify(self):
+ pass
+
+ def now(self):
+ raise NotImplementedError()
+
+ def update_now(self):
+ raise NotImplementedError()
+
+ def update(self):
+ import warnings
+ warnings.warn("'update' is deprecated; use 'update_now'",
+ DeprecationWarning,
+ stacklevel=2)
+ self.update_now()
+
+ def __repr__(self):
+ return '<%s at 0x%x %s>' % (self.__class__.__name__, id(self), self._format())
+
+ @property
+ def default(self):
+ return self._default if self._ptr else False
+
+ @property
+ def iteration(self):
+ return -1
+
+ @property
+ def depth(self):
+ return -1
+
+ @property
+ def backend_int(self):
+ return 0
+
+ @property
+ def backend(self):
+ return "default"
+
+ @property
+ def pendingcnt(self):
+ return 0
+
+ def io(self, fd, events, ref=True, priority=None):
+ return self._watchers.io(self, fd, events, ref, priority)
+
+ def timer(self, after, repeat=0.0, ref=True, priority=None):
+ return self._watchers.timer(self, after, repeat, ref, priority)
+
+ def signal(self, signum, ref=True, priority=None):
+ return self._watchers.signal(self, signum, ref, priority)
+
+ def idle(self, ref=True, priority=None):
+ return self._watchers.idle(self, ref, priority)
+
+ def prepare(self, ref=True, priority=None):
+ return self._watchers.prepare(self, ref, priority)
+
+ def check(self, ref=True, priority=None):
+ return self._watchers.check(self, ref, priority)
+
+ def fork(self, ref=True, priority=None):
+ return self._watchers.fork(self, ref, priority)
+
+ def async_(self, ref=True, priority=None):
+ return self._watchers.async_(self, ref, priority)
+
+ # Provide BWC for those that can use 'async' as is
+ locals()['async'] = async_
+
+ if sys.platform != "win32":
+
+ def child(self, pid, trace=0, ref=True):
+ return self._watchers.child(self, pid, trace, ref)
+
+ def install_sigchld(self):
+ pass
+
+ def stat(self, path, interval=0.0, ref=True, priority=None):
+ return self._watchers.stat(self, path, interval, ref, priority)
+
+ def callback(self, priority=None):
+ return callback(self, priority)
+
+ def _setup_for_run_callback(self):
+ raise NotImplementedError()
+
+ def run_callback(self, func, *args):
+ # If we happen to already be running callbacks (inside
+ # _run_callbacks), this could happen almost immediately,
+ # without the loop cycling.
+ cb = callback(func, args)
+ self._callbacks.append(cb)
+ self._setup_for_run_callback()
+
+ return cb
+
+ def _format(self):
+ if not self._ptr:
+ return 'destroyed'
+ msg = self.backend
+ if self.default:
+ msg += ' default'
+ msg += ' pending=%s' % self.pendingcnt
+ msg += self._format_details()
+ return msg
+
+ def _format_details(self):
+ msg = ''
+ fileno = self.fileno() # pylint:disable=assignment-from-none
+ try:
+ activecnt = self.activecnt
+ except AttributeError:
+ activecnt = None
+ if activecnt is not None:
+ msg += ' ref=' + repr(activecnt)
+ if fileno is not None:
+ msg += ' fileno=' + repr(fileno)
+ #if sigfd is not None and sigfd != -1:
+ # msg += ' sigfd=' + repr(sigfd)
+ return msg
+
+ def fileno(self):
+ return None
+
+ @property
+ def activecnt(self):
+ if not self._ptr:
+ raise ValueError('operation on destroyed loop')
+ return 0
diff --git a/python/gevent/_ffi/watcher.py b/python/gevent/_ffi/watcher.py
new file mode 100644
index 0000000..3f880ce
--- /dev/null
+++ b/python/gevent/_ffi/watcher.py
@@ -0,0 +1,641 @@
+"""
+Useful base classes for watchers. The available
+watchers will depend on the specific event loop.
+"""
+# pylint:disable=not-callable
+from __future__ import absolute_import, print_function
+
+import signal as signalmodule
+import functools
+import warnings
+
+from gevent._config import config
+
+try:
+ from tracemalloc import get_object_traceback
+
+ def tracemalloc(init):
+ # PYTHONTRACEMALLOC env var controls this on Python 3.
+ return init
+except ImportError: # Python < 3.4
+
+ if config.trace_malloc:
+ # Use the same env var to turn this on for Python 2
+ import traceback
+
+ class _TB(object):
+ __slots__ = ('lines',)
+
+ def __init__(self, lines):
+ # These end in newlines, which we don't want for consistency
+ self.lines = [x.rstrip() for x in lines]
+
+ def format(self):
+ return self.lines
+
+ def tracemalloc(init):
+ @functools.wraps(init)
+ def traces(self, *args, **kwargs):
+ init(self, *args, **kwargs)
+ self._captured_malloc = _TB(traceback.format_stack())
+ return traces
+
+ def get_object_traceback(obj):
+ return obj._captured_malloc
+
+ else:
+ def get_object_traceback(_obj):
+ return None
+
+ def tracemalloc(init):
+ return init
+
+from gevent._compat import fsencode
+
+from gevent._ffi import _dbg # pylint:disable=unused-import
+from gevent._ffi import GEVENT_DEBUG_LEVEL
+from gevent._ffi import DEBUG
+from gevent._ffi.loop import GEVENT_CORE_EVENTS
+from gevent._ffi.loop import _NOARGS
+
+ALLOW_WATCHER_DEL = GEVENT_DEBUG_LEVEL >= DEBUG
+
+__all__ = [
+
+]
+
+try:
+ ResourceWarning
+except NameError:
+ class ResourceWarning(Warning):
+ "Python 2 fallback"
+
+class _NoWatcherResult(int):
+
+ def __repr__(self):
+ return "<NoWatcher>"
+
+_NoWatcherResult = _NoWatcherResult(0)
+
+def events_to_str(event_field, all_events):
+ result = []
+ for (flag, string) in all_events:
+ c_flag = flag
+ if event_field & c_flag:
+ result.append(string)
+ event_field = event_field & (~c_flag)
+ if not event_field:
+ break
+ if event_field:
+ result.append(hex(event_field))
+ return '|'.join(result)
+
+
+def not_while_active(func):
+ @functools.wraps(func)
+ def nw(self, *args, **kwargs):
+ if self.active:
+ raise ValueError("not while active")
+ func(self, *args, **kwargs)
+ return nw
+
+def only_if_watcher(func):
+ @functools.wraps(func)
+ def if_w(self):
+ if self._watcher:
+ return func(self)
+ return _NoWatcherResult
+ return if_w
+
+
+class LazyOnClass(object):
+
+ @classmethod
+ def lazy(cls, cls_dict, func):
+ "Put a LazyOnClass object in *cls_dict* with the same name as *func*"
+ cls_dict[func.__name__] = cls(func)
+
+ def __init__(self, func, name=None):
+ self.name = name or func.__name__
+ self.func = func
+
+ def __get__(self, inst, klass):
+ if inst is None: # pragma: no cover
+ return self
+
+ val = self.func(inst)
+ setattr(klass, self.name, val)
+ return val
+
+
+class AbstractWatcherType(type):
+ """
+ Base metaclass for watchers.
+
+ To use, you will:
+
+ - subclass the watcher class defined from this type.
+ - optionally subclass this type
+ """
+ # pylint:disable=bad-mcs-classmethod-argument
+
+ _FFI = None
+ _LIB = None
+
+ def __new__(cls, name, bases, cls_dict):
+ if name != 'watcher' and not cls_dict.get('_watcher_skip_ffi'):
+ cls._fill_watcher(name, bases, cls_dict)
+ if '__del__' in cls_dict and not ALLOW_WATCHER_DEL: # pragma: no cover
+ raise TypeError("CFFI watchers are not allowed to have __del__")
+ return type.__new__(cls, name, bases, cls_dict)
+
+ @classmethod
+ def _fill_watcher(cls, name, bases, cls_dict):
+ # TODO: refactor smaller
+ # pylint:disable=too-many-locals
+ if name.endswith('_'):
+ # Strip trailing _ added to avoid keyword duplications
+ # e.g., async_
+ name = name[:-1]
+
+ def _mro_get(attr, bases, error=True):
+ for b in bases:
+ try:
+ return getattr(b, attr)
+ except AttributeError:
+ continue
+ if error: # pragma: no cover
+ raise AttributeError(attr)
+ _watcher_prefix = cls_dict.get('_watcher_prefix') or _mro_get('_watcher_prefix', bases)
+
+ if '_watcher_type' not in cls_dict:
+ watcher_type = _watcher_prefix + '_' + name
+ cls_dict['_watcher_type'] = watcher_type
+ elif not cls_dict['_watcher_type'].startswith(_watcher_prefix):
+ watcher_type = _watcher_prefix + '_' + cls_dict['_watcher_type']
+ cls_dict['_watcher_type'] = watcher_type
+
+ active_name = _watcher_prefix + '_is_active'
+
+ def _watcher_is_active(self):
+ return getattr(self._LIB, active_name)
+
+ LazyOnClass.lazy(cls_dict, _watcher_is_active)
+
+ watcher_struct_name = cls_dict.get('_watcher_struct_name')
+ if not watcher_struct_name:
+ watcher_struct_pattern = (cls_dict.get('_watcher_struct_pattern')
+ or _mro_get('_watcher_struct_pattern', bases, False)
+ or 'struct %s')
+ watcher_struct_name = watcher_struct_pattern % (watcher_type,)
+
+ def _watcher_struct_pointer_type(self):
+ return self._FFI.typeof(watcher_struct_name + ' *')
+
+ LazyOnClass.lazy(cls_dict, _watcher_struct_pointer_type)
+
+ callback_name = (cls_dict.get('_watcher_callback_name')
+ or _mro_get('_watcher_callback_name', bases, False)
+ or '_gevent_generic_callback')
+
+ def _watcher_callback(self):
+ return self._FFI.addressof(self._LIB, callback_name)
+
+ LazyOnClass.lazy(cls_dict, _watcher_callback)
+
+ def _make_meth(name, watcher_name):
+ def meth(self):
+ lib_name = self._watcher_type + '_' + name
+ return getattr(self._LIB, lib_name)
+ meth.__name__ = watcher_name
+ return meth
+
+ for meth_name in 'start', 'stop', 'init':
+ watcher_name = '_watcher' + '_' + meth_name
+ if watcher_name not in cls_dict:
+ LazyOnClass.lazy(cls_dict, _make_meth(meth_name, watcher_name))
+
+ def new_handle(cls, obj):
+ return cls._FFI.new_handle(obj)
+
+ def new(cls, kind):
+ return cls._FFI.new(kind)
+
+class watcher(object):
+
+ _callback = None
+ _args = None
+ _watcher = None
+ # self._handle has a reference to self, keeping it alive.
+ # We must keep self._handle alive for ffi.from_handle() to be
+ # able to work. We only fill this in when we are started,
+ # and when we are stopped we destroy it.
+ # NOTE: This is a GC cycle, so we keep it around for as short
+ # as possible.
+ _handle = None
+
+ @tracemalloc
+ def __init__(self, _loop, ref=True, priority=None, args=_NOARGS):
+ self.loop = _loop
+ self.__init_priority = priority
+ self.__init_args = args
+ self.__init_ref = ref
+ self._watcher_full_init()
+
+
+ def _watcher_full_init(self):
+ priority = self.__init_priority
+ ref = self.__init_ref
+ args = self.__init_args
+
+ self._watcher_create(ref)
+
+ if priority is not None:
+ self._watcher_ffi_set_priority(priority)
+
+ try:
+ self._watcher_ffi_init(args)
+ except:
+ # Let these be GC'd immediately.
+ # If we keep them around to when *we* are gc'd,
+ # they're probably invalid, meaning any native calls
+ # we do then to close() them are likely to fail
+ self._watcher = None
+ raise
+ self._watcher_ffi_set_init_ref(ref)
+
+ @classmethod
+ def _watcher_ffi_close(cls, ffi_watcher):
+ pass
+
+ def _watcher_create(self, ref): # pylint:disable=unused-argument
+ self._watcher = self._watcher_new()
+
+ def _watcher_new(self):
+ return type(self).new(self._watcher_struct_pointer_type) # pylint:disable=no-member
+
+ def _watcher_ffi_set_init_ref(self, ref):
+ pass
+
+ def _watcher_ffi_set_priority(self, priority):
+ pass
+
+ def _watcher_ffi_init(self, args):
+ raise NotImplementedError()
+
+ def _watcher_ffi_start(self):
+ raise NotImplementedError()
+
+ def _watcher_ffi_stop(self):
+ self._watcher_stop(self.loop._ptr, self._watcher)
+
+ def _watcher_ffi_ref(self):
+ raise NotImplementedError()
+
+ def _watcher_ffi_unref(self):
+ raise NotImplementedError()
+
+ def _watcher_ffi_start_unref(self):
+ # While a watcher is active, we don't keep it
+ # referenced. This allows a timer, for example, to be started,
+ # and still allow the loop to end if there is nothing
+ # else to do. see test__order.TestSleep0 for one example.
+ self._watcher_ffi_unref()
+
+ def _watcher_ffi_stop_ref(self):
+ self._watcher_ffi_ref()
+
+ # A string identifying the type of libev object we watch, e.g., 'ev_io'
+ # This should be a class attribute.
+ _watcher_type = None
+ # A class attribute that is the callback on the libev object that init's the C struct,
+ # e.g., libev.ev_io_init. If None, will be set by _init_subclasses.
+ _watcher_init = None
+ # A class attribute that is the callback on the libev object that starts the C watcher,
+ # e.g., libev.ev_io_start. If None, will be set by _init_subclasses.
+ _watcher_start = None
+ # A class attribute that is the callback on the libev object that stops the C watcher,
+ # e.g., libev.ev_io_stop. If None, will be set by _init_subclasses.
+ _watcher_stop = None
+ # A cffi ctype object identifying the struct pointer we create.
+ # This is a class attribute set based on the _watcher_type
+ _watcher_struct_pointer_type = None
+ # The attribute of the libev object identifying the custom
+ # callback function for this type of watcher. This is a class
+ # attribute set based on the _watcher_type in _init_subclasses.
+ _watcher_callback = None
+ _watcher_is_active = None
+
+ def close(self):
+ if self._watcher is None:
+ return
+
+ self.stop()
+ _watcher = self._watcher
+ self._watcher = None
+ self._watcher_set_data(_watcher, self._FFI.NULL) # pylint: disable=no-member
+ self._watcher_ffi_close(_watcher)
+ self.loop = None
+
+ def _watcher_set_data(self, the_watcher, data):
+ # This abstraction exists for the sole benefit of
+ # libuv.watcher.stat, which "subclasses" uv_handle_t.
+ # Can we do something to avoid this extra function call?
+ the_watcher.data = data
+ return data
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, t, v, tb):
+ self.close()
+
+ if ALLOW_WATCHER_DEL:
+ def __del__(self):
+ if self._watcher:
+ tb = get_object_traceback(self)
+ tb_msg = ''
+ if tb is not None:
+ tb_msg = '\n'.join(tb.format())
+ tb_msg = '\nTraceback:\n' + tb_msg
+ warnings.warn("Failed to close watcher %r%s" % (self, tb_msg),
+ ResourceWarning)
+
+ # may fail if __init__ did; will be harmlessly printed
+ self.close()
+
+
+ def __repr__(self):
+ formats = self._format()
+ result = "<%s at 0x%x%s" % (self.__class__.__name__, id(self), formats)
+ if self.pending:
+ result += " pending"
+ if self.callback is not None:
+ fself = getattr(self.callback, '__self__', None)
+ if fself is self:
+ result += " callback=<bound method %s of self>" % (self.callback.__name__)
+ else:
+ result += " callback=%r" % (self.callback, )
+ if self.args is not None:
+ result += " args=%r" % (self.args, )
+ if self.callback is None and self.args is None:
+ result += " stopped"
+ result += " watcher=%s" % (self._watcher)
+ result += " handle=%s" % (self._watcher_handle)
+ result += " ref=%s" % (self.ref)
+ return result + ">"
+
+ @property
+ def _watcher_handle(self):
+ if self._watcher:
+ return self._watcher.data
+
+ def _format(self):
+ return ''
+
+ @property
+ def ref(self):
+ raise NotImplementedError()
+
+ def _get_callback(self):
+ return self._callback
+
+ def _set_callback(self, cb):
+ if not callable(cb) and cb is not None:
+ raise TypeError("Expected callable, not %r" % (cb, ))
+ if cb is None:
+ if '_callback' in self.__dict__:
+ del self._callback
+ else:
+ self._callback = cb
+ callback = property(_get_callback, _set_callback)
+
+ def _get_args(self):
+ return self._args
+
+ def _set_args(self, args):
+ if not isinstance(args, tuple) and args is not None:
+ raise TypeError("args must be a tuple or None")
+ if args is None:
+ if '_args' in self.__dict__:
+ del self._args
+ else:
+ self._args = args
+
+ args = property(_get_args, _set_args)
+
+ def start(self, callback, *args):
+ if callback is None:
+ raise TypeError('callback must be callable, not None')
+ self.callback = callback
+ self.args = args or _NOARGS
+ self.loop._keepaliveset.add(self)
+ self._handle = self._watcher_set_data(self._watcher, type(self).new_handle(self)) # pylint:disable=no-member
+ self._watcher_ffi_start()
+ self._watcher_ffi_start_unref()
+
+ def stop(self):
+ if self._callback is None:
+ assert self.loop is None or self not in self.loop._keepaliveset
+ return
+ self._watcher_ffi_stop_ref()
+ self._watcher_ffi_stop()
+ self.loop._keepaliveset.discard(self)
+ self._handle = None
+ self._watcher_set_data(self._watcher, self._FFI.NULL) # pylint:disable=no-member
+ self.callback = None
+ self.args = None
+
+ def _get_priority(self):
+ return None
+
+ @not_while_active
+ def _set_priority(self, priority):
+ pass
+
+ priority = property(_get_priority, _set_priority)
+
+
+ @property
+ def active(self):
+ if self._watcher is not None and self._watcher_is_active(self._watcher):
+ return True
+ return False
+
+ @property
+ def pending(self):
+ return False
+
+watcher = AbstractWatcherType('watcher', (object,), dict(watcher.__dict__))
+
+class IoMixin(object):
+
+ EVENT_MASK = 0
+
+ def __init__(self, loop, fd, events, ref=True, priority=None, _args=None):
+ # Win32 only works with sockets, and only when we use libuv, because
+ # we don't use _open_osfhandle. See libuv/watchers.py:io for a description.
+ if fd < 0:
+ raise ValueError('fd must be non-negative: %r' % fd)
+ if events & ~self.EVENT_MASK:
+ raise ValueError('illegal event mask: %r' % events)
+ self._fd = fd
+ super(IoMixin, self).__init__(loop, ref=ref, priority=priority,
+ args=_args or (fd, events))
+
+ def start(self, callback, *args, **kwargs):
+ args = args or _NOARGS
+ if kwargs.get('pass_events'):
+ args = (GEVENT_CORE_EVENTS, ) + args
+ super(IoMixin, self).start(callback, *args)
+
+ def _format(self):
+ return ' fd=%d' % self._fd
+
+class TimerMixin(object):
+ _watcher_type = 'timer'
+
+ def __init__(self, loop, after=0.0, repeat=0.0, ref=True, priority=None):
+ if repeat < 0.0:
+ raise ValueError("repeat must be positive or zero: %r" % repeat)
+ self._after = after
+ self._repeat = repeat
+ super(TimerMixin, self).__init__(loop, ref=ref, priority=priority, args=(after, repeat))
+
+ def start(self, callback, *args, **kw):
+ update = kw.get("update", self.loop.starting_timer_may_update_loop_time)
+ if update:
+ # Quoth the libev doc: "This is a costly operation and is
+ # usually done automatically within ev_run(). This
+ # function is rarely useful, but when some event callback
+ # runs for a very long time without entering the event
+ # loop, updating libev's idea of the current time is a
+ # good idea."
+
+ # 1.3 changed the default for this to False *unless* the loop is
+ # running a callback; see libuv for details. Note that
+ # starting Timeout objects still sets this to true.
+
+ self.loop.update_now()
+ super(TimerMixin, self).start(callback, *args)
+
+ def again(self, callback, *args, **kw):
+ raise NotImplementedError()
+
+
+class SignalMixin(object):
+ _watcher_type = 'signal'
+
+ def __init__(self, loop, signalnum, ref=True, priority=None):
+ if signalnum < 1 or signalnum >= signalmodule.NSIG:
+ raise ValueError('illegal signal number: %r' % signalnum)
+ # still possible to crash on one of libev's asserts:
+ # 1) "libev: ev_signal_start called with illegal signal number"
+ # EV_NSIG might be different from signal.NSIG on some platforms
+ # 2) "libev: a signal must not be attached to two different loops"
+ # we probably could check that in LIBEV_EMBED mode, but not in general
+ self._signalnum = signalnum
+ super(SignalMixin, self).__init__(loop, ref=ref, priority=priority, args=(signalnum, ))
+
+
+class IdleMixin(object):
+ _watcher_type = 'idle'
+
+
+class PrepareMixin(object):
+ _watcher_type = 'prepare'
+
+
+class CheckMixin(object):
+ _watcher_type = 'check'
+
+
+class ForkMixin(object):
+ _watcher_type = 'fork'
+
+
+class AsyncMixin(object):
+ _watcher_type = 'async'
+
+ def send(self):
+ raise NotImplementedError()
+
+ @property
+ def pending(self):
+ raise NotImplementedError()
+
+
+class ChildMixin(object):
+
+ # hack for libuv which doesn't extend watcher
+ _CALL_SUPER_INIT = True
+
+ def __init__(self, loop, pid, trace=0, ref=True):
+ if not loop.default:
+ raise TypeError('child watchers are only available on the default loop')
+ loop.install_sigchld()
+ self._pid = pid
+ if self._CALL_SUPER_INIT:
+ super(ChildMixin, self).__init__(loop, ref=ref, args=(pid, trace))
+
+ def _format(self):
+ return ' pid=%r rstatus=%r' % (self.pid, self.rstatus)
+
+ @property
+ def pid(self):
+ return self._pid
+
+ @property
+ def rpid(self):
+ # The received pid, the result of the waitpid() call.
+ return self._rpid
+
+ _rpid = None
+ _rstatus = 0
+
+ @property
+ def rstatus(self):
+ return self._rstatus
+
+class StatMixin(object):
+
+ @staticmethod
+ def _encode_path(path):
+ return fsencode(path)
+
+ def __init__(self, _loop, path, interval=0.0, ref=True, priority=None):
+ # Store the encoded path in the same attribute that corecext does
+ self._paths = self._encode_path(path)
+
+ # Keep the original path to avoid re-encoding, especially on Python 3
+ self._path = path
+
+ # Although CFFI would automatically convert a bytes object into a char* when
+ # calling ev_stat_init(..., char*, ...), on PyPy the char* pointer is not
+ # guaranteed to live past the function call. On CPython, only with a constant/interned
+ # bytes object is the pointer guaranteed to last path the function call. (And since
+ # Python 3 is pretty much guaranteed to produce a newly-encoded bytes object above, thats
+ # rarely the case). Therefore, we must keep a reference to the produced cdata object
+ # so that the struct ev_stat_watcher's `path` pointer doesn't become invalid/deallocated
+ self._cpath = self._FFI.new('char[]', self._paths)
+
+ self._interval = interval
+ super(StatMixin, self).__init__(_loop, ref=ref, priority=priority,
+ args=(self._cpath,
+ interval))
+
+ @property
+ def path(self):
+ return self._path
+
+ @property
+ def attr(self):
+ raise NotImplementedError
+
+ @property
+ def prev(self):
+ raise NotImplementedError
+
+ @property
+ def interval(self):
+ return self._interval