aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/_socketcommon.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/_socketcommon.py')
-rw-r--r--python/gevent/_socketcommon.py251
1 files changed, 145 insertions, 106 deletions
diff --git a/python/gevent/_socketcommon.py b/python/gevent/_socketcommon.py
index 4da29c8..c1c778f 100644
--- a/python/gevent/_socketcommon.py
+++ b/python/gevent/_socketcommon.py
@@ -68,16 +68,15 @@ __py3_imports__ = [
__imports__.extend(__py3_imports__)
-
+import time
import sys
-from gevent.hub import get_hub
-from gevent.hub import ConcurrentObjectUseError
-from gevent.timeout import Timeout
+from gevent._hub_local import get_hub_noargs as get_hub
from gevent._compat import string_types, integer_types, PY3
from gevent._util import copy_globals
-from gevent._util import _NONE
is_windows = sys.platform == 'win32'
+is_macos = sys.platform == 'darwin'
+
# pylint:disable=no-name-in-module,unused-import
if is_windows:
# no such thing as WSAEPERM or error code 10001 according to winsock.h or MSDN
@@ -102,6 +101,17 @@ try:
except ImportError:
EBADF = 9
+# macOS can return EPROTOTYPE when writing to a socket that is shutting
+# Down. Retrying the write should return the expected EPIPE error.
+# Downstream classes (like pywsgi) know how to handle/ignore EPIPE.
+# This set is used by socket.send() to decide whether the write should
+# be retried. The default is to retry only on EWOULDBLOCK. Here we add
+# EPROTOTYPE on macOS to handle this platform-specific race condition.
+GSENDAGAIN = (EWOULDBLOCK,)
+if is_macos:
+ from errno import EPROTOTYPE
+ GSENDAGAIN += (EPROTOTYPE,)
+
import _socket
_realsocket = _socket.socket
import socket as __socket__
@@ -121,87 +131,13 @@ del _name, _value
_timeout_error = timeout # pylint: disable=undefined-variable
+from gevent import _hub_primitives
+_hub_primitives.set_default_timeout_error(_timeout_error)
-def wait(io, timeout=None, timeout_exc=_NONE):
- """
- Block the current greenlet until *io* is ready.
-
- If *timeout* is non-negative, then *timeout_exc* is raised after
- *timeout* second has passed. By default *timeout_exc* is
- ``socket.timeout('timed out')``.
-
- If :func:`cancel_wait` is called on *io* by another greenlet,
- raise an exception in this blocking greenlet
- (``socket.error(EBADF, 'File descriptor was closed in another
- greenlet')`` by default).
-
- :param io: A libev watcher, most commonly an IO watcher obtained from
- :meth:`gevent.core.loop.io`
- :keyword timeout_exc: The exception to raise if the timeout expires.
- By default, a :class:`socket.timeout` exception is raised.
- If you pass a value for this keyword, it is interpreted as for
- :class:`gevent.timeout.Timeout`.
- """
- if io.callback is not None:
- raise ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (io.callback, ))
- if timeout is not None:
- timeout_exc = timeout_exc if timeout_exc is not _NONE else _timeout_error('timed out')
- timeout = Timeout.start_new(timeout, timeout_exc)
-
- try:
- return get_hub().wait(io)
- finally:
- if timeout is not None:
- timeout.cancel()
- # rename "io" to "watcher" because wait() works with any watcher
-
-
-def wait_read(fileno, timeout=None, timeout_exc=_NONE):
- """
- Block the current greenlet until *fileno* is ready to read.
-
- For the meaning of the other parameters and possible exceptions,
- see :func:`wait`.
-
- .. seealso:: :func:`cancel_wait`
- """
- io = get_hub().loop.io(fileno, 1)
- return wait(io, timeout, timeout_exc)
-
-
-def wait_write(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
- """
- Block the current greenlet until *fileno* is ready to write.
-
- For the meaning of the other parameters and possible exceptions,
- see :func:`wait`.
-
- :keyword event: Ignored. Applications should not pass this parameter.
- In the future, it may become an error.
-
- .. seealso:: :func:`cancel_wait`
- """
- # pylint:disable=unused-argument
- io = get_hub().loop.io(fileno, 2)
- return wait(io, timeout, timeout_exc)
-
-
-def wait_readwrite(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
- """
- Block the current greenlet until *fileno* is ready to read or
- write.
-
- For the meaning of the other parameters and possible exceptions,
- see :func:`wait`.
-
- :keyword event: Ignored. Applications should not pass this parameter.
- In the future, it may become an error.
-
- .. seealso:: :func:`cancel_wait`
- """
- # pylint:disable=unused-argument
- io = get_hub().loop.io(fileno, 3)
- return wait(io, timeout, timeout_exc)
+wait = _hub_primitives.wait_on_watcher
+wait_read = _hub_primitives.wait_read
+wait_write = _hub_primitives.wait_write
+wait_readwrite = _hub_primitives.wait_readwrite
#: The exception raised by default on a call to :func:`cancel_wait`
class cancel_wait_ex(error): # pylint: disable=undefined-variable
@@ -216,29 +152,13 @@ def cancel_wait(watcher, error=cancel_wait_ex):
get_hub().cancel_wait(watcher, error)
-class BlockingResolver(object):
-
- def __init__(self, hub=None):
- pass
-
- def close(self):
- pass
-
- for method in ['gethostbyname',
- 'gethostbyname_ex',
- 'getaddrinfo',
- 'gethostbyaddr',
- 'getnameinfo']:
- locals()[method] = staticmethod(getattr(_socket, method))
-
-
def gethostbyname(hostname):
"""
gethostbyname(host) -> address
Return the IP address (a string of the form '255.255.255.255') for a host.
- .. seealso:: :doc:`dns`
+ .. seealso:: :doc:`/dns`
"""
return get_hub().resolver.gethostbyname(hostname)
@@ -251,7 +171,7 @@ def gethostbyname_ex(hostname):
for a host. The host argument is a string giving a host name or IP number.
Resolve host and port into list of address info entries.
- .. seealso:: :doc:`dns`
+ .. seealso:: :doc:`/dns`
"""
return get_hub().resolver.gethostbyname_ex(hostname)
@@ -271,7 +191,7 @@ def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0):
narrow the list of addresses returned. Passing zero as a value for each of
these arguments selects the full range of results.
- .. seealso:: :doc:`dns`
+ .. seealso:: :doc:`/dns`
"""
return get_hub().resolver.getaddrinfo(host, port, family, socktype, proto, flags)
@@ -297,7 +217,7 @@ def gethostbyaddr(ip_address):
Return the true host name, a list of aliases, and a list of IP addresses,
for a host. The host argument is a string giving a host name or IP number.
- .. seealso:: :doc:`dns`
+ .. seealso:: :doc:`/dns`
"""
return get_hub().resolver.gethostbyaddr(ip_address)
@@ -308,7 +228,7 @@ def getnameinfo(sockaddr, flags):
Get host and port for a sockaddr.
- .. seealso:: :doc:`dns`
+ .. seealso:: :doc:`/dns`
"""
return get_hub().resolver.getnameinfo(sockaddr, flags)
@@ -341,3 +261,122 @@ def getfqdn(name=''):
else:
name = hostname
return name
+
+def __send_chunk(socket, data_memory, flags, timeleft, end, timeout=_timeout_error):
+ """
+ Send the complete contents of ``data_memory`` before returning.
+ This is the core loop around :meth:`send`.
+
+ :param timeleft: Either ``None`` if there is no timeout involved,
+ or a float indicating the timeout to use.
+ :param end: Either ``None`` if there is no timeout involved, or
+ a float giving the absolute end time.
+ :return: An updated value for ``timeleft`` (or None)
+ :raises timeout: If ``timeleft`` was given and elapsed while
+ sending this chunk.
+ """
+ data_sent = 0
+ len_data_memory = len(data_memory)
+ started_timer = 0
+ while data_sent < len_data_memory:
+ chunk = data_memory[data_sent:]
+ if timeleft is None:
+ data_sent += socket.send(chunk, flags)
+ elif started_timer and timeleft <= 0:
+ # Check before sending to guarantee a check
+ # happens even if each chunk successfully sends its data
+ # (especially important for SSL sockets since they have large
+ # buffers). But only do this if we've actually tried to
+ # send something once to avoid spurious timeouts on non-blocking
+ # sockets.
+ raise timeout('timed out')
+ else:
+ started_timer = 1
+ data_sent += socket.send(chunk, flags, timeout=timeleft)
+ timeleft = end - time.time()
+
+ return timeleft
+
+def _sendall(socket, data_memory, flags,
+ SOL_SOCKET=__socket__.SOL_SOCKET, # pylint:disable=no-member
+ SO_SNDBUF=__socket__.SO_SNDBUF): # pylint:disable=no-member
+ """
+ Send the *data_memory* (which should be a memoryview)
+ using the gevent *socket*, performing well on PyPy.
+ """
+
+ # On PyPy up through 5.10.0, both PyPy2 and PyPy3, subviews
+ # (slices) of a memoryview() object copy the underlying bytes the
+ # first time the builtin socket.send() method is called. On a
+ # non-blocking socket (that thus calls socket.send() many times)
+ # with a large input, this results in many repeated copies of an
+ # ever smaller string, depending on the networking buffering. For
+ # example, if each send() can process 1MB of a 50MB input, and we
+ # naively pass the entire remaining subview each time, we'd copy
+ # 49MB, 48MB, 47MB, etc, thus completely killing performance. To
+ # workaround this problem, we work in reasonable, fixed-size
+ # chunks. This results in a 10x improvement to bench_sendall.py,
+ # while having no measurable impact on CPython (since it doesn't
+ # copy at all the only extra overhead is a few python function
+ # calls, which is negligible for large inputs).
+
+ # On one macOS machine, PyPy3 5.10.1 produced ~ 67.53 MB/s before this change,
+ # and ~ 616.01 MB/s after.
+
+ # See https://bitbucket.org/pypy/pypy/issues/2091/non-blocking-socketsend-slow-gevent
+
+ # Too small of a chunk (the socket's buf size is usually too
+ # small) results in reduced perf due to *too many* calls to send and too many
+ # small copies. With a buffer of 143K (the default on my system), for
+ # example, bench_sendall.py yields ~264MB/s, while using 1MB yields
+ # ~653MB/s (matching CPython). 1MB is arbitrary and might be better
+ # chosen, say, to match a page size?
+
+ len_data_memory = len(data_memory)
+ if not len_data_memory:
+ # Don't try to send empty data at all, no point, and breaks ssl
+ # See issue 719
+ return 0
+
+
+ chunk_size = max(socket.getsockopt(SOL_SOCKET, SO_SNDBUF), 1024 * 1024)
+
+ data_sent = 0
+ end = None
+ timeleft = None
+ if socket.timeout is not None:
+ timeleft = socket.timeout
+ end = time.time() + timeleft
+
+ while data_sent < len_data_memory:
+ chunk_end = min(data_sent + chunk_size, len_data_memory)
+ chunk = data_memory[data_sent:chunk_end]
+
+ timeleft = __send_chunk(socket, chunk, flags, timeleft, end)
+ data_sent += len(chunk) # Guaranteed it sent the whole thing
+
+# pylint:disable=no-member
+_RESOLVABLE_FAMILIES = (__socket__.AF_INET,)
+if __socket__.has_ipv6:
+ _RESOLVABLE_FAMILIES += (__socket__.AF_INET6,)
+
+def _resolve_addr(sock, address):
+ # Internal method: resolve the AF_INET[6] address using
+ # getaddrinfo.
+ if sock.family not in _RESOLVABLE_FAMILIES or not isinstance(address, tuple):
+ return address
+ # address is (host, port) (ipv4) or (host, port, flowinfo, scopeid) (ipv6).
+
+ # We don't pass the port to getaddrinfo because the C
+ # socket module doesn't either (on some systems its
+ # illegal to do that without also passing socket type and
+ # protocol). Instead we join the port back at the end.
+ # See https://github.com/gevent/gevent/issues/1252
+ host, port = address[:2]
+ r = getaddrinfo(host, None, sock.family)
+ address = r[0][-1]
+ if len(address) == 2:
+ address = (address[0], port)
+ else:
+ address = (address[0], port, address[2], address[3])
+ return address