diff options
Diffstat (limited to 'python/gevent/_ffi')
-rw-r--r-- | python/gevent/_ffi/__init__.py | 27 | ||||
-rw-r--r-- | python/gevent/_ffi/callback.py | 58 | ||||
-rw-r--r-- | python/gevent/_ffi/loop.py | 709 | ||||
-rw-r--r-- | python/gevent/_ffi/watcher.py | 641 |
4 files changed, 1435 insertions, 0 deletions
diff --git a/python/gevent/_ffi/__init__.py b/python/gevent/_ffi/__init__.py new file mode 100644 index 0000000..56f1e96 --- /dev/null +++ b/python/gevent/_ffi/__init__.py @@ -0,0 +1,27 @@ +""" +Internal helpers for FFI implementations. +""" +from __future__ import print_function, absolute_import + +import os +import sys + +def _dbg(*args, **kwargs): + # pylint:disable=unused-argument + pass + +#_dbg = print + +def _pid_dbg(*args, **kwargs): + kwargs['file'] = sys.stderr + print(os.getpid(), *args, **kwargs) + +CRITICAL = 1 +ERROR = 3 +DEBUG = 5 +TRACE = 9 + +GEVENT_DEBUG_LEVEL = vars()[os.getenv("GEVENT_DEBUG", 'CRITICAL').upper()] + +if GEVENT_DEBUG_LEVEL >= TRACE: + _dbg = _pid_dbg diff --git a/python/gevent/_ffi/callback.py b/python/gevent/_ffi/callback.py new file mode 100644 index 0000000..df59a9f --- /dev/null +++ b/python/gevent/_ffi/callback.py @@ -0,0 +1,58 @@ +from __future__ import absolute_import, print_function + +__all__ = [ + 'callback', +] + + +# For times when *args is captured but often not passed (empty), +# we can avoid keeping the new tuple that was created for *args +# around by using a constant. +_NOARGS = () + + +class callback(object): + + __slots__ = ('callback', 'args') + + def __init__(self, cb, args): + self.callback = cb + self.args = args or _NOARGS + + def stop(self): + self.callback = None + self.args = None + + close = stop + + # Note that __nonzero__ and pending are different + # bool() is used in contexts where we need to know whether to schedule another callback, + # so it's true if it's pending or currently running + # 'pending' has the same meaning as libev watchers: it is cleared before actually + # running the callback + + def __nonzero__(self): + # it's nonzero if it's pending or currently executing + # NOTE: This depends on loop._run_callbacks setting the args property + # to None. + return self.args is not None + __bool__ = __nonzero__ + + @property + def pending(self): + return self.callback is not None + + def _format(self): + return '' + + def __repr__(self): + result = "<%s at 0x%x" % (self.__class__.__name__, id(self)) + if self.pending: + result += " pending" + if self.callback is not None: + result += " callback=%r" % (self.callback, ) + if self.args is not None: + result += " args=%r" % (self.args, ) + if self.callback is None and self.args is None: + result += " stopped" + return result + ">" diff --git a/python/gevent/_ffi/loop.py b/python/gevent/_ffi/loop.py new file mode 100644 index 0000000..efe4462 --- /dev/null +++ b/python/gevent/_ffi/loop.py @@ -0,0 +1,709 @@ +""" +Basic loop implementation for ffi-based cores. +""" +# pylint: disable=too-many-lines, protected-access, redefined-outer-name, not-callable +from __future__ import absolute_import, print_function + +from collections import deque +import sys +import os +import traceback + +from gevent._ffi import _dbg +from gevent._ffi import GEVENT_DEBUG_LEVEL +from gevent._ffi import TRACE +from gevent._ffi.callback import callback +from gevent._compat import PYPY + +from gevent import getswitchinterval + +__all__ = [ + 'AbstractLoop', + 'assign_standard_callbacks', +] + + +class _EVENTSType(object): + def __repr__(self): + return 'gevent.core.EVENTS' + +EVENTS = GEVENT_CORE_EVENTS = _EVENTSType() + + +##### +## Note on CFFI objects, callbacks and the lifecycle of watcher objects +# +# Each subclass of `watcher` allocates a C structure of the +# appropriate type e.g., struct gevent_ev_io and holds this pointer in +# its `_gwatcher` attribute. When that watcher instance is garbage +# collected, then the C structure is also freed. The C structure is +# passed to libev from the watcher's start() method and then to the +# appropriate C callback function, e.g., _gevent_ev_io_callback, which +# passes it back to python's _python_callback where we need the +# watcher instance. Therefore, as long as that callback is active (the +# watcher is started), the watcher instance must not be allowed to get +# GC'd---any access at the C level or even the FFI level to the freed +# memory could crash the process. +# +# However, the typical idiom calls for writing something like this: +# loop.io(fd, python_cb).start() +# thus forgetting the newly created watcher subclass and allowing it to be immediately +# GC'd. To combat this, when the watcher is started, it places itself into the loop's +# `_keepaliveset`, and it only removes itself when the watcher's `stop()` method is called. +# Often, this is the *only* reference keeping the watcher object, and hence its C structure, +# alive. +# +# This is slightly complicated by the fact that the python-level +# callback, called from the C callback, could choose to manually stop +# the watcher. When we return to the C level callback, we now have an +# invalid pointer, and attempting to pass it back to Python (e.g., to +# handle an error) could crash. Hence, _python_callback, +# _gevent_io_callback, and _python_handle_error cooperate to make sure +# that the watcher instance stays in the loops `_keepaliveset` while +# the C code could be running---and if it gets removed, to not call back +# to Python again. +# See also https://github.com/gevent/gevent/issues/676 +#### +class AbstractCallbacks(object): + + + def __init__(self, ffi): + self.ffi = ffi + self.callbacks = [] + if GEVENT_DEBUG_LEVEL < TRACE: + self.from_handle = ffi.from_handle + + def from_handle(self, handle): # pylint:disable=method-hidden + x = self.ffi.from_handle(handle) + return x + + def python_callback(self, handle, revents): + """ + Returns an integer having one of three values: + + - -1 + An exception occurred during the callback and you must call + :func:`_python_handle_error` to deal with it. The Python watcher + object will have the exception tuple saved in ``_exc_info``. + - 1 + Everything went according to plan. You should check to see if the libev + watcher is still active, and call :func:`python_stop` if it is not. This will + clean up the memory. Finding the watcher still active at the event loop level, + but not having stopped itself at the gevent level is a buggy scenario and + shouldn't happen. + - 2 + Everything went according to plan, but the watcher has already + been stopped. Its memory may no longer be valid. + + This function should never return 0, as that's the default value that + Python exceptions will produce. + """ + #print("Running callback", handle) + orig_ffi_watcher = None + try: + # Even dereferencing the handle needs to be inside the try/except; + # if we don't return normally (e.g., a signal) then we wind up going + # to the 'onerror' handler (unhandled_onerror), which + # is not what we want; that can permanently wedge the loop depending + # on which callback was executing. + # XXX: See comments in that function. We may be able to restart and do better? + if not handle: + # Hmm, a NULL handle. That's not supposed to happen. + # We can easily get into a loop if we deref it and allow that + # to raise. + _dbg("python_callback got null handle") + return 1 + the_watcher = self.from_handle(handle) + orig_ffi_watcher = the_watcher._watcher + args = the_watcher.args + if args is None: + # Legacy behaviour from corecext: convert None into () + # See test__core_watcher.py + args = _NOARGS + if args and args[0] == GEVENT_CORE_EVENTS: + args = (revents, ) + args[1:] + #print("Calling function", the_watcher.callback, args) + the_watcher.callback(*args) + except: # pylint:disable=bare-except + _dbg("Got exception servicing watcher with handle", handle, sys.exc_info()) + # It's possible for ``the_watcher`` to be undefined (UnboundLocalError) + # if we threw an exception (signal) on the line that created that variable. + # This is typically the case with a signal under libuv + try: + the_watcher + except UnboundLocalError: + the_watcher = self.from_handle(handle) + the_watcher._exc_info = sys.exc_info() + # Depending on when the exception happened, the watcher + # may or may not have been stopped. We need to make sure its + # memory stays valid so we can stop it at the ev level if needed. + the_watcher.loop._keepaliveset.add(the_watcher) + return -1 + else: + if (the_watcher.loop is not None + and the_watcher in the_watcher.loop._keepaliveset + and the_watcher._watcher is orig_ffi_watcher): + # It didn't stop itself, *and* it didn't stop itself, reset + # its watcher, and start itself again. libuv's io watchers MAY + # do that. + # The normal, expected scenario when we find the watcher still + # in the keepaliveset is that it is still active at the event loop + # level, so we don't expect that python_stop gets called. + #_dbg("The watcher has not stopped itself, possibly still active", the_watcher) + return 1 + return 2 # it stopped itself + + def python_handle_error(self, handle, _revents): + _dbg("Handling error for handle", handle) + if not handle: + return + try: + watcher = self.from_handle(handle) + exc_info = watcher._exc_info + del watcher._exc_info + # In the past, we passed the ``watcher`` itself as the context, + # which typically meant that the Hub would just print + # the exception. This is a problem because sometimes we can't + # detect signals until late in ``python_callback``; specifically, + # test_selectors.py:DefaultSelectorTest.test_select_interrupt_exc + # installs a SIGALRM handler that raises an exception. That exception can happen + # before we enter ``python_callback`` or at any point within it because of the way + # libuv swallows signals. By passing None, we get the exception prapagated into + # the main greenlet (which is probably *also* not what we always want, but + # I see no way to distinguish the cases). + watcher.loop.handle_error(None, *exc_info) + finally: + # XXX Since we're here on an error condition, and we + # made sure that the watcher object was put in loop._keepaliveset, + # what about not stopping the watcher? Looks like a possible + # memory leak? + # XXX: This used to do "if revents & (libev.EV_READ | libev.EV_WRITE)" + # before stopping. Why? + try: + watcher.stop() + except: # pylint:disable=bare-except + watcher.loop.handle_error(watcher, *sys.exc_info()) + return # pylint:disable=lost-exception + + def unhandled_onerror(self, t, v, tb): + # This is supposed to be called for signals, etc. + # This is the onerror= value for CFFI. + # If we return None, C will get a value of 0/NULL; + # if we raise, CFFI will print the exception and then + # return 0/NULL; (unless error= was configured) + # If things go as planned, we return the value that asks + # C to call back and check on if the watcher needs to be closed or + # not. + + # XXX: TODO: Could this cause events to be lost? Maybe we need to return + # a value that causes the C loop to try the callback again? + # at least for signals under libuv, which are delivered at very odd times. + # Hopefully the event still shows up when we poll the next time. + watcher = None + handle = tb.tb_frame.f_locals['handle'] if tb is not None else None + if handle: # handle could be NULL + watcher = self.from_handle(handle) + if watcher is not None: + watcher.loop.handle_error(None, t, v, tb) + return 1 + + # Raising it causes a lot of noise from CFFI + print("WARNING: gevent: Unhandled error with no watcher", + file=sys.stderr) + traceback.print_exception(t, v, tb) + + def python_stop(self, handle): + if not handle: # pragma: no cover + print( + "WARNING: gevent: Unable to dereference handle; not stopping watcher. " + "Native resources may leak. This is most likely a bug in gevent.", + file=sys.stderr) + # The alternative is to crash with no helpful information + # NOTE: Raising exceptions here does nothing, they're swallowed by CFFI. + # Since the C level passed in a null pointer, even dereferencing the handle + # will just produce some exceptions. + return + watcher = self.from_handle(handle) + watcher.stop() + + if not PYPY: + def python_check_callback(self, watcher_ptr): # pylint:disable=unused-argument + # If we have the onerror callback, this is a no-op; all the real + # work to rethrow the exception is done by the onerror callback + + # NOTE: Unlike the rest of the functions, this is called with a pointer + # to the C level structure, *not* a pointer to the void* that represents a + # <cdata> for the Python Watcher object. + pass + else: # PyPy + # On PyPy, we need the function to have some sort of body, otherwise + # the signal exceptions don't always get caught, *especially* with + # libuv (however, there's no reason to expect this to only be a libuv + # issue; it's just that we don't depend on the periodic signal timer + # under libev, so the issue is much more pronounced under libuv) + # test_socket's test_sendall_interrupted can hang. + # See https://github.com/gevent/gevent/issues/1112 + + def python_check_callback(self, watcher_ptr): # pylint:disable=unused-argument + # Things we've tried that *don't* work: + # greenlet.getcurrent() + # 1 + 1 + try: + raise MemoryError() + except MemoryError: + pass + + def python_prepare_callback(self, watcher_ptr): + loop = self._find_loop_from_c_watcher(watcher_ptr) + if loop is None: # pragma: no cover + print("WARNING: gevent: running prepare callbacks from a destroyed handle: ", + watcher_ptr) + return + loop._run_callbacks() + + def check_callback_onerror(self, t, v, tb): + watcher_ptr = tb.tb_frame.f_locals['watcher_ptr'] if tb is not None else None + if watcher_ptr: + loop = self._find_loop_from_c_watcher(watcher_ptr) + if loop is not None: + # None as the context argument causes the exception to be raised + # in the main greenlet. + loop.handle_error(None, t, v, tb) + return None + raise v # Let CFFI print + + def _find_loop_from_c_watcher(self, watcher_ptr): + raise NotImplementedError() + + + +def assign_standard_callbacks(ffi, lib, callbacks_class, extras=()): # pylint:disable=unused-argument + # callbacks keeps these cdata objects alive at the python level + callbacks = callbacks_class(ffi) + extras = tuple([(getattr(callbacks, name), error) for name, error in extras]) + for (func, error_func) in ((callbacks.python_callback, None), + (callbacks.python_handle_error, None), + (callbacks.python_stop, None), + (callbacks.python_check_callback, + callbacks.check_callback_onerror), + (callbacks.python_prepare_callback, + callbacks.check_callback_onerror)) + extras: + # The name of the callback function matches the 'extern Python' declaration. + error_func = error_func or callbacks.unhandled_onerror + callback = ffi.def_extern(onerror=error_func)(func) + # keep alive the cdata + # (def_extern returns the original function, and it requests that + # the function be "global", so maybe it keeps a hard reference to it somewhere now + # unlike ffi.callback(), and we don't need to do this?) + callbacks.callbacks.append(callback) + + # At this point, the library C variable (static function, actually) + # is filled in. + + return callbacks + + +if sys.version_info[0] >= 3: + basestring = (bytes, str) + integer_types = (int,) +else: + import __builtin__ # pylint:disable=import-error + basestring = (__builtin__.basestring,) + integer_types = (int, __builtin__.long) + + + + +_NOARGS = () + +CALLBACK_CHECK_COUNT = 50 + +class AbstractLoop(object): + # pylint:disable=too-many-public-methods,too-many-instance-attributes + + error_handler = None + + _CHECK_POINTER = None + + _TIMER_POINTER = None + _TIMER_CALLBACK_SIG = None + + _PREPARE_POINTER = None + + starting_timer_may_update_loop_time = False + + # Subclasses should set this in __init__ to reflect + # whether they were the default loop. + _default = None + + def __init__(self, ffi, lib, watchers, flags=None, default=None): + self._ffi = ffi + self._lib = lib + self._ptr = None + self._handle_to_self = self._ffi.new_handle(self) # XXX: Reference cycle? + self._watchers = watchers + self._in_callback = False + self._callbacks = deque() + # Stores python watcher objects while they are started + self._keepaliveset = set() + self._init_loop_and_aux_watchers(flags, default) + + + def _init_loop_and_aux_watchers(self, flags=None, default=None): + + self._ptr = self._init_loop(flags, default) + + + # self._check is a watcher that runs in each iteration of the + # mainloop, just after the blocking call. It's point is to handle + # signals. It doesn't run watchers or callbacks, it just exists to give + # CFFI a chance to raise signal exceptions so we can handle them. + self._check = self._ffi.new(self._CHECK_POINTER) + self._check.data = self._handle_to_self + self._init_and_start_check() + + # self._prepare is a watcher that runs in each iteration of the mainloop, + # just before the blocking call. It's where we run deferred callbacks + # from self.run_callback. This cooperates with _setup_for_run_callback() + # to schedule self._timer0 if needed. + self._prepare = self._ffi.new(self._PREPARE_POINTER) + self._prepare.data = self._handle_to_self + self._init_and_start_prepare() + + # A timer we start and stop on demand. If we have callbacks, + # too many to run in one iteration of _run_callbacks, we turn this + # on so as to have the next iteration of the run loop return to us + # as quickly as possible. + # TODO: There may be a more efficient way to do this using ev_timer_again; + # see the "ev_timer" section of the ev manpage (http://linux.die.net/man/3/ev) + # Alternatively, setting the ev maximum block time may also work. + self._timer0 = self._ffi.new(self._TIMER_POINTER) + self._timer0.data = self._handle_to_self + self._init_callback_timer() + + # TODO: We may be able to do something nicer and use the existing python_callback + # combined with onerror and the class check/timer/prepare to simplify things + # and unify our handling + + def _init_loop(self, flags, default): + """ + Called by __init__ to create or find the loop. The return value + is assigned to self._ptr. + """ + raise NotImplementedError() + + def _init_and_start_check(self): + raise NotImplementedError() + + def _init_and_start_prepare(self): + raise NotImplementedError() + + def _init_callback_timer(self): + raise NotImplementedError() + + def _stop_callback_timer(self): + raise NotImplementedError() + + def _start_callback_timer(self): + raise NotImplementedError() + + def _check_callback_handle_error(self, t, v, tb): + self.handle_error(None, t, v, tb) + + def _run_callbacks(self): # pylint:disable=too-many-branches + # When we're running callbacks, its safe for timers to + # update the notion of the current time (because if we're here, + # we're not running in a timer callback that may let other timers + # run; this is mostly an issue for libuv). + + # That's actually a bit of a lie: on libev, self._timer0 really is + # a timer, and so sometimes this is running in a timer callback, not + # a prepare callback. But that's OK, libev doesn't suffer from cascading + # timer expiration and its safe to update the loop time at any + # moment there. + self.starting_timer_may_update_loop_time = True + try: + count = CALLBACK_CHECK_COUNT + now = self.now() + expiration = now + getswitchinterval() + self._stop_callback_timer() + while self._callbacks: + cb = self._callbacks.popleft() # pylint:disable=assignment-from-no-return + count -= 1 + self.unref() # XXX: libuv doesn't have a global ref count! + callback = cb.callback + cb.callback = None + args = cb.args + if callback is None or args is None: + # it's been stopped + continue + + try: + callback(*args) + except: # pylint:disable=bare-except + # If we allow an exception to escape this method (while we are running the ev callback), + # then CFFI will print the error and libev will continue executing. + # There are two problems with this. The first is that the code after + # the loop won't run. The second is that any remaining callbacks scheduled + # for this loop iteration will be silently dropped; they won't run, but they'll + # also not be *stopped* (which is not a huge deal unless you're looking for + # consistency or checking the boolean/pending status; the loop doesn't keep + # a reference to them like it does to watchers...*UNLESS* the callback itself had + # a reference to a watcher; then I don't know what would happen, it depends on + # the state of the watcher---a leak or crash is not totally inconceivable). + # The Cython implementation in core.ppyx uses gevent_call from callbacks.c + # to run the callback, which uses gevent_handle_error to handle any errors the + # Python callback raises...it unconditionally simply prints any error raised + # by loop.handle_error and clears it, so callback handling continues. + # We take a similar approach (but are extra careful about printing) + try: + self.handle_error(cb, *sys.exc_info()) + except: # pylint:disable=bare-except + try: + print("Exception while handling another error", file=sys.stderr) + traceback.print_exc() + except: # pylint:disable=bare-except + pass # Nothing we can do here + finally: + # NOTE: this must be reset here, because cb.args is used as a flag in + # the callback class so that bool(cb) of a callback that has been run + # becomes False + cb.args = None + + # We've finished running one group of callbacks + # but we may have more, so before looping check our + # switch interval. + if count == 0 and self._callbacks: + count = CALLBACK_CHECK_COUNT + self.update_now() + if self.now() >= expiration: + now = 0 + break + + # Update the time before we start going again, if we didn't + # just do so. + if now != 0: + self.update_now() + + if self._callbacks: + self._start_callback_timer() + finally: + self.starting_timer_may_update_loop_time = False + + def _stop_aux_watchers(self): + raise NotImplementedError() + + def destroy(self): + if self._ptr: + try: + if not self._can_destroy_loop(self._ptr): + return False + self._stop_aux_watchers() + self._destroy_loop(self._ptr) + finally: + # not ffi.NULL, we don't want something that can be + # passed to C and crash later. This will create nice friendly + # TypeError from CFFI. + self._ptr = None + del self._handle_to_self + del self._callbacks + del self._keepaliveset + + return True + + def _can_destroy_loop(self, ptr): + raise NotImplementedError() + + def _destroy_loop(self, ptr): + raise NotImplementedError() + + @property + def ptr(self): + return self._ptr + + @property + def WatcherType(self): + return self._watchers.watcher + + @property + def MAXPRI(self): + return 1 + + @property + def MINPRI(self): + return 1 + + def _handle_syserr(self, message, errno): + try: + errno = os.strerror(errno) + except: # pylint:disable=bare-except + traceback.print_exc() + try: + message = '%s: %s' % (message, errno) + except: # pylint:disable=bare-except + traceback.print_exc() + self.handle_error(None, SystemError, SystemError(message), None) + + def handle_error(self, context, type, value, tb): + handle_error = None + error_handler = self.error_handler + if error_handler is not None: + # we do want to do getattr every time so that setting Hub.handle_error property just works + handle_error = getattr(error_handler, 'handle_error', error_handler) + handle_error(context, type, value, tb) + else: + self._default_handle_error(context, type, value, tb) + + def _default_handle_error(self, context, type, value, tb): # pylint:disable=unused-argument + # note: Hub sets its own error handler so this is not used by gevent + # this is here to make core.loop usable without the rest of gevent + # Should cause the loop to stop running. + traceback.print_exception(type, value, tb) + + + def run(self, nowait=False, once=False): + raise NotImplementedError() + + def reinit(self): + raise NotImplementedError() + + def ref(self): + # XXX: libuv doesn't do it this way + raise NotImplementedError() + + def unref(self): + raise NotImplementedError() + + def break_(self, how=None): + raise NotImplementedError() + + def verify(self): + pass + + def now(self): + raise NotImplementedError() + + def update_now(self): + raise NotImplementedError() + + def update(self): + import warnings + warnings.warn("'update' is deprecated; use 'update_now'", + DeprecationWarning, + stacklevel=2) + self.update_now() + + def __repr__(self): + return '<%s at 0x%x %s>' % (self.__class__.__name__, id(self), self._format()) + + @property + def default(self): + return self._default if self._ptr else False + + @property + def iteration(self): + return -1 + + @property + def depth(self): + return -1 + + @property + def backend_int(self): + return 0 + + @property + def backend(self): + return "default" + + @property + def pendingcnt(self): + return 0 + + def io(self, fd, events, ref=True, priority=None): + return self._watchers.io(self, fd, events, ref, priority) + + def timer(self, after, repeat=0.0, ref=True, priority=None): + return self._watchers.timer(self, after, repeat, ref, priority) + + def signal(self, signum, ref=True, priority=None): + return self._watchers.signal(self, signum, ref, priority) + + def idle(self, ref=True, priority=None): + return self._watchers.idle(self, ref, priority) + + def prepare(self, ref=True, priority=None): + return self._watchers.prepare(self, ref, priority) + + def check(self, ref=True, priority=None): + return self._watchers.check(self, ref, priority) + + def fork(self, ref=True, priority=None): + return self._watchers.fork(self, ref, priority) + + def async_(self, ref=True, priority=None): + return self._watchers.async_(self, ref, priority) + + # Provide BWC for those that can use 'async' as is + locals()['async'] = async_ + + if sys.platform != "win32": + + def child(self, pid, trace=0, ref=True): + return self._watchers.child(self, pid, trace, ref) + + def install_sigchld(self): + pass + + def stat(self, path, interval=0.0, ref=True, priority=None): + return self._watchers.stat(self, path, interval, ref, priority) + + def callback(self, priority=None): + return callback(self, priority) + + def _setup_for_run_callback(self): + raise NotImplementedError() + + def run_callback(self, func, *args): + # If we happen to already be running callbacks (inside + # _run_callbacks), this could happen almost immediately, + # without the loop cycling. + cb = callback(func, args) + self._callbacks.append(cb) + self._setup_for_run_callback() + + return cb + + def _format(self): + if not self._ptr: + return 'destroyed' + msg = self.backend + if self.default: + msg += ' default' + msg += ' pending=%s' % self.pendingcnt + msg += self._format_details() + return msg + + def _format_details(self): + msg = '' + fileno = self.fileno() # pylint:disable=assignment-from-none + try: + activecnt = self.activecnt + except AttributeError: + activecnt = None + if activecnt is not None: + msg += ' ref=' + repr(activecnt) + if fileno is not None: + msg += ' fileno=' + repr(fileno) + #if sigfd is not None and sigfd != -1: + # msg += ' sigfd=' + repr(sigfd) + return msg + + def fileno(self): + return None + + @property + def activecnt(self): + if not self._ptr: + raise ValueError('operation on destroyed loop') + return 0 diff --git a/python/gevent/_ffi/watcher.py b/python/gevent/_ffi/watcher.py new file mode 100644 index 0000000..3f880ce --- /dev/null +++ b/python/gevent/_ffi/watcher.py @@ -0,0 +1,641 @@ +""" +Useful base classes for watchers. The available +watchers will depend on the specific event loop. +""" +# pylint:disable=not-callable +from __future__ import absolute_import, print_function + +import signal as signalmodule +import functools +import warnings + +from gevent._config import config + +try: + from tracemalloc import get_object_traceback + + def tracemalloc(init): + # PYTHONTRACEMALLOC env var controls this on Python 3. + return init +except ImportError: # Python < 3.4 + + if config.trace_malloc: + # Use the same env var to turn this on for Python 2 + import traceback + + class _TB(object): + __slots__ = ('lines',) + + def __init__(self, lines): + # These end in newlines, which we don't want for consistency + self.lines = [x.rstrip() for x in lines] + + def format(self): + return self.lines + + def tracemalloc(init): + @functools.wraps(init) + def traces(self, *args, **kwargs): + init(self, *args, **kwargs) + self._captured_malloc = _TB(traceback.format_stack()) + return traces + + def get_object_traceback(obj): + return obj._captured_malloc + + else: + def get_object_traceback(_obj): + return None + + def tracemalloc(init): + return init + +from gevent._compat import fsencode + +from gevent._ffi import _dbg # pylint:disable=unused-import +from gevent._ffi import GEVENT_DEBUG_LEVEL +from gevent._ffi import DEBUG +from gevent._ffi.loop import GEVENT_CORE_EVENTS +from gevent._ffi.loop import _NOARGS + +ALLOW_WATCHER_DEL = GEVENT_DEBUG_LEVEL >= DEBUG + +__all__ = [ + +] + +try: + ResourceWarning +except NameError: + class ResourceWarning(Warning): + "Python 2 fallback" + +class _NoWatcherResult(int): + + def __repr__(self): + return "<NoWatcher>" + +_NoWatcherResult = _NoWatcherResult(0) + +def events_to_str(event_field, all_events): + result = [] + for (flag, string) in all_events: + c_flag = flag + if event_field & c_flag: + result.append(string) + event_field = event_field & (~c_flag) + if not event_field: + break + if event_field: + result.append(hex(event_field)) + return '|'.join(result) + + +def not_while_active(func): + @functools.wraps(func) + def nw(self, *args, **kwargs): + if self.active: + raise ValueError("not while active") + func(self, *args, **kwargs) + return nw + +def only_if_watcher(func): + @functools.wraps(func) + def if_w(self): + if self._watcher: + return func(self) + return _NoWatcherResult + return if_w + + +class LazyOnClass(object): + + @classmethod + def lazy(cls, cls_dict, func): + "Put a LazyOnClass object in *cls_dict* with the same name as *func*" + cls_dict[func.__name__] = cls(func) + + def __init__(self, func, name=None): + self.name = name or func.__name__ + self.func = func + + def __get__(self, inst, klass): + if inst is None: # pragma: no cover + return self + + val = self.func(inst) + setattr(klass, self.name, val) + return val + + +class AbstractWatcherType(type): + """ + Base metaclass for watchers. + + To use, you will: + + - subclass the watcher class defined from this type. + - optionally subclass this type + """ + # pylint:disable=bad-mcs-classmethod-argument + + _FFI = None + _LIB = None + + def __new__(cls, name, bases, cls_dict): + if name != 'watcher' and not cls_dict.get('_watcher_skip_ffi'): + cls._fill_watcher(name, bases, cls_dict) + if '__del__' in cls_dict and not ALLOW_WATCHER_DEL: # pragma: no cover + raise TypeError("CFFI watchers are not allowed to have __del__") + return type.__new__(cls, name, bases, cls_dict) + + @classmethod + def _fill_watcher(cls, name, bases, cls_dict): + # TODO: refactor smaller + # pylint:disable=too-many-locals + if name.endswith('_'): + # Strip trailing _ added to avoid keyword duplications + # e.g., async_ + name = name[:-1] + + def _mro_get(attr, bases, error=True): + for b in bases: + try: + return getattr(b, attr) + except AttributeError: + continue + if error: # pragma: no cover + raise AttributeError(attr) + _watcher_prefix = cls_dict.get('_watcher_prefix') or _mro_get('_watcher_prefix', bases) + + if '_watcher_type' not in cls_dict: + watcher_type = _watcher_prefix + '_' + name + cls_dict['_watcher_type'] = watcher_type + elif not cls_dict['_watcher_type'].startswith(_watcher_prefix): + watcher_type = _watcher_prefix + '_' + cls_dict['_watcher_type'] + cls_dict['_watcher_type'] = watcher_type + + active_name = _watcher_prefix + '_is_active' + + def _watcher_is_active(self): + return getattr(self._LIB, active_name) + + LazyOnClass.lazy(cls_dict, _watcher_is_active) + + watcher_struct_name = cls_dict.get('_watcher_struct_name') + if not watcher_struct_name: + watcher_struct_pattern = (cls_dict.get('_watcher_struct_pattern') + or _mro_get('_watcher_struct_pattern', bases, False) + or 'struct %s') + watcher_struct_name = watcher_struct_pattern % (watcher_type,) + + def _watcher_struct_pointer_type(self): + return self._FFI.typeof(watcher_struct_name + ' *') + + LazyOnClass.lazy(cls_dict, _watcher_struct_pointer_type) + + callback_name = (cls_dict.get('_watcher_callback_name') + or _mro_get('_watcher_callback_name', bases, False) + or '_gevent_generic_callback') + + def _watcher_callback(self): + return self._FFI.addressof(self._LIB, callback_name) + + LazyOnClass.lazy(cls_dict, _watcher_callback) + + def _make_meth(name, watcher_name): + def meth(self): + lib_name = self._watcher_type + '_' + name + return getattr(self._LIB, lib_name) + meth.__name__ = watcher_name + return meth + + for meth_name in 'start', 'stop', 'init': + watcher_name = '_watcher' + '_' + meth_name + if watcher_name not in cls_dict: + LazyOnClass.lazy(cls_dict, _make_meth(meth_name, watcher_name)) + + def new_handle(cls, obj): + return cls._FFI.new_handle(obj) + + def new(cls, kind): + return cls._FFI.new(kind) + +class watcher(object): + + _callback = None + _args = None + _watcher = None + # self._handle has a reference to self, keeping it alive. + # We must keep self._handle alive for ffi.from_handle() to be + # able to work. We only fill this in when we are started, + # and when we are stopped we destroy it. + # NOTE: This is a GC cycle, so we keep it around for as short + # as possible. + _handle = None + + @tracemalloc + def __init__(self, _loop, ref=True, priority=None, args=_NOARGS): + self.loop = _loop + self.__init_priority = priority + self.__init_args = args + self.__init_ref = ref + self._watcher_full_init() + + + def _watcher_full_init(self): + priority = self.__init_priority + ref = self.__init_ref + args = self.__init_args + + self._watcher_create(ref) + + if priority is not None: + self._watcher_ffi_set_priority(priority) + + try: + self._watcher_ffi_init(args) + except: + # Let these be GC'd immediately. + # If we keep them around to when *we* are gc'd, + # they're probably invalid, meaning any native calls + # we do then to close() them are likely to fail + self._watcher = None + raise + self._watcher_ffi_set_init_ref(ref) + + @classmethod + def _watcher_ffi_close(cls, ffi_watcher): + pass + + def _watcher_create(self, ref): # pylint:disable=unused-argument + self._watcher = self._watcher_new() + + def _watcher_new(self): + return type(self).new(self._watcher_struct_pointer_type) # pylint:disable=no-member + + def _watcher_ffi_set_init_ref(self, ref): + pass + + def _watcher_ffi_set_priority(self, priority): + pass + + def _watcher_ffi_init(self, args): + raise NotImplementedError() + + def _watcher_ffi_start(self): + raise NotImplementedError() + + def _watcher_ffi_stop(self): + self._watcher_stop(self.loop._ptr, self._watcher) + + def _watcher_ffi_ref(self): + raise NotImplementedError() + + def _watcher_ffi_unref(self): + raise NotImplementedError() + + def _watcher_ffi_start_unref(self): + # While a watcher is active, we don't keep it + # referenced. This allows a timer, for example, to be started, + # and still allow the loop to end if there is nothing + # else to do. see test__order.TestSleep0 for one example. + self._watcher_ffi_unref() + + def _watcher_ffi_stop_ref(self): + self._watcher_ffi_ref() + + # A string identifying the type of libev object we watch, e.g., 'ev_io' + # This should be a class attribute. + _watcher_type = None + # A class attribute that is the callback on the libev object that init's the C struct, + # e.g., libev.ev_io_init. If None, will be set by _init_subclasses. + _watcher_init = None + # A class attribute that is the callback on the libev object that starts the C watcher, + # e.g., libev.ev_io_start. If None, will be set by _init_subclasses. + _watcher_start = None + # A class attribute that is the callback on the libev object that stops the C watcher, + # e.g., libev.ev_io_stop. If None, will be set by _init_subclasses. + _watcher_stop = None + # A cffi ctype object identifying the struct pointer we create. + # This is a class attribute set based on the _watcher_type + _watcher_struct_pointer_type = None + # The attribute of the libev object identifying the custom + # callback function for this type of watcher. This is a class + # attribute set based on the _watcher_type in _init_subclasses. + _watcher_callback = None + _watcher_is_active = None + + def close(self): + if self._watcher is None: + return + + self.stop() + _watcher = self._watcher + self._watcher = None + self._watcher_set_data(_watcher, self._FFI.NULL) # pylint: disable=no-member + self._watcher_ffi_close(_watcher) + self.loop = None + + def _watcher_set_data(self, the_watcher, data): + # This abstraction exists for the sole benefit of + # libuv.watcher.stat, which "subclasses" uv_handle_t. + # Can we do something to avoid this extra function call? + the_watcher.data = data + return data + + def __enter__(self): + return self + + def __exit__(self, t, v, tb): + self.close() + + if ALLOW_WATCHER_DEL: + def __del__(self): + if self._watcher: + tb = get_object_traceback(self) + tb_msg = '' + if tb is not None: + tb_msg = '\n'.join(tb.format()) + tb_msg = '\nTraceback:\n' + tb_msg + warnings.warn("Failed to close watcher %r%s" % (self, tb_msg), + ResourceWarning) + + # may fail if __init__ did; will be harmlessly printed + self.close() + + + def __repr__(self): + formats = self._format() + result = "<%s at 0x%x%s" % (self.__class__.__name__, id(self), formats) + if self.pending: + result += " pending" + if self.callback is not None: + fself = getattr(self.callback, '__self__', None) + if fself is self: + result += " callback=<bound method %s of self>" % (self.callback.__name__) + else: + result += " callback=%r" % (self.callback, ) + if self.args is not None: + result += " args=%r" % (self.args, ) + if self.callback is None and self.args is None: + result += " stopped" + result += " watcher=%s" % (self._watcher) + result += " handle=%s" % (self._watcher_handle) + result += " ref=%s" % (self.ref) + return result + ">" + + @property + def _watcher_handle(self): + if self._watcher: + return self._watcher.data + + def _format(self): + return '' + + @property + def ref(self): + raise NotImplementedError() + + def _get_callback(self): + return self._callback + + def _set_callback(self, cb): + if not callable(cb) and cb is not None: + raise TypeError("Expected callable, not %r" % (cb, )) + if cb is None: + if '_callback' in self.__dict__: + del self._callback + else: + self._callback = cb + callback = property(_get_callback, _set_callback) + + def _get_args(self): + return self._args + + def _set_args(self, args): + if not isinstance(args, tuple) and args is not None: + raise TypeError("args must be a tuple or None") + if args is None: + if '_args' in self.__dict__: + del self._args + else: + self._args = args + + args = property(_get_args, _set_args) + + def start(self, callback, *args): + if callback is None: + raise TypeError('callback must be callable, not None') + self.callback = callback + self.args = args or _NOARGS + self.loop._keepaliveset.add(self) + self._handle = self._watcher_set_data(self._watcher, type(self).new_handle(self)) # pylint:disable=no-member + self._watcher_ffi_start() + self._watcher_ffi_start_unref() + + def stop(self): + if self._callback is None: + assert self.loop is None or self not in self.loop._keepaliveset + return + self._watcher_ffi_stop_ref() + self._watcher_ffi_stop() + self.loop._keepaliveset.discard(self) + self._handle = None + self._watcher_set_data(self._watcher, self._FFI.NULL) # pylint:disable=no-member + self.callback = None + self.args = None + + def _get_priority(self): + return None + + @not_while_active + def _set_priority(self, priority): + pass + + priority = property(_get_priority, _set_priority) + + + @property + def active(self): + if self._watcher is not None and self._watcher_is_active(self._watcher): + return True + return False + + @property + def pending(self): + return False + +watcher = AbstractWatcherType('watcher', (object,), dict(watcher.__dict__)) + +class IoMixin(object): + + EVENT_MASK = 0 + + def __init__(self, loop, fd, events, ref=True, priority=None, _args=None): + # Win32 only works with sockets, and only when we use libuv, because + # we don't use _open_osfhandle. See libuv/watchers.py:io for a description. + if fd < 0: + raise ValueError('fd must be non-negative: %r' % fd) + if events & ~self.EVENT_MASK: + raise ValueError('illegal event mask: %r' % events) + self._fd = fd + super(IoMixin, self).__init__(loop, ref=ref, priority=priority, + args=_args or (fd, events)) + + def start(self, callback, *args, **kwargs): + args = args or _NOARGS + if kwargs.get('pass_events'): + args = (GEVENT_CORE_EVENTS, ) + args + super(IoMixin, self).start(callback, *args) + + def _format(self): + return ' fd=%d' % self._fd + +class TimerMixin(object): + _watcher_type = 'timer' + + def __init__(self, loop, after=0.0, repeat=0.0, ref=True, priority=None): + if repeat < 0.0: + raise ValueError("repeat must be positive or zero: %r" % repeat) + self._after = after + self._repeat = repeat + super(TimerMixin, self).__init__(loop, ref=ref, priority=priority, args=(after, repeat)) + + def start(self, callback, *args, **kw): + update = kw.get("update", self.loop.starting_timer_may_update_loop_time) + if update: + # Quoth the libev doc: "This is a costly operation and is + # usually done automatically within ev_run(). This + # function is rarely useful, but when some event callback + # runs for a very long time without entering the event + # loop, updating libev's idea of the current time is a + # good idea." + + # 1.3 changed the default for this to False *unless* the loop is + # running a callback; see libuv for details. Note that + # starting Timeout objects still sets this to true. + + self.loop.update_now() + super(TimerMixin, self).start(callback, *args) + + def again(self, callback, *args, **kw): + raise NotImplementedError() + + +class SignalMixin(object): + _watcher_type = 'signal' + + def __init__(self, loop, signalnum, ref=True, priority=None): + if signalnum < 1 or signalnum >= signalmodule.NSIG: + raise ValueError('illegal signal number: %r' % signalnum) + # still possible to crash on one of libev's asserts: + # 1) "libev: ev_signal_start called with illegal signal number" + # EV_NSIG might be different from signal.NSIG on some platforms + # 2) "libev: a signal must not be attached to two different loops" + # we probably could check that in LIBEV_EMBED mode, but not in general + self._signalnum = signalnum + super(SignalMixin, self).__init__(loop, ref=ref, priority=priority, args=(signalnum, )) + + +class IdleMixin(object): + _watcher_type = 'idle' + + +class PrepareMixin(object): + _watcher_type = 'prepare' + + +class CheckMixin(object): + _watcher_type = 'check' + + +class ForkMixin(object): + _watcher_type = 'fork' + + +class AsyncMixin(object): + _watcher_type = 'async' + + def send(self): + raise NotImplementedError() + + @property + def pending(self): + raise NotImplementedError() + + +class ChildMixin(object): + + # hack for libuv which doesn't extend watcher + _CALL_SUPER_INIT = True + + def __init__(self, loop, pid, trace=0, ref=True): + if not loop.default: + raise TypeError('child watchers are only available on the default loop') + loop.install_sigchld() + self._pid = pid + if self._CALL_SUPER_INIT: + super(ChildMixin, self).__init__(loop, ref=ref, args=(pid, trace)) + + def _format(self): + return ' pid=%r rstatus=%r' % (self.pid, self.rstatus) + + @property + def pid(self): + return self._pid + + @property + def rpid(self): + # The received pid, the result of the waitpid() call. + return self._rpid + + _rpid = None + _rstatus = 0 + + @property + def rstatus(self): + return self._rstatus + +class StatMixin(object): + + @staticmethod + def _encode_path(path): + return fsencode(path) + + def __init__(self, _loop, path, interval=0.0, ref=True, priority=None): + # Store the encoded path in the same attribute that corecext does + self._paths = self._encode_path(path) + + # Keep the original path to avoid re-encoding, especially on Python 3 + self._path = path + + # Although CFFI would automatically convert a bytes object into a char* when + # calling ev_stat_init(..., char*, ...), on PyPy the char* pointer is not + # guaranteed to live past the function call. On CPython, only with a constant/interned + # bytes object is the pointer guaranteed to last path the function call. (And since + # Python 3 is pretty much guaranteed to produce a newly-encoded bytes object above, thats + # rarely the case). Therefore, we must keep a reference to the produced cdata object + # so that the struct ev_stat_watcher's `path` pointer doesn't become invalid/deallocated + self._cpath = self._FFI.new('char[]', self._paths) + + self._interval = interval + super(StatMixin, self).__init__(_loop, ref=ref, priority=priority, + args=(self._cpath, + interval)) + + @property + def path(self): + return self._path + + @property + def attr(self): + raise NotImplementedError + + @property + def prev(self): + raise NotImplementedError + + @property + def interval(self): + return self._interval |