aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/libuv/loop.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/libuv/loop.py')
-rw-r--r--python/gevent/libuv/loop.py601
1 files changed, 601 insertions, 0 deletions
diff --git a/python/gevent/libuv/loop.py b/python/gevent/libuv/loop.py
new file mode 100644
index 0000000..0f317c0
--- /dev/null
+++ b/python/gevent/libuv/loop.py
@@ -0,0 +1,601 @@
+"""
+libuv loop implementation
+"""
+# pylint: disable=no-member
+from __future__ import absolute_import, print_function
+
+import os
+from collections import defaultdict
+from collections import namedtuple
+from operator import delitem
+import signal
+
+from gevent._ffi import _dbg # pylint: disable=unused-import
+from gevent._ffi.loop import AbstractLoop
+from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error
+from gevent._ffi.loop import assign_standard_callbacks
+from gevent._ffi.loop import AbstractCallbacks
+from gevent._util import implementer
+from gevent._interfaces import ILoop
+
+ffi = _corecffi.ffi
+libuv = _corecffi.lib
+
+__all__ = [
+]
+
+
+class _Callbacks(AbstractCallbacks):
+
+ def _find_loop_from_c_watcher(self, watcher_ptr):
+ loop_handle = ffi.cast('uv_handle_t*', watcher_ptr).data
+ return self.from_handle(loop_handle) if loop_handle else None
+
+ def python_sigchld_callback(self, watcher_ptr, _signum):
+ self.from_handle(ffi.cast('uv_handle_t*', watcher_ptr).data)._sigchld_callback()
+
+ def python_timer0_callback(self, watcher_ptr):
+ return self.python_prepare_callback(watcher_ptr)
+
+ def python_queue_callback(self, watcher_ptr, revents):
+ watcher_handle = watcher_ptr.data
+ the_watcher = self.from_handle(watcher_handle)
+
+ the_watcher.loop._queue_callback(watcher_ptr, revents)
+
+
+_callbacks = assign_standard_callbacks(
+ ffi, libuv, _Callbacks,
+ [('python_sigchld_callback', None),
+ ('python_timer0_callback', None),
+ ('python_queue_callback', None)])
+
+from gevent._ffi.loop import EVENTS
+GEVENT_CORE_EVENTS = EVENTS # export
+
+from gevent.libuv import watcher as _watchers # pylint:disable=no-name-in-module
+
+_events_to_str = _watchers._events_to_str # export
+
+READ = libuv.UV_READABLE
+WRITE = libuv.UV_WRITABLE
+
+def get_version():
+ uv_bytes = ffi.string(libuv.uv_version_string())
+ if not isinstance(uv_bytes, str):
+ # Py3
+ uv_str = uv_bytes.decode("ascii")
+ else:
+ uv_str = uv_bytes
+
+ return 'libuv-' + uv_str
+
+def get_header_version():
+ return 'libuv-%d.%d.%d' % (libuv.UV_VERSION_MAJOR, libuv.UV_VERSION_MINOR, libuv.UV_VERSION_PATCH)
+
+def supported_backends():
+ return ['default']
+
+@implementer(ILoop)
+class loop(AbstractLoop):
+
+ # XXX: Undocumented. Maybe better named 'timer_resolution'? We can't
+ # know this in general on libev
+ min_sleep_time = 0.001 # 1ms
+
+ error_handler = None
+
+ _CHECK_POINTER = 'uv_check_t *'
+
+ _PREPARE_POINTER = 'uv_prepare_t *'
+ _PREPARE_CALLBACK_SIG = "void(*)(void*)"
+
+ _TIMER_POINTER = _CHECK_POINTER # This is poorly named. It's for the callback "timer"
+
+ def __init__(self, flags=None, default=None):
+ AbstractLoop.__init__(self, ffi, libuv, _watchers, flags, default)
+ self.__loop_pid = os.getpid()
+ self._child_watchers = defaultdict(list)
+ self._io_watchers = dict()
+ self._fork_watchers = set()
+ self._pid = os.getpid()
+ self._default = self._ptr == libuv.uv_default_loop()
+ self._queued_callbacks = []
+
+ def _queue_callback(self, watcher_ptr, revents):
+ self._queued_callbacks.append((watcher_ptr, revents))
+
+ def _init_loop(self, flags, default):
+ if default is None:
+ default = True
+ # Unlike libev, libuv creates a new default
+ # loop automatically if the old default loop was
+ # closed.
+
+ if default:
+ # XXX: If the default loop had been destroyed, this
+ # will create a new one, but we won't destroy it
+ ptr = libuv.uv_default_loop()
+ else:
+ ptr = libuv.uv_loop_new()
+
+
+ if not ptr:
+ raise SystemError("Failed to get loop")
+
+ # Track whether or not any object has destroyed
+ # this loop. See _can_destroy_default_loop
+ ptr.data = ptr
+ return ptr
+
+ _signal_idle = None
+
+ def _init_and_start_check(self):
+ libuv.uv_check_init(self._ptr, self._check)
+ libuv.uv_check_start(self._check, libuv.python_check_callback)
+ libuv.uv_unref(self._check)
+
+ # We also have to have an idle watcher to be able to handle
+ # signals in a timely manner. Without them, libuv won't loop again
+ # and call into its check and prepare handlers.
+ # Note that this basically forces us into a busy-loop
+ # XXX: As predicted, using an idle watcher causes our process
+ # to eat 100% CPU time. We instead use a timer with a max of a .3 second
+ # delay to notice signals. Note that this timeout also implements fork
+ # watchers, effectively.
+
+ # XXX: Perhaps we could optimize this to notice when there are other
+ # timers in the loop and start/stop it then. When we have a callback
+ # scheduled, this should also be the same and unnecessary?
+ # libev does takes this basic approach on Windows.
+ self._signal_idle = ffi.new("uv_timer_t*")
+ libuv.uv_timer_init(self._ptr, self._signal_idle)
+ self._signal_idle.data = self._handle_to_self
+ sig_cb = ffi.cast('void(*)(uv_timer_t*)', libuv.python_check_callback)
+ libuv.uv_timer_start(self._signal_idle,
+ sig_cb,
+ 300,
+ 300)
+ libuv.uv_unref(self._signal_idle)
+
+ def _run_callbacks(self):
+ # Manually handle fork watchers.
+ curpid = os.getpid()
+ if curpid != self._pid:
+ self._pid = curpid
+ for watcher in self._fork_watchers:
+ watcher._on_fork()
+
+
+ # The contents of queued_callbacks at this point should be timers
+ # that expired when the loop began along with any idle watchers.
+ # We need to run them so that any manual callbacks they want to schedule
+ # get added to the list and ran next before we go on to poll for IO.
+ # This is critical for libuv on linux: closing a socket schedules some manual
+ # callbacks to actually stop the watcher; if those don't run before
+ # we poll for IO, then libuv can abort the process for the closed file descriptor.
+
+ # XXX: There's still a race condition here because we may not run *all* the manual
+ # callbacks. We need a way to prioritize those.
+
+ # Running these before the manual callbacks lead to some
+ # random test failures. In test__event.TestEvent_SetThenClear
+ # we would get a LoopExit sometimes. The problem occurred when
+ # a timer expired on entering the first loop; we would process
+ # it there, and then process the callback that it created
+ # below, leaving nothing for the loop to do. Having the
+ # self.run() manually process manual callbacks before
+ # continuing solves the problem. (But we must still run callbacks
+ # here again.)
+ self._prepare_ran_callbacks = self.__run_queued_callbacks()
+
+ super(loop, self)._run_callbacks()
+
+ def _init_and_start_prepare(self):
+ libuv.uv_prepare_init(self._ptr, self._prepare)
+ libuv.uv_prepare_start(self._prepare, libuv.python_prepare_callback)
+ libuv.uv_unref(self._prepare)
+
+ def _init_callback_timer(self):
+ libuv.uv_check_init(self._ptr, self._timer0)
+
+ def _stop_callback_timer(self):
+ libuv.uv_check_stop(self._timer0)
+
+ def _start_callback_timer(self):
+ # The purpose of the callback timer is to ensure that we run
+ # callbacks as soon as possible on the next iteration of the event loop.
+
+ # In libev, we set a 0 duration timer with a no-op callback.
+ # This executes immediately *after* the IO poll is done (it
+ # actually determines the time that the IO poll will block
+ # for), so having the timer present simply spins the loop, and
+ # our normal prepare watcher kicks in to run the callbacks.
+
+ # In libuv, however, timers are run *first*, before prepare
+ # callbacks and before polling for IO. So a no-op 0 duration
+ # timer actually does *nothing*. (Also note that libev queues all
+ # watchers found during IO poll to run at the end (I think), while libuv
+ # runs them in uv__io_poll itself.)
+
+ # From the loop inside uv_run:
+ # while True:
+ # uv__update_time(loop);
+ # uv__run_timers(loop);
+ # # we don't use pending watchers. They are how libuv
+ # # implements the pipe/udp/tcp streams.
+ # ran_pending = uv__run_pending(loop);
+ # uv__run_idle(loop);
+ # uv__run_prepare(loop);
+ # ...
+ # uv__io_poll(loop, timeout); # <--- IO watchers run here!
+ # uv__run_check(loop);
+
+ # libev looks something like this (pseudo code because the real code is
+ # hard to read):
+ #
+ # do {
+ # run_fork_callbacks();
+ # run_prepare_callbacks();
+ # timeout = min(time of all timers or normal block time)
+ # io_poll() # <--- Only queues IO callbacks
+ # update_now(); calculate_expired_timers();
+ # run callbacks in this order: (although specificying priorities changes it)
+ # check
+ # stat
+ # child
+ # signal
+ # timer
+ # io
+ # }
+
+ # So instead of running a no-op and letting the side-effect of spinning
+ # the loop run the callbacks, we must explicitly run them here.
+
+ # If we don't, test__systemerror:TestCallback will be flaky, failing
+ # one time out of ~20, depending on timing.
+
+ # To get them to run immediately after this current loop,
+ # we use a check watcher, instead of a 0 duration timer entirely.
+ # If we use a 0 duration timer, we can get stuck in a timer loop.
+ # Python 3.6 fails in test_ftplib.py
+
+ # As a final note, if we have not yet entered the loop *at
+ # all*, and a timer was created with a duration shorter than
+ # the amount of time it took for us to enter the loop in the
+ # first place, it may expire and get called before our callback
+ # does. This could also lead to test__systemerror:TestCallback
+ # appearing to be flaky.
+
+ # As yet another final note, if we are currently running a
+ # timer callback, meaning we're inside uv__run_timers() in C,
+ # and the Python starts a new timer, if the Python code then
+ # update's the loop's time, it's possible that timer will
+ # expire *and be run in the same iteration of the loop*. This
+ # is trivial to do: In sequential code, anything after
+ # `gevent.sleep(0.1)` is running in a timer callback. Starting
+ # a new timer---e.g., another gevent.sleep() call---will
+ # update the time, *before* uv__run_timers exits, meaning
+ # other timers get a chance to run before our check or prepare
+ # watcher callbacks do. Therefore, we do indeed have to have a 0
+ # timer to run callbacks---it gets inserted before any other user
+ # timers---ideally, this should be especially careful about how much time
+ # it runs for.
+
+ # AND YET: We can't actually do that. We get timeouts that I haven't fully
+ # investigated if we do. Probably stuck in a timer loop.
+
+ # As a partial remedy to this, unlike libev, our timer watcher
+ # class doesn't update the loop time by default.
+
+ libuv.uv_check_start(self._timer0, libuv.python_timer0_callback)
+
+
+ def _stop_aux_watchers(self):
+ assert self._prepare
+ assert self._check
+ assert self._signal_idle
+ libuv.uv_prepare_stop(self._prepare)
+ libuv.uv_ref(self._prepare) # Why are we doing this?
+
+ libuv.uv_check_stop(self._check)
+ libuv.uv_ref(self._check)
+
+ libuv.uv_timer_stop(self._signal_idle)
+ libuv.uv_ref(self._signal_idle)
+
+ libuv.uv_check_stop(self._timer0)
+
+ def _setup_for_run_callback(self):
+ self._start_callback_timer()
+ libuv.uv_ref(self._timer0)
+
+
+ def _can_destroy_loop(self, ptr):
+ # We're being asked to destroy a loop that's,
+ # at the time it was constructed, was the default loop.
+ # If loop objects were constructed more than once,
+ # it may have already been destroyed, though.
+ # We track this in the data member.
+ return ptr.data
+
+ def _destroy_loop(self, ptr):
+ ptr.data = ffi.NULL
+ libuv.uv_stop(ptr)
+
+ libuv.gevent_close_all_handles(ptr)
+
+ closed_failed = libuv.uv_loop_close(ptr)
+ if closed_failed:
+ assert closed_failed == libuv.UV_EBUSY
+ # We already closed all the handles. Run the loop
+ # once to let them be cut off from the loop.
+ ran_has_more_callbacks = libuv.uv_run(ptr, libuv.UV_RUN_ONCE)
+ if ran_has_more_callbacks:
+ libuv.uv_run(ptr, libuv.UV_RUN_NOWAIT)
+ closed_failed = libuv.uv_loop_close(ptr)
+ assert closed_failed == 0, closed_failed
+
+ # Destroy the native resources *after* we have closed
+ # the loop. If we do it before, walking the handles
+ # attached to the loop is likely to segfault.
+
+ libuv.gevent_zero_check(self._check)
+ libuv.gevent_zero_check(self._timer0)
+ libuv.gevent_zero_prepare(self._prepare)
+ libuv.gevent_zero_timer(self._signal_idle)
+ del self._check
+ del self._prepare
+ del self._signal_idle
+ del self._timer0
+
+ libuv.gevent_zero_loop(ptr)
+
+ # Destroy any watchers we're still holding on to.
+ del self._io_watchers
+ del self._fork_watchers
+ del self._child_watchers
+
+
+ def debug(self):
+ """
+ Return all the handles that are open and their ref status.
+ """
+ handle_state = namedtuple("HandleState",
+ ['handle',
+ 'type',
+ 'watcher',
+ 'ref',
+ 'active',
+ 'closing'])
+ handles = []
+
+ # XXX: Convert this to a modern callback.
+ def walk(handle, _arg):
+ data = handle.data
+ if data:
+ watcher = ffi.from_handle(data)
+ else:
+ watcher = None
+ handles.append(handle_state(handle,
+ ffi.string(libuv.uv_handle_type_name(handle.type)),
+ watcher,
+ libuv.uv_has_ref(handle),
+ libuv.uv_is_active(handle),
+ libuv.uv_is_closing(handle)))
+
+ libuv.uv_walk(self._ptr,
+ ffi.callback("void(*)(uv_handle_t*,void*)",
+ walk),
+ ffi.NULL)
+ return handles
+
+ def ref(self):
+ pass
+
+ def unref(self):
+ # XXX: Called by _run_callbacks.
+ pass
+
+ def break_(self, how=None):
+ libuv.uv_stop(self._ptr)
+
+ def reinit(self):
+ # TODO: How to implement? We probably have to simply
+ # re-__init__ this whole class? Does it matter?
+ # OR maybe we need to uv_walk() and close all the handles?
+
+ # XXX: libuv < 1.12 simply CANNOT handle a fork unless you immediately
+ # exec() in the child. There are multiple calls to abort() that
+ # will kill the child process:
+ # - The OS X poll implementation (kqueue) aborts on an error return
+ # value; since kqueue FDs can't be inherited, then the next call
+ # to kqueue in the child will fail and get aborted; fork() is likely
+ # to be called during the gevent loop, meaning we're deep inside the
+ # runloop already, so we can't even close the loop that we're in:
+ # it's too late, the next call to kqueue is already scheduled.
+ # - The threadpool, should it be in use, also aborts
+ # (https://github.com/joyent/libuv/pull/1136)
+ # - There global shared state that breaks signal handling
+ # and leads to an abort() in the child, EVEN IF the loop in the parent
+ # had already been closed
+ # (https://github.com/joyent/libuv/issues/1405)
+
+ # In 1.12, the uv_loop_fork function was added (by gevent!)
+ libuv.uv_loop_fork(self._ptr)
+
+ _prepare_ran_callbacks = False
+
+ def __run_queued_callbacks(self):
+ if not self._queued_callbacks:
+ return False
+
+ cbs = list(self._queued_callbacks)
+ self._queued_callbacks = []
+
+ for watcher_ptr, arg in cbs:
+ handle = watcher_ptr.data
+ if not handle:
+ # It's been stopped and possibly closed
+ assert not libuv.uv_is_active(watcher_ptr)
+ continue
+ val = _callbacks.python_callback(handle, arg)
+ if val == -1:
+ _callbacks.python_handle_error(handle, arg)
+ elif val == 1:
+ if not libuv.uv_is_active(watcher_ptr):
+ if watcher_ptr.data != handle:
+ if watcher_ptr.data:
+ _callbacks.python_stop(None)
+ else:
+ _callbacks.python_stop(handle)
+ return True
+
+
+ def run(self, nowait=False, once=False):
+ # we can only respect one flag or the other.
+ # nowait takes precedence because it can't block
+ mode = libuv.UV_RUN_DEFAULT
+ if once:
+ mode = libuv.UV_RUN_ONCE
+ if nowait:
+ mode = libuv.UV_RUN_NOWAIT
+
+ if mode == libuv.UV_RUN_DEFAULT:
+ while self._ptr and self._ptr.data:
+ # This is here to better preserve order guarantees. See _run_callbacks
+ # for details.
+ # It may get run again from the prepare watcher, so potentially we
+ # could take twice as long as the switch interval.
+ self._run_callbacks()
+ self._prepare_ran_callbacks = False
+ ran_status = libuv.uv_run(self._ptr, libuv.UV_RUN_ONCE)
+ # Note that we run queued callbacks when the prepare watcher runs,
+ # thus accounting for timers that expired before polling for IO,
+ # and idle watchers. This next call should get IO callbacks and
+ # callbacks from timers that expired *after* polling for IO.
+ ran_callbacks = self.__run_queued_callbacks()
+
+ if not ran_status and not ran_callbacks and not self._prepare_ran_callbacks:
+ # A return of 0 means there are no referenced and
+ # active handles. The loop is over.
+ # If we didn't run any callbacks, then we couldn't schedule
+ # anything to switch in the future, so there's no point
+ # running again.
+ return ran_status
+ return 0 # Somebody closed the loop
+
+ result = libuv.uv_run(self._ptr, mode)
+ self.__run_queued_callbacks()
+ return result
+
+ def now(self):
+ # libuv's now is expressed as an integer number of
+ # milliseconds, so to get it compatible with time.time units
+ # that this method is supposed to return, we have to divide by 1000.0
+ now = libuv.uv_now(self._ptr)
+ return now / 1000.0
+
+ def update_now(self):
+ libuv.uv_update_time(self._ptr)
+
+ def fileno(self):
+ if self._ptr:
+ fd = libuv.uv_backend_fd(self._ptr)
+ if fd >= 0:
+ return fd
+
+ _sigchld_watcher = None
+ _sigchld_callback_ffi = None
+
+ def install_sigchld(self):
+ if not self.default:
+ return
+
+ if self._sigchld_watcher:
+ return
+
+ self._sigchld_watcher = ffi.new('uv_signal_t*')
+ libuv.uv_signal_init(self._ptr, self._sigchld_watcher)
+ self._sigchld_watcher.data = self._handle_to_self
+
+ libuv.uv_signal_start(self._sigchld_watcher,
+ libuv.python_sigchld_callback,
+ signal.SIGCHLD)
+
+ def reset_sigchld(self):
+ if not self.default or not self._sigchld_watcher:
+ return
+
+ libuv.uv_signal_stop(self._sigchld_watcher)
+ # Must go through this to manage the memory lifetime
+ # correctly. Alternately, we could just stop it and restart
+ # it in install_sigchld?
+ _watchers.watcher._watcher_ffi_close(self._sigchld_watcher)
+ del self._sigchld_watcher
+
+
+ def _sigchld_callback(self):
+ # Signals can arrive at (relatively) any time. To eliminate
+ # race conditions, and behave more like libev, we "queue"
+ # sigchld to run when we run callbacks.
+ while True:
+ try:
+ pid, status, _usage = os.wait3(os.WNOHANG)
+ except OSError:
+ # Python 3 raises ChildProcessError
+ break
+
+ if pid == 0:
+ break
+ children_watchers = self._child_watchers.get(pid, []) + self._child_watchers.get(0, [])
+ for watcher in children_watchers:
+ self.run_callback(watcher._set_waitpid_status, pid, status)
+
+ # Don't invoke child watchers for 0 more than once
+ self._child_watchers[0] = []
+
+ def _register_child_watcher(self, watcher):
+ self._child_watchers[watcher._pid].append(watcher)
+
+ def _unregister_child_watcher(self, watcher):
+ try:
+ # stop() should be idempotent
+ self._child_watchers[watcher._pid].remove(watcher)
+ except ValueError:
+ pass
+
+ # Now's a good time to clean up any dead lists we don't need
+ # anymore
+ for pid in list(self._child_watchers):
+ if not self._child_watchers[pid]:
+ del self._child_watchers[pid]
+
+ def io(self, fd, events, ref=True, priority=None):
+ # We rely on hard references here and explicit calls to
+ # close() on the returned object to correctly manage
+ # the watcher lifetimes.
+
+ io_watchers = self._io_watchers
+ try:
+ io_watcher = io_watchers[fd]
+ assert io_watcher._multiplex_watchers, ("IO Watcher %s unclosed but should be dead" % io_watcher)
+ except KeyError:
+ # Start the watcher with just the events that we're interested in.
+ # as multiplexers are added, the real event mask will be updated to keep in sync.
+ # If we watch for too much, we get spurious wakeups and busy loops.
+ io_watcher = self._watchers.io(self, fd, 0)
+ io_watchers[fd] = io_watcher
+ io_watcher._no_more_watchers = lambda: delitem(io_watchers, fd)
+
+ return io_watcher.multiplex(events)
+
+ def prepare(self, ref=True, priority=None):
+ # We run arbitrary code in python_prepare_callback. That could switch
+ # greenlets. If it does that while also manipulating the active prepare
+ # watchers, we could corrupt the process state, since the prepare watcher
+ # queue is iterated on the stack (on unix). We could workaround this by implementing
+ # prepare watchers in pure Python.
+ # See https://github.com/gevent/gevent/issues/1126
+ raise TypeError("prepare watchers are not currently supported in libuv. "
+ "If you need them, please contact the maintainers.")