diff options
author | James Taylor <user234683@users.noreply.github.com> | 2019-09-06 16:31:13 -0700 |
---|---|---|
committer | James Taylor <user234683@users.noreply.github.com> | 2019-09-06 16:31:13 -0700 |
commit | 3d57e14df7ba5f14a634295caf3b2e60da50bfe2 (patch) | |
tree | 4903bcb79a49ad714a1a9129765b9545405c9978 /python/gevent/libuv/loop.py | |
parent | ac32b24b2a011292b704a3f27e8fd08a7ae9424b (diff) | |
download | yt-local-3d57e14df7ba5f14a634295caf3b2e60da50bfe2.tar.lz yt-local-3d57e14df7ba5f14a634295caf3b2e60da50bfe2.tar.xz yt-local-3d57e14df7ba5f14a634295caf3b2e60da50bfe2.zip |
Remove windows python distribution from repo and add requirements.txt
Diffstat (limited to 'python/gevent/libuv/loop.py')
-rw-r--r-- | python/gevent/libuv/loop.py | 601 |
1 files changed, 0 insertions, 601 deletions
diff --git a/python/gevent/libuv/loop.py b/python/gevent/libuv/loop.py deleted file mode 100644 index 0f317c0..0000000 --- a/python/gevent/libuv/loop.py +++ /dev/null @@ -1,601 +0,0 @@ -""" -libuv loop implementation -""" -# pylint: disable=no-member -from __future__ import absolute_import, print_function - -import os -from collections import defaultdict -from collections import namedtuple -from operator import delitem -import signal - -from gevent._ffi import _dbg # pylint: disable=unused-import -from gevent._ffi.loop import AbstractLoop -from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error -from gevent._ffi.loop import assign_standard_callbacks -from gevent._ffi.loop import AbstractCallbacks -from gevent._util import implementer -from gevent._interfaces import ILoop - -ffi = _corecffi.ffi -libuv = _corecffi.lib - -__all__ = [ -] - - -class _Callbacks(AbstractCallbacks): - - def _find_loop_from_c_watcher(self, watcher_ptr): - loop_handle = ffi.cast('uv_handle_t*', watcher_ptr).data - return self.from_handle(loop_handle) if loop_handle else None - - def python_sigchld_callback(self, watcher_ptr, _signum): - self.from_handle(ffi.cast('uv_handle_t*', watcher_ptr).data)._sigchld_callback() - - def python_timer0_callback(self, watcher_ptr): - return self.python_prepare_callback(watcher_ptr) - - def python_queue_callback(self, watcher_ptr, revents): - watcher_handle = watcher_ptr.data - the_watcher = self.from_handle(watcher_handle) - - the_watcher.loop._queue_callback(watcher_ptr, revents) - - -_callbacks = assign_standard_callbacks( - ffi, libuv, _Callbacks, - [('python_sigchld_callback', None), - ('python_timer0_callback', None), - ('python_queue_callback', None)]) - -from gevent._ffi.loop import EVENTS -GEVENT_CORE_EVENTS = EVENTS # export - -from gevent.libuv import watcher as _watchers # pylint:disable=no-name-in-module - -_events_to_str = _watchers._events_to_str # export - -READ = libuv.UV_READABLE -WRITE = libuv.UV_WRITABLE - -def get_version(): - uv_bytes = ffi.string(libuv.uv_version_string()) - if not isinstance(uv_bytes, str): - # Py3 - uv_str = uv_bytes.decode("ascii") - else: - uv_str = uv_bytes - - return 'libuv-' + uv_str - -def get_header_version(): - return 'libuv-%d.%d.%d' % (libuv.UV_VERSION_MAJOR, libuv.UV_VERSION_MINOR, libuv.UV_VERSION_PATCH) - -def supported_backends(): - return ['default'] - -@implementer(ILoop) -class loop(AbstractLoop): - - # XXX: Undocumented. Maybe better named 'timer_resolution'? We can't - # know this in general on libev - min_sleep_time = 0.001 # 1ms - - error_handler = None - - _CHECK_POINTER = 'uv_check_t *' - - _PREPARE_POINTER = 'uv_prepare_t *' - _PREPARE_CALLBACK_SIG = "void(*)(void*)" - - _TIMER_POINTER = _CHECK_POINTER # This is poorly named. It's for the callback "timer" - - def __init__(self, flags=None, default=None): - AbstractLoop.__init__(self, ffi, libuv, _watchers, flags, default) - self.__loop_pid = os.getpid() - self._child_watchers = defaultdict(list) - self._io_watchers = dict() - self._fork_watchers = set() - self._pid = os.getpid() - self._default = self._ptr == libuv.uv_default_loop() - self._queued_callbacks = [] - - def _queue_callback(self, watcher_ptr, revents): - self._queued_callbacks.append((watcher_ptr, revents)) - - def _init_loop(self, flags, default): - if default is None: - default = True - # Unlike libev, libuv creates a new default - # loop automatically if the old default loop was - # closed. - - if default: - # XXX: If the default loop had been destroyed, this - # will create a new one, but we won't destroy it - ptr = libuv.uv_default_loop() - else: - ptr = libuv.uv_loop_new() - - - if not ptr: - raise SystemError("Failed to get loop") - - # Track whether or not any object has destroyed - # this loop. See _can_destroy_default_loop - ptr.data = ptr - return ptr - - _signal_idle = None - - def _init_and_start_check(self): - libuv.uv_check_init(self._ptr, self._check) - libuv.uv_check_start(self._check, libuv.python_check_callback) - libuv.uv_unref(self._check) - - # We also have to have an idle watcher to be able to handle - # signals in a timely manner. Without them, libuv won't loop again - # and call into its check and prepare handlers. - # Note that this basically forces us into a busy-loop - # XXX: As predicted, using an idle watcher causes our process - # to eat 100% CPU time. We instead use a timer with a max of a .3 second - # delay to notice signals. Note that this timeout also implements fork - # watchers, effectively. - - # XXX: Perhaps we could optimize this to notice when there are other - # timers in the loop and start/stop it then. When we have a callback - # scheduled, this should also be the same and unnecessary? - # libev does takes this basic approach on Windows. - self._signal_idle = ffi.new("uv_timer_t*") - libuv.uv_timer_init(self._ptr, self._signal_idle) - self._signal_idle.data = self._handle_to_self - sig_cb = ffi.cast('void(*)(uv_timer_t*)', libuv.python_check_callback) - libuv.uv_timer_start(self._signal_idle, - sig_cb, - 300, - 300) - libuv.uv_unref(self._signal_idle) - - def _run_callbacks(self): - # Manually handle fork watchers. - curpid = os.getpid() - if curpid != self._pid: - self._pid = curpid - for watcher in self._fork_watchers: - watcher._on_fork() - - - # The contents of queued_callbacks at this point should be timers - # that expired when the loop began along with any idle watchers. - # We need to run them so that any manual callbacks they want to schedule - # get added to the list and ran next before we go on to poll for IO. - # This is critical for libuv on linux: closing a socket schedules some manual - # callbacks to actually stop the watcher; if those don't run before - # we poll for IO, then libuv can abort the process for the closed file descriptor. - - # XXX: There's still a race condition here because we may not run *all* the manual - # callbacks. We need a way to prioritize those. - - # Running these before the manual callbacks lead to some - # random test failures. In test__event.TestEvent_SetThenClear - # we would get a LoopExit sometimes. The problem occurred when - # a timer expired on entering the first loop; we would process - # it there, and then process the callback that it created - # below, leaving nothing for the loop to do. Having the - # self.run() manually process manual callbacks before - # continuing solves the problem. (But we must still run callbacks - # here again.) - self._prepare_ran_callbacks = self.__run_queued_callbacks() - - super(loop, self)._run_callbacks() - - def _init_and_start_prepare(self): - libuv.uv_prepare_init(self._ptr, self._prepare) - libuv.uv_prepare_start(self._prepare, libuv.python_prepare_callback) - libuv.uv_unref(self._prepare) - - def _init_callback_timer(self): - libuv.uv_check_init(self._ptr, self._timer0) - - def _stop_callback_timer(self): - libuv.uv_check_stop(self._timer0) - - def _start_callback_timer(self): - # The purpose of the callback timer is to ensure that we run - # callbacks as soon as possible on the next iteration of the event loop. - - # In libev, we set a 0 duration timer with a no-op callback. - # This executes immediately *after* the IO poll is done (it - # actually determines the time that the IO poll will block - # for), so having the timer present simply spins the loop, and - # our normal prepare watcher kicks in to run the callbacks. - - # In libuv, however, timers are run *first*, before prepare - # callbacks and before polling for IO. So a no-op 0 duration - # timer actually does *nothing*. (Also note that libev queues all - # watchers found during IO poll to run at the end (I think), while libuv - # runs them in uv__io_poll itself.) - - # From the loop inside uv_run: - # while True: - # uv__update_time(loop); - # uv__run_timers(loop); - # # we don't use pending watchers. They are how libuv - # # implements the pipe/udp/tcp streams. - # ran_pending = uv__run_pending(loop); - # uv__run_idle(loop); - # uv__run_prepare(loop); - # ... - # uv__io_poll(loop, timeout); # <--- IO watchers run here! - # uv__run_check(loop); - - # libev looks something like this (pseudo code because the real code is - # hard to read): - # - # do { - # run_fork_callbacks(); - # run_prepare_callbacks(); - # timeout = min(time of all timers or normal block time) - # io_poll() # <--- Only queues IO callbacks - # update_now(); calculate_expired_timers(); - # run callbacks in this order: (although specificying priorities changes it) - # check - # stat - # child - # signal - # timer - # io - # } - - # So instead of running a no-op and letting the side-effect of spinning - # the loop run the callbacks, we must explicitly run them here. - - # If we don't, test__systemerror:TestCallback will be flaky, failing - # one time out of ~20, depending on timing. - - # To get them to run immediately after this current loop, - # we use a check watcher, instead of a 0 duration timer entirely. - # If we use a 0 duration timer, we can get stuck in a timer loop. - # Python 3.6 fails in test_ftplib.py - - # As a final note, if we have not yet entered the loop *at - # all*, and a timer was created with a duration shorter than - # the amount of time it took for us to enter the loop in the - # first place, it may expire and get called before our callback - # does. This could also lead to test__systemerror:TestCallback - # appearing to be flaky. - - # As yet another final note, if we are currently running a - # timer callback, meaning we're inside uv__run_timers() in C, - # and the Python starts a new timer, if the Python code then - # update's the loop's time, it's possible that timer will - # expire *and be run in the same iteration of the loop*. This - # is trivial to do: In sequential code, anything after - # `gevent.sleep(0.1)` is running in a timer callback. Starting - # a new timer---e.g., another gevent.sleep() call---will - # update the time, *before* uv__run_timers exits, meaning - # other timers get a chance to run before our check or prepare - # watcher callbacks do. Therefore, we do indeed have to have a 0 - # timer to run callbacks---it gets inserted before any other user - # timers---ideally, this should be especially careful about how much time - # it runs for. - - # AND YET: We can't actually do that. We get timeouts that I haven't fully - # investigated if we do. Probably stuck in a timer loop. - - # As a partial remedy to this, unlike libev, our timer watcher - # class doesn't update the loop time by default. - - libuv.uv_check_start(self._timer0, libuv.python_timer0_callback) - - - def _stop_aux_watchers(self): - assert self._prepare - assert self._check - assert self._signal_idle - libuv.uv_prepare_stop(self._prepare) - libuv.uv_ref(self._prepare) # Why are we doing this? - - libuv.uv_check_stop(self._check) - libuv.uv_ref(self._check) - - libuv.uv_timer_stop(self._signal_idle) - libuv.uv_ref(self._signal_idle) - - libuv.uv_check_stop(self._timer0) - - def _setup_for_run_callback(self): - self._start_callback_timer() - libuv.uv_ref(self._timer0) - - - def _can_destroy_loop(self, ptr): - # We're being asked to destroy a loop that's, - # at the time it was constructed, was the default loop. - # If loop objects were constructed more than once, - # it may have already been destroyed, though. - # We track this in the data member. - return ptr.data - - def _destroy_loop(self, ptr): - ptr.data = ffi.NULL - libuv.uv_stop(ptr) - - libuv.gevent_close_all_handles(ptr) - - closed_failed = libuv.uv_loop_close(ptr) - if closed_failed: - assert closed_failed == libuv.UV_EBUSY - # We already closed all the handles. Run the loop - # once to let them be cut off from the loop. - ran_has_more_callbacks = libuv.uv_run(ptr, libuv.UV_RUN_ONCE) - if ran_has_more_callbacks: - libuv.uv_run(ptr, libuv.UV_RUN_NOWAIT) - closed_failed = libuv.uv_loop_close(ptr) - assert closed_failed == 0, closed_failed - - # Destroy the native resources *after* we have closed - # the loop. If we do it before, walking the handles - # attached to the loop is likely to segfault. - - libuv.gevent_zero_check(self._check) - libuv.gevent_zero_check(self._timer0) - libuv.gevent_zero_prepare(self._prepare) - libuv.gevent_zero_timer(self._signal_idle) - del self._check - del self._prepare - del self._signal_idle - del self._timer0 - - libuv.gevent_zero_loop(ptr) - - # Destroy any watchers we're still holding on to. - del self._io_watchers - del self._fork_watchers - del self._child_watchers - - - def debug(self): - """ - Return all the handles that are open and their ref status. - """ - handle_state = namedtuple("HandleState", - ['handle', - 'type', - 'watcher', - 'ref', - 'active', - 'closing']) - handles = [] - - # XXX: Convert this to a modern callback. - def walk(handle, _arg): - data = handle.data - if data: - watcher = ffi.from_handle(data) - else: - watcher = None - handles.append(handle_state(handle, - ffi.string(libuv.uv_handle_type_name(handle.type)), - watcher, - libuv.uv_has_ref(handle), - libuv.uv_is_active(handle), - libuv.uv_is_closing(handle))) - - libuv.uv_walk(self._ptr, - ffi.callback("void(*)(uv_handle_t*,void*)", - walk), - ffi.NULL) - return handles - - def ref(self): - pass - - def unref(self): - # XXX: Called by _run_callbacks. - pass - - def break_(self, how=None): - libuv.uv_stop(self._ptr) - - def reinit(self): - # TODO: How to implement? We probably have to simply - # re-__init__ this whole class? Does it matter? - # OR maybe we need to uv_walk() and close all the handles? - - # XXX: libuv < 1.12 simply CANNOT handle a fork unless you immediately - # exec() in the child. There are multiple calls to abort() that - # will kill the child process: - # - The OS X poll implementation (kqueue) aborts on an error return - # value; since kqueue FDs can't be inherited, then the next call - # to kqueue in the child will fail and get aborted; fork() is likely - # to be called during the gevent loop, meaning we're deep inside the - # runloop already, so we can't even close the loop that we're in: - # it's too late, the next call to kqueue is already scheduled. - # - The threadpool, should it be in use, also aborts - # (https://github.com/joyent/libuv/pull/1136) - # - There global shared state that breaks signal handling - # and leads to an abort() in the child, EVEN IF the loop in the parent - # had already been closed - # (https://github.com/joyent/libuv/issues/1405) - - # In 1.12, the uv_loop_fork function was added (by gevent!) - libuv.uv_loop_fork(self._ptr) - - _prepare_ran_callbacks = False - - def __run_queued_callbacks(self): - if not self._queued_callbacks: - return False - - cbs = list(self._queued_callbacks) - self._queued_callbacks = [] - - for watcher_ptr, arg in cbs: - handle = watcher_ptr.data - if not handle: - # It's been stopped and possibly closed - assert not libuv.uv_is_active(watcher_ptr) - continue - val = _callbacks.python_callback(handle, arg) - if val == -1: - _callbacks.python_handle_error(handle, arg) - elif val == 1: - if not libuv.uv_is_active(watcher_ptr): - if watcher_ptr.data != handle: - if watcher_ptr.data: - _callbacks.python_stop(None) - else: - _callbacks.python_stop(handle) - return True - - - def run(self, nowait=False, once=False): - # we can only respect one flag or the other. - # nowait takes precedence because it can't block - mode = libuv.UV_RUN_DEFAULT - if once: - mode = libuv.UV_RUN_ONCE - if nowait: - mode = libuv.UV_RUN_NOWAIT - - if mode == libuv.UV_RUN_DEFAULT: - while self._ptr and self._ptr.data: - # This is here to better preserve order guarantees. See _run_callbacks - # for details. - # It may get run again from the prepare watcher, so potentially we - # could take twice as long as the switch interval. - self._run_callbacks() - self._prepare_ran_callbacks = False - ran_status = libuv.uv_run(self._ptr, libuv.UV_RUN_ONCE) - # Note that we run queued callbacks when the prepare watcher runs, - # thus accounting for timers that expired before polling for IO, - # and idle watchers. This next call should get IO callbacks and - # callbacks from timers that expired *after* polling for IO. - ran_callbacks = self.__run_queued_callbacks() - - if not ran_status and not ran_callbacks and not self._prepare_ran_callbacks: - # A return of 0 means there are no referenced and - # active handles. The loop is over. - # If we didn't run any callbacks, then we couldn't schedule - # anything to switch in the future, so there's no point - # running again. - return ran_status - return 0 # Somebody closed the loop - - result = libuv.uv_run(self._ptr, mode) - self.__run_queued_callbacks() - return result - - def now(self): - # libuv's now is expressed as an integer number of - # milliseconds, so to get it compatible with time.time units - # that this method is supposed to return, we have to divide by 1000.0 - now = libuv.uv_now(self._ptr) - return now / 1000.0 - - def update_now(self): - libuv.uv_update_time(self._ptr) - - def fileno(self): - if self._ptr: - fd = libuv.uv_backend_fd(self._ptr) - if fd >= 0: - return fd - - _sigchld_watcher = None - _sigchld_callback_ffi = None - - def install_sigchld(self): - if not self.default: - return - - if self._sigchld_watcher: - return - - self._sigchld_watcher = ffi.new('uv_signal_t*') - libuv.uv_signal_init(self._ptr, self._sigchld_watcher) - self._sigchld_watcher.data = self._handle_to_self - - libuv.uv_signal_start(self._sigchld_watcher, - libuv.python_sigchld_callback, - signal.SIGCHLD) - - def reset_sigchld(self): - if not self.default or not self._sigchld_watcher: - return - - libuv.uv_signal_stop(self._sigchld_watcher) - # Must go through this to manage the memory lifetime - # correctly. Alternately, we could just stop it and restart - # it in install_sigchld? - _watchers.watcher._watcher_ffi_close(self._sigchld_watcher) - del self._sigchld_watcher - - - def _sigchld_callback(self): - # Signals can arrive at (relatively) any time. To eliminate - # race conditions, and behave more like libev, we "queue" - # sigchld to run when we run callbacks. - while True: - try: - pid, status, _usage = os.wait3(os.WNOHANG) - except OSError: - # Python 3 raises ChildProcessError - break - - if pid == 0: - break - children_watchers = self._child_watchers.get(pid, []) + self._child_watchers.get(0, []) - for watcher in children_watchers: - self.run_callback(watcher._set_waitpid_status, pid, status) - - # Don't invoke child watchers for 0 more than once - self._child_watchers[0] = [] - - def _register_child_watcher(self, watcher): - self._child_watchers[watcher._pid].append(watcher) - - def _unregister_child_watcher(self, watcher): - try: - # stop() should be idempotent - self._child_watchers[watcher._pid].remove(watcher) - except ValueError: - pass - - # Now's a good time to clean up any dead lists we don't need - # anymore - for pid in list(self._child_watchers): - if not self._child_watchers[pid]: - del self._child_watchers[pid] - - def io(self, fd, events, ref=True, priority=None): - # We rely on hard references here and explicit calls to - # close() on the returned object to correctly manage - # the watcher lifetimes. - - io_watchers = self._io_watchers - try: - io_watcher = io_watchers[fd] - assert io_watcher._multiplex_watchers, ("IO Watcher %s unclosed but should be dead" % io_watcher) - except KeyError: - # Start the watcher with just the events that we're interested in. - # as multiplexers are added, the real event mask will be updated to keep in sync. - # If we watch for too much, we get spurious wakeups and busy loops. - io_watcher = self._watchers.io(self, fd, 0) - io_watchers[fd] = io_watcher - io_watcher._no_more_watchers = lambda: delitem(io_watchers, fd) - - return io_watcher.multiplex(events) - - def prepare(self, ref=True, priority=None): - # We run arbitrary code in python_prepare_callback. That could switch - # greenlets. If it does that while also manipulating the active prepare - # watchers, we could corrupt the process state, since the prepare watcher - # queue is iterated on the stack (on unix). We could workaround this by implementing - # prepare watchers in pure Python. - # See https://github.com/gevent/gevent/issues/1126 - raise TypeError("prepare watchers are not currently supported in libuv. " - "If you need them, please contact the maintainers.") |