diff options
Diffstat (limited to 'python/gevent/os.py')
-rw-r--r-- | python/gevent/os.py | 162 |
1 files changed, 102 insertions, 60 deletions
diff --git a/python/gevent/os.py b/python/gevent/os.py index fffd0dc..3980f32 100644 --- a/python/gevent/os.py +++ b/python/gevent/os.py @@ -45,7 +45,9 @@ from __future__ import absolute_import import os import sys -from gevent.hub import get_hub, reinit +from gevent.hub import _get_hub_noargs as get_hub +from gevent.hub import reinit +from gevent._config import config from gevent._compat import PY3 from gevent._util import copy_globals import errno @@ -72,54 +74,76 @@ if fcntl: __extensions__ += ['make_nonblocking', 'nb_read', 'nb_write'] def make_nonblocking(fd): - """Put the file descriptor *fd* into non-blocking mode if possible. + """Put the file descriptor *fd* into non-blocking mode if + possible. - :return: A boolean value that evaluates to True if successful.""" + :return: A boolean value that evaluates to True if successful. + """ flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) if not bool(flags & os.O_NONBLOCK): fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) return True def nb_read(fd, n): - """Read up to `n` bytes from file descriptor `fd`. Return a string - containing the bytes read. If end-of-file is reached, an empty string - is returned. + """ + Read up to *n* bytes from file descriptor *fd*. Return a + byte string containing the bytes read, which may be shorter than + *n*. If end-of-file is reached, an empty string is returned. The descriptor must be in non-blocking mode. """ - hub, event = None, None - while True: - try: - return _read(fd, n) - except OSError as e: - if e.errno not in ignored_errors: - raise - if not PY3: - sys.exc_clear() - if hub is None: - hub = get_hub() - event = hub.loop.io(fd, 1) - hub.wait(event) + hub = None + event = None + try: + while 1: + try: + result = _read(fd, n) + return result + except OSError as e: + if e.errno not in ignored_errors: + raise + if not PY3: + sys.exc_clear() + if hub is None: + hub = get_hub() + event = hub.loop.io(fd, 1) + hub.wait(event) + finally: + if event is not None: + event.close() + event = None + hub = None + def nb_write(fd, buf): - """Write bytes from buffer `buf` to file descriptor `fd`. Return the - number of bytes written. + """ + Write some number of bytes from buffer *buf* to file + descriptor *fd*. Return the number of bytes written, which may + be less than the length of *buf*. The file descriptor must be in non-blocking mode. """ - hub, event = None, None - while True: - try: - return _write(fd, buf) - except OSError as e: - if e.errno not in ignored_errors: - raise - if not PY3: - sys.exc_clear() - if hub is None: - hub = get_hub() - event = hub.loop.io(fd, 2) - hub.wait(event) + hub = None + event = None + try: + while 1: + try: + result = _write(fd, buf) + return result + except OSError as e: + if e.errno not in ignored_errors: + raise + if not PY3: + sys.exc_clear() + if hub is None: + hub = get_hub() + event = hub.loop.io(fd, 2) + hub.wait(event) + finally: + if event is not None: + event.close() + event = None + hub = None def tp_read(fd, n): @@ -224,13 +248,16 @@ if hasattr(os, 'fork'): # XXX: Could handle tracing here by not stopping # until the pid is terminated watcher.stop() - _watched_children[watcher.pid] = (watcher.pid, watcher.rstatus, time.time()) - if callback: - callback(watcher) - # dispatch an "event"; used by gevent.signal.signal - _on_child_hook() - # now is as good a time as any to reap children - _reap_children() + try: + _watched_children[watcher.pid] = (watcher.pid, watcher.rstatus, time.time()) + if callback: + callback(watcher) + # dispatch an "event"; used by gevent.signal.signal + _on_child_hook() + # now is as good a time as any to reap children + _reap_children() + finally: + watcher.close() def _reap_children(timeout=60): # Remove all the dead children that haven't been waited on @@ -277,6 +304,7 @@ if hasattr(os, 'fork'): .. versionchanged:: 1.2a1 More cases are handled in a cooperative manner. """ + # pylint: disable=too-many-return-statements # XXX Does not handle tracing children # So long as libev's loop doesn't run, it's OK to add @@ -303,14 +331,19 @@ if hasattr(os, 'fork'): # pass through to the OS. if pid == -1 and options == 0: hub = get_hub() - watcher = hub.loop.child(0, False) - hub.wait(watcher) - return watcher.rpid, watcher.rstatus + with hub.loop.child(0, False) as watcher: + hub.wait(watcher) + return watcher.rpid, watcher.rstatus # There were funky options/pid, so we must go to the OS. return _waitpid(pid, options) if pid in _watched_children: # yes, we're watching it + + # Note that the remainder of this code must be careful to NOT + # yield to the event loop except at well known times, or + # we have a race condition between the _on_child callback and the + # code here that could lead to a process to hang. if options & _WNOHANG or isinstance(_watched_children[pid], tuple): # We're either asked not to block, or it already finished, in which # case blocking doesn't matter @@ -322,25 +355,34 @@ if hasattr(os, 'fork'): return result[:2] # it's not finished return (0, 0) - else: - # Ok, we need to "block". Do so via a watcher so that we're - # cooperative. We know it's our child, etc, so this should work. - watcher = _watched_children[pid] - # We can't start a watcher that's already started, - # so we can't reuse the existing watcher. - new_watcher = watcher.loop.child(pid, False) + + # Ok, we need to "block". Do so via a watcher so that we're + # cooperative. We know it's our child, etc, so this should work. + watcher = _watched_children[pid] + # We can't start a watcher that's already started, + # so we can't reuse the existing watcher. Notice that the + # old watcher must not have fired already, or during this time, but + # only after we successfully `start()` the watcher. So this must + # not yield to the event loop. + with watcher.loop.child(pid, False) as new_watcher: get_hub().wait(new_watcher) - # Ok, so now the new watcher is done. That means - # the old watcher's callback (_on_child) should - # have fired, potentially taking this child out of - # _watched_children (but that could depend on how - # many callbacks there were to run, so use the - # watcher object directly; libev sets all the - # watchers at the same time). - return watcher.rpid, watcher.rstatus + # Ok, so now the new watcher is done. That means + # the old watcher's callback (_on_child) should + # have fired, potentially taking this child out of + # _watched_children (but that could depend on how + # many callbacks there were to run, so use the + # watcher object directly; libev sets all the + # watchers at the same time). + return watcher.rpid, watcher.rstatus # we're not watching it and it may not even be our child, # so we must go to the OS to be sure to get the right semantics (exception) + # XXX + # libuv has a race condition because the signal + # handler is a Python function, so the InterruptedError + # is raised before the signal handler runs and calls the + # child watcher + # we're not watching it return _waitpid(pid, options) def fork_and_watch(callback=None, loop=None, ref=False, fork=fork_gevent): @@ -401,7 +443,7 @@ if hasattr(os, 'fork'): __extensions__.append('forkpty_and_watch') # Watch children by default - if not os.getenv('GEVENT_NOWAITPID'): + if not config.disable_watch_children: # Broken out into separate functions instead of simple name aliases # for documentation purposes. def fork(*args, **kwargs): |