diff options
Diffstat (limited to 'python/gevent/_socketcommon.py')
-rw-r--r-- | python/gevent/_socketcommon.py | 251 |
1 files changed, 145 insertions, 106 deletions
diff --git a/python/gevent/_socketcommon.py b/python/gevent/_socketcommon.py index 4da29c8..c1c778f 100644 --- a/python/gevent/_socketcommon.py +++ b/python/gevent/_socketcommon.py @@ -68,16 +68,15 @@ __py3_imports__ = [ __imports__.extend(__py3_imports__) - +import time import sys -from gevent.hub import get_hub -from gevent.hub import ConcurrentObjectUseError -from gevent.timeout import Timeout +from gevent._hub_local import get_hub_noargs as get_hub from gevent._compat import string_types, integer_types, PY3 from gevent._util import copy_globals -from gevent._util import _NONE is_windows = sys.platform == 'win32' +is_macos = sys.platform == 'darwin' + # pylint:disable=no-name-in-module,unused-import if is_windows: # no such thing as WSAEPERM or error code 10001 according to winsock.h or MSDN @@ -102,6 +101,17 @@ try: except ImportError: EBADF = 9 +# macOS can return EPROTOTYPE when writing to a socket that is shutting +# Down. Retrying the write should return the expected EPIPE error. +# Downstream classes (like pywsgi) know how to handle/ignore EPIPE. +# This set is used by socket.send() to decide whether the write should +# be retried. The default is to retry only on EWOULDBLOCK. Here we add +# EPROTOTYPE on macOS to handle this platform-specific race condition. +GSENDAGAIN = (EWOULDBLOCK,) +if is_macos: + from errno import EPROTOTYPE + GSENDAGAIN += (EPROTOTYPE,) + import _socket _realsocket = _socket.socket import socket as __socket__ @@ -121,87 +131,13 @@ del _name, _value _timeout_error = timeout # pylint: disable=undefined-variable +from gevent import _hub_primitives +_hub_primitives.set_default_timeout_error(_timeout_error) -def wait(io, timeout=None, timeout_exc=_NONE): - """ - Block the current greenlet until *io* is ready. - - If *timeout* is non-negative, then *timeout_exc* is raised after - *timeout* second has passed. By default *timeout_exc* is - ``socket.timeout('timed out')``. - - If :func:`cancel_wait` is called on *io* by another greenlet, - raise an exception in this blocking greenlet - (``socket.error(EBADF, 'File descriptor was closed in another - greenlet')`` by default). - - :param io: A libev watcher, most commonly an IO watcher obtained from - :meth:`gevent.core.loop.io` - :keyword timeout_exc: The exception to raise if the timeout expires. - By default, a :class:`socket.timeout` exception is raised. - If you pass a value for this keyword, it is interpreted as for - :class:`gevent.timeout.Timeout`. - """ - if io.callback is not None: - raise ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (io.callback, )) - if timeout is not None: - timeout_exc = timeout_exc if timeout_exc is not _NONE else _timeout_error('timed out') - timeout = Timeout.start_new(timeout, timeout_exc) - - try: - return get_hub().wait(io) - finally: - if timeout is not None: - timeout.cancel() - # rename "io" to "watcher" because wait() works with any watcher - - -def wait_read(fileno, timeout=None, timeout_exc=_NONE): - """ - Block the current greenlet until *fileno* is ready to read. - - For the meaning of the other parameters and possible exceptions, - see :func:`wait`. - - .. seealso:: :func:`cancel_wait` - """ - io = get_hub().loop.io(fileno, 1) - return wait(io, timeout, timeout_exc) - - -def wait_write(fileno, timeout=None, timeout_exc=_NONE, event=_NONE): - """ - Block the current greenlet until *fileno* is ready to write. - - For the meaning of the other parameters and possible exceptions, - see :func:`wait`. - - :keyword event: Ignored. Applications should not pass this parameter. - In the future, it may become an error. - - .. seealso:: :func:`cancel_wait` - """ - # pylint:disable=unused-argument - io = get_hub().loop.io(fileno, 2) - return wait(io, timeout, timeout_exc) - - -def wait_readwrite(fileno, timeout=None, timeout_exc=_NONE, event=_NONE): - """ - Block the current greenlet until *fileno* is ready to read or - write. - - For the meaning of the other parameters and possible exceptions, - see :func:`wait`. - - :keyword event: Ignored. Applications should not pass this parameter. - In the future, it may become an error. - - .. seealso:: :func:`cancel_wait` - """ - # pylint:disable=unused-argument - io = get_hub().loop.io(fileno, 3) - return wait(io, timeout, timeout_exc) +wait = _hub_primitives.wait_on_watcher +wait_read = _hub_primitives.wait_read +wait_write = _hub_primitives.wait_write +wait_readwrite = _hub_primitives.wait_readwrite #: The exception raised by default on a call to :func:`cancel_wait` class cancel_wait_ex(error): # pylint: disable=undefined-variable @@ -216,29 +152,13 @@ def cancel_wait(watcher, error=cancel_wait_ex): get_hub().cancel_wait(watcher, error) -class BlockingResolver(object): - - def __init__(self, hub=None): - pass - - def close(self): - pass - - for method in ['gethostbyname', - 'gethostbyname_ex', - 'getaddrinfo', - 'gethostbyaddr', - 'getnameinfo']: - locals()[method] = staticmethod(getattr(_socket, method)) - - def gethostbyname(hostname): """ gethostbyname(host) -> address Return the IP address (a string of the form '255.255.255.255') for a host. - .. seealso:: :doc:`dns` + .. seealso:: :doc:`/dns` """ return get_hub().resolver.gethostbyname(hostname) @@ -251,7 +171,7 @@ def gethostbyname_ex(hostname): for a host. The host argument is a string giving a host name or IP number. Resolve host and port into list of address info entries. - .. seealso:: :doc:`dns` + .. seealso:: :doc:`/dns` """ return get_hub().resolver.gethostbyname_ex(hostname) @@ -271,7 +191,7 @@ def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0): narrow the list of addresses returned. Passing zero as a value for each of these arguments selects the full range of results. - .. seealso:: :doc:`dns` + .. seealso:: :doc:`/dns` """ return get_hub().resolver.getaddrinfo(host, port, family, socktype, proto, flags) @@ -297,7 +217,7 @@ def gethostbyaddr(ip_address): Return the true host name, a list of aliases, and a list of IP addresses, for a host. The host argument is a string giving a host name or IP number. - .. seealso:: :doc:`dns` + .. seealso:: :doc:`/dns` """ return get_hub().resolver.gethostbyaddr(ip_address) @@ -308,7 +228,7 @@ def getnameinfo(sockaddr, flags): Get host and port for a sockaddr. - .. seealso:: :doc:`dns` + .. seealso:: :doc:`/dns` """ return get_hub().resolver.getnameinfo(sockaddr, flags) @@ -341,3 +261,122 @@ def getfqdn(name=''): else: name = hostname return name + +def __send_chunk(socket, data_memory, flags, timeleft, end, timeout=_timeout_error): + """ + Send the complete contents of ``data_memory`` before returning. + This is the core loop around :meth:`send`. + + :param timeleft: Either ``None`` if there is no timeout involved, + or a float indicating the timeout to use. + :param end: Either ``None`` if there is no timeout involved, or + a float giving the absolute end time. + :return: An updated value for ``timeleft`` (or None) + :raises timeout: If ``timeleft`` was given and elapsed while + sending this chunk. + """ + data_sent = 0 + len_data_memory = len(data_memory) + started_timer = 0 + while data_sent < len_data_memory: + chunk = data_memory[data_sent:] + if timeleft is None: + data_sent += socket.send(chunk, flags) + elif started_timer and timeleft <= 0: + # Check before sending to guarantee a check + # happens even if each chunk successfully sends its data + # (especially important for SSL sockets since they have large + # buffers). But only do this if we've actually tried to + # send something once to avoid spurious timeouts on non-blocking + # sockets. + raise timeout('timed out') + else: + started_timer = 1 + data_sent += socket.send(chunk, flags, timeout=timeleft) + timeleft = end - time.time() + + return timeleft + +def _sendall(socket, data_memory, flags, + SOL_SOCKET=__socket__.SOL_SOCKET, # pylint:disable=no-member + SO_SNDBUF=__socket__.SO_SNDBUF): # pylint:disable=no-member + """ + Send the *data_memory* (which should be a memoryview) + using the gevent *socket*, performing well on PyPy. + """ + + # On PyPy up through 5.10.0, both PyPy2 and PyPy3, subviews + # (slices) of a memoryview() object copy the underlying bytes the + # first time the builtin socket.send() method is called. On a + # non-blocking socket (that thus calls socket.send() many times) + # with a large input, this results in many repeated copies of an + # ever smaller string, depending on the networking buffering. For + # example, if each send() can process 1MB of a 50MB input, and we + # naively pass the entire remaining subview each time, we'd copy + # 49MB, 48MB, 47MB, etc, thus completely killing performance. To + # workaround this problem, we work in reasonable, fixed-size + # chunks. This results in a 10x improvement to bench_sendall.py, + # while having no measurable impact on CPython (since it doesn't + # copy at all the only extra overhead is a few python function + # calls, which is negligible for large inputs). + + # On one macOS machine, PyPy3 5.10.1 produced ~ 67.53 MB/s before this change, + # and ~ 616.01 MB/s after. + + # See https://bitbucket.org/pypy/pypy/issues/2091/non-blocking-socketsend-slow-gevent + + # Too small of a chunk (the socket's buf size is usually too + # small) results in reduced perf due to *too many* calls to send and too many + # small copies. With a buffer of 143K (the default on my system), for + # example, bench_sendall.py yields ~264MB/s, while using 1MB yields + # ~653MB/s (matching CPython). 1MB is arbitrary and might be better + # chosen, say, to match a page size? + + len_data_memory = len(data_memory) + if not len_data_memory: + # Don't try to send empty data at all, no point, and breaks ssl + # See issue 719 + return 0 + + + chunk_size = max(socket.getsockopt(SOL_SOCKET, SO_SNDBUF), 1024 * 1024) + + data_sent = 0 + end = None + timeleft = None + if socket.timeout is not None: + timeleft = socket.timeout + end = time.time() + timeleft + + while data_sent < len_data_memory: + chunk_end = min(data_sent + chunk_size, len_data_memory) + chunk = data_memory[data_sent:chunk_end] + + timeleft = __send_chunk(socket, chunk, flags, timeleft, end) + data_sent += len(chunk) # Guaranteed it sent the whole thing + +# pylint:disable=no-member +_RESOLVABLE_FAMILIES = (__socket__.AF_INET,) +if __socket__.has_ipv6: + _RESOLVABLE_FAMILIES += (__socket__.AF_INET6,) + +def _resolve_addr(sock, address): + # Internal method: resolve the AF_INET[6] address using + # getaddrinfo. + if sock.family not in _RESOLVABLE_FAMILIES or not isinstance(address, tuple): + return address + # address is (host, port) (ipv4) or (host, port, flowinfo, scopeid) (ipv6). + + # We don't pass the port to getaddrinfo because the C + # socket module doesn't either (on some systems its + # illegal to do that without also passing socket type and + # protocol). Instead we join the port back at the end. + # See https://github.com/gevent/gevent/issues/1252 + host, port = address[:2] + r = getaddrinfo(host, None, sock.family) + address = r[0][-1] + if len(address) == 2: + address = (address[0], port) + else: + address = (address[0], port, address[2], address[3]) + return address |