diff options
Diffstat (limited to 'python/gevent/libuv/watcher.py')
-rw-r--r-- | python/gevent/libuv/watcher.py | 732 |
1 files changed, 0 insertions, 732 deletions
diff --git a/python/gevent/libuv/watcher.py b/python/gevent/libuv/watcher.py deleted file mode 100644 index 035cb43..0000000 --- a/python/gevent/libuv/watcher.py +++ /dev/null @@ -1,732 +0,0 @@ -# pylint: disable=too-many-lines, protected-access, redefined-outer-name, not-callable -# pylint: disable=no-member -from __future__ import absolute_import, print_function - -import functools -import sys - -from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error - -ffi = _corecffi.ffi -libuv = _corecffi.lib - -from gevent._ffi import watcher as _base -from gevent._ffi import _dbg - -_closing_watchers = set() - -# In debug mode, it would be nice to be able to clear the memory of -# the watcher (its size determined by -# libuv.uv_handle_size(ffi_watcher.type)) using memset so that if we -# are using it after it's supposedly been closed and deleted, we'd -# catch it sooner. BUT doing so breaks test__threadpool. We get errors -# about `pthread_mutex_lock[3]: Invalid argument` (and sometimes we -# crash) suggesting either that we're writing on memory that doesn't -# belong to us, somehow, or that we haven't actually lost all -# references... -_uv_close_callback = ffi.def_extern(name='_uv_close_callback')(_closing_watchers.remove) - - -_events = [(libuv.UV_READABLE, "READ"), - (libuv.UV_WRITABLE, "WRITE")] - -def _events_to_str(events): # export - return _base.events_to_str(events, _events) - -class UVFuncallError(ValueError): - pass - -class libuv_error_wrapper(object): - # Makes sure that everything stored as a function - # on the wrapper instances (classes, actually, - # because this is used by the metaclass) - # checks its return value and raises an error. - # This expects that everything we call has an int - # or void return value and follows the conventions - # of error handling (that negative values are errors) - def __init__(self, uv): - self._libuv = uv - - def __getattr__(self, name): - libuv_func = getattr(self._libuv, name) - - @functools.wraps(libuv_func) - def wrap(*args, **kwargs): - if args and isinstance(args[0], watcher): - args = args[1:] - res = libuv_func(*args, **kwargs) - if res is not None and res < 0: - raise UVFuncallError( - str(ffi.string(libuv.uv_err_name(res)).decode('ascii') - + ' ' - + ffi.string(libuv.uv_strerror(res)).decode('ascii')) - + " Args: " + repr(args) + " KWARGS: " + repr(kwargs) - ) - return res - - setattr(self, name, wrap) - - return wrap - - -class ffi_unwrapper(object): - # undoes the wrapping of libuv_error_wrapper for - # the methods used by the metaclass that care - - def __init__(self, ff): - self._ffi = ff - - def __getattr__(self, name): - return getattr(self._ffi, name) - - def addressof(self, lib, name): - assert isinstance(lib, libuv_error_wrapper) - return self._ffi.addressof(libuv, name) - - -class watcher(_base.watcher): - _FFI = ffi_unwrapper(ffi) - _LIB = libuv_error_wrapper(libuv) - - _watcher_prefix = 'uv' - _watcher_struct_pattern = '%s_t' - - @classmethod - def _watcher_ffi_close(cls, ffi_watcher): - # Managing the lifetime of _watcher is tricky. - # They have to be uv_close()'d, but that only - # queues them to be closed in the *next* loop iteration. - # The memory must stay valid for at least that long, - # or assert errors are triggered. We can't use a ffi.gc() - # pointer to queue the uv_close, because by the time the - # destructor is called, there's no way to keep the memory alive - # and it could be re-used. - # So here we resort to resurrecting the pointer object out - # of our scope, keeping it alive past this object's lifetime. - # We then use the uv_close callback to handle removing that - # reference. There's no context passed to the close callback, - # so we have to do this globally. - - # Sadly, doing this causes crashes if there were multiple - # watchers for a given FD, so we have to take special care - # about that. See https://github.com/gevent/gevent/issues/790#issuecomment-208076604 - - # Note that this cannot be a __del__ method, because we store - # the CFFI handle to self on self, which is a cycle, and - # objects with a __del__ method cannot be collected on CPython < 3.4 - - # Instead, this is arranged as a callback to GC when the - # watcher class dies. Obviously it's important to keep the ffi - # watcher alive. - # We can pass in "subclasses" if uv_handle_t that line up at the C level, - # but that don't in CFFI without a cast. But be careful what we use the cast - # for, don't pass it back to C. - ffi_handle_watcher = cls._FFI.cast('uv_handle_t*', ffi_watcher) - if ffi_handle_watcher.type and not libuv.uv_is_closing(ffi_watcher): - # If the type isn't set, we were never properly initialized, - # and trying to close it results in libuv terminating the process. - # Sigh. Same thing if it's already in the process of being - # closed. - _closing_watchers.add(ffi_watcher) - libuv.uv_close(ffi_watcher, libuv._uv_close_callback) - - ffi_handle_watcher.data = ffi.NULL - - - def _watcher_ffi_set_init_ref(self, ref): - self.ref = ref - - def _watcher_ffi_init(self, args): - # TODO: we could do a better job chokepointing this - return self._watcher_init(self.loop.ptr, - self._watcher, - *args) - - def _watcher_ffi_start(self): - self._watcher_start(self._watcher, self._watcher_callback) - - def _watcher_ffi_stop(self): - if self._watcher: - # The multiplexed io watcher deletes self._watcher - # when it closes down. If that's in the process of - # an error handler, AbstractCallbacks.unhandled_onerror - # will try to close us again. - self._watcher_stop(self._watcher) - - @_base.only_if_watcher - def _watcher_ffi_ref(self): - libuv.uv_ref(self._watcher) - - @_base.only_if_watcher - def _watcher_ffi_unref(self): - libuv.uv_unref(self._watcher) - - def _watcher_ffi_start_unref(self): - pass - - def _watcher_ffi_stop_ref(self): - pass - - def _get_ref(self): - # Convert 1/0 to True/False - if self._watcher is None: - return None - return True if libuv.uv_has_ref(self._watcher) else False - - def _set_ref(self, value): - if value: - self._watcher_ffi_ref() - else: - self._watcher_ffi_unref() - - ref = property(_get_ref, _set_ref) - - def feed(self, _revents, _callback, *_args): - raise Exception("Not implemented") - -class io(_base.IoMixin, watcher): - _watcher_type = 'poll' - _watcher_callback_name = '_gevent_poll_callback2' - - # On Windows is critical to be able to garbage collect these - # objects in a timely fashion so that they don't get reused - # for multiplexing completely different sockets. This is because - # uv_poll_init_socket does a lot of setup for the socket to make - # polling work. If get reused for another socket that has the same - # fileno, things break badly. (In theory this could be a problem - # on posix too, but in practice it isn't). - - # TODO: We should probably generalize this to all - # ffi watchers. Avoiding GC cycles as much as possible - # is a good thing, and potentially allocating new handles - # as needed gets us better memory locality. - - # Especially on Windows, we must also account for the case that a - # reference to this object has leaked (e.g., the socket object is - # still around), but the fileno has been closed and a new one - # opened. We must still get a new native watcher at that point. We - # handle this case by simply making sure that we don't even have - # a native watcher until the object is started, and we shut it down - # when the object is stopped. - - # XXX: I was able to solve at least Windows test_ftplib.py issues - # with more of a careful use of io objects in socket.py, so - # delaying this entirely is at least temporarily on hold. Instead - # sticking with the _watcher_create function override for the - # moment. - - # XXX: Note 2: Moving to a deterministic close model, which was necessary - # for PyPy, also seems to solve the Windows issues. So we're completely taking - # this object out of the loop's registration; we don't want GC callbacks and - # uv_close anywhere *near* this object. - - _watcher_registers_with_loop_on_create = False - - EVENT_MASK = libuv.UV_READABLE | libuv.UV_WRITABLE | libuv.UV_DISCONNECT - - _multiplex_watchers = () - - def __init__(self, loop, fd, events, ref=True, priority=None): - super(io, self).__init__(loop, fd, events, ref=ref, priority=priority, _args=(fd,)) - self._fd = fd - self._events = events - self._multiplex_watchers = [] - - def _get_fd(self): - return self._fd - - @_base.not_while_active - def _set_fd(self, fd): - self._fd = fd - self._watcher_ffi_init((fd,)) - - def _get_events(self): - return self._events - - def _set_events(self, events): - if events == self._events: - return - self._events = events - if self.active: - # We're running but libuv specifically says we can - # call start again to change our event mask. - assert self._handle is not None - self._watcher_start(self._watcher, self._events, self._watcher_callback) - - events = property(_get_events, _set_events) - - def _watcher_ffi_start(self): - self._watcher_start(self._watcher, self._events, self._watcher_callback) - - if sys.platform.startswith('win32'): - # uv_poll can only handle sockets on Windows, but the plain - # uv_poll_init we call on POSIX assumes that the fileno - # argument is already a C fileno, as created by - # _get_osfhandle. C filenos are limited resources, must be - # closed with _close. So there are lifetime issues with that: - # calling the C function _close to dispose of the fileno - # *also* closes the underlying win32 handle, possibly - # prematurely. (XXX: Maybe could do something with weak - # references? But to what?) - - # All libuv wants to do with the fileno in uv_poll_init is - # turn it back into a Win32 SOCKET handle. - - # Now, libuv provides uv_poll_init_socket, which instead of - # taking a C fileno takes the SOCKET, avoiding the need to dance with - # the C runtime. - - # It turns out that SOCKET (win32 handles in general) can be - # represented with `intptr_t`. It further turns out that - # CPython *directly* exposes the SOCKET handle as the value of - # fileno (32-bit PyPy does some munging on it, which should - # rarely matter). So we can pass socket.fileno() through - # to uv_poll_init_socket. - - # See _corecffi_build. - _watcher_init = watcher._LIB.uv_poll_init_socket - - - class _multiplexwatcher(object): - - callback = None - args = () - pass_events = False - ref = True - - def __init__(self, events, watcher): - self._events = events - - # References: - # These objects must keep the original IO object alive; - # the IO object SHOULD NOT keep these alive to avoid cycles - # We MUST NOT rely on GC to clean up the IO objects, but the explicit - # calls to close(); see _multiplex_closed. - self._watcher_ref = watcher - - events = property( - lambda self: self._events, - _base.not_while_active(lambda self, nv: setattr(self, '_events', nv))) - - def start(self, callback, *args, **kwargs): - self.pass_events = kwargs.get("pass_events") - self.callback = callback - self.args = args - - watcher = self._watcher_ref - if watcher is not None: - if not watcher.active: - watcher._io_start() - else: - # Make sure we're in the event mask - watcher._calc_and_update_events() - - def stop(self): - self.callback = None - self.pass_events = None - self.args = None - watcher = self._watcher_ref - if watcher is not None: - watcher._io_maybe_stop() - - def close(self): - if self._watcher_ref is not None: - self._watcher_ref._multiplex_closed(self) - self._watcher_ref = None - - @property - def active(self): - return self.callback is not None - - @property - def _watcher(self): - # For testing. - return self._watcher_ref._watcher - - # ares.pyx depends on this property, - # and test__core uses it too - fd = property(lambda self: getattr(self._watcher_ref, '_fd', -1), - lambda self, nv: self._watcher_ref._set_fd(nv)) - - def _io_maybe_stop(self): - self._calc_and_update_events() - for w in self._multiplex_watchers: - if w.callback is not None: - # There's still a reference to it, and it's started, - # so we can't stop. - return - # If we get here, nothing was started - # so we can take ourself out of the polling set - self.stop() - - def _io_start(self): - self._calc_and_update_events() - self.start(self._io_callback, pass_events=True) - - def _calc_and_update_events(self): - events = 0 - for watcher in self._multiplex_watchers: - if watcher.callback is not None: - # Only ask for events that are active. - events |= watcher.events - self._set_events(events) - - - def multiplex(self, events): - watcher = self._multiplexwatcher(events, self) - self._multiplex_watchers.append(watcher) - self._calc_and_update_events() - return watcher - - def close(self): - super(io, self).close() - del self._multiplex_watchers - - def _multiplex_closed(self, watcher): - self._multiplex_watchers.remove(watcher) - if not self._multiplex_watchers: - self.stop() # should already be stopped - self._no_more_watchers() - # It is absolutely critical that we control when the call - # to uv_close() gets made. uv_close() of a uv_poll_t - # handle winds up calling uv__platform_invalidate_fd, - # which, as the name implies, destroys any outstanding - # events for the *fd* that haven't been delivered yet, and also removes - # the *fd* from the poll set. So if this happens later, at some - # non-deterministic time when (cyclic or otherwise) GC runs, - # *and* we've opened a new watcher for the fd, that watcher will - # suddenly and mysteriously stop seeing events. So we do this now; - # this method is smart enough not to close the handle twice. - self.close() - else: - self._calc_and_update_events() - - def _no_more_watchers(self): - # The loop sets this on an individual watcher to delete it from - # the active list where it keeps hard references. - pass - - def _io_callback(self, events): - if events < 0: - # actually a status error code - _dbg("Callback error on", self._fd, - ffi.string(libuv.uv_err_name(events)), - ffi.string(libuv.uv_strerror(events))) - # XXX: We've seen one half of a FileObjectPosix pair - # (the read side of a pipe) report errno 11 'bad file descriptor' - # after the write side was closed and its watcher removed. But - # we still need to attempt to read from it to clear out what's in - # its buffers--if we return with the watcher inactive before proceeding to wake up - # the reader, we get a LoopExit. So we can't return here and arguably shouldn't print it - # either. The negative events mask will match the watcher's mask. - # See test__fileobject.py:Test.test_newlines for an example. - - # On Windows (at least with PyPy), we can get ENOTSOCK (socket operation on non-socket) - # if a socket gets closed. If we don't pass the events on, we hang. - # See test__makefile_ref.TestSSL for examples. - # return - - for watcher in self._multiplex_watchers: - if not watcher.callback: - # Stopped - continue - assert watcher._watcher_ref is self, (self, watcher._watcher_ref) - - send_event = (events & watcher.events) or events < 0 - if send_event: - if not watcher.pass_events: - watcher.callback(*watcher.args) - else: - watcher.callback(events, *watcher.args) - -class _SimulatedWithAsyncMixin(object): - _watcher_skip_ffi = True - - def __init__(self, loop, *args, **kwargs): - self._async = loop.async_() - try: - super(_SimulatedWithAsyncMixin, self).__init__(loop, *args, **kwargs) - except: - self._async.close() - raise - - def _watcher_create(self, _args): - return - - @property - def _watcher_handle(self): - return None - - def _watcher_ffi_init(self, _args): - return - - def _watcher_ffi_set_init_ref(self, ref): - self._async.ref = ref - - @property - def active(self): - return self._async.active - - def start(self, cb, *args): - self._register_loop_callback() - self.callback = cb - self.args = args - self._async.start(cb, *args) - #watcher.start(self, cb, *args) - - def stop(self): - self._unregister_loop_callback() - self.callback = None - self.args = None - self._async.stop() - - def close(self): - if self._async is not None: - a = self._async - #self._async = None - a.close() - - def _register_loop_callback(self): - # called from start() - raise NotImplementedError() - - def _unregister_loop_callback(self): - # called from stop - raise NotImplementedError() - -class fork(_SimulatedWithAsyncMixin, - _base.ForkMixin, - watcher): - # We'll have to implement this one completely manually - # Right now it doesn't matter much since libuv doesn't survive - # a fork anyway. (That's a work in progress) - _watcher_skip_ffi = False - - def _register_loop_callback(self): - self.loop._fork_watchers.add(self) - - def _unregister_loop_callback(self): - try: - # stop() should be idempotent - self.loop._fork_watchers.remove(self) - except KeyError: - pass - - def _on_fork(self): - self._async.send() - - -class child(_SimulatedWithAsyncMixin, - _base.ChildMixin, - watcher): - _watcher_skip_ffi = True - # We'll have to implement this one completely manually. - # Our approach is to use a SIGCHLD handler and the original - # os.waitpid call. - - # On Unix, libuv's uv_process_t and uv_spawn use SIGCHLD, - # just like libev does for its child watchers. So - # we're not adding any new SIGCHLD related issues not already - # present in libev. - - - def _register_loop_callback(self): - self.loop._register_child_watcher(self) - - def _unregister_loop_callback(self): - self.loop._unregister_child_watcher(self) - - def _set_waitpid_status(self, pid, status): - self._rpid = pid - self._rstatus = status - self._async.send() - - -class async_(_base.AsyncMixin, watcher): - _watcher_callback_name = '_gevent_async_callback0' - - def _watcher_ffi_init(self, args): - # It's dangerous to have a raw, non-initted struct - # around; it will crash in uv_close() when we get GC'd, - # and send() will also crash. - # NOTE: uv_async_init is NOT idempotent. Calling it more than - # once adds the uv_async_t to the internal queue multiple times, - # and uv_close only cleans up one of them, meaning that we tend to - # crash. Thus we have to be very careful not to allow that. - return self._watcher_init(self.loop.ptr, self._watcher, ffi.NULL) - - def _watcher_ffi_start(self): - # we're created in a started state, but we didn't provide a - # callback (because if we did and we don't have a value in our - # callback attribute, then python_callback would crash.) Note that - # uv_async_t->async_cb is not technically documented as public. - self._watcher.async_cb = self._watcher_callback - - def _watcher_ffi_stop(self): - self._watcher.async_cb = ffi.NULL - # We have to unref this because we're setting the cb behind libuv's - # back, basically: once a async watcher is started, it can't ever be - # stopped through libuv interfaces, so it would never lose its active - # status, and thus if it stays reffed it would keep the event loop - # from exiting. - self._watcher_ffi_unref() - - def send(self): - if libuv.uv_is_closing(self._watcher): - raise Exception("Closing handle") - libuv.uv_async_send(self._watcher) - - @property - def pending(self): - return None - -locals()['async'] = async_ - -class timer(_base.TimerMixin, watcher): - - _watcher_callback_name = '_gevent_timer_callback0' - - # In libuv, timer callbacks continue running while any timer is - # expired, including newly added timers. Newly added non-zero - # timers (especially of small duration) can be seen to be expired - # if the loop time is updated while we are in a timer callback. - # This can lead to us being stuck running timers for a terribly - # long time, which is not good. So default to not updating the - # time. - - # Also, newly-added timers of 0 duration can *also* stall the - # loop, because they'll be seen to be expired immediately. - # Updating the time can prevent that, *if* there was already a - # timer for a longer duration scheduled. - - # To mitigate the above problems, our loop implementation turns - # zero duration timers into check watchers instead using OneShotCheck. - # This ensures the loop cycles. Of course, the 'again' method does - # nothing on them and doesn't exist. In practice that's not an issue. - - _again = False - - def _watcher_ffi_init(self, args): - self._watcher_init(self.loop._ptr, self._watcher) - self._after, self._repeat = args - if self._after and self._after < 0.001: - import warnings - # XXX: The stack level is hard to determine, could be getting here - # through a number of different ways. - warnings.warn("libuv only supports millisecond timer resolution; " - "all times less will be set to 1 ms", - stacklevel=6) - # The alternative is to effectively pass in int(0.1) == 0, which - # means no sleep at all, which leads to excessive wakeups - self._after = 0.001 - if self._repeat and self._repeat < 0.001: - import warnings - warnings.warn("libuv only supports millisecond timer resolution; " - "all times less will be set to 1 ms", - stacklevel=6) - self._repeat = 0.001 - - def _watcher_ffi_start(self): - if self._again: - libuv.uv_timer_again(self._watcher) - else: - try: - self._watcher_start(self._watcher, self._watcher_callback, - int(self._after * 1000), - int(self._repeat * 1000)) - except ValueError: - # in case of non-ints in _after/_repeat - raise TypeError() - - def again(self, callback, *args, **kw): - if not self.active: - # If we've never been started, this is the same as starting us. - # libuv makes the distinction, libev doesn't. - self.start(callback, *args, **kw) - return - - self._again = True - try: - self.start(callback, *args, **kw) - finally: - del self._again - - -class stat(_base.StatMixin, watcher): - _watcher_type = 'fs_poll' - _watcher_struct_name = 'gevent_fs_poll_t' - _watcher_callback_name = '_gevent_fs_poll_callback3' - - def _watcher_set_data(self, the_watcher, data): - the_watcher.handle.data = data - return data - - def _watcher_ffi_init(self, args): - return self._watcher_init(self.loop._ptr, self._watcher) - - MIN_STAT_INTERVAL = 0.1074891 # match libev; 0.0 is default - - def _watcher_ffi_start(self): - # libev changes this when the watcher is started - if self._interval < self.MIN_STAT_INTERVAL: - self._interval = self.MIN_STAT_INTERVAL - self._watcher_start(self._watcher, self._watcher_callback, - self._cpath, - int(self._interval * 1000)) - - @property - def _watcher_handle(self): - return self._watcher.handle.data - - @property - def attr(self): - if not self._watcher.curr.st_nlink: - return - return self._watcher.curr - - @property - def prev(self): - if not self._watcher.prev.st_nlink: - return - return self._watcher.prev - - -class signal(_base.SignalMixin, watcher): - _watcher_callback_name = '_gevent_signal_callback1' - - def _watcher_ffi_init(self, args): - self._watcher_init(self.loop._ptr, self._watcher) - self.ref = False # libev doesn't ref these by default - - - def _watcher_ffi_start(self): - self._watcher_start(self._watcher, self._watcher_callback, - self._signalnum) - - -class idle(_base.IdleMixin, watcher): - # Because libuv doesn't support priorities, idle watchers are - # potentially quite a bit different than under libev - _watcher_callback_name = '_gevent_idle_callback0' - - -class check(_base.CheckMixin, watcher): - _watcher_callback_name = '_gevent_check_callback0' - -class OneShotCheck(check): - - _watcher_skip_ffi = True - - def __make_cb(self, func): - stop = self.stop - @functools.wraps(func) - def cb(*args): - stop() - return func(*args) - return cb - - def start(self, callback, *args): - return check.start(self, self.__make_cb(callback), *args) - -class prepare(_base.PrepareMixin, watcher): - _watcher_callback_name = '_gevent_prepare_callback0' |