""" Low-level operating system functions from :mod:`os`. Cooperative I/O =============== This module provides cooperative versions of :func:`os.read` and :func:`os.write`. These functions are *not* monkey-patched; you must explicitly call them or monkey patch them yourself. POSIX functions --------------- On POSIX, non-blocking IO is available. - :func:`nb_read` - :func:`nb_write` - :func:`make_nonblocking` All Platforms ------------- On non-POSIX platforms (e.g., Windows), non-blocking IO is not available. On those platforms (and on POSIX), cooperative IO can be done with the threadpool. - :func:`tp_read` - :func:`tp_write` Child Processes =============== The functions :func:`fork` and (on POSIX) :func:`forkpty` and :func:`waitpid` can be used to manage child processes. .. warning:: Forking a process that uses greenlets does not eliminate all non-running greenlets. Any that were scheduled in the hub of the forking thread in the parent remain scheduled in the child; compare this to how normal threads operate. (This behaviour may change is a subsequent major release.) """ from __future__ import absolute_import import os import sys from gevent.hub import get_hub, reinit from gevent._compat import PY3 from gevent._util import copy_globals import errno EAGAIN = getattr(errno, 'EAGAIN', 11) try: import fcntl except ImportError: fcntl = None __implements__ = ['fork'] __extensions__ = ['tp_read', 'tp_write'] _read = os.read _write = os.write ignored_errors = [EAGAIN, errno.EINTR] if fcntl: __extensions__ += ['make_nonblocking', 'nb_read', 'nb_write'] def make_nonblocking(fd): """Put the file descriptor *fd* into non-blocking mode if possible. :return: A boolean value that evaluates to True if successful.""" flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) if not bool(flags & os.O_NONBLOCK): fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) return True def nb_read(fd, n): """Read up to `n` bytes from file descriptor `fd`. Return a string containing the bytes read. If end-of-file is reached, an empty string is returned. The descriptor must be in non-blocking mode. """ hub, event = None, None while True: try: return _read(fd, n) except OSError as e: if e.errno not in ignored_errors: raise if not PY3: sys.exc_clear() if hub is None: hub = get_hub() event = hub.loop.io(fd, 1) hub.wait(event) def nb_write(fd, buf): """Write bytes from buffer `buf` to file descriptor `fd`. Return the number of bytes written. The file descriptor must be in non-blocking mode. """ hub, event = None, None while True: try: return _write(fd, buf) except OSError as e: if e.errno not in ignored_errors: raise if not PY3: sys.exc_clear() if hub is None: hub = get_hub() event = hub.loop.io(fd, 2) hub.wait(event) def tp_read(fd, n): """Read up to *n* bytes from file descriptor *fd*. Return a string containing the bytes read. If end-of-file is reached, an empty string is returned. Reading is done using the threadpool. """ return get_hub().threadpool.apply(_read, (fd, n)) def tp_write(fd, buf): """Write bytes from buffer *buf* to file descriptor *fd*. Return the number of bytes written. Writing is done using the threadpool. """ return get_hub().threadpool.apply(_write, (fd, buf)) if hasattr(os, 'fork'): # pylint:disable=function-redefined,redefined-outer-name _raw_fork = os.fork def fork_gevent(): """ Forks the process using :func:`os.fork` and prepares the child process to continue using gevent before returning. .. note:: The PID returned by this function may not be waitable with either the original :func:`os.waitpid` or this module's :func:`waitpid` and it may not generate SIGCHLD signals if libev child watchers are or ever have been in use. For example, the :mod:`gevent.subprocess` module uses libev child watchers (which parts of gevent use libev child watchers is subject to change at any time). Most applications should use :func:`fork_and_watch`, which is monkey-patched as the default replacement for :func:`os.fork` and implements the ``fork`` function of this module by default, unless the environment variable ``GEVENT_NOWAITPID`` is defined before this module is imported. .. versionadded:: 1.1b2 """ result = _raw_fork() if not result: reinit() return result def fork(): """ A wrapper for :func:`fork_gevent` for non-POSIX platforms. """ return fork_gevent() if hasattr(os, 'forkpty'): _raw_forkpty = os.forkpty def forkpty_gevent(): """ Forks the process using :func:`os.forkpty` and prepares the child process to continue using gevent before returning. Returns a tuple (pid, master_fd). The `master_fd` is *not* put into non-blocking mode. Availability: Some Unix systems. .. seealso:: This function has the same limitations as :func:`fork_gevent`. .. versionadded:: 1.1b5 """ pid, master_fd = _raw_forkpty() if not pid: reinit() return pid, master_fd forkpty = forkpty_gevent __implements__.append('forkpty') __extensions__.append("forkpty_gevent") if hasattr(os, 'WNOWAIT') or hasattr(os, 'WNOHANG'): # We can only do this on POSIX import time _waitpid = os.waitpid _WNOHANG = os.WNOHANG # replaced by the signal module. _on_child_hook = lambda: None # {pid -> watcher or tuple(pid, rstatus, timestamp)} _watched_children = {} def _on_child(watcher, callback): # XXX: Could handle tracing here by not stopping # until the pid is terminated watcher.stop() _watched_children[watcher.pid] = (watcher.pid, watcher.rstatus, time.time()) if callback: callback(watcher) # dispatch an "event"; used by gevent.signal.signal _on_child_hook() # now is as good a time as any to reap children _reap_children() def _reap_children(timeout=60): # Remove all the dead children that haven't been waited on # for the *timeout* seconds. # Some platforms queue delivery of SIGCHLD for all children that die; # in that case, a well-behaved application should call waitpid() for each # signal. # Some platforms (linux) only guarantee one delivery if multiple children # die. On that platform, the well-behave application calls waitpid() in a loop # until it gets back -1, indicating no more dead children need to be waited for. # In either case, waitpid should be called the same number of times as dead children, # thus removing all the watchers when a SIGCHLD arrives. The (generous) timeout # is to work with applications that neglect to call waitpid and prevent "unlimited" # growth. # Note that we don't watch for the case of pid wraparound. That is, we fork a new # child with the same pid as an existing watcher, but the child is already dead, # just not waited on yet. now = time.time() oldest_allowed = now - timeout dead = [pid for pid, val in _watched_children.items() if isinstance(val, tuple) and val[2] < oldest_allowed] for pid in dead: del _watched_children[pid] def waitpid(pid, options): """ Wait for a child process to finish. If the child process was spawned using :func:`fork_and_watch`, then this function behaves cooperatively. If not, it *may* have race conditions; see :func:`fork_gevent` for more information. The arguments are as for the underlying :func:`os.waitpid`. Some combinations of *options* may not be supported cooperatively (as of 1.1 that includes WUNTRACED). Using a *pid* of 0 to request waiting on only processes from the current process group is not cooperative. Availability: POSIX. .. versionadded:: 1.1b1 .. versionchanged:: 1.2a1 More cases are handled in a cooperative manner. """ # XXX Does not handle tracing children # So long as libev's loop doesn't run, it's OK to add # child watchers. The SIGCHLD handler only feeds events # for the next iteration of the loop to handle. (And the # signal handler itself is only called from the next loop # iteration.) if pid <= 0: # magic functions for multiple children. if pid == -1: # Any child. If we have one that we're watching and that finished, # we will use that one. Otherwise, let the OS take care of it. for k, v in _watched_children.items(): if isinstance(v, tuple): pid = k break if pid <= 0: # We didn't have one that was ready. If there are # no funky options set, and the pid was -1 # (meaning any process, not 0, which means process # group--- libev doesn't know about process # groups) then we can use a child watcher of pid 0; otherwise, # pass through to the OS. if pid == -1 and options == 0: hub = get_hub() watcher = hub.loop.child(0, False) hub.wait(watcher) return watcher.rpid, watcher.rstatus # There were funky options/pid, so we must go to the OS. return _waitpid(pid, options) if pid in _watched_children: # yes, we're watching it if options & _WNOHANG or isinstance(_watched_children[pid], tuple): # We're either asked not to block, or it already finished, in which # case blocking doesn't matter result = _watched_children[pid] if isinstance(result, tuple): # it finished. libev child watchers # are one-shot del _watched_children[pid] return result[:2] # it's not finished return (0, 0) else: # Ok, we need to "block". Do so via a watcher so that we're # cooperative. We know it's our child, etc, so this should work. watcher = _watched_children[pid] # We can't start a watcher that's already started, # so we can't reuse the existing watcher. new_watcher = watcher.loop.child(pid, False) get_hub().wait(new_watcher) # Ok, so now the new watcher is done. That means # the old watcher's callback (_on_child) should # have fired, potentially taking this child out of # _watched_children (but that could depend on how # many callbacks there were to run, so use the # watcher object directly; libev sets all the # watchers at the same time). return watcher.rpid, watcher.rstatus # we're not watching it and it may not even be our child, # so we must go to the OS to be sure to get the right semantics (exception) return _waitpid(pid, options) def fork_and_watch(callback=None, loop=None, ref=False, fork=fork_gevent): """ Fork a child process and start a child watcher for it in the parent process. This call cooperates with :func:`waitpid` to enable cooperatively waiting for children to finish. When monkey-patching, these functions are patched in as :func:`os.fork` and :func:`os.waitpid`, respectively. In the child process, this function calls :func:`gevent.hub.reinit` before returning. Availability: POSIX. :keyword callback: If given, a callable that will be called with the child watcher when the child finishes. :keyword loop: The loop to start the watcher in. Defaults to the loop of the current hub. :keyword fork: The fork function. Defaults to :func:`the one defined in this module ` (which automatically calls :func:`gevent.hub.reinit`). Pass the builtin :func:`os.fork` function if you do not need to initialize gevent in the child process. .. versionadded:: 1.1b1 .. seealso:: :func:`gevent.monkey.get_original` To access the builtin :func:`os.fork`. """ pid = fork() if pid: # parent loop = loop or get_hub().loop watcher = loop.child(pid, ref=ref) _watched_children[pid] = watcher watcher.start(_on_child, watcher, callback) return pid __extensions__.append('fork_and_watch') __extensions__.append('fork_gevent') if 'forkpty' in __implements__: def forkpty_and_watch(callback=None, loop=None, ref=False, forkpty=forkpty_gevent): """ Like :func:`fork_and_watch`, except using :func:`forkpty_gevent`. Availability: Some Unix systems. .. versionadded:: 1.1b5 """ result = [] def _fork(): pid_and_fd = forkpty() result.append(pid_and_fd) return pid_and_fd[0] fork_and_watch(callback, loop, ref, _fork) return result[0] __extensions__.append('forkpty_and_watch') # Watch children by default if not os.getenv('GEVENT_NOWAITPID'): # Broken out into separate functions instead of simple name aliases # for documentation purposes. def fork(*args, **kwargs): """ Forks a child process and starts a child watcher for it in the parent process so that ``waitpid`` and SIGCHLD work as expected. This implementation of ``fork`` is a wrapper for :func:`fork_and_watch` when the environment variable ``GEVENT_NOWAITPID`` is *not* defined. This is the default and should be used by most applications. .. versionchanged:: 1.1b2 """ # take any args to match fork_and_watch return fork_and_watch(*args, **kwargs) if 'forkpty' in __implements__: def forkpty(*args, **kwargs): """ Like :func:`fork`, but using :func:`forkpty_gevent`. This implementation of ``forkpty`` is a wrapper for :func:`forkpty_and_watch` when the environment variable ``GEVENT_NOWAITPID`` is *not* defined. This is the default and should be used by most applications. .. versionadded:: 1.1b5 """ # take any args to match fork_and_watch return forkpty_and_watch(*args, **kwargs) __implements__.append("waitpid") else: def fork(): """ Forks a child process, initializes gevent in the child, but *does not* prepare the parent to wait for the child or receive SIGCHLD. This implementation of ``fork`` is a wrapper for :func:`fork_gevent` when the environment variable ``GEVENT_NOWAITPID`` *is* defined. This is not recommended for most applications. """ return fork_gevent() if 'forkpty' in __implements__: def forkpty(): """ Like :func:`fork`, but using :func:`os.forkpty` This implementation of ``forkpty`` is a wrapper for :func:`forkpty_gevent` when the environment variable ``GEVENT_NOWAITPID`` *is* defined. This is not recommended for most applications. .. versionadded:: 1.1b5 """ return forkpty_gevent() __extensions__.append("waitpid") else: __implements__.remove('fork') __imports__ = copy_globals(os, globals(), names_to_ignore=__implements__ + __extensions__, dunder_names_to_keep=()) __all__ = list(set(__implements__ + __extensions__))