diff options
author | James Taylor <user234683@users.noreply.github.com> | 2018-07-12 23:40:30 -0700 |
---|---|---|
committer | James Taylor <user234683@users.noreply.github.com> | 2018-07-12 23:41:07 -0700 |
commit | c3b9f8c4582882cd1f768b0727eca75475bb4f94 (patch) | |
tree | 5b4a1c693fd5b7416f1d5a75862e633502e77ca7 /python/gevent/os.py | |
parent | fe9fe8257740529f5880693992e4eeca35c7ea3e (diff) | |
download | yt-local-c3b9f8c4582882cd1f768b0727eca75475bb4f94.tar.lz yt-local-c3b9f8c4582882cd1f768b0727eca75475bb4f94.tar.xz yt-local-c3b9f8c4582882cd1f768b0727eca75475bb4f94.zip |
track embedded python distribution
Diffstat (limited to 'python/gevent/os.py')
-rw-r--r-- | python/gevent/os.py | 468 |
1 files changed, 468 insertions, 0 deletions
diff --git a/python/gevent/os.py b/python/gevent/os.py new file mode 100644 index 0000000..fffd0dc --- /dev/null +++ b/python/gevent/os.py @@ -0,0 +1,468 @@ +""" +Low-level operating system functions from :mod:`os`. + +Cooperative I/O +=============== + +This module provides cooperative versions of :func:`os.read` and +:func:`os.write`. These functions are *not* monkey-patched; you +must explicitly call them or monkey patch them yourself. + +POSIX functions +--------------- + +On POSIX, non-blocking IO is available. + +- :func:`nb_read` +- :func:`nb_write` +- :func:`make_nonblocking` + +All Platforms +------------- + +On non-POSIX platforms (e.g., Windows), non-blocking IO is not +available. On those platforms (and on POSIX), cooperative IO can +be done with the threadpool. + +- :func:`tp_read` +- :func:`tp_write` + +Child Processes +=============== + +The functions :func:`fork` and (on POSIX) :func:`forkpty` and :func:`waitpid` can be used +to manage child processes. + +.. warning:: + + Forking a process that uses greenlets does not eliminate all non-running + greenlets. Any that were scheduled in the hub of the forking thread in the parent + remain scheduled in the child; compare this to how normal threads operate. (This behaviour + may change is a subsequent major release.) +""" + +from __future__ import absolute_import + +import os +import sys +from gevent.hub import get_hub, reinit +from gevent._compat import PY3 +from gevent._util import copy_globals +import errno + +EAGAIN = getattr(errno, 'EAGAIN', 11) + +try: + import fcntl +except ImportError: + fcntl = None + +__implements__ = ['fork'] +__extensions__ = ['tp_read', 'tp_write'] + +_read = os.read +_write = os.write + + +ignored_errors = [EAGAIN, errno.EINTR] + + +if fcntl: + + __extensions__ += ['make_nonblocking', 'nb_read', 'nb_write'] + + def make_nonblocking(fd): + """Put the file descriptor *fd* into non-blocking mode if possible. + + :return: A boolean value that evaluates to True if successful.""" + flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) + if not bool(flags & os.O_NONBLOCK): + fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + return True + + def nb_read(fd, n): + """Read up to `n` bytes from file descriptor `fd`. Return a string + containing the bytes read. If end-of-file is reached, an empty string + is returned. + + The descriptor must be in non-blocking mode. + """ + hub, event = None, None + while True: + try: + return _read(fd, n) + except OSError as e: + if e.errno not in ignored_errors: + raise + if not PY3: + sys.exc_clear() + if hub is None: + hub = get_hub() + event = hub.loop.io(fd, 1) + hub.wait(event) + + def nb_write(fd, buf): + """Write bytes from buffer `buf` to file descriptor `fd`. Return the + number of bytes written. + + The file descriptor must be in non-blocking mode. + """ + hub, event = None, None + while True: + try: + return _write(fd, buf) + except OSError as e: + if e.errno not in ignored_errors: + raise + if not PY3: + sys.exc_clear() + if hub is None: + hub = get_hub() + event = hub.loop.io(fd, 2) + hub.wait(event) + + +def tp_read(fd, n): + """Read up to *n* bytes from file descriptor *fd*. Return a string + containing the bytes read. If end-of-file is reached, an empty string + is returned. + + Reading is done using the threadpool. + """ + return get_hub().threadpool.apply(_read, (fd, n)) + + +def tp_write(fd, buf): + """Write bytes from buffer *buf* to file descriptor *fd*. Return the + number of bytes written. + + Writing is done using the threadpool. + """ + return get_hub().threadpool.apply(_write, (fd, buf)) + + +if hasattr(os, 'fork'): + # pylint:disable=function-redefined,redefined-outer-name + + _raw_fork = os.fork + + def fork_gevent(): + """ + Forks the process using :func:`os.fork` and prepares the + child process to continue using gevent before returning. + + .. note:: + + The PID returned by this function may not be waitable with + either the original :func:`os.waitpid` or this module's + :func:`waitpid` and it may not generate SIGCHLD signals if + libev child watchers are or ever have been in use. For + example, the :mod:`gevent.subprocess` module uses libev + child watchers (which parts of gevent use libev child + watchers is subject to change at any time). Most + applications should use :func:`fork_and_watch`, which is + monkey-patched as the default replacement for + :func:`os.fork` and implements the ``fork`` function of + this module by default, unless the environment variable + ``GEVENT_NOWAITPID`` is defined before this module is + imported. + + .. versionadded:: 1.1b2 + """ + result = _raw_fork() + if not result: + reinit() + return result + + def fork(): + """ + A wrapper for :func:`fork_gevent` for non-POSIX platforms. + """ + return fork_gevent() + + if hasattr(os, 'forkpty'): + _raw_forkpty = os.forkpty + + def forkpty_gevent(): + """ + Forks the process using :func:`os.forkpty` and prepares the + child process to continue using gevent before returning. + + Returns a tuple (pid, master_fd). The `master_fd` is *not* put into + non-blocking mode. + + Availability: Some Unix systems. + + .. seealso:: This function has the same limitations as :func:`fork_gevent`. + + .. versionadded:: 1.1b5 + """ + pid, master_fd = _raw_forkpty() + if not pid: + reinit() + return pid, master_fd + + forkpty = forkpty_gevent + + __implements__.append('forkpty') + __extensions__.append("forkpty_gevent") + + if hasattr(os, 'WNOWAIT') or hasattr(os, 'WNOHANG'): + # We can only do this on POSIX + import time + + _waitpid = os.waitpid + _WNOHANG = os.WNOHANG + + # replaced by the signal module. + _on_child_hook = lambda: None + + # {pid -> watcher or tuple(pid, rstatus, timestamp)} + _watched_children = {} + + def _on_child(watcher, callback): + # XXX: Could handle tracing here by not stopping + # until the pid is terminated + watcher.stop() + _watched_children[watcher.pid] = (watcher.pid, watcher.rstatus, time.time()) + if callback: + callback(watcher) + # dispatch an "event"; used by gevent.signal.signal + _on_child_hook() + # now is as good a time as any to reap children + _reap_children() + + def _reap_children(timeout=60): + # Remove all the dead children that haven't been waited on + # for the *timeout* seconds. + # Some platforms queue delivery of SIGCHLD for all children that die; + # in that case, a well-behaved application should call waitpid() for each + # signal. + # Some platforms (linux) only guarantee one delivery if multiple children + # die. On that platform, the well-behave application calls waitpid() in a loop + # until it gets back -1, indicating no more dead children need to be waited for. + # In either case, waitpid should be called the same number of times as dead children, + # thus removing all the watchers when a SIGCHLD arrives. The (generous) timeout + # is to work with applications that neglect to call waitpid and prevent "unlimited" + # growth. + # Note that we don't watch for the case of pid wraparound. That is, we fork a new + # child with the same pid as an existing watcher, but the child is already dead, + # just not waited on yet. + now = time.time() + oldest_allowed = now - timeout + dead = [pid for pid, val + in _watched_children.items() + if isinstance(val, tuple) and val[2] < oldest_allowed] + for pid in dead: + del _watched_children[pid] + + def waitpid(pid, options): + """ + Wait for a child process to finish. + + If the child process was spawned using + :func:`fork_and_watch`, then this function behaves + cooperatively. If not, it *may* have race conditions; see + :func:`fork_gevent` for more information. + + The arguments are as for the underlying + :func:`os.waitpid`. Some combinations of *options* may not + be supported cooperatively (as of 1.1 that includes + WUNTRACED). Using a *pid* of 0 to request waiting on only processes + from the current process group is not cooperative. + + Availability: POSIX. + + .. versionadded:: 1.1b1 + .. versionchanged:: 1.2a1 + More cases are handled in a cooperative manner. + """ + # XXX Does not handle tracing children + + # So long as libev's loop doesn't run, it's OK to add + # child watchers. The SIGCHLD handler only feeds events + # for the next iteration of the loop to handle. (And the + # signal handler itself is only called from the next loop + # iteration.) + + if pid <= 0: + # magic functions for multiple children. + if pid == -1: + # Any child. If we have one that we're watching and that finished, + # we will use that one. Otherwise, let the OS take care of it. + for k, v in _watched_children.items(): + if isinstance(v, tuple): + pid = k + break + if pid <= 0: + # We didn't have one that was ready. If there are + # no funky options set, and the pid was -1 + # (meaning any process, not 0, which means process + # group--- libev doesn't know about process + # groups) then we can use a child watcher of pid 0; otherwise, + # pass through to the OS. + if pid == -1 and options == 0: + hub = get_hub() + watcher = hub.loop.child(0, False) + hub.wait(watcher) + return watcher.rpid, watcher.rstatus + # There were funky options/pid, so we must go to the OS. + return _waitpid(pid, options) + + if pid in _watched_children: + # yes, we're watching it + if options & _WNOHANG or isinstance(_watched_children[pid], tuple): + # We're either asked not to block, or it already finished, in which + # case blocking doesn't matter + result = _watched_children[pid] + if isinstance(result, tuple): + # it finished. libev child watchers + # are one-shot + del _watched_children[pid] + return result[:2] + # it's not finished + return (0, 0) + else: + # Ok, we need to "block". Do so via a watcher so that we're + # cooperative. We know it's our child, etc, so this should work. + watcher = _watched_children[pid] + # We can't start a watcher that's already started, + # so we can't reuse the existing watcher. + new_watcher = watcher.loop.child(pid, False) + get_hub().wait(new_watcher) + # Ok, so now the new watcher is done. That means + # the old watcher's callback (_on_child) should + # have fired, potentially taking this child out of + # _watched_children (but that could depend on how + # many callbacks there were to run, so use the + # watcher object directly; libev sets all the + # watchers at the same time). + return watcher.rpid, watcher.rstatus + + # we're not watching it and it may not even be our child, + # so we must go to the OS to be sure to get the right semantics (exception) + return _waitpid(pid, options) + + def fork_and_watch(callback=None, loop=None, ref=False, fork=fork_gevent): + """ + Fork a child process and start a child watcher for it in the parent process. + + This call cooperates with :func:`waitpid` to enable cooperatively waiting + for children to finish. When monkey-patching, these functions are patched in as + :func:`os.fork` and :func:`os.waitpid`, respectively. + + In the child process, this function calls :func:`gevent.hub.reinit` before returning. + + Availability: POSIX. + + :keyword callback: If given, a callable that will be called with the child watcher + when the child finishes. + :keyword loop: The loop to start the watcher in. Defaults to the + loop of the current hub. + :keyword fork: The fork function. Defaults to :func:`the one defined in this + module <gevent.os.fork_gevent>` (which automatically calls :func:`gevent.hub.reinit`). + Pass the builtin :func:`os.fork` function if you do not need to + initialize gevent in the child process. + + .. versionadded:: 1.1b1 + .. seealso:: + :func:`gevent.monkey.get_original` To access the builtin :func:`os.fork`. + """ + pid = fork() + if pid: + # parent + loop = loop or get_hub().loop + watcher = loop.child(pid, ref=ref) + _watched_children[pid] = watcher + watcher.start(_on_child, watcher, callback) + return pid + + __extensions__.append('fork_and_watch') + __extensions__.append('fork_gevent') + + if 'forkpty' in __implements__: + def forkpty_and_watch(callback=None, loop=None, ref=False, forkpty=forkpty_gevent): + """ + Like :func:`fork_and_watch`, except using :func:`forkpty_gevent`. + + Availability: Some Unix systems. + + .. versionadded:: 1.1b5 + """ + result = [] + + def _fork(): + pid_and_fd = forkpty() + result.append(pid_and_fd) + return pid_and_fd[0] + fork_and_watch(callback, loop, ref, _fork) + return result[0] + + __extensions__.append('forkpty_and_watch') + + # Watch children by default + if not os.getenv('GEVENT_NOWAITPID'): + # Broken out into separate functions instead of simple name aliases + # for documentation purposes. + def fork(*args, **kwargs): + """ + Forks a child process and starts a child watcher for it in the + parent process so that ``waitpid`` and SIGCHLD work as expected. + + This implementation of ``fork`` is a wrapper for :func:`fork_and_watch` + when the environment variable ``GEVENT_NOWAITPID`` is *not* defined. + This is the default and should be used by most applications. + + .. versionchanged:: 1.1b2 + """ + # take any args to match fork_and_watch + return fork_and_watch(*args, **kwargs) + + if 'forkpty' in __implements__: + def forkpty(*args, **kwargs): + """ + Like :func:`fork`, but using :func:`forkpty_gevent`. + + This implementation of ``forkpty`` is a wrapper for :func:`forkpty_and_watch` + when the environment variable ``GEVENT_NOWAITPID`` is *not* defined. + This is the default and should be used by most applications. + + .. versionadded:: 1.1b5 + """ + # take any args to match fork_and_watch + return forkpty_and_watch(*args, **kwargs) + __implements__.append("waitpid") + else: + def fork(): + """ + Forks a child process, initializes gevent in the child, + but *does not* prepare the parent to wait for the child or receive SIGCHLD. + + This implementation of ``fork`` is a wrapper for :func:`fork_gevent` + when the environment variable ``GEVENT_NOWAITPID`` *is* defined. + This is not recommended for most applications. + """ + return fork_gevent() + + if 'forkpty' in __implements__: + def forkpty(): + """ + Like :func:`fork`, but using :func:`os.forkpty` + + This implementation of ``forkpty`` is a wrapper for :func:`forkpty_gevent` + when the environment variable ``GEVENT_NOWAITPID`` *is* defined. + This is not recommended for most applications. + + .. versionadded:: 1.1b5 + """ + return forkpty_gevent() + __extensions__.append("waitpid") + +else: + __implements__.remove('fork') + +__imports__ = copy_globals(os, globals(), + names_to_ignore=__implements__ + __extensions__, + dunder_names_to_keep=()) + +__all__ = list(set(__implements__ + __extensions__)) |