diff options
Diffstat (limited to 'python/gevent/libuv/loop.py')
-rw-r--r-- | python/gevent/libuv/loop.py | 601 |
1 files changed, 601 insertions, 0 deletions
diff --git a/python/gevent/libuv/loop.py b/python/gevent/libuv/loop.py new file mode 100644 index 0000000..0f317c0 --- /dev/null +++ b/python/gevent/libuv/loop.py @@ -0,0 +1,601 @@ +""" +libuv loop implementation +""" +# pylint: disable=no-member +from __future__ import absolute_import, print_function + +import os +from collections import defaultdict +from collections import namedtuple +from operator import delitem +import signal + +from gevent._ffi import _dbg # pylint: disable=unused-import +from gevent._ffi.loop import AbstractLoop +from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error +from gevent._ffi.loop import assign_standard_callbacks +from gevent._ffi.loop import AbstractCallbacks +from gevent._util import implementer +from gevent._interfaces import ILoop + +ffi = _corecffi.ffi +libuv = _corecffi.lib + +__all__ = [ +] + + +class _Callbacks(AbstractCallbacks): + + def _find_loop_from_c_watcher(self, watcher_ptr): + loop_handle = ffi.cast('uv_handle_t*', watcher_ptr).data + return self.from_handle(loop_handle) if loop_handle else None + + def python_sigchld_callback(self, watcher_ptr, _signum): + self.from_handle(ffi.cast('uv_handle_t*', watcher_ptr).data)._sigchld_callback() + + def python_timer0_callback(self, watcher_ptr): + return self.python_prepare_callback(watcher_ptr) + + def python_queue_callback(self, watcher_ptr, revents): + watcher_handle = watcher_ptr.data + the_watcher = self.from_handle(watcher_handle) + + the_watcher.loop._queue_callback(watcher_ptr, revents) + + +_callbacks = assign_standard_callbacks( + ffi, libuv, _Callbacks, + [('python_sigchld_callback', None), + ('python_timer0_callback', None), + ('python_queue_callback', None)]) + +from gevent._ffi.loop import EVENTS +GEVENT_CORE_EVENTS = EVENTS # export + +from gevent.libuv import watcher as _watchers # pylint:disable=no-name-in-module + +_events_to_str = _watchers._events_to_str # export + +READ = libuv.UV_READABLE +WRITE = libuv.UV_WRITABLE + +def get_version(): + uv_bytes = ffi.string(libuv.uv_version_string()) + if not isinstance(uv_bytes, str): + # Py3 + uv_str = uv_bytes.decode("ascii") + else: + uv_str = uv_bytes + + return 'libuv-' + uv_str + +def get_header_version(): + return 'libuv-%d.%d.%d' % (libuv.UV_VERSION_MAJOR, libuv.UV_VERSION_MINOR, libuv.UV_VERSION_PATCH) + +def supported_backends(): + return ['default'] + +@implementer(ILoop) +class loop(AbstractLoop): + + # XXX: Undocumented. Maybe better named 'timer_resolution'? We can't + # know this in general on libev + min_sleep_time = 0.001 # 1ms + + error_handler = None + + _CHECK_POINTER = 'uv_check_t *' + + _PREPARE_POINTER = 'uv_prepare_t *' + _PREPARE_CALLBACK_SIG = "void(*)(void*)" + + _TIMER_POINTER = _CHECK_POINTER # This is poorly named. It's for the callback "timer" + + def __init__(self, flags=None, default=None): + AbstractLoop.__init__(self, ffi, libuv, _watchers, flags, default) + self.__loop_pid = os.getpid() + self._child_watchers = defaultdict(list) + self._io_watchers = dict() + self._fork_watchers = set() + self._pid = os.getpid() + self._default = self._ptr == libuv.uv_default_loop() + self._queued_callbacks = [] + + def _queue_callback(self, watcher_ptr, revents): + self._queued_callbacks.append((watcher_ptr, revents)) + + def _init_loop(self, flags, default): + if default is None: + default = True + # Unlike libev, libuv creates a new default + # loop automatically if the old default loop was + # closed. + + if default: + # XXX: If the default loop had been destroyed, this + # will create a new one, but we won't destroy it + ptr = libuv.uv_default_loop() + else: + ptr = libuv.uv_loop_new() + + + if not ptr: + raise SystemError("Failed to get loop") + + # Track whether or not any object has destroyed + # this loop. See _can_destroy_default_loop + ptr.data = ptr + return ptr + + _signal_idle = None + + def _init_and_start_check(self): + libuv.uv_check_init(self._ptr, self._check) + libuv.uv_check_start(self._check, libuv.python_check_callback) + libuv.uv_unref(self._check) + + # We also have to have an idle watcher to be able to handle + # signals in a timely manner. Without them, libuv won't loop again + # and call into its check and prepare handlers. + # Note that this basically forces us into a busy-loop + # XXX: As predicted, using an idle watcher causes our process + # to eat 100% CPU time. We instead use a timer with a max of a .3 second + # delay to notice signals. Note that this timeout also implements fork + # watchers, effectively. + + # XXX: Perhaps we could optimize this to notice when there are other + # timers in the loop and start/stop it then. When we have a callback + # scheduled, this should also be the same and unnecessary? + # libev does takes this basic approach on Windows. + self._signal_idle = ffi.new("uv_timer_t*") + libuv.uv_timer_init(self._ptr, self._signal_idle) + self._signal_idle.data = self._handle_to_self + sig_cb = ffi.cast('void(*)(uv_timer_t*)', libuv.python_check_callback) + libuv.uv_timer_start(self._signal_idle, + sig_cb, + 300, + 300) + libuv.uv_unref(self._signal_idle) + + def _run_callbacks(self): + # Manually handle fork watchers. + curpid = os.getpid() + if curpid != self._pid: + self._pid = curpid + for watcher in self._fork_watchers: + watcher._on_fork() + + + # The contents of queued_callbacks at this point should be timers + # that expired when the loop began along with any idle watchers. + # We need to run them so that any manual callbacks they want to schedule + # get added to the list and ran next before we go on to poll for IO. + # This is critical for libuv on linux: closing a socket schedules some manual + # callbacks to actually stop the watcher; if those don't run before + # we poll for IO, then libuv can abort the process for the closed file descriptor. + + # XXX: There's still a race condition here because we may not run *all* the manual + # callbacks. We need a way to prioritize those. + + # Running these before the manual callbacks lead to some + # random test failures. In test__event.TestEvent_SetThenClear + # we would get a LoopExit sometimes. The problem occurred when + # a timer expired on entering the first loop; we would process + # it there, and then process the callback that it created + # below, leaving nothing for the loop to do. Having the + # self.run() manually process manual callbacks before + # continuing solves the problem. (But we must still run callbacks + # here again.) + self._prepare_ran_callbacks = self.__run_queued_callbacks() + + super(loop, self)._run_callbacks() + + def _init_and_start_prepare(self): + libuv.uv_prepare_init(self._ptr, self._prepare) + libuv.uv_prepare_start(self._prepare, libuv.python_prepare_callback) + libuv.uv_unref(self._prepare) + + def _init_callback_timer(self): + libuv.uv_check_init(self._ptr, self._timer0) + + def _stop_callback_timer(self): + libuv.uv_check_stop(self._timer0) + + def _start_callback_timer(self): + # The purpose of the callback timer is to ensure that we run + # callbacks as soon as possible on the next iteration of the event loop. + + # In libev, we set a 0 duration timer with a no-op callback. + # This executes immediately *after* the IO poll is done (it + # actually determines the time that the IO poll will block + # for), so having the timer present simply spins the loop, and + # our normal prepare watcher kicks in to run the callbacks. + + # In libuv, however, timers are run *first*, before prepare + # callbacks and before polling for IO. So a no-op 0 duration + # timer actually does *nothing*. (Also note that libev queues all + # watchers found during IO poll to run at the end (I think), while libuv + # runs them in uv__io_poll itself.) + + # From the loop inside uv_run: + # while True: + # uv__update_time(loop); + # uv__run_timers(loop); + # # we don't use pending watchers. They are how libuv + # # implements the pipe/udp/tcp streams. + # ran_pending = uv__run_pending(loop); + # uv__run_idle(loop); + # uv__run_prepare(loop); + # ... + # uv__io_poll(loop, timeout); # <--- IO watchers run here! + # uv__run_check(loop); + + # libev looks something like this (pseudo code because the real code is + # hard to read): + # + # do { + # run_fork_callbacks(); + # run_prepare_callbacks(); + # timeout = min(time of all timers or normal block time) + # io_poll() # <--- Only queues IO callbacks + # update_now(); calculate_expired_timers(); + # run callbacks in this order: (although specificying priorities changes it) + # check + # stat + # child + # signal + # timer + # io + # } + + # So instead of running a no-op and letting the side-effect of spinning + # the loop run the callbacks, we must explicitly run them here. + + # If we don't, test__systemerror:TestCallback will be flaky, failing + # one time out of ~20, depending on timing. + + # To get them to run immediately after this current loop, + # we use a check watcher, instead of a 0 duration timer entirely. + # If we use a 0 duration timer, we can get stuck in a timer loop. + # Python 3.6 fails in test_ftplib.py + + # As a final note, if we have not yet entered the loop *at + # all*, and a timer was created with a duration shorter than + # the amount of time it took for us to enter the loop in the + # first place, it may expire and get called before our callback + # does. This could also lead to test__systemerror:TestCallback + # appearing to be flaky. + + # As yet another final note, if we are currently running a + # timer callback, meaning we're inside uv__run_timers() in C, + # and the Python starts a new timer, if the Python code then + # update's the loop's time, it's possible that timer will + # expire *and be run in the same iteration of the loop*. This + # is trivial to do: In sequential code, anything after + # `gevent.sleep(0.1)` is running in a timer callback. Starting + # a new timer---e.g., another gevent.sleep() call---will + # update the time, *before* uv__run_timers exits, meaning + # other timers get a chance to run before our check or prepare + # watcher callbacks do. Therefore, we do indeed have to have a 0 + # timer to run callbacks---it gets inserted before any other user + # timers---ideally, this should be especially careful about how much time + # it runs for. + + # AND YET: We can't actually do that. We get timeouts that I haven't fully + # investigated if we do. Probably stuck in a timer loop. + + # As a partial remedy to this, unlike libev, our timer watcher + # class doesn't update the loop time by default. + + libuv.uv_check_start(self._timer0, libuv.python_timer0_callback) + + + def _stop_aux_watchers(self): + assert self._prepare + assert self._check + assert self._signal_idle + libuv.uv_prepare_stop(self._prepare) + libuv.uv_ref(self._prepare) # Why are we doing this? + + libuv.uv_check_stop(self._check) + libuv.uv_ref(self._check) + + libuv.uv_timer_stop(self._signal_idle) + libuv.uv_ref(self._signal_idle) + + libuv.uv_check_stop(self._timer0) + + def _setup_for_run_callback(self): + self._start_callback_timer() + libuv.uv_ref(self._timer0) + + + def _can_destroy_loop(self, ptr): + # We're being asked to destroy a loop that's, + # at the time it was constructed, was the default loop. + # If loop objects were constructed more than once, + # it may have already been destroyed, though. + # We track this in the data member. + return ptr.data + + def _destroy_loop(self, ptr): + ptr.data = ffi.NULL + libuv.uv_stop(ptr) + + libuv.gevent_close_all_handles(ptr) + + closed_failed = libuv.uv_loop_close(ptr) + if closed_failed: + assert closed_failed == libuv.UV_EBUSY + # We already closed all the handles. Run the loop + # once to let them be cut off from the loop. + ran_has_more_callbacks = libuv.uv_run(ptr, libuv.UV_RUN_ONCE) + if ran_has_more_callbacks: + libuv.uv_run(ptr, libuv.UV_RUN_NOWAIT) + closed_failed = libuv.uv_loop_close(ptr) + assert closed_failed == 0, closed_failed + + # Destroy the native resources *after* we have closed + # the loop. If we do it before, walking the handles + # attached to the loop is likely to segfault. + + libuv.gevent_zero_check(self._check) + libuv.gevent_zero_check(self._timer0) + libuv.gevent_zero_prepare(self._prepare) + libuv.gevent_zero_timer(self._signal_idle) + del self._check + del self._prepare + del self._signal_idle + del self._timer0 + + libuv.gevent_zero_loop(ptr) + + # Destroy any watchers we're still holding on to. + del self._io_watchers + del self._fork_watchers + del self._child_watchers + + + def debug(self): + """ + Return all the handles that are open and their ref status. + """ + handle_state = namedtuple("HandleState", + ['handle', + 'type', + 'watcher', + 'ref', + 'active', + 'closing']) + handles = [] + + # XXX: Convert this to a modern callback. + def walk(handle, _arg): + data = handle.data + if data: + watcher = ffi.from_handle(data) + else: + watcher = None + handles.append(handle_state(handle, + ffi.string(libuv.uv_handle_type_name(handle.type)), + watcher, + libuv.uv_has_ref(handle), + libuv.uv_is_active(handle), + libuv.uv_is_closing(handle))) + + libuv.uv_walk(self._ptr, + ffi.callback("void(*)(uv_handle_t*,void*)", + walk), + ffi.NULL) + return handles + + def ref(self): + pass + + def unref(self): + # XXX: Called by _run_callbacks. + pass + + def break_(self, how=None): + libuv.uv_stop(self._ptr) + + def reinit(self): + # TODO: How to implement? We probably have to simply + # re-__init__ this whole class? Does it matter? + # OR maybe we need to uv_walk() and close all the handles? + + # XXX: libuv < 1.12 simply CANNOT handle a fork unless you immediately + # exec() in the child. There are multiple calls to abort() that + # will kill the child process: + # - The OS X poll implementation (kqueue) aborts on an error return + # value; since kqueue FDs can't be inherited, then the next call + # to kqueue in the child will fail and get aborted; fork() is likely + # to be called during the gevent loop, meaning we're deep inside the + # runloop already, so we can't even close the loop that we're in: + # it's too late, the next call to kqueue is already scheduled. + # - The threadpool, should it be in use, also aborts + # (https://github.com/joyent/libuv/pull/1136) + # - There global shared state that breaks signal handling + # and leads to an abort() in the child, EVEN IF the loop in the parent + # had already been closed + # (https://github.com/joyent/libuv/issues/1405) + + # In 1.12, the uv_loop_fork function was added (by gevent!) + libuv.uv_loop_fork(self._ptr) + + _prepare_ran_callbacks = False + + def __run_queued_callbacks(self): + if not self._queued_callbacks: + return False + + cbs = list(self._queued_callbacks) + self._queued_callbacks = [] + + for watcher_ptr, arg in cbs: + handle = watcher_ptr.data + if not handle: + # It's been stopped and possibly closed + assert not libuv.uv_is_active(watcher_ptr) + continue + val = _callbacks.python_callback(handle, arg) + if val == -1: + _callbacks.python_handle_error(handle, arg) + elif val == 1: + if not libuv.uv_is_active(watcher_ptr): + if watcher_ptr.data != handle: + if watcher_ptr.data: + _callbacks.python_stop(None) + else: + _callbacks.python_stop(handle) + return True + + + def run(self, nowait=False, once=False): + # we can only respect one flag or the other. + # nowait takes precedence because it can't block + mode = libuv.UV_RUN_DEFAULT + if once: + mode = libuv.UV_RUN_ONCE + if nowait: + mode = libuv.UV_RUN_NOWAIT + + if mode == libuv.UV_RUN_DEFAULT: + while self._ptr and self._ptr.data: + # This is here to better preserve order guarantees. See _run_callbacks + # for details. + # It may get run again from the prepare watcher, so potentially we + # could take twice as long as the switch interval. + self._run_callbacks() + self._prepare_ran_callbacks = False + ran_status = libuv.uv_run(self._ptr, libuv.UV_RUN_ONCE) + # Note that we run queued callbacks when the prepare watcher runs, + # thus accounting for timers that expired before polling for IO, + # and idle watchers. This next call should get IO callbacks and + # callbacks from timers that expired *after* polling for IO. + ran_callbacks = self.__run_queued_callbacks() + + if not ran_status and not ran_callbacks and not self._prepare_ran_callbacks: + # A return of 0 means there are no referenced and + # active handles. The loop is over. + # If we didn't run any callbacks, then we couldn't schedule + # anything to switch in the future, so there's no point + # running again. + return ran_status + return 0 # Somebody closed the loop + + result = libuv.uv_run(self._ptr, mode) + self.__run_queued_callbacks() + return result + + def now(self): + # libuv's now is expressed as an integer number of + # milliseconds, so to get it compatible with time.time units + # that this method is supposed to return, we have to divide by 1000.0 + now = libuv.uv_now(self._ptr) + return now / 1000.0 + + def update_now(self): + libuv.uv_update_time(self._ptr) + + def fileno(self): + if self._ptr: + fd = libuv.uv_backend_fd(self._ptr) + if fd >= 0: + return fd + + _sigchld_watcher = None + _sigchld_callback_ffi = None + + def install_sigchld(self): + if not self.default: + return + + if self._sigchld_watcher: + return + + self._sigchld_watcher = ffi.new('uv_signal_t*') + libuv.uv_signal_init(self._ptr, self._sigchld_watcher) + self._sigchld_watcher.data = self._handle_to_self + + libuv.uv_signal_start(self._sigchld_watcher, + libuv.python_sigchld_callback, + signal.SIGCHLD) + + def reset_sigchld(self): + if not self.default or not self._sigchld_watcher: + return + + libuv.uv_signal_stop(self._sigchld_watcher) + # Must go through this to manage the memory lifetime + # correctly. Alternately, we could just stop it and restart + # it in install_sigchld? + _watchers.watcher._watcher_ffi_close(self._sigchld_watcher) + del self._sigchld_watcher + + + def _sigchld_callback(self): + # Signals can arrive at (relatively) any time. To eliminate + # race conditions, and behave more like libev, we "queue" + # sigchld to run when we run callbacks. + while True: + try: + pid, status, _usage = os.wait3(os.WNOHANG) + except OSError: + # Python 3 raises ChildProcessError + break + + if pid == 0: + break + children_watchers = self._child_watchers.get(pid, []) + self._child_watchers.get(0, []) + for watcher in children_watchers: + self.run_callback(watcher._set_waitpid_status, pid, status) + + # Don't invoke child watchers for 0 more than once + self._child_watchers[0] = [] + + def _register_child_watcher(self, watcher): + self._child_watchers[watcher._pid].append(watcher) + + def _unregister_child_watcher(self, watcher): + try: + # stop() should be idempotent + self._child_watchers[watcher._pid].remove(watcher) + except ValueError: + pass + + # Now's a good time to clean up any dead lists we don't need + # anymore + for pid in list(self._child_watchers): + if not self._child_watchers[pid]: + del self._child_watchers[pid] + + def io(self, fd, events, ref=True, priority=None): + # We rely on hard references here and explicit calls to + # close() on the returned object to correctly manage + # the watcher lifetimes. + + io_watchers = self._io_watchers + try: + io_watcher = io_watchers[fd] + assert io_watcher._multiplex_watchers, ("IO Watcher %s unclosed but should be dead" % io_watcher) + except KeyError: + # Start the watcher with just the events that we're interested in. + # as multiplexers are added, the real event mask will be updated to keep in sync. + # If we watch for too much, we get spurious wakeups and busy loops. + io_watcher = self._watchers.io(self, fd, 0) + io_watchers[fd] = io_watcher + io_watcher._no_more_watchers = lambda: delitem(io_watchers, fd) + + return io_watcher.multiplex(events) + + def prepare(self, ref=True, priority=None): + # We run arbitrary code in python_prepare_callback. That could switch + # greenlets. If it does that while also manipulating the active prepare + # watchers, we could corrupt the process state, since the prepare watcher + # queue is iterated on the stack (on unix). We could workaround this by implementing + # prepare watchers in pure Python. + # See https://github.com/gevent/gevent/issues/1126 + raise TypeError("prepare watchers are not currently supported in libuv. " + "If you need them, please contact the maintainers.") |