aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/_socket3.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/_socket3.py')
-rw-r--r--python/gevent/_socket3.py1065
1 files changed, 1065 insertions, 0 deletions
diff --git a/python/gevent/_socket3.py b/python/gevent/_socket3.py
new file mode 100644
index 0000000..d659d88
--- /dev/null
+++ b/python/gevent/_socket3.py
@@ -0,0 +1,1065 @@
+# Port of Python 3.3's socket module to gevent
+"""
+Python 3 socket module.
+"""
+# Our import magic sadly makes this warning useless
+# pylint: disable=undefined-variable
+# pylint: disable=too-many-statements,too-many-branches
+# pylint: disable=too-many-public-methods,unused-argument
+from __future__ import absolute_import
+import io
+import os
+import sys
+import time
+from gevent import _socketcommon
+from gevent._util import copy_globals
+from gevent._compat import PYPY
+import _socket
+from os import dup
+
+copy_globals(_socketcommon, globals(),
+ names_to_ignore=_socketcommon.__extensions__,
+ dunder_names_to_keep=())
+
+__socket__ = _socketcommon.__socket__
+__implements__ = _socketcommon._implements
+__extensions__ = _socketcommon.__extensions__
+__imports__ = _socketcommon.__imports__
+__dns__ = _socketcommon.__dns__
+
+
+SocketIO = __socket__.SocketIO # pylint:disable=no-member
+
+
+def _get_memory(data):
+ mv = memoryview(data)
+ if mv.shape:
+ return mv
+ # No shape, probably working with a ctypes object,
+ # or something else exotic that supports the buffer interface
+ return mv.tobytes()
+
+timeout_default = object()
+
+
+class _wrefsocket(_socket.socket):
+ # Plain stdlib socket.socket objects subclass _socket.socket
+ # and add weakref ability. The ssl module, for one, counts on this.
+ # We don't create socket.socket objects (because they may have been
+ # monkey patched to be the object from this module), but we still
+ # need to make sure what we do create can be weakrefd.
+
+ __slots__ = ("__weakref__", )
+
+ if PYPY:
+ # server.py unwraps the socket object to get the raw _sock;
+ # it depends on having a timeout property alias, which PyPy does not
+ # provide.
+ timeout = property(lambda s: s.gettimeout(),
+ lambda s, nv: s.settimeout(nv))
+
+
+class socket(object):
+ """
+ gevent `socket.socket <https://docs.python.org/3/library/socket.html#socket-objects>`_
+ for Python 3.
+
+ This object should have the same API as the standard library socket linked to above. Not all
+ methods are specifically documented here; when they are they may point out a difference
+ to be aware of or may document a method the standard library does not.
+ """
+
+ # Subclasses can set this to customize the type of the
+ # native _socket.socket we create. It MUST be a subclass
+ # of _wrefsocket. (gevent internal usage only)
+ _gevent_sock_class = _wrefsocket
+
+ def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None):
+ # Take the same approach as socket2: wrap a real socket object,
+ # don't subclass it. This lets code that needs the raw _sock (not tied to the hub)
+ # get it. This shows up in tests like test__example_udp_server.
+ self._sock = self._gevent_sock_class(family, type, proto, fileno)
+ self._io_refs = 0
+ self._closed = False
+ _socket.socket.setblocking(self._sock, False)
+ fileno = _socket.socket.fileno(self._sock)
+ self.hub = get_hub()
+ io_class = self.hub.loop.io
+ self._read_event = io_class(fileno, 1)
+ self._write_event = io_class(fileno, 2)
+ self.timeout = _socket.getdefaulttimeout()
+
+ def __getattr__(self, name):
+ return getattr(self._sock, name)
+
+ if hasattr(_socket, 'SOCK_NONBLOCK'):
+ # Only defined under Linux
+ @property
+ def type(self):
+ # See https://github.com/gevent/gevent/pull/399
+ if self.timeout != 0.0:
+ return self._sock.type & ~_socket.SOCK_NONBLOCK # pylint:disable=no-member
+ return self._sock.type
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ if not self._closed:
+ self.close()
+
+ def __repr__(self):
+ """Wrap __repr__() to reveal the real class name."""
+ try:
+ s = _socket.socket.__repr__(self._sock)
+ except Exception as ex: # pylint:disable=broad-except
+ # Observed on Windows Py3.3, printing the repr of a socket
+ # that just sufferred a ConnectionResetError [WinError 10054]:
+ # "OverflowError: no printf formatter to display the socket descriptor in decimal"
+ # Not sure what the actual cause is or if there's a better way to handle this
+ s = '<socket [%r]>' % ex
+
+ if s.startswith("<socket object"):
+ s = "<%s.%s%s%s" % (self.__class__.__module__,
+ self.__class__.__name__,
+ getattr(self, '_closed', False) and " [closed] " or "",
+ s[7:])
+ return s
+
+ def __getstate__(self):
+ raise TypeError("Cannot serialize socket object")
+
+ def _get_ref(self):
+ return self._read_event.ref or self._write_event.ref
+
+ def _set_ref(self, value):
+ self._read_event.ref = value
+ self._write_event.ref = value
+
+ ref = property(_get_ref, _set_ref)
+
+ def _wait(self, watcher, timeout_exc=timeout('timed out')):
+ """Block the current greenlet until *watcher* has pending events.
+
+ If *timeout* is non-negative, then *timeout_exc* is raised after *timeout* second has passed.
+ By default *timeout_exc* is ``socket.timeout('timed out')``.
+
+ If :func:`cancel_wait` is called, raise ``socket.error(EBADF, 'File descriptor was closed in another greenlet')``.
+ """
+ if watcher.callback is not None:
+ raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))
+ if self.timeout is not None:
+ timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False)
+ else:
+ timeout = None
+ try:
+ self.hub.wait(watcher)
+ finally:
+ if timeout is not None:
+ timeout.cancel()
+
+ def dup(self):
+ """dup() -> socket object
+
+ Return a new socket object connected to the same system resource.
+ """
+ fd = dup(self.fileno())
+ sock = self.__class__(self.family, self.type, self.proto, fileno=fd)
+ sock.settimeout(self.gettimeout())
+ return sock
+
+ def accept(self):
+ """accept() -> (socket object, address info)
+
+ Wait for an incoming connection. Return a new socket
+ representing the connection, and the address of the client.
+ For IP sockets, the address info is a pair (hostaddr, port).
+ """
+ while True:
+ try:
+ fd, addr = self._accept()
+ break
+ except BlockingIOError:
+ if self.timeout == 0.0:
+ raise
+ self._wait(self._read_event)
+ sock = socket(self.family, self.type, self.proto, fileno=fd)
+ # Python Issue #7995: if no default timeout is set and the listening
+ # socket had a (non-zero) timeout, force the new socket in blocking
+ # mode to override platform-specific socket flags inheritance.
+ # XXX do we need to do this?
+ if getdefaulttimeout() is None and self.gettimeout():
+ sock.setblocking(True)
+ return sock, addr
+
+ def makefile(self, mode="r", buffering=None, *,
+ encoding=None, errors=None, newline=None):
+ """Return an I/O stream connected to the socket
+
+ The arguments are as for io.open() after the filename,
+ except the only mode characters supported are 'r', 'w' and 'b'.
+ The semantics are similar too.
+ """
+ # (XXX refactor to share code?)
+ for c in mode:
+ if c not in {"r", "w", "b"}:
+ raise ValueError("invalid mode %r (only r, w, b allowed)")
+ writing = "w" in mode
+ reading = "r" in mode or not writing
+ assert reading or writing
+ binary = "b" in mode
+ rawmode = ""
+ if reading:
+ rawmode += "r"
+ if writing:
+ rawmode += "w"
+ raw = SocketIO(self, rawmode)
+ self._io_refs += 1
+ if buffering is None:
+ buffering = -1
+ if buffering < 0:
+ buffering = io.DEFAULT_BUFFER_SIZE
+ if buffering == 0:
+ if not binary:
+ raise ValueError("unbuffered streams must be binary")
+ return raw
+ if reading and writing:
+ buffer = io.BufferedRWPair(raw, raw, buffering)
+ elif reading:
+ buffer = io.BufferedReader(raw, buffering)
+ else:
+ assert writing
+ buffer = io.BufferedWriter(raw, buffering)
+ if binary:
+ return buffer
+ text = io.TextIOWrapper(buffer, encoding, errors, newline)
+ text.mode = mode
+ return text
+
+ def _decref_socketios(self):
+ if self._io_refs > 0:
+ self._io_refs -= 1
+ if self._closed:
+ self.close()
+
+ def _real_close(self, _ss=_socket.socket, cancel_wait_ex=cancel_wait_ex):
+ # This function should not reference any globals. See Python issue #808164.
+ self.hub.cancel_wait(self._read_event, cancel_wait_ex)
+ self.hub.cancel_wait(self._write_event, cancel_wait_ex)
+ _ss.close(self._sock)
+
+ # Break any references to the underlying socket object. Tested
+ # by test__refcount. (Why does this matter?). Be sure to
+ # preserve our same family/type/proto if possible (if we
+ # don't, we can get TypeError instead of OSError; see
+ # test_socket.SendmsgUDP6Test.testSendmsgAfterClose)... but
+ # this isn't always possible (see test_socket.test_unknown_socket_family_repr)
+ # TODO: Can we use a simpler proxy, like _socket2 does?
+ try:
+ self._sock = self._gevent_sock_class(self.family, self.type, self.proto)
+ except OSError:
+ pass
+ else:
+ _ss.close(self._sock)
+
+
+ def close(self):
+ # This function should not reference any globals. See Python issue #808164.
+ self._closed = True
+ if self._io_refs <= 0:
+ self._real_close()
+
+ @property
+ def closed(self):
+ return self._closed
+
+ def detach(self):
+ """detach() -> file descriptor
+
+ Close the socket object without closing the underlying file descriptor.
+ The object cannot be used after this call, but the file descriptor
+ can be reused for other purposes. The file descriptor is returned.
+ """
+ self._closed = True
+ return self._sock.detach()
+
+ def connect(self, address):
+ if self.timeout == 0.0:
+ return _socket.socket.connect(self._sock, address)
+ if isinstance(address, tuple):
+ r = getaddrinfo(address[0], address[1], self.family)
+ address = r[0][-1]
+ if self.timeout is not None:
+ timer = Timeout.start_new(self.timeout, timeout('timed out'))
+ else:
+ timer = None
+ try:
+ while True:
+ err = self.getsockopt(SOL_SOCKET, SO_ERROR)
+ if err:
+ raise error(err, strerror(err))
+ result = _socket.socket.connect_ex(self._sock, address)
+ if not result or result == EISCONN:
+ break
+ elif (result in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or (result == EINVAL and is_windows):
+ self._wait(self._write_event)
+ else:
+ raise error(result, strerror(result))
+ finally:
+ if timer is not None:
+ timer.cancel()
+
+ def connect_ex(self, address):
+ try:
+ return self.connect(address) or 0
+ except timeout:
+ return EAGAIN
+ except gaierror:
+ # gaierror/overflowerror/typerror is not silenced by connect_ex;
+ # gaierror extends OSError (aka error) so catch it first
+ raise
+ except error as ex:
+ # error is now OSError and it has various subclasses.
+ # Only those that apply to actually connecting are silenced by
+ # connect_ex.
+ if ex.errno:
+ return ex.errno
+ raise # pragma: no cover
+
+ def recv(self, *args):
+ while True:
+ try:
+ return _socket.socket.recv(self._sock, *args)
+ except error as ex:
+ if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
+ raise
+ self._wait(self._read_event)
+
+ if hasattr(_socket.socket, 'sendmsg'):
+ # Only on Unix
+
+ def recvmsg(self, *args):
+ while True:
+ try:
+ return _socket.socket.recvmsg(self._sock, *args)
+ except error as ex:
+ if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
+ raise
+ self._wait(self._read_event)
+
+ def recvmsg_into(self, *args):
+ while True:
+ try:
+ return _socket.socket.recvmsg_into(self._sock, *args)
+ except error as ex:
+ if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
+ raise
+ self._wait(self._read_event)
+
+ def recvfrom(self, *args):
+ while True:
+ try:
+ return _socket.socket.recvfrom(self._sock, *args)
+ except error as ex:
+ if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
+ raise
+ self._wait(self._read_event)
+
+ def recvfrom_into(self, *args):
+ while True:
+ try:
+ return _socket.socket.recvfrom_into(self._sock, *args)
+ except error as ex:
+ if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
+ raise
+ self._wait(self._read_event)
+
+ def recv_into(self, *args):
+ while True:
+ try:
+ return _socket.socket.recv_into(self._sock, *args)
+ except error as ex:
+ if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
+ raise
+ self._wait(self._read_event)
+
+ def send(self, data, flags=0, timeout=timeout_default):
+ if timeout is timeout_default:
+ timeout = self.timeout
+ try:
+ return _socket.socket.send(self._sock, data, flags)
+ except error as ex:
+ if ex.args[0] != EWOULDBLOCK or timeout == 0.0:
+ raise
+ self._wait(self._write_event)
+ try:
+ return _socket.socket.send(self._sock, data, flags)
+ except error as ex2:
+ if ex2.args[0] == EWOULDBLOCK:
+ return 0
+ raise
+
+ def sendall(self, data, flags=0):
+ # XXX Now that we run on PyPy3, see the notes in _socket2.py's sendall()
+ # and implement that here if needed.
+ # PyPy3 is not optimized for performance yet, and is known to be slower than
+ # PyPy2, so it's possibly premature to do this. However, there is a 3.5 test case that
+ # possibly exposes this in a severe way.
+ data_memory = _get_memory(data)
+ len_data_memory = len(data_memory)
+ if not len_data_memory:
+ # Don't try to send empty data at all, no point, and breaks ssl
+ # See issue 719
+ return 0
+
+ if self.timeout is None:
+ data_sent = 0
+ while data_sent < len_data_memory:
+ data_sent += self.send(data_memory[data_sent:], flags)
+ else:
+ timeleft = self.timeout
+ end = time.time() + timeleft
+ data_sent = 0
+ while True:
+ data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft)
+ if data_sent >= len_data_memory:
+ break
+ timeleft = end - time.time()
+ if timeleft <= 0:
+ raise timeout('timed out')
+
+ def sendto(self, *args):
+ try:
+ return _socket.socket.sendto(self._sock, *args)
+ except error as ex:
+ if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
+ raise
+ self._wait(self._write_event)
+ try:
+ return _socket.socket.sendto(self._sock, *args)
+ except error as ex2:
+ if ex2.args[0] == EWOULDBLOCK:
+ return 0
+ raise
+
+ if hasattr(_socket.socket, 'sendmsg'):
+ # Only on Unix
+ def sendmsg(self, buffers, ancdata=(), flags=0, address=None):
+ try:
+ return _socket.socket.sendmsg(self._sock, buffers, ancdata, flags, address)
+ except error as ex:
+ if flags & getattr(_socket, 'MSG_DONTWAIT', 0):
+ # Enable non-blocking behaviour
+ # XXX: Do all platforms that have sendmsg have MSG_DONTWAIT?
+ raise
+
+ if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
+ raise
+ self._wait(self._write_event)
+ try:
+ return _socket.socket.sendmsg(self._sock, buffers, ancdata, flags, address)
+ except error as ex2:
+ if ex2.args[0] == EWOULDBLOCK:
+ return 0
+ raise
+
+ def setblocking(self, flag):
+ if flag:
+ self.timeout = None
+ else:
+ self.timeout = 0.0
+
+ def settimeout(self, howlong):
+ if howlong is not None:
+ try:
+ f = howlong.__float__
+ except AttributeError:
+ raise TypeError('a float is required')
+ howlong = f()
+ if howlong < 0.0:
+ raise ValueError('Timeout value out of range')
+ self.__dict__['timeout'] = howlong
+
+ def gettimeout(self):
+ return self.__dict__['timeout']
+
+ def shutdown(self, how):
+ if how == 0: # SHUT_RD
+ self.hub.cancel_wait(self._read_event, cancel_wait_ex)
+ elif how == 1: # SHUT_WR
+ self.hub.cancel_wait(self._write_event, cancel_wait_ex)
+ else:
+ self.hub.cancel_wait(self._read_event, cancel_wait_ex)
+ self.hub.cancel_wait(self._write_event, cancel_wait_ex)
+ self._sock.shutdown(how)
+
+ # sendfile: new in 3.5. But there's no real reason to not
+ # support it everywhere. Note that we can't use os.sendfile()
+ # because it's not cooperative.
+ def _sendfile_use_sendfile(self, file, offset=0, count=None):
+ # This is called directly by tests
+ raise __socket__._GiveupOnSendfile() # pylint:disable=no-member
+
+ def _sendfile_use_send(self, file, offset=0, count=None):
+ self._check_sendfile_params(file, offset, count)
+ if self.gettimeout() == 0:
+ raise ValueError("non-blocking sockets are not supported")
+ if offset:
+ file.seek(offset)
+ blocksize = min(count, 8192) if count else 8192
+ total_sent = 0
+ # localize variable access to minimize overhead
+ file_read = file.read
+ sock_send = self.send
+ try:
+ while True:
+ if count:
+ blocksize = min(count - total_sent, blocksize)
+ if blocksize <= 0:
+ break
+ data = memoryview(file_read(blocksize))
+ if not data:
+ break # EOF
+ while True:
+ try:
+ sent = sock_send(data)
+ except BlockingIOError:
+ continue
+ else:
+ total_sent += sent
+ if sent < len(data):
+ data = data[sent:]
+ else:
+ break
+ return total_sent
+ finally:
+ if total_sent > 0 and hasattr(file, 'seek'):
+ file.seek(offset + total_sent)
+
+ def _check_sendfile_params(self, file, offset, count):
+ if 'b' not in getattr(file, 'mode', 'b'):
+ raise ValueError("file should be opened in binary mode")
+ if not self.type & SOCK_STREAM:
+ raise ValueError("only SOCK_STREAM type sockets are supported")
+ if count is not None:
+ if not isinstance(count, int):
+ raise TypeError(
+ "count must be a positive integer (got {!r})".format(count))
+ if count <= 0:
+ raise ValueError(
+ "count must be a positive integer (got {!r})".format(count))
+
+ def sendfile(self, file, offset=0, count=None):
+ """sendfile(file[, offset[, count]]) -> sent
+
+ Send a file until EOF is reached by using high-performance
+ os.sendfile() and return the total number of bytes which
+ were sent.
+ *file* must be a regular file object opened in binary mode.
+ If os.sendfile() is not available (e.g. Windows) or file is
+ not a regular file socket.send() will be used instead.
+ *offset* tells from where to start reading the file.
+ If specified, *count* is the total number of bytes to transmit
+ as opposed to sending the file until EOF is reached.
+ File position is updated on return or also in case of error in
+ which case file.tell() can be used to figure out the number of
+ bytes which were sent.
+ The socket must be of SOCK_STREAM type.
+ Non-blocking sockets are not supported.
+
+ .. versionadded:: 1.1rc4
+ Added in Python 3.5, but available under all Python 3 versions in
+ gevent.
+ """
+ return self._sendfile_use_send(file, offset, count)
+
+ # get/set_inheritable new in 3.4
+ if hasattr(os, 'get_inheritable') or hasattr(os, 'get_handle_inheritable'):
+ # pylint:disable=no-member
+ if os.name == 'nt':
+ def get_inheritable(self):
+ return os.get_handle_inheritable(self.fileno())
+
+ def set_inheritable(self, inheritable):
+ os.set_handle_inheritable(self.fileno(), inheritable)
+ else:
+ def get_inheritable(self):
+ return os.get_inheritable(self.fileno())
+
+ def set_inheritable(self, inheritable):
+ os.set_inheritable(self.fileno(), inheritable)
+ _added = "\n\n.. versionadded:: 1.1rc4 Added in Python 3.4"
+ get_inheritable.__doc__ = "Get the inheritable flag of the socket" + _added
+ set_inheritable.__doc__ = "Set the inheritable flag of the socket" + _added
+ del _added
+
+
+if sys.version_info[:2] == (3, 4) and sys.version_info[:3] <= (3, 4, 2):
+ # Python 3.4, up to and including 3.4.2, had a bug where the
+ # SocketType enumeration overwrote the SocketType class imported
+ # from _socket. This was fixed in 3.4.3 (http://bugs.python.org/issue20386
+ # and https://github.com/python/cpython/commit/0d2f85f38a9691efdfd1e7285c4262cab7f17db7).
+ # Prior to that, if we replace SocketType with our own class, the implementation
+ # of socket.type breaks with "OSError: [Errno 97] Address family not supported by protocol".
+ # Therefore, on these old versions, we must preserve it as an enum; while this
+ # seems like it could lead to non-green behaviour, code on those versions
+ # cannot possibly be using SocketType as a class anyway.
+ SocketType = __socket__.SocketType # pylint:disable=no-member
+ # Fixup __all__; note that we get exec'd multiple times during unit tests
+ if 'SocketType' in __implements__:
+ __implements__.remove('SocketType')
+ if 'SocketType' not in __imports__:
+ __imports__.append('SocketType')
+else:
+ SocketType = socket
+
+
+def fromfd(fd, family, type, proto=0):
+ """ fromfd(fd, family, type[, proto]) -> socket object
+
+ Create a socket object from a duplicate of the given file
+ descriptor. The remaining arguments are the same as for socket().
+ """
+ nfd = dup(fd)
+ return socket(family, type, proto, nfd)
+
+
+if hasattr(_socket.socket, "share"):
+ def fromshare(info):
+ """ fromshare(info) -> socket object
+
+ Create a socket object from a the bytes object returned by
+ socket.share(pid).
+ """
+ return socket(0, 0, 0, info)
+
+ __implements__.append('fromshare')
+
+if hasattr(_socket, "socketpair"):
+
+ def socketpair(family=None, type=SOCK_STREAM, proto=0):
+ """socketpair([family[, type[, proto]]]) -> (socket object, socket object)
+
+ Create a pair of socket objects from the sockets returned by the platform
+ socketpair() function.
+ The arguments are the same as for socket() except the default family is
+ AF_UNIX if defined on the platform; otherwise, the default is AF_INET.
+
+ .. versionchanged:: 1.2
+ All Python 3 versions on Windows supply this function (natively
+ supplied by Python 3.5 and above).
+ """
+ if family is None:
+ try:
+ family = AF_UNIX
+ except NameError:
+ family = AF_INET
+ a, b = _socket.socketpair(family, type, proto)
+ a = socket(family, type, proto, a.detach())
+ b = socket(family, type, proto, b.detach())
+ return a, b
+
+else:
+ # Origin: https://gist.github.com/4325783, by Geert Jansen. Public domain.
+
+ # gevent: taken from 3.6 release. Expected to be used only on Win. Added to Win/3.5
+ # gevent: for < 3.5, pass the default value of 128 to lsock.listen()
+ # (3.5+ uses this as a default and the original code passed no value)
+
+ _LOCALHOST = '127.0.0.1'
+ _LOCALHOST_V6 = '::1'
+
+ def socketpair(family=AF_INET, type=SOCK_STREAM, proto=0):
+ if family == AF_INET:
+ host = _LOCALHOST
+ elif family == AF_INET6:
+ host = _LOCALHOST_V6
+ else:
+ raise ValueError("Only AF_INET and AF_INET6 socket address families "
+ "are supported")
+ if type != SOCK_STREAM:
+ raise ValueError("Only SOCK_STREAM socket type is supported")
+ if proto != 0:
+ raise ValueError("Only protocol zero is supported")
+
+ # We create a connected TCP socket. Note the trick with
+ # setblocking(False) that prevents us from having to create a thread.
+ lsock = socket(family, type, proto)
+ try:
+ lsock.bind((host, 0))
+ lsock.listen(128)
+ # On IPv6, ignore flow_info and scope_id
+ addr, port = lsock.getsockname()[:2]
+ csock = socket(family, type, proto)
+ try:
+ csock.setblocking(False)
+ try:
+ csock.connect((addr, port))
+ except (BlockingIOError, InterruptedError):
+ pass
+ csock.setblocking(True)
+ ssock, _ = lsock.accept()
+ except:
+ csock.close()
+ raise
+ finally:
+ lsock.close()
+ return (ssock, csock)
+
+ if sys.version_info[:2] < (3, 5):
+ # Not provided natively
+ if 'socketpair' in __implements__:
+ # Multiple imports can cause this to be missing if _socketcommon
+ # was successfully imported, leading to subsequent imports to cause
+ # ValueError
+ __implements__.remove('socketpair')
+
+
+# PyPy needs drop and reuse
+def _do_reuse_or_drop(sock, methname):
+ try:
+ method = getattr(sock, methname)
+ except (AttributeError, TypeError):
+ pass
+ else:
+ method()
+
+from io import BytesIO
+
+
+class _basefileobject(object):
+ """Faux file object attached to a socket object."""
+
+ default_bufsize = 8192
+ name = "<socket>"
+
+ __slots__ = ["mode", "bufsize", "softspace",
+ # "closed" is a property, see below
+ "_sock", "_rbufsize", "_wbufsize", "_rbuf", "_wbuf", "_wbuf_len",
+ "_close"]
+
+ def __init__(self, sock, mode='rb', bufsize=-1, close=False):
+ _do_reuse_or_drop(sock, '_reuse')
+ self._sock = sock
+ self.mode = mode # Not actually used in this version
+ if bufsize < 0:
+ bufsize = self.default_bufsize
+ self.bufsize = bufsize
+ self.softspace = False
+ # _rbufsize is the suggested recv buffer size. It is *strictly*
+ # obeyed within readline() for recv calls. If it is larger than
+ # default_bufsize it will be used for recv calls within read().
+ if bufsize == 0:
+ self._rbufsize = 1
+ elif bufsize == 1:
+ self._rbufsize = self.default_bufsize
+ else:
+ self._rbufsize = bufsize
+ self._wbufsize = bufsize
+ # We use BytesIO for the read buffer to avoid holding a list
+ # of variously sized string objects which have been known to
+ # fragment the heap due to how they are malloc()ed and often
+ # realloc()ed down much smaller than their original allocation.
+ self._rbuf = BytesIO()
+ self._wbuf = [] # A list of strings
+ self._wbuf_len = 0
+ self._close = close
+
+ def _getclosed(self):
+ return self._sock is None
+ closed = property(_getclosed, doc="True if the file is closed")
+
+ def close(self):
+ try:
+ if self._sock:
+ self.flush()
+ finally:
+ s = self._sock
+ self._sock = None
+ if s is not None:
+ if self._close:
+ s.close()
+ else:
+ _do_reuse_or_drop(s, '_drop')
+
+ def __del__(self):
+ try:
+ self.close()
+ except: # pylint:disable=bare-except
+ # close() may fail if __init__ didn't complete
+ pass
+
+ def flush(self):
+ if self._wbuf:
+ data = b"".join(self._wbuf)
+ self._wbuf = []
+ self._wbuf_len = 0
+ buffer_size = max(self._rbufsize, self.default_bufsize)
+ data_size = len(data)
+ write_offset = 0
+ view = memoryview(data)
+ try:
+ while write_offset < data_size:
+ self._sock.sendall(view[write_offset:write_offset + buffer_size])
+ write_offset += buffer_size
+ finally:
+ if write_offset < data_size:
+ remainder = data[write_offset:]
+ del view, data # explicit free
+ self._wbuf.append(remainder)
+ self._wbuf_len = len(remainder)
+
+ def fileno(self):
+ return self._sock.fileno()
+
+ def write(self, data):
+ if not isinstance(data, bytes):
+ raise TypeError("Non-bytes data")
+ if not data:
+ return
+ self._wbuf.append(data)
+ self._wbuf_len += len(data)
+ if (self._wbufsize == 0 or (self._wbufsize == 1 and b'\n' in data) or
+ (self._wbufsize > 1 and self._wbuf_len >= self._wbufsize)):
+ self.flush()
+
+ def writelines(self, list):
+ # XXX We could do better here for very long lists
+ # XXX Should really reject non-string non-buffers
+ lines = filter(None, map(str, list))
+ self._wbuf_len += sum(map(len, lines))
+ self._wbuf.extend(lines)
+ if self._wbufsize <= 1 or self._wbuf_len >= self._wbufsize:
+ self.flush()
+
+ def read(self, size=-1):
+ # Use max, disallow tiny reads in a loop as they are very inefficient.
+ # We never leave read() with any leftover data from a new recv() call
+ # in our internal buffer.
+ rbufsize = max(self._rbufsize, self.default_bufsize)
+ # Our use of BytesIO rather than lists of string objects returned by
+ # recv() minimizes memory usage and fragmentation that occurs when
+ # rbufsize is large compared to the typical return value of recv().
+ buf = self._rbuf
+ buf.seek(0, 2) # seek end
+ if size < 0:
+ # Read until EOF
+ self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
+ while True:
+ try:
+ data = self._sock.recv(rbufsize)
+ except InterruptedError:
+ continue
+ if not data:
+ break
+ buf.write(data)
+ return buf.getvalue()
+ else:
+ # Read until size bytes or EOF seen, whichever comes first
+ buf_len = buf.tell()
+ if buf_len >= size:
+ # Already have size bytes in our buffer? Extract and return.
+ buf.seek(0)
+ rv = buf.read(size)
+ self._rbuf = BytesIO()
+ self._rbuf.write(buf.read())
+ return rv
+
+ self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
+ while True:
+ left = size - buf_len
+ # recv() will malloc the amount of memory given as its
+ # parameter even though it often returns much less data
+ # than that. The returned data string is short lived
+ # as we copy it into a BytesIO and free it. This avoids
+ # fragmentation issues on many platforms.
+ try:
+ data = self._sock.recv(left)
+ except InterruptedError:
+ continue
+
+ if not data:
+ break
+ n = len(data)
+ if n == size and not buf_len:
+ # Shortcut. Avoid buffer data copies when:
+ # - We have no data in our buffer.
+ # AND
+ # - Our call to recv returned exactly the
+ # number of bytes we were asked to read.
+ return data
+ if n == left:
+ buf.write(data)
+ del data # explicit free
+ break
+ assert n <= left, "recv(%d) returned %d bytes" % (left, n)
+ buf.write(data)
+ buf_len += n
+ del data # explicit free
+ #assert buf_len == buf.tell()
+ return buf.getvalue()
+
+ def readline(self, size=-1):
+ # pylint:disable=too-many-return-statements
+ buf = self._rbuf
+ buf.seek(0, 2) # seek end
+ if buf.tell() > 0:
+ # check if we already have it in our buffer
+ buf.seek(0)
+ bline = buf.readline(size)
+ if bline.endswith(b'\n') or len(bline) == size:
+ self._rbuf = BytesIO()
+ self._rbuf.write(buf.read())
+ return bline
+ del bline
+ if size < 0: # pylint:disable=too-many-nested-blocks
+ # Read until \n or EOF, whichever comes first
+ if self._rbufsize <= 1:
+ # Speed up unbuffered case
+ buf.seek(0)
+ buffers = [buf.read()]
+ self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
+ data = None
+ recv = self._sock.recv
+ while True:
+ try:
+ while data != b"\n":
+ data = recv(1)
+ if not data:
+ break
+ buffers.append(data)
+ except InterruptedError:
+ # The try..except to catch EINTR was moved outside the
+ # recv loop to avoid the per byte overhead.
+ continue
+
+ break
+ return b"".join(buffers)
+
+ buf.seek(0, 2) # seek end
+ self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
+ while True:
+ try:
+ data = self._sock.recv(self._rbufsize)
+ except InterruptedError:
+ continue
+
+ if not data:
+ break
+ nl = data.find(b'\n')
+ if nl >= 0:
+ nl += 1
+ buf.write(data[:nl])
+ self._rbuf.write(data[nl:])
+ del data
+ break
+ buf.write(data)
+ return buf.getvalue()
+ else:
+ # Read until size bytes or \n or EOF seen, whichever comes first
+ buf.seek(0, 2) # seek end
+ buf_len = buf.tell()
+ if buf_len >= size:
+ buf.seek(0)
+ rv = buf.read(size)
+ self._rbuf = BytesIO()
+ self._rbuf.write(buf.read())
+ return rv
+ self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
+ while True:
+ try:
+ data = self._sock.recv(self._rbufsize)
+ except InterruptedError:
+ continue
+
+ if not data:
+ break
+ left = size - buf_len
+ # did we just receive a newline?
+ nl = data.find(b'\n', 0, left)
+ if nl >= 0:
+ nl += 1
+ # save the excess data to _rbuf
+ self._rbuf.write(data[nl:])
+ if buf_len:
+ buf.write(data[:nl])
+ break
+ else:
+ # Shortcut. Avoid data copy through buf when returning
+ # a substring of our first recv().
+ return data[:nl]
+ n = len(data)
+ if n == size and not buf_len:
+ # Shortcut. Avoid data copy through buf when
+ # returning exactly all of our first recv().
+ return data
+ if n >= left:
+ buf.write(data[:left])
+ self._rbuf.write(data[left:])
+ break
+ buf.write(data)
+ buf_len += n
+ #assert buf_len == buf.tell()
+ return buf.getvalue()
+
+ def readlines(self, sizehint=0):
+ total = 0
+ list = []
+ while True:
+ line = self.readline()
+ if not line:
+ break
+ list.append(line)
+ total += len(line)
+ if sizehint and total >= sizehint:
+ break
+ return list
+
+ # Iterator protocols
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ line = self.readline()
+ if not line:
+ raise StopIteration
+ return line
+ __next__ = next
+
+try:
+ from gevent.fileobject import FileObjectPosix
+except ImportError:
+ # Manual implementation
+ _fileobject = _basefileobject
+else:
+ class _fileobject(FileObjectPosix):
+ # Add the proper drop/reuse support for pypy, and match
+ # the close=False default in the constructor
+ def __init__(self, sock, mode='rb', bufsize=-1, close=False):
+ _do_reuse_or_drop(sock, '_reuse')
+ self._sock = sock
+ FileObjectPosix.__init__(self, sock, mode, bufsize, close)
+
+ def close(self):
+ try:
+ if self._sock:
+ self.flush()
+ finally:
+ s = self._sock
+ self._sock = None
+ if s is not None:
+ if self._close:
+ FileObjectPosix.close(self)
+ else:
+ _do_reuse_or_drop(s, '_drop')
+
+ def __del__(self):
+ try:
+ self.close()
+ except: # pylint:disable=bare-except
+ # close() may fail if __init__ didn't complete
+ pass
+
+
+__all__ = __implements__ + __extensions__ + __imports__