diff options
Diffstat (limited to 'python/gevent/_socket3.py')
-rw-r--r-- | python/gevent/_socket3.py | 517 |
1 files changed, 104 insertions, 413 deletions
diff --git a/python/gevent/_socket3.py b/python/gevent/_socket3.py index d659d88..973b5a9 100644 --- a/python/gevent/_socket3.py +++ b/python/gevent/_socket3.py @@ -10,17 +10,27 @@ from __future__ import absolute_import import io import os import sys -import time + from gevent import _socketcommon from gevent._util import copy_globals from gevent._compat import PYPY +from gevent.timeout import Timeout import _socket from os import dup + copy_globals(_socketcommon, globals(), names_to_ignore=_socketcommon.__extensions__, dunder_names_to_keep=()) +try: + from errno import EHOSTUNREACH + from errno import ECONNREFUSED +except ImportError: + EHOSTUNREACH = -1 + ECONNREFUSED = -1 + + __socket__ = _socketcommon.__socket__ __implements__ = _socketcommon._implements __extensions__ = _socketcommon.__extensions__ @@ -58,6 +68,7 @@ class _wrefsocket(_socket.socket): timeout = property(lambda s: s.gettimeout(), lambda s, nv: s.settimeout(nv)) +from gevent._hub_primitives import wait_on_socket as _wait_on_socket class socket(object): """ @@ -74,13 +85,37 @@ class socket(object): # of _wrefsocket. (gevent internal usage only) _gevent_sock_class = _wrefsocket - def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None): - # Take the same approach as socket2: wrap a real socket object, - # don't subclass it. This lets code that needs the raw _sock (not tied to the hub) - # get it. This shows up in tests like test__example_udp_server. - self._sock = self._gevent_sock_class(family, type, proto, fileno) - self._io_refs = 0 - self._closed = False + _io_refs = 0 + _closed = False + _read_event = None + _write_event = None + + + # Take the same approach as socket2: wrap a real socket object, + # don't subclass it. This lets code that needs the raw _sock (not tied to the hub) + # get it. This shows up in tests like test__example_udp_server. + + if sys.version_info[:2] < (3, 7): + def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None): + self._sock = self._gevent_sock_class(family, type, proto, fileno) + self.timeout = None + self.__init_common() + else: + # In 3.7, socket changed to auto-detecting family, type, and proto + # when given a fileno. + def __init__(self, family=-1, type=-1, proto=-1, fileno=None): + if fileno is None: + if family == -1: + family = AF_INET + if type == -1: + type = SOCK_STREAM + if proto == -1: + proto = 0 + self._sock = self._gevent_sock_class(family, type, proto, fileno) + self.timeout = None + self.__init_common() + + def __init_common(self): _socket.socket.setblocking(self._sock, False) fileno = _socket.socket.fileno(self._sock) self.hub = get_hub() @@ -101,6 +136,16 @@ class socket(object): return self._sock.type & ~_socket.SOCK_NONBLOCK # pylint:disable=no-member return self._sock.type + def getblocking(self): + """ + Returns whether the socket will approximate blocking + behaviour. + + .. versionadded:: 1.3a2 + Added in Python 3.7. + """ + return self.timeout != 0.0 + def __enter__(self): return self @@ -114,7 +159,7 @@ class socket(object): s = _socket.socket.__repr__(self._sock) except Exception as ex: # pylint:disable=broad-except # Observed on Windows Py3.3, printing the repr of a socket - # that just sufferred a ConnectionResetError [WinError 10054]: + # that just suffered a ConnectionResetError [WinError 10054]: # "OverflowError: no printf formatter to display the socket descriptor in decimal" # Not sure what the actual cause is or if there's a better way to handle this s = '<socket [%r]>' % ex @@ -138,25 +183,7 @@ class socket(object): ref = property(_get_ref, _set_ref) - def _wait(self, watcher, timeout_exc=timeout('timed out')): - """Block the current greenlet until *watcher* has pending events. - - If *timeout* is non-negative, then *timeout_exc* is raised after *timeout* second has passed. - By default *timeout_exc* is ``socket.timeout('timed out')``. - - If :func:`cancel_wait` is called, raise ``socket.error(EBADF, 'File descriptor was closed in another greenlet')``. - """ - if watcher.callback is not None: - raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, )) - if self.timeout is not None: - timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False) - else: - timeout = None - try: - self.hub.wait(watcher) - finally: - if timeout is not None: - timeout.cancel() + _wait = _wait_on_socket def dup(self): """dup() -> socket object @@ -237,15 +264,28 @@ class socket(object): return text def _decref_socketios(self): + # Called by SocketIO when it is closed. if self._io_refs > 0: self._io_refs -= 1 if self._closed: self.close() + def _drop_events(self): + if self._read_event is not None: + self.hub.cancel_wait(self._read_event, cancel_wait_ex, True) + self._read_event = None + if self._write_event is not None: + self.hub.cancel_wait(self._write_event, cancel_wait_ex, True) + self._write_event = None + def _real_close(self, _ss=_socket.socket, cancel_wait_ex=cancel_wait_ex): # This function should not reference any globals. See Python issue #808164. - self.hub.cancel_wait(self._read_event, cancel_wait_ex) - self.hub.cancel_wait(self._write_event, cancel_wait_ex) + + # Break any reference to the loop.io objects. Our fileno, + # which they were tied to, is now free to be reused, so these + # objects are no longer functional. + self._drop_events() + _ss.close(self._sock) # Break any references to the underlying socket object. Tested @@ -286,35 +326,41 @@ class socket(object): def connect(self, address): if self.timeout == 0.0: return _socket.socket.connect(self._sock, address) - if isinstance(address, tuple): - r = getaddrinfo(address[0], address[1], self.family) - address = r[0][-1] - if self.timeout is not None: - timer = Timeout.start_new(self.timeout, timeout('timed out')) - else: - timer = None - try: + address = _socketcommon._resolve_addr(self._sock, address) + + with Timeout._start_new_or_dummy(self.timeout, timeout("timed out")): while True: err = self.getsockopt(SOL_SOCKET, SO_ERROR) if err: raise error(err, strerror(err)) result = _socket.socket.connect_ex(self._sock, address) + if not result or result == EISCONN: break elif (result in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or (result == EINVAL and is_windows): self._wait(self._write_event) else: + if (isinstance(address, tuple) + and address[0] == 'fe80::1' + and result == EHOSTUNREACH): + # On Python 3.7 on mac, we see EHOSTUNREACH + # returned for this link-local address, but it really is + # supposed to be ECONNREFUSED according to the standard library + # tests (test_socket.NetworkConnectionNoServer.test_create_connection) + # (On previous versions, that code passed the '127.0.0.1' IPv4 address, so + # ipv6 link locals were never a factor; 3.7 passes 'localhost'.) + # It is something of a mystery how the stdlib socket code doesn't + # produce EHOSTUNREACH---I (JAM) can't see how socketmodule.c would avoid + # that. The normal connect just calls connect_ex much like we do. + result = ECONNREFUSED raise error(result, strerror(result)) - finally: - if timer is not None: - timer.cancel() def connect_ex(self, address): try: return self.connect(address) or 0 except timeout: return EAGAIN - except gaierror: + except gaierror: # pylint:disable=try-except-raise # gaierror/overflowerror/typerror is not silenced by connect_ex; # gaierror extends OSError (aka error) so catch it first raise @@ -335,8 +381,9 @@ class socket(object): raise self._wait(self._read_event) - if hasattr(_socket.socket, 'sendmsg'): - # Only on Unix + if hasattr(_socket.socket, 'recvmsg'): + # Only on Unix; PyPy 3.5 5.10.0 provides sendmsg and recvmsg, but not + # recvmsg_into (at least on os x) def recvmsg(self, *args): while True: @@ -347,6 +394,8 @@ class socket(object): raise self._wait(self._read_event) + if hasattr(_socket.socket, 'recvmsg_into'): + def recvmsg_into(self, *args): while True: try: @@ -389,7 +438,7 @@ class socket(object): try: return _socket.socket.send(self._sock, data, flags) except error as ex: - if ex.args[0] != EWOULDBLOCK or timeout == 0.0: + if ex.args[0] not in _socketcommon.GSENDAGAIN or timeout == 0.0: raise self._wait(self._write_event) try: @@ -406,27 +455,7 @@ class socket(object): # PyPy2, so it's possibly premature to do this. However, there is a 3.5 test case that # possibly exposes this in a severe way. data_memory = _get_memory(data) - len_data_memory = len(data_memory) - if not len_data_memory: - # Don't try to send empty data at all, no point, and breaks ssl - # See issue 719 - return 0 - - if self.timeout is None: - data_sent = 0 - while data_sent < len_data_memory: - data_sent += self.send(data_memory[data_sent:], flags) - else: - timeleft = self.timeout - end = time.time() + timeleft - data_sent = 0 - while True: - data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft) - if data_sent >= len_data_memory: - break - timeleft = end - time.time() - if timeleft <= 0: - raise timeout('timed out') + return _socketcommon._sendall(self, data_memory, flags) def sendto(self, *args): try: @@ -464,6 +493,11 @@ class socket(object): raise def setblocking(self, flag): + # Beginning in 3.6.0b3 this is supposed to raise + # if the file descriptor is closed, but the test for it + # involves closing the fileno directly. Since we + # don't touch the fileno here, it doesn't make sense for + # us. if flag: self.timeout = None else: @@ -659,7 +693,7 @@ if hasattr(_socket, "socketpair"): b = socket(family, type, proto, b.detach()) return a, b -else: +else: # pragma: no cover # Origin: https://gist.github.com/4325783, by Geert Jansen. Public domain. # gevent: taken from 3.6 release. Expected to be used only on Win. Added to Win/3.5 @@ -715,351 +749,8 @@ else: __implements__.remove('socketpair') -# PyPy needs drop and reuse -def _do_reuse_or_drop(sock, methname): - try: - method = getattr(sock, methname) - except (AttributeError, TypeError): - pass - else: - method() - -from io import BytesIO - - -class _basefileobject(object): - """Faux file object attached to a socket object.""" - - default_bufsize = 8192 - name = "<socket>" - - __slots__ = ["mode", "bufsize", "softspace", - # "closed" is a property, see below - "_sock", "_rbufsize", "_wbufsize", "_rbuf", "_wbuf", "_wbuf_len", - "_close"] - - def __init__(self, sock, mode='rb', bufsize=-1, close=False): - _do_reuse_or_drop(sock, '_reuse') - self._sock = sock - self.mode = mode # Not actually used in this version - if bufsize < 0: - bufsize = self.default_bufsize - self.bufsize = bufsize - self.softspace = False - # _rbufsize is the suggested recv buffer size. It is *strictly* - # obeyed within readline() for recv calls. If it is larger than - # default_bufsize it will be used for recv calls within read(). - if bufsize == 0: - self._rbufsize = 1 - elif bufsize == 1: - self._rbufsize = self.default_bufsize - else: - self._rbufsize = bufsize - self._wbufsize = bufsize - # We use BytesIO for the read buffer to avoid holding a list - # of variously sized string objects which have been known to - # fragment the heap due to how they are malloc()ed and often - # realloc()ed down much smaller than their original allocation. - self._rbuf = BytesIO() - self._wbuf = [] # A list of strings - self._wbuf_len = 0 - self._close = close - - def _getclosed(self): - return self._sock is None - closed = property(_getclosed, doc="True if the file is closed") - - def close(self): - try: - if self._sock: - self.flush() - finally: - s = self._sock - self._sock = None - if s is not None: - if self._close: - s.close() - else: - _do_reuse_or_drop(s, '_drop') - - def __del__(self): - try: - self.close() - except: # pylint:disable=bare-except - # close() may fail if __init__ didn't complete - pass - - def flush(self): - if self._wbuf: - data = b"".join(self._wbuf) - self._wbuf = [] - self._wbuf_len = 0 - buffer_size = max(self._rbufsize, self.default_bufsize) - data_size = len(data) - write_offset = 0 - view = memoryview(data) - try: - while write_offset < data_size: - self._sock.sendall(view[write_offset:write_offset + buffer_size]) - write_offset += buffer_size - finally: - if write_offset < data_size: - remainder = data[write_offset:] - del view, data # explicit free - self._wbuf.append(remainder) - self._wbuf_len = len(remainder) - - def fileno(self): - return self._sock.fileno() - - def write(self, data): - if not isinstance(data, bytes): - raise TypeError("Non-bytes data") - if not data: - return - self._wbuf.append(data) - self._wbuf_len += len(data) - if (self._wbufsize == 0 or (self._wbufsize == 1 and b'\n' in data) or - (self._wbufsize > 1 and self._wbuf_len >= self._wbufsize)): - self.flush() - - def writelines(self, list): - # XXX We could do better here for very long lists - # XXX Should really reject non-string non-buffers - lines = filter(None, map(str, list)) - self._wbuf_len += sum(map(len, lines)) - self._wbuf.extend(lines) - if self._wbufsize <= 1 or self._wbuf_len >= self._wbufsize: - self.flush() - - def read(self, size=-1): - # Use max, disallow tiny reads in a loop as they are very inefficient. - # We never leave read() with any leftover data from a new recv() call - # in our internal buffer. - rbufsize = max(self._rbufsize, self.default_bufsize) - # Our use of BytesIO rather than lists of string objects returned by - # recv() minimizes memory usage and fragmentation that occurs when - # rbufsize is large compared to the typical return value of recv(). - buf = self._rbuf - buf.seek(0, 2) # seek end - if size < 0: - # Read until EOF - self._rbuf = BytesIO() # reset _rbuf. we consume it via buf. - while True: - try: - data = self._sock.recv(rbufsize) - except InterruptedError: - continue - if not data: - break - buf.write(data) - return buf.getvalue() - else: - # Read until size bytes or EOF seen, whichever comes first - buf_len = buf.tell() - if buf_len >= size: - # Already have size bytes in our buffer? Extract and return. - buf.seek(0) - rv = buf.read(size) - self._rbuf = BytesIO() - self._rbuf.write(buf.read()) - return rv - - self._rbuf = BytesIO() # reset _rbuf. we consume it via buf. - while True: - left = size - buf_len - # recv() will malloc the amount of memory given as its - # parameter even though it often returns much less data - # than that. The returned data string is short lived - # as we copy it into a BytesIO and free it. This avoids - # fragmentation issues on many platforms. - try: - data = self._sock.recv(left) - except InterruptedError: - continue - - if not data: - break - n = len(data) - if n == size and not buf_len: - # Shortcut. Avoid buffer data copies when: - # - We have no data in our buffer. - # AND - # - Our call to recv returned exactly the - # number of bytes we were asked to read. - return data - if n == left: - buf.write(data) - del data # explicit free - break - assert n <= left, "recv(%d) returned %d bytes" % (left, n) - buf.write(data) - buf_len += n - del data # explicit free - #assert buf_len == buf.tell() - return buf.getvalue() - - def readline(self, size=-1): - # pylint:disable=too-many-return-statements - buf = self._rbuf - buf.seek(0, 2) # seek end - if buf.tell() > 0: - # check if we already have it in our buffer - buf.seek(0) - bline = buf.readline(size) - if bline.endswith(b'\n') or len(bline) == size: - self._rbuf = BytesIO() - self._rbuf.write(buf.read()) - return bline - del bline - if size < 0: # pylint:disable=too-many-nested-blocks - # Read until \n or EOF, whichever comes first - if self._rbufsize <= 1: - # Speed up unbuffered case - buf.seek(0) - buffers = [buf.read()] - self._rbuf = BytesIO() # reset _rbuf. we consume it via buf. - data = None - recv = self._sock.recv - while True: - try: - while data != b"\n": - data = recv(1) - if not data: - break - buffers.append(data) - except InterruptedError: - # The try..except to catch EINTR was moved outside the - # recv loop to avoid the per byte overhead. - continue - - break - return b"".join(buffers) - - buf.seek(0, 2) # seek end - self._rbuf = BytesIO() # reset _rbuf. we consume it via buf. - while True: - try: - data = self._sock.recv(self._rbufsize) - except InterruptedError: - continue - - if not data: - break - nl = data.find(b'\n') - if nl >= 0: - nl += 1 - buf.write(data[:nl]) - self._rbuf.write(data[nl:]) - del data - break - buf.write(data) - return buf.getvalue() - else: - # Read until size bytes or \n or EOF seen, whichever comes first - buf.seek(0, 2) # seek end - buf_len = buf.tell() - if buf_len >= size: - buf.seek(0) - rv = buf.read(size) - self._rbuf = BytesIO() - self._rbuf.write(buf.read()) - return rv - self._rbuf = BytesIO() # reset _rbuf. we consume it via buf. - while True: - try: - data = self._sock.recv(self._rbufsize) - except InterruptedError: - continue - - if not data: - break - left = size - buf_len - # did we just receive a newline? - nl = data.find(b'\n', 0, left) - if nl >= 0: - nl += 1 - # save the excess data to _rbuf - self._rbuf.write(data[nl:]) - if buf_len: - buf.write(data[:nl]) - break - else: - # Shortcut. Avoid data copy through buf when returning - # a substring of our first recv(). - return data[:nl] - n = len(data) - if n == size and not buf_len: - # Shortcut. Avoid data copy through buf when - # returning exactly all of our first recv(). - return data - if n >= left: - buf.write(data[:left]) - self._rbuf.write(data[left:]) - break - buf.write(data) - buf_len += n - #assert buf_len == buf.tell() - return buf.getvalue() - - def readlines(self, sizehint=0): - total = 0 - list = [] - while True: - line = self.readline() - if not line: - break - list.append(line) - total += len(line) - if sizehint and total >= sizehint: - break - return list - - # Iterator protocols - - def __iter__(self): - return self - - def next(self): - line = self.readline() - if not line: - raise StopIteration - return line - __next__ = next - -try: - from gevent.fileobject import FileObjectPosix -except ImportError: - # Manual implementation - _fileobject = _basefileobject -else: - class _fileobject(FileObjectPosix): - # Add the proper drop/reuse support for pypy, and match - # the close=False default in the constructor - def __init__(self, sock, mode='rb', bufsize=-1, close=False): - _do_reuse_or_drop(sock, '_reuse') - self._sock = sock - FileObjectPosix.__init__(self, sock, mode, bufsize, close) - - def close(self): - try: - if self._sock: - self.flush() - finally: - s = self._sock - self._sock = None - if s is not None: - if self._close: - FileObjectPosix.close(self) - else: - _do_reuse_or_drop(s, '_drop') - - def __del__(self): - try: - self.close() - except: # pylint:disable=bare-except - # close() may fail if __init__ didn't complete - pass - +if hasattr(__socket__, 'close'): # Python 3.7b1+ + close = __socket__.close # pylint:disable=no-member + __imports__ += ['close'] __all__ = __implements__ + __extensions__ + __imports__ |