diff options
Diffstat (limited to 'python/gevent/_ffi')
-rw-r--r-- | python/gevent/_ffi/__init__.py | 27 | ||||
-rw-r--r-- | python/gevent/_ffi/callback.py | 58 | ||||
-rw-r--r-- | python/gevent/_ffi/loop.py | 709 | ||||
-rw-r--r-- | python/gevent/_ffi/watcher.py | 641 |
4 files changed, 0 insertions, 1435 deletions
diff --git a/python/gevent/_ffi/__init__.py b/python/gevent/_ffi/__init__.py deleted file mode 100644 index 56f1e96..0000000 --- a/python/gevent/_ffi/__init__.py +++ /dev/null @@ -1,27 +0,0 @@ -""" -Internal helpers for FFI implementations. -""" -from __future__ import print_function, absolute_import - -import os -import sys - -def _dbg(*args, **kwargs): - # pylint:disable=unused-argument - pass - -#_dbg = print - -def _pid_dbg(*args, **kwargs): - kwargs['file'] = sys.stderr - print(os.getpid(), *args, **kwargs) - -CRITICAL = 1 -ERROR = 3 -DEBUG = 5 -TRACE = 9 - -GEVENT_DEBUG_LEVEL = vars()[os.getenv("GEVENT_DEBUG", 'CRITICAL').upper()] - -if GEVENT_DEBUG_LEVEL >= TRACE: - _dbg = _pid_dbg diff --git a/python/gevent/_ffi/callback.py b/python/gevent/_ffi/callback.py deleted file mode 100644 index df59a9f..0000000 --- a/python/gevent/_ffi/callback.py +++ /dev/null @@ -1,58 +0,0 @@ -from __future__ import absolute_import, print_function - -__all__ = [ - 'callback', -] - - -# For times when *args is captured but often not passed (empty), -# we can avoid keeping the new tuple that was created for *args -# around by using a constant. -_NOARGS = () - - -class callback(object): - - __slots__ = ('callback', 'args') - - def __init__(self, cb, args): - self.callback = cb - self.args = args or _NOARGS - - def stop(self): - self.callback = None - self.args = None - - close = stop - - # Note that __nonzero__ and pending are different - # bool() is used in contexts where we need to know whether to schedule another callback, - # so it's true if it's pending or currently running - # 'pending' has the same meaning as libev watchers: it is cleared before actually - # running the callback - - def __nonzero__(self): - # it's nonzero if it's pending or currently executing - # NOTE: This depends on loop._run_callbacks setting the args property - # to None. - return self.args is not None - __bool__ = __nonzero__ - - @property - def pending(self): - return self.callback is not None - - def _format(self): - return '' - - def __repr__(self): - result = "<%s at 0x%x" % (self.__class__.__name__, id(self)) - if self.pending: - result += " pending" - if self.callback is not None: - result += " callback=%r" % (self.callback, ) - if self.args is not None: - result += " args=%r" % (self.args, ) - if self.callback is None and self.args is None: - result += " stopped" - return result + ">" diff --git a/python/gevent/_ffi/loop.py b/python/gevent/_ffi/loop.py deleted file mode 100644 index efe4462..0000000 --- a/python/gevent/_ffi/loop.py +++ /dev/null @@ -1,709 +0,0 @@ -""" -Basic loop implementation for ffi-based cores. -""" -# pylint: disable=too-many-lines, protected-access, redefined-outer-name, not-callable -from __future__ import absolute_import, print_function - -from collections import deque -import sys -import os -import traceback - -from gevent._ffi import _dbg -from gevent._ffi import GEVENT_DEBUG_LEVEL -from gevent._ffi import TRACE -from gevent._ffi.callback import callback -from gevent._compat import PYPY - -from gevent import getswitchinterval - -__all__ = [ - 'AbstractLoop', - 'assign_standard_callbacks', -] - - -class _EVENTSType(object): - def __repr__(self): - return 'gevent.core.EVENTS' - -EVENTS = GEVENT_CORE_EVENTS = _EVENTSType() - - -##### -## Note on CFFI objects, callbacks and the lifecycle of watcher objects -# -# Each subclass of `watcher` allocates a C structure of the -# appropriate type e.g., struct gevent_ev_io and holds this pointer in -# its `_gwatcher` attribute. When that watcher instance is garbage -# collected, then the C structure is also freed. The C structure is -# passed to libev from the watcher's start() method and then to the -# appropriate C callback function, e.g., _gevent_ev_io_callback, which -# passes it back to python's _python_callback where we need the -# watcher instance. Therefore, as long as that callback is active (the -# watcher is started), the watcher instance must not be allowed to get -# GC'd---any access at the C level or even the FFI level to the freed -# memory could crash the process. -# -# However, the typical idiom calls for writing something like this: -# loop.io(fd, python_cb).start() -# thus forgetting the newly created watcher subclass and allowing it to be immediately -# GC'd. To combat this, when the watcher is started, it places itself into the loop's -# `_keepaliveset`, and it only removes itself when the watcher's `stop()` method is called. -# Often, this is the *only* reference keeping the watcher object, and hence its C structure, -# alive. -# -# This is slightly complicated by the fact that the python-level -# callback, called from the C callback, could choose to manually stop -# the watcher. When we return to the C level callback, we now have an -# invalid pointer, and attempting to pass it back to Python (e.g., to -# handle an error) could crash. Hence, _python_callback, -# _gevent_io_callback, and _python_handle_error cooperate to make sure -# that the watcher instance stays in the loops `_keepaliveset` while -# the C code could be running---and if it gets removed, to not call back -# to Python again. -# See also https://github.com/gevent/gevent/issues/676 -#### -class AbstractCallbacks(object): - - - def __init__(self, ffi): - self.ffi = ffi - self.callbacks = [] - if GEVENT_DEBUG_LEVEL < TRACE: - self.from_handle = ffi.from_handle - - def from_handle(self, handle): # pylint:disable=method-hidden - x = self.ffi.from_handle(handle) - return x - - def python_callback(self, handle, revents): - """ - Returns an integer having one of three values: - - - -1 - An exception occurred during the callback and you must call - :func:`_python_handle_error` to deal with it. The Python watcher - object will have the exception tuple saved in ``_exc_info``. - - 1 - Everything went according to plan. You should check to see if the libev - watcher is still active, and call :func:`python_stop` if it is not. This will - clean up the memory. Finding the watcher still active at the event loop level, - but not having stopped itself at the gevent level is a buggy scenario and - shouldn't happen. - - 2 - Everything went according to plan, but the watcher has already - been stopped. Its memory may no longer be valid. - - This function should never return 0, as that's the default value that - Python exceptions will produce. - """ - #print("Running callback", handle) - orig_ffi_watcher = None - try: - # Even dereferencing the handle needs to be inside the try/except; - # if we don't return normally (e.g., a signal) then we wind up going - # to the 'onerror' handler (unhandled_onerror), which - # is not what we want; that can permanently wedge the loop depending - # on which callback was executing. - # XXX: See comments in that function. We may be able to restart and do better? - if not handle: - # Hmm, a NULL handle. That's not supposed to happen. - # We can easily get into a loop if we deref it and allow that - # to raise. - _dbg("python_callback got null handle") - return 1 - the_watcher = self.from_handle(handle) - orig_ffi_watcher = the_watcher._watcher - args = the_watcher.args - if args is None: - # Legacy behaviour from corecext: convert None into () - # See test__core_watcher.py - args = _NOARGS - if args and args[0] == GEVENT_CORE_EVENTS: - args = (revents, ) + args[1:] - #print("Calling function", the_watcher.callback, args) - the_watcher.callback(*args) - except: # pylint:disable=bare-except - _dbg("Got exception servicing watcher with handle", handle, sys.exc_info()) - # It's possible for ``the_watcher`` to be undefined (UnboundLocalError) - # if we threw an exception (signal) on the line that created that variable. - # This is typically the case with a signal under libuv - try: - the_watcher - except UnboundLocalError: - the_watcher = self.from_handle(handle) - the_watcher._exc_info = sys.exc_info() - # Depending on when the exception happened, the watcher - # may or may not have been stopped. We need to make sure its - # memory stays valid so we can stop it at the ev level if needed. - the_watcher.loop._keepaliveset.add(the_watcher) - return -1 - else: - if (the_watcher.loop is not None - and the_watcher in the_watcher.loop._keepaliveset - and the_watcher._watcher is orig_ffi_watcher): - # It didn't stop itself, *and* it didn't stop itself, reset - # its watcher, and start itself again. libuv's io watchers MAY - # do that. - # The normal, expected scenario when we find the watcher still - # in the keepaliveset is that it is still active at the event loop - # level, so we don't expect that python_stop gets called. - #_dbg("The watcher has not stopped itself, possibly still active", the_watcher) - return 1 - return 2 # it stopped itself - - def python_handle_error(self, handle, _revents): - _dbg("Handling error for handle", handle) - if not handle: - return - try: - watcher = self.from_handle(handle) - exc_info = watcher._exc_info - del watcher._exc_info - # In the past, we passed the ``watcher`` itself as the context, - # which typically meant that the Hub would just print - # the exception. This is a problem because sometimes we can't - # detect signals until late in ``python_callback``; specifically, - # test_selectors.py:DefaultSelectorTest.test_select_interrupt_exc - # installs a SIGALRM handler that raises an exception. That exception can happen - # before we enter ``python_callback`` or at any point within it because of the way - # libuv swallows signals. By passing None, we get the exception prapagated into - # the main greenlet (which is probably *also* not what we always want, but - # I see no way to distinguish the cases). - watcher.loop.handle_error(None, *exc_info) - finally: - # XXX Since we're here on an error condition, and we - # made sure that the watcher object was put in loop._keepaliveset, - # what about not stopping the watcher? Looks like a possible - # memory leak? - # XXX: This used to do "if revents & (libev.EV_READ | libev.EV_WRITE)" - # before stopping. Why? - try: - watcher.stop() - except: # pylint:disable=bare-except - watcher.loop.handle_error(watcher, *sys.exc_info()) - return # pylint:disable=lost-exception - - def unhandled_onerror(self, t, v, tb): - # This is supposed to be called for signals, etc. - # This is the onerror= value for CFFI. - # If we return None, C will get a value of 0/NULL; - # if we raise, CFFI will print the exception and then - # return 0/NULL; (unless error= was configured) - # If things go as planned, we return the value that asks - # C to call back and check on if the watcher needs to be closed or - # not. - - # XXX: TODO: Could this cause events to be lost? Maybe we need to return - # a value that causes the C loop to try the callback again? - # at least for signals under libuv, which are delivered at very odd times. - # Hopefully the event still shows up when we poll the next time. - watcher = None - handle = tb.tb_frame.f_locals['handle'] if tb is not None else None - if handle: # handle could be NULL - watcher = self.from_handle(handle) - if watcher is not None: - watcher.loop.handle_error(None, t, v, tb) - return 1 - - # Raising it causes a lot of noise from CFFI - print("WARNING: gevent: Unhandled error with no watcher", - file=sys.stderr) - traceback.print_exception(t, v, tb) - - def python_stop(self, handle): - if not handle: # pragma: no cover - print( - "WARNING: gevent: Unable to dereference handle; not stopping watcher. " - "Native resources may leak. This is most likely a bug in gevent.", - file=sys.stderr) - # The alternative is to crash with no helpful information - # NOTE: Raising exceptions here does nothing, they're swallowed by CFFI. - # Since the C level passed in a null pointer, even dereferencing the handle - # will just produce some exceptions. - return - watcher = self.from_handle(handle) - watcher.stop() - - if not PYPY: - def python_check_callback(self, watcher_ptr): # pylint:disable=unused-argument - # If we have the onerror callback, this is a no-op; all the real - # work to rethrow the exception is done by the onerror callback - - # NOTE: Unlike the rest of the functions, this is called with a pointer - # to the C level structure, *not* a pointer to the void* that represents a - # <cdata> for the Python Watcher object. - pass - else: # PyPy - # On PyPy, we need the function to have some sort of body, otherwise - # the signal exceptions don't always get caught, *especially* with - # libuv (however, there's no reason to expect this to only be a libuv - # issue; it's just that we don't depend on the periodic signal timer - # under libev, so the issue is much more pronounced under libuv) - # test_socket's test_sendall_interrupted can hang. - # See https://github.com/gevent/gevent/issues/1112 - - def python_check_callback(self, watcher_ptr): # pylint:disable=unused-argument - # Things we've tried that *don't* work: - # greenlet.getcurrent() - # 1 + 1 - try: - raise MemoryError() - except MemoryError: - pass - - def python_prepare_callback(self, watcher_ptr): - loop = self._find_loop_from_c_watcher(watcher_ptr) - if loop is None: # pragma: no cover - print("WARNING: gevent: running prepare callbacks from a destroyed handle: ", - watcher_ptr) - return - loop._run_callbacks() - - def check_callback_onerror(self, t, v, tb): - watcher_ptr = tb.tb_frame.f_locals['watcher_ptr'] if tb is not None else None - if watcher_ptr: - loop = self._find_loop_from_c_watcher(watcher_ptr) - if loop is not None: - # None as the context argument causes the exception to be raised - # in the main greenlet. - loop.handle_error(None, t, v, tb) - return None - raise v # Let CFFI print - - def _find_loop_from_c_watcher(self, watcher_ptr): - raise NotImplementedError() - - - -def assign_standard_callbacks(ffi, lib, callbacks_class, extras=()): # pylint:disable=unused-argument - # callbacks keeps these cdata objects alive at the python level - callbacks = callbacks_class(ffi) - extras = tuple([(getattr(callbacks, name), error) for name, error in extras]) - for (func, error_func) in ((callbacks.python_callback, None), - (callbacks.python_handle_error, None), - (callbacks.python_stop, None), - (callbacks.python_check_callback, - callbacks.check_callback_onerror), - (callbacks.python_prepare_callback, - callbacks.check_callback_onerror)) + extras: - # The name of the callback function matches the 'extern Python' declaration. - error_func = error_func or callbacks.unhandled_onerror - callback = ffi.def_extern(onerror=error_func)(func) - # keep alive the cdata - # (def_extern returns the original function, and it requests that - # the function be "global", so maybe it keeps a hard reference to it somewhere now - # unlike ffi.callback(), and we don't need to do this?) - callbacks.callbacks.append(callback) - - # At this point, the library C variable (static function, actually) - # is filled in. - - return callbacks - - -if sys.version_info[0] >= 3: - basestring = (bytes, str) - integer_types = (int,) -else: - import __builtin__ # pylint:disable=import-error - basestring = (__builtin__.basestring,) - integer_types = (int, __builtin__.long) - - - - -_NOARGS = () - -CALLBACK_CHECK_COUNT = 50 - -class AbstractLoop(object): - # pylint:disable=too-many-public-methods,too-many-instance-attributes - - error_handler = None - - _CHECK_POINTER = None - - _TIMER_POINTER = None - _TIMER_CALLBACK_SIG = None - - _PREPARE_POINTER = None - - starting_timer_may_update_loop_time = False - - # Subclasses should set this in __init__ to reflect - # whether they were the default loop. - _default = None - - def __init__(self, ffi, lib, watchers, flags=None, default=None): - self._ffi = ffi - self._lib = lib - self._ptr = None - self._handle_to_self = self._ffi.new_handle(self) # XXX: Reference cycle? - self._watchers = watchers - self._in_callback = False - self._callbacks = deque() - # Stores python watcher objects while they are started - self._keepaliveset = set() - self._init_loop_and_aux_watchers(flags, default) - - - def _init_loop_and_aux_watchers(self, flags=None, default=None): - - self._ptr = self._init_loop(flags, default) - - - # self._check is a watcher that runs in each iteration of the - # mainloop, just after the blocking call. It's point is to handle - # signals. It doesn't run watchers or callbacks, it just exists to give - # CFFI a chance to raise signal exceptions so we can handle them. - self._check = self._ffi.new(self._CHECK_POINTER) - self._check.data = self._handle_to_self - self._init_and_start_check() - - # self._prepare is a watcher that runs in each iteration of the mainloop, - # just before the blocking call. It's where we run deferred callbacks - # from self.run_callback. This cooperates with _setup_for_run_callback() - # to schedule self._timer0 if needed. - self._prepare = self._ffi.new(self._PREPARE_POINTER) - self._prepare.data = self._handle_to_self - self._init_and_start_prepare() - - # A timer we start and stop on demand. If we have callbacks, - # too many to run in one iteration of _run_callbacks, we turn this - # on so as to have the next iteration of the run loop return to us - # as quickly as possible. - # TODO: There may be a more efficient way to do this using ev_timer_again; - # see the "ev_timer" section of the ev manpage (http://linux.die.net/man/3/ev) - # Alternatively, setting the ev maximum block time may also work. - self._timer0 = self._ffi.new(self._TIMER_POINTER) - self._timer0.data = self._handle_to_self - self._init_callback_timer() - - # TODO: We may be able to do something nicer and use the existing python_callback - # combined with onerror and the class check/timer/prepare to simplify things - # and unify our handling - - def _init_loop(self, flags, default): - """ - Called by __init__ to create or find the loop. The return value - is assigned to self._ptr. - """ - raise NotImplementedError() - - def _init_and_start_check(self): - raise NotImplementedError() - - def _init_and_start_prepare(self): - raise NotImplementedError() - - def _init_callback_timer(self): - raise NotImplementedError() - - def _stop_callback_timer(self): - raise NotImplementedError() - - def _start_callback_timer(self): - raise NotImplementedError() - - def _check_callback_handle_error(self, t, v, tb): - self.handle_error(None, t, v, tb) - - def _run_callbacks(self): # pylint:disable=too-many-branches - # When we're running callbacks, its safe for timers to - # update the notion of the current time (because if we're here, - # we're not running in a timer callback that may let other timers - # run; this is mostly an issue for libuv). - - # That's actually a bit of a lie: on libev, self._timer0 really is - # a timer, and so sometimes this is running in a timer callback, not - # a prepare callback. But that's OK, libev doesn't suffer from cascading - # timer expiration and its safe to update the loop time at any - # moment there. - self.starting_timer_may_update_loop_time = True - try: - count = CALLBACK_CHECK_COUNT - now = self.now() - expiration = now + getswitchinterval() - self._stop_callback_timer() - while self._callbacks: - cb = self._callbacks.popleft() # pylint:disable=assignment-from-no-return - count -= 1 - self.unref() # XXX: libuv doesn't have a global ref count! - callback = cb.callback - cb.callback = None - args = cb.args - if callback is None or args is None: - # it's been stopped - continue - - try: - callback(*args) - except: # pylint:disable=bare-except - # If we allow an exception to escape this method (while we are running the ev callback), - # then CFFI will print the error and libev will continue executing. - # There are two problems with this. The first is that the code after - # the loop won't run. The second is that any remaining callbacks scheduled - # for this loop iteration will be silently dropped; they won't run, but they'll - # also not be *stopped* (which is not a huge deal unless you're looking for - # consistency or checking the boolean/pending status; the loop doesn't keep - # a reference to them like it does to watchers...*UNLESS* the callback itself had - # a reference to a watcher; then I don't know what would happen, it depends on - # the state of the watcher---a leak or crash is not totally inconceivable). - # The Cython implementation in core.ppyx uses gevent_call from callbacks.c - # to run the callback, which uses gevent_handle_error to handle any errors the - # Python callback raises...it unconditionally simply prints any error raised - # by loop.handle_error and clears it, so callback handling continues. - # We take a similar approach (but are extra careful about printing) - try: - self.handle_error(cb, *sys.exc_info()) - except: # pylint:disable=bare-except - try: - print("Exception while handling another error", file=sys.stderr) - traceback.print_exc() - except: # pylint:disable=bare-except - pass # Nothing we can do here - finally: - # NOTE: this must be reset here, because cb.args is used as a flag in - # the callback class so that bool(cb) of a callback that has been run - # becomes False - cb.args = None - - # We've finished running one group of callbacks - # but we may have more, so before looping check our - # switch interval. - if count == 0 and self._callbacks: - count = CALLBACK_CHECK_COUNT - self.update_now() - if self.now() >= expiration: - now = 0 - break - - # Update the time before we start going again, if we didn't - # just do so. - if now != 0: - self.update_now() - - if self._callbacks: - self._start_callback_timer() - finally: - self.starting_timer_may_update_loop_time = False - - def _stop_aux_watchers(self): - raise NotImplementedError() - - def destroy(self): - if self._ptr: - try: - if not self._can_destroy_loop(self._ptr): - return False - self._stop_aux_watchers() - self._destroy_loop(self._ptr) - finally: - # not ffi.NULL, we don't want something that can be - # passed to C and crash later. This will create nice friendly - # TypeError from CFFI. - self._ptr = None - del self._handle_to_self - del self._callbacks - del self._keepaliveset - - return True - - def _can_destroy_loop(self, ptr): - raise NotImplementedError() - - def _destroy_loop(self, ptr): - raise NotImplementedError() - - @property - def ptr(self): - return self._ptr - - @property - def WatcherType(self): - return self._watchers.watcher - - @property - def MAXPRI(self): - return 1 - - @property - def MINPRI(self): - return 1 - - def _handle_syserr(self, message, errno): - try: - errno = os.strerror(errno) - except: # pylint:disable=bare-except - traceback.print_exc() - try: - message = '%s: %s' % (message, errno) - except: # pylint:disable=bare-except - traceback.print_exc() - self.handle_error(None, SystemError, SystemError(message), None) - - def handle_error(self, context, type, value, tb): - handle_error = None - error_handler = self.error_handler - if error_handler is not None: - # we do want to do getattr every time so that setting Hub.handle_error property just works - handle_error = getattr(error_handler, 'handle_error', error_handler) - handle_error(context, type, value, tb) - else: - self._default_handle_error(context, type, value, tb) - - def _default_handle_error(self, context, type, value, tb): # pylint:disable=unused-argument - # note: Hub sets its own error handler so this is not used by gevent - # this is here to make core.loop usable without the rest of gevent - # Should cause the loop to stop running. - traceback.print_exception(type, value, tb) - - - def run(self, nowait=False, once=False): - raise NotImplementedError() - - def reinit(self): - raise NotImplementedError() - - def ref(self): - # XXX: libuv doesn't do it this way - raise NotImplementedError() - - def unref(self): - raise NotImplementedError() - - def break_(self, how=None): - raise NotImplementedError() - - def verify(self): - pass - - def now(self): - raise NotImplementedError() - - def update_now(self): - raise NotImplementedError() - - def update(self): - import warnings - warnings.warn("'update' is deprecated; use 'update_now'", - DeprecationWarning, - stacklevel=2) - self.update_now() - - def __repr__(self): - return '<%s at 0x%x %s>' % (self.__class__.__name__, id(self), self._format()) - - @property - def default(self): - return self._default if self._ptr else False - - @property - def iteration(self): - return -1 - - @property - def depth(self): - return -1 - - @property - def backend_int(self): - return 0 - - @property - def backend(self): - return "default" - - @property - def pendingcnt(self): - return 0 - - def io(self, fd, events, ref=True, priority=None): - return self._watchers.io(self, fd, events, ref, priority) - - def timer(self, after, repeat=0.0, ref=True, priority=None): - return self._watchers.timer(self, after, repeat, ref, priority) - - def signal(self, signum, ref=True, priority=None): - return self._watchers.signal(self, signum, ref, priority) - - def idle(self, ref=True, priority=None): - return self._watchers.idle(self, ref, priority) - - def prepare(self, ref=True, priority=None): - return self._watchers.prepare(self, ref, priority) - - def check(self, ref=True, priority=None): - return self._watchers.check(self, ref, priority) - - def fork(self, ref=True, priority=None): - return self._watchers.fork(self, ref, priority) - - def async_(self, ref=True, priority=None): - return self._watchers.async_(self, ref, priority) - - # Provide BWC for those that can use 'async' as is - locals()['async'] = async_ - - if sys.platform != "win32": - - def child(self, pid, trace=0, ref=True): - return self._watchers.child(self, pid, trace, ref) - - def install_sigchld(self): - pass - - def stat(self, path, interval=0.0, ref=True, priority=None): - return self._watchers.stat(self, path, interval, ref, priority) - - def callback(self, priority=None): - return callback(self, priority) - - def _setup_for_run_callback(self): - raise NotImplementedError() - - def run_callback(self, func, *args): - # If we happen to already be running callbacks (inside - # _run_callbacks), this could happen almost immediately, - # without the loop cycling. - cb = callback(func, args) - self._callbacks.append(cb) - self._setup_for_run_callback() - - return cb - - def _format(self): - if not self._ptr: - return 'destroyed' - msg = self.backend - if self.default: - msg += ' default' - msg += ' pending=%s' % self.pendingcnt - msg += self._format_details() - return msg - - def _format_details(self): - msg = '' - fileno = self.fileno() # pylint:disable=assignment-from-none - try: - activecnt = self.activecnt - except AttributeError: - activecnt = None - if activecnt is not None: - msg += ' ref=' + repr(activecnt) - if fileno is not None: - msg += ' fileno=' + repr(fileno) - #if sigfd is not None and sigfd != -1: - # msg += ' sigfd=' + repr(sigfd) - return msg - - def fileno(self): - return None - - @property - def activecnt(self): - if not self._ptr: - raise ValueError('operation on destroyed loop') - return 0 diff --git a/python/gevent/_ffi/watcher.py b/python/gevent/_ffi/watcher.py deleted file mode 100644 index 3f880ce..0000000 --- a/python/gevent/_ffi/watcher.py +++ /dev/null @@ -1,641 +0,0 @@ -""" -Useful base classes for watchers. The available -watchers will depend on the specific event loop. -""" -# pylint:disable=not-callable -from __future__ import absolute_import, print_function - -import signal as signalmodule -import functools -import warnings - -from gevent._config import config - -try: - from tracemalloc import get_object_traceback - - def tracemalloc(init): - # PYTHONTRACEMALLOC env var controls this on Python 3. - return init -except ImportError: # Python < 3.4 - - if config.trace_malloc: - # Use the same env var to turn this on for Python 2 - import traceback - - class _TB(object): - __slots__ = ('lines',) - - def __init__(self, lines): - # These end in newlines, which we don't want for consistency - self.lines = [x.rstrip() for x in lines] - - def format(self): - return self.lines - - def tracemalloc(init): - @functools.wraps(init) - def traces(self, *args, **kwargs): - init(self, *args, **kwargs) - self._captured_malloc = _TB(traceback.format_stack()) - return traces - - def get_object_traceback(obj): - return obj._captured_malloc - - else: - def get_object_traceback(_obj): - return None - - def tracemalloc(init): - return init - -from gevent._compat import fsencode - -from gevent._ffi import _dbg # pylint:disable=unused-import -from gevent._ffi import GEVENT_DEBUG_LEVEL -from gevent._ffi import DEBUG -from gevent._ffi.loop import GEVENT_CORE_EVENTS -from gevent._ffi.loop import _NOARGS - -ALLOW_WATCHER_DEL = GEVENT_DEBUG_LEVEL >= DEBUG - -__all__ = [ - -] - -try: - ResourceWarning -except NameError: - class ResourceWarning(Warning): - "Python 2 fallback" - -class _NoWatcherResult(int): - - def __repr__(self): - return "<NoWatcher>" - -_NoWatcherResult = _NoWatcherResult(0) - -def events_to_str(event_field, all_events): - result = [] - for (flag, string) in all_events: - c_flag = flag - if event_field & c_flag: - result.append(string) - event_field = event_field & (~c_flag) - if not event_field: - break - if event_field: - result.append(hex(event_field)) - return '|'.join(result) - - -def not_while_active(func): - @functools.wraps(func) - def nw(self, *args, **kwargs): - if self.active: - raise ValueError("not while active") - func(self, *args, **kwargs) - return nw - -def only_if_watcher(func): - @functools.wraps(func) - def if_w(self): - if self._watcher: - return func(self) - return _NoWatcherResult - return if_w - - -class LazyOnClass(object): - - @classmethod - def lazy(cls, cls_dict, func): - "Put a LazyOnClass object in *cls_dict* with the same name as *func*" - cls_dict[func.__name__] = cls(func) - - def __init__(self, func, name=None): - self.name = name or func.__name__ - self.func = func - - def __get__(self, inst, klass): - if inst is None: # pragma: no cover - return self - - val = self.func(inst) - setattr(klass, self.name, val) - return val - - -class AbstractWatcherType(type): - """ - Base metaclass for watchers. - - To use, you will: - - - subclass the watcher class defined from this type. - - optionally subclass this type - """ - # pylint:disable=bad-mcs-classmethod-argument - - _FFI = None - _LIB = None - - def __new__(cls, name, bases, cls_dict): - if name != 'watcher' and not cls_dict.get('_watcher_skip_ffi'): - cls._fill_watcher(name, bases, cls_dict) - if '__del__' in cls_dict and not ALLOW_WATCHER_DEL: # pragma: no cover - raise TypeError("CFFI watchers are not allowed to have __del__") - return type.__new__(cls, name, bases, cls_dict) - - @classmethod - def _fill_watcher(cls, name, bases, cls_dict): - # TODO: refactor smaller - # pylint:disable=too-many-locals - if name.endswith('_'): - # Strip trailing _ added to avoid keyword duplications - # e.g., async_ - name = name[:-1] - - def _mro_get(attr, bases, error=True): - for b in bases: - try: - return getattr(b, attr) - except AttributeError: - continue - if error: # pragma: no cover - raise AttributeError(attr) - _watcher_prefix = cls_dict.get('_watcher_prefix') or _mro_get('_watcher_prefix', bases) - - if '_watcher_type' not in cls_dict: - watcher_type = _watcher_prefix + '_' + name - cls_dict['_watcher_type'] = watcher_type - elif not cls_dict['_watcher_type'].startswith(_watcher_prefix): - watcher_type = _watcher_prefix + '_' + cls_dict['_watcher_type'] - cls_dict['_watcher_type'] = watcher_type - - active_name = _watcher_prefix + '_is_active' - - def _watcher_is_active(self): - return getattr(self._LIB, active_name) - - LazyOnClass.lazy(cls_dict, _watcher_is_active) - - watcher_struct_name = cls_dict.get('_watcher_struct_name') - if not watcher_struct_name: - watcher_struct_pattern = (cls_dict.get('_watcher_struct_pattern') - or _mro_get('_watcher_struct_pattern', bases, False) - or 'struct %s') - watcher_struct_name = watcher_struct_pattern % (watcher_type,) - - def _watcher_struct_pointer_type(self): - return self._FFI.typeof(watcher_struct_name + ' *') - - LazyOnClass.lazy(cls_dict, _watcher_struct_pointer_type) - - callback_name = (cls_dict.get('_watcher_callback_name') - or _mro_get('_watcher_callback_name', bases, False) - or '_gevent_generic_callback') - - def _watcher_callback(self): - return self._FFI.addressof(self._LIB, callback_name) - - LazyOnClass.lazy(cls_dict, _watcher_callback) - - def _make_meth(name, watcher_name): - def meth(self): - lib_name = self._watcher_type + '_' + name - return getattr(self._LIB, lib_name) - meth.__name__ = watcher_name - return meth - - for meth_name in 'start', 'stop', 'init': - watcher_name = '_watcher' + '_' + meth_name - if watcher_name not in cls_dict: - LazyOnClass.lazy(cls_dict, _make_meth(meth_name, watcher_name)) - - def new_handle(cls, obj): - return cls._FFI.new_handle(obj) - - def new(cls, kind): - return cls._FFI.new(kind) - -class watcher(object): - - _callback = None - _args = None - _watcher = None - # self._handle has a reference to self, keeping it alive. - # We must keep self._handle alive for ffi.from_handle() to be - # able to work. We only fill this in when we are started, - # and when we are stopped we destroy it. - # NOTE: This is a GC cycle, so we keep it around for as short - # as possible. - _handle = None - - @tracemalloc - def __init__(self, _loop, ref=True, priority=None, args=_NOARGS): - self.loop = _loop - self.__init_priority = priority - self.__init_args = args - self.__init_ref = ref - self._watcher_full_init() - - - def _watcher_full_init(self): - priority = self.__init_priority - ref = self.__init_ref - args = self.__init_args - - self._watcher_create(ref) - - if priority is not None: - self._watcher_ffi_set_priority(priority) - - try: - self._watcher_ffi_init(args) - except: - # Let these be GC'd immediately. - # If we keep them around to when *we* are gc'd, - # they're probably invalid, meaning any native calls - # we do then to close() them are likely to fail - self._watcher = None - raise - self._watcher_ffi_set_init_ref(ref) - - @classmethod - def _watcher_ffi_close(cls, ffi_watcher): - pass - - def _watcher_create(self, ref): # pylint:disable=unused-argument - self._watcher = self._watcher_new() - - def _watcher_new(self): - return type(self).new(self._watcher_struct_pointer_type) # pylint:disable=no-member - - def _watcher_ffi_set_init_ref(self, ref): - pass - - def _watcher_ffi_set_priority(self, priority): - pass - - def _watcher_ffi_init(self, args): - raise NotImplementedError() - - def _watcher_ffi_start(self): - raise NotImplementedError() - - def _watcher_ffi_stop(self): - self._watcher_stop(self.loop._ptr, self._watcher) - - def _watcher_ffi_ref(self): - raise NotImplementedError() - - def _watcher_ffi_unref(self): - raise NotImplementedError() - - def _watcher_ffi_start_unref(self): - # While a watcher is active, we don't keep it - # referenced. This allows a timer, for example, to be started, - # and still allow the loop to end if there is nothing - # else to do. see test__order.TestSleep0 for one example. - self._watcher_ffi_unref() - - def _watcher_ffi_stop_ref(self): - self._watcher_ffi_ref() - - # A string identifying the type of libev object we watch, e.g., 'ev_io' - # This should be a class attribute. - _watcher_type = None - # A class attribute that is the callback on the libev object that init's the C struct, - # e.g., libev.ev_io_init. If None, will be set by _init_subclasses. - _watcher_init = None - # A class attribute that is the callback on the libev object that starts the C watcher, - # e.g., libev.ev_io_start. If None, will be set by _init_subclasses. - _watcher_start = None - # A class attribute that is the callback on the libev object that stops the C watcher, - # e.g., libev.ev_io_stop. If None, will be set by _init_subclasses. - _watcher_stop = None - # A cffi ctype object identifying the struct pointer we create. - # This is a class attribute set based on the _watcher_type - _watcher_struct_pointer_type = None - # The attribute of the libev object identifying the custom - # callback function for this type of watcher. This is a class - # attribute set based on the _watcher_type in _init_subclasses. - _watcher_callback = None - _watcher_is_active = None - - def close(self): - if self._watcher is None: - return - - self.stop() - _watcher = self._watcher - self._watcher = None - self._watcher_set_data(_watcher, self._FFI.NULL) # pylint: disable=no-member - self._watcher_ffi_close(_watcher) - self.loop = None - - def _watcher_set_data(self, the_watcher, data): - # This abstraction exists for the sole benefit of - # libuv.watcher.stat, which "subclasses" uv_handle_t. - # Can we do something to avoid this extra function call? - the_watcher.data = data - return data - - def __enter__(self): - return self - - def __exit__(self, t, v, tb): - self.close() - - if ALLOW_WATCHER_DEL: - def __del__(self): - if self._watcher: - tb = get_object_traceback(self) - tb_msg = '' - if tb is not None: - tb_msg = '\n'.join(tb.format()) - tb_msg = '\nTraceback:\n' + tb_msg - warnings.warn("Failed to close watcher %r%s" % (self, tb_msg), - ResourceWarning) - - # may fail if __init__ did; will be harmlessly printed - self.close() - - - def __repr__(self): - formats = self._format() - result = "<%s at 0x%x%s" % (self.__class__.__name__, id(self), formats) - if self.pending: - result += " pending" - if self.callback is not None: - fself = getattr(self.callback, '__self__', None) - if fself is self: - result += " callback=<bound method %s of self>" % (self.callback.__name__) - else: - result += " callback=%r" % (self.callback, ) - if self.args is not None: - result += " args=%r" % (self.args, ) - if self.callback is None and self.args is None: - result += " stopped" - result += " watcher=%s" % (self._watcher) - result += " handle=%s" % (self._watcher_handle) - result += " ref=%s" % (self.ref) - return result + ">" - - @property - def _watcher_handle(self): - if self._watcher: - return self._watcher.data - - def _format(self): - return '' - - @property - def ref(self): - raise NotImplementedError() - - def _get_callback(self): - return self._callback - - def _set_callback(self, cb): - if not callable(cb) and cb is not None: - raise TypeError("Expected callable, not %r" % (cb, )) - if cb is None: - if '_callback' in self.__dict__: - del self._callback - else: - self._callback = cb - callback = property(_get_callback, _set_callback) - - def _get_args(self): - return self._args - - def _set_args(self, args): - if not isinstance(args, tuple) and args is not None: - raise TypeError("args must be a tuple or None") - if args is None: - if '_args' in self.__dict__: - del self._args - else: - self._args = args - - args = property(_get_args, _set_args) - - def start(self, callback, *args): - if callback is None: - raise TypeError('callback must be callable, not None') - self.callback = callback - self.args = args or _NOARGS - self.loop._keepaliveset.add(self) - self._handle = self._watcher_set_data(self._watcher, type(self).new_handle(self)) # pylint:disable=no-member - self._watcher_ffi_start() - self._watcher_ffi_start_unref() - - def stop(self): - if self._callback is None: - assert self.loop is None or self not in self.loop._keepaliveset - return - self._watcher_ffi_stop_ref() - self._watcher_ffi_stop() - self.loop._keepaliveset.discard(self) - self._handle = None - self._watcher_set_data(self._watcher, self._FFI.NULL) # pylint:disable=no-member - self.callback = None - self.args = None - - def _get_priority(self): - return None - - @not_while_active - def _set_priority(self, priority): - pass - - priority = property(_get_priority, _set_priority) - - - @property - def active(self): - if self._watcher is not None and self._watcher_is_active(self._watcher): - return True - return False - - @property - def pending(self): - return False - -watcher = AbstractWatcherType('watcher', (object,), dict(watcher.__dict__)) - -class IoMixin(object): - - EVENT_MASK = 0 - - def __init__(self, loop, fd, events, ref=True, priority=None, _args=None): - # Win32 only works with sockets, and only when we use libuv, because - # we don't use _open_osfhandle. See libuv/watchers.py:io for a description. - if fd < 0: - raise ValueError('fd must be non-negative: %r' % fd) - if events & ~self.EVENT_MASK: - raise ValueError('illegal event mask: %r' % events) - self._fd = fd - super(IoMixin, self).__init__(loop, ref=ref, priority=priority, - args=_args or (fd, events)) - - def start(self, callback, *args, **kwargs): - args = args or _NOARGS - if kwargs.get('pass_events'): - args = (GEVENT_CORE_EVENTS, ) + args - super(IoMixin, self).start(callback, *args) - - def _format(self): - return ' fd=%d' % self._fd - -class TimerMixin(object): - _watcher_type = 'timer' - - def __init__(self, loop, after=0.0, repeat=0.0, ref=True, priority=None): - if repeat < 0.0: - raise ValueError("repeat must be positive or zero: %r" % repeat) - self._after = after - self._repeat = repeat - super(TimerMixin, self).__init__(loop, ref=ref, priority=priority, args=(after, repeat)) - - def start(self, callback, *args, **kw): - update = kw.get("update", self.loop.starting_timer_may_update_loop_time) - if update: - # Quoth the libev doc: "This is a costly operation and is - # usually done automatically within ev_run(). This - # function is rarely useful, but when some event callback - # runs for a very long time without entering the event - # loop, updating libev's idea of the current time is a - # good idea." - - # 1.3 changed the default for this to False *unless* the loop is - # running a callback; see libuv for details. Note that - # starting Timeout objects still sets this to true. - - self.loop.update_now() - super(TimerMixin, self).start(callback, *args) - - def again(self, callback, *args, **kw): - raise NotImplementedError() - - -class SignalMixin(object): - _watcher_type = 'signal' - - def __init__(self, loop, signalnum, ref=True, priority=None): - if signalnum < 1 or signalnum >= signalmodule.NSIG: - raise ValueError('illegal signal number: %r' % signalnum) - # still possible to crash on one of libev's asserts: - # 1) "libev: ev_signal_start called with illegal signal number" - # EV_NSIG might be different from signal.NSIG on some platforms - # 2) "libev: a signal must not be attached to two different loops" - # we probably could check that in LIBEV_EMBED mode, but not in general - self._signalnum = signalnum - super(SignalMixin, self).__init__(loop, ref=ref, priority=priority, args=(signalnum, )) - - -class IdleMixin(object): - _watcher_type = 'idle' - - -class PrepareMixin(object): - _watcher_type = 'prepare' - - -class CheckMixin(object): - _watcher_type = 'check' - - -class ForkMixin(object): - _watcher_type = 'fork' - - -class AsyncMixin(object): - _watcher_type = 'async' - - def send(self): - raise NotImplementedError() - - @property - def pending(self): - raise NotImplementedError() - - -class ChildMixin(object): - - # hack for libuv which doesn't extend watcher - _CALL_SUPER_INIT = True - - def __init__(self, loop, pid, trace=0, ref=True): - if not loop.default: - raise TypeError('child watchers are only available on the default loop') - loop.install_sigchld() - self._pid = pid - if self._CALL_SUPER_INIT: - super(ChildMixin, self).__init__(loop, ref=ref, args=(pid, trace)) - - def _format(self): - return ' pid=%r rstatus=%r' % (self.pid, self.rstatus) - - @property - def pid(self): - return self._pid - - @property - def rpid(self): - # The received pid, the result of the waitpid() call. - return self._rpid - - _rpid = None - _rstatus = 0 - - @property - def rstatus(self): - return self._rstatus - -class StatMixin(object): - - @staticmethod - def _encode_path(path): - return fsencode(path) - - def __init__(self, _loop, path, interval=0.0, ref=True, priority=None): - # Store the encoded path in the same attribute that corecext does - self._paths = self._encode_path(path) - - # Keep the original path to avoid re-encoding, especially on Python 3 - self._path = path - - # Although CFFI would automatically convert a bytes object into a char* when - # calling ev_stat_init(..., char*, ...), on PyPy the char* pointer is not - # guaranteed to live past the function call. On CPython, only with a constant/interned - # bytes object is the pointer guaranteed to last path the function call. (And since - # Python 3 is pretty much guaranteed to produce a newly-encoded bytes object above, thats - # rarely the case). Therefore, we must keep a reference to the produced cdata object - # so that the struct ev_stat_watcher's `path` pointer doesn't become invalid/deallocated - self._cpath = self._FFI.new('char[]', self._paths) - - self._interval = interval - super(StatMixin, self).__init__(_loop, ref=ref, priority=priority, - args=(self._cpath, - interval)) - - @property - def path(self): - return self._path - - @property - def attr(self): - raise NotImplementedError - - @property - def prev(self): - raise NotImplementedError - - @property - def interval(self): - return self._interval |