aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/os.py
diff options
context:
space:
mode:
authorJames Taylor <user234683@users.noreply.github.com>2018-09-14 19:32:27 -0700
committerJames Taylor <user234683@users.noreply.github.com>2018-09-14 19:32:27 -0700
commit4212164e91ba2f49583cf44ad623a29b36db8f77 (patch)
tree47aefe3c0162f03e0c823b43873356f69c1cd636 /python/gevent/os.py
parent6ca20ff7010f2bafc7fefcb8cad982be27a8aeae (diff)
downloadyt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.tar.lz
yt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.tar.xz
yt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.zip
Windows: Use 32-bit distribution of python
Diffstat (limited to 'python/gevent/os.py')
-rw-r--r--python/gevent/os.py162
1 files changed, 102 insertions, 60 deletions
diff --git a/python/gevent/os.py b/python/gevent/os.py
index fffd0dc..3980f32 100644
--- a/python/gevent/os.py
+++ b/python/gevent/os.py
@@ -45,7 +45,9 @@ from __future__ import absolute_import
import os
import sys
-from gevent.hub import get_hub, reinit
+from gevent.hub import _get_hub_noargs as get_hub
+from gevent.hub import reinit
+from gevent._config import config
from gevent._compat import PY3
from gevent._util import copy_globals
import errno
@@ -72,54 +74,76 @@ if fcntl:
__extensions__ += ['make_nonblocking', 'nb_read', 'nb_write']
def make_nonblocking(fd):
- """Put the file descriptor *fd* into non-blocking mode if possible.
+ """Put the file descriptor *fd* into non-blocking mode if
+ possible.
- :return: A boolean value that evaluates to True if successful."""
+ :return: A boolean value that evaluates to True if successful.
+ """
flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
if not bool(flags & os.O_NONBLOCK):
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
return True
def nb_read(fd, n):
- """Read up to `n` bytes from file descriptor `fd`. Return a string
- containing the bytes read. If end-of-file is reached, an empty string
- is returned.
+ """
+ Read up to *n* bytes from file descriptor *fd*. Return a
+ byte string containing the bytes read, which may be shorter than
+ *n*. If end-of-file is reached, an empty string is returned.
The descriptor must be in non-blocking mode.
"""
- hub, event = None, None
- while True:
- try:
- return _read(fd, n)
- except OSError as e:
- if e.errno not in ignored_errors:
- raise
- if not PY3:
- sys.exc_clear()
- if hub is None:
- hub = get_hub()
- event = hub.loop.io(fd, 1)
- hub.wait(event)
+ hub = None
+ event = None
+ try:
+ while 1:
+ try:
+ result = _read(fd, n)
+ return result
+ except OSError as e:
+ if e.errno not in ignored_errors:
+ raise
+ if not PY3:
+ sys.exc_clear()
+ if hub is None:
+ hub = get_hub()
+ event = hub.loop.io(fd, 1)
+ hub.wait(event)
+ finally:
+ if event is not None:
+ event.close()
+ event = None
+ hub = None
+
def nb_write(fd, buf):
- """Write bytes from buffer `buf` to file descriptor `fd`. Return the
- number of bytes written.
+ """
+ Write some number of bytes from buffer *buf* to file
+ descriptor *fd*. Return the number of bytes written, which may
+ be less than the length of *buf*.
The file descriptor must be in non-blocking mode.
"""
- hub, event = None, None
- while True:
- try:
- return _write(fd, buf)
- except OSError as e:
- if e.errno not in ignored_errors:
- raise
- if not PY3:
- sys.exc_clear()
- if hub is None:
- hub = get_hub()
- event = hub.loop.io(fd, 2)
- hub.wait(event)
+ hub = None
+ event = None
+ try:
+ while 1:
+ try:
+ result = _write(fd, buf)
+ return result
+ except OSError as e:
+ if e.errno not in ignored_errors:
+ raise
+ if not PY3:
+ sys.exc_clear()
+ if hub is None:
+ hub = get_hub()
+ event = hub.loop.io(fd, 2)
+ hub.wait(event)
+ finally:
+ if event is not None:
+ event.close()
+ event = None
+ hub = None
def tp_read(fd, n):
@@ -224,13 +248,16 @@ if hasattr(os, 'fork'):
# XXX: Could handle tracing here by not stopping
# until the pid is terminated
watcher.stop()
- _watched_children[watcher.pid] = (watcher.pid, watcher.rstatus, time.time())
- if callback:
- callback(watcher)
- # dispatch an "event"; used by gevent.signal.signal
- _on_child_hook()
- # now is as good a time as any to reap children
- _reap_children()
+ try:
+ _watched_children[watcher.pid] = (watcher.pid, watcher.rstatus, time.time())
+ if callback:
+ callback(watcher)
+ # dispatch an "event"; used by gevent.signal.signal
+ _on_child_hook()
+ # now is as good a time as any to reap children
+ _reap_children()
+ finally:
+ watcher.close()
def _reap_children(timeout=60):
# Remove all the dead children that haven't been waited on
@@ -277,6 +304,7 @@ if hasattr(os, 'fork'):
.. versionchanged:: 1.2a1
More cases are handled in a cooperative manner.
"""
+ # pylint: disable=too-many-return-statements
# XXX Does not handle tracing children
# So long as libev's loop doesn't run, it's OK to add
@@ -303,14 +331,19 @@ if hasattr(os, 'fork'):
# pass through to the OS.
if pid == -1 and options == 0:
hub = get_hub()
- watcher = hub.loop.child(0, False)
- hub.wait(watcher)
- return watcher.rpid, watcher.rstatus
+ with hub.loop.child(0, False) as watcher:
+ hub.wait(watcher)
+ return watcher.rpid, watcher.rstatus
# There were funky options/pid, so we must go to the OS.
return _waitpid(pid, options)
if pid in _watched_children:
# yes, we're watching it
+
+ # Note that the remainder of this code must be careful to NOT
+ # yield to the event loop except at well known times, or
+ # we have a race condition between the _on_child callback and the
+ # code here that could lead to a process to hang.
if options & _WNOHANG or isinstance(_watched_children[pid], tuple):
# We're either asked not to block, or it already finished, in which
# case blocking doesn't matter
@@ -322,25 +355,34 @@ if hasattr(os, 'fork'):
return result[:2]
# it's not finished
return (0, 0)
- else:
- # Ok, we need to "block". Do so via a watcher so that we're
- # cooperative. We know it's our child, etc, so this should work.
- watcher = _watched_children[pid]
- # We can't start a watcher that's already started,
- # so we can't reuse the existing watcher.
- new_watcher = watcher.loop.child(pid, False)
+
+ # Ok, we need to "block". Do so via a watcher so that we're
+ # cooperative. We know it's our child, etc, so this should work.
+ watcher = _watched_children[pid]
+ # We can't start a watcher that's already started,
+ # so we can't reuse the existing watcher. Notice that the
+ # old watcher must not have fired already, or during this time, but
+ # only after we successfully `start()` the watcher. So this must
+ # not yield to the event loop.
+ with watcher.loop.child(pid, False) as new_watcher:
get_hub().wait(new_watcher)
- # Ok, so now the new watcher is done. That means
- # the old watcher's callback (_on_child) should
- # have fired, potentially taking this child out of
- # _watched_children (but that could depend on how
- # many callbacks there were to run, so use the
- # watcher object directly; libev sets all the
- # watchers at the same time).
- return watcher.rpid, watcher.rstatus
+ # Ok, so now the new watcher is done. That means
+ # the old watcher's callback (_on_child) should
+ # have fired, potentially taking this child out of
+ # _watched_children (but that could depend on how
+ # many callbacks there were to run, so use the
+ # watcher object directly; libev sets all the
+ # watchers at the same time).
+ return watcher.rpid, watcher.rstatus
# we're not watching it and it may not even be our child,
# so we must go to the OS to be sure to get the right semantics (exception)
+ # XXX
+ # libuv has a race condition because the signal
+ # handler is a Python function, so the InterruptedError
+ # is raised before the signal handler runs and calls the
+ # child watcher
+ # we're not watching it
return _waitpid(pid, options)
def fork_and_watch(callback=None, loop=None, ref=False, fork=fork_gevent):
@@ -401,7 +443,7 @@ if hasattr(os, 'fork'):
__extensions__.append('forkpty_and_watch')
# Watch children by default
- if not os.getenv('GEVENT_NOWAITPID'):
+ if not config.disable_watch_children:
# Broken out into separate functions instead of simple name aliases
# for documentation purposes.
def fork(*args, **kwargs):