aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/_socket3.py
diff options
context:
space:
mode:
authorJames Taylor <user234683@users.noreply.github.com>2018-09-14 19:32:27 -0700
committerJames Taylor <user234683@users.noreply.github.com>2018-09-14 19:32:27 -0700
commit4212164e91ba2f49583cf44ad623a29b36db8f77 (patch)
tree47aefe3c0162f03e0c823b43873356f69c1cd636 /python/gevent/_socket3.py
parent6ca20ff7010f2bafc7fefcb8cad982be27a8aeae (diff)
downloadyt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.tar.lz
yt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.tar.xz
yt-local-4212164e91ba2f49583cf44ad623a29b36db8f77.zip
Windows: Use 32-bit distribution of python
Diffstat (limited to 'python/gevent/_socket3.py')
-rw-r--r--python/gevent/_socket3.py517
1 files changed, 104 insertions, 413 deletions
diff --git a/python/gevent/_socket3.py b/python/gevent/_socket3.py
index d659d88..973b5a9 100644
--- a/python/gevent/_socket3.py
+++ b/python/gevent/_socket3.py
@@ -10,17 +10,27 @@ from __future__ import absolute_import
import io
import os
import sys
-import time
+
from gevent import _socketcommon
from gevent._util import copy_globals
from gevent._compat import PYPY
+from gevent.timeout import Timeout
import _socket
from os import dup
+
copy_globals(_socketcommon, globals(),
names_to_ignore=_socketcommon.__extensions__,
dunder_names_to_keep=())
+try:
+ from errno import EHOSTUNREACH
+ from errno import ECONNREFUSED
+except ImportError:
+ EHOSTUNREACH = -1
+ ECONNREFUSED = -1
+
+
__socket__ = _socketcommon.__socket__
__implements__ = _socketcommon._implements
__extensions__ = _socketcommon.__extensions__
@@ -58,6 +68,7 @@ class _wrefsocket(_socket.socket):
timeout = property(lambda s: s.gettimeout(),
lambda s, nv: s.settimeout(nv))
+from gevent._hub_primitives import wait_on_socket as _wait_on_socket
class socket(object):
"""
@@ -74,13 +85,37 @@ class socket(object):
# of _wrefsocket. (gevent internal usage only)
_gevent_sock_class = _wrefsocket
- def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None):
- # Take the same approach as socket2: wrap a real socket object,
- # don't subclass it. This lets code that needs the raw _sock (not tied to the hub)
- # get it. This shows up in tests like test__example_udp_server.
- self._sock = self._gevent_sock_class(family, type, proto, fileno)
- self._io_refs = 0
- self._closed = False
+ _io_refs = 0
+ _closed = False
+ _read_event = None
+ _write_event = None
+
+
+ # Take the same approach as socket2: wrap a real socket object,
+ # don't subclass it. This lets code that needs the raw _sock (not tied to the hub)
+ # get it. This shows up in tests like test__example_udp_server.
+
+ if sys.version_info[:2] < (3, 7):
+ def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None):
+ self._sock = self._gevent_sock_class(family, type, proto, fileno)
+ self.timeout = None
+ self.__init_common()
+ else:
+ # In 3.7, socket changed to auto-detecting family, type, and proto
+ # when given a fileno.
+ def __init__(self, family=-1, type=-1, proto=-1, fileno=None):
+ if fileno is None:
+ if family == -1:
+ family = AF_INET
+ if type == -1:
+ type = SOCK_STREAM
+ if proto == -1:
+ proto = 0
+ self._sock = self._gevent_sock_class(family, type, proto, fileno)
+ self.timeout = None
+ self.__init_common()
+
+ def __init_common(self):
_socket.socket.setblocking(self._sock, False)
fileno = _socket.socket.fileno(self._sock)
self.hub = get_hub()
@@ -101,6 +136,16 @@ class socket(object):
return self._sock.type & ~_socket.SOCK_NONBLOCK # pylint:disable=no-member
return self._sock.type
+ def getblocking(self):
+ """
+ Returns whether the socket will approximate blocking
+ behaviour.
+
+ .. versionadded:: 1.3a2
+ Added in Python 3.7.
+ """
+ return self.timeout != 0.0
+
def __enter__(self):
return self
@@ -114,7 +159,7 @@ class socket(object):
s = _socket.socket.__repr__(self._sock)
except Exception as ex: # pylint:disable=broad-except
# Observed on Windows Py3.3, printing the repr of a socket
- # that just sufferred a ConnectionResetError [WinError 10054]:
+ # that just suffered a ConnectionResetError [WinError 10054]:
# "OverflowError: no printf formatter to display the socket descriptor in decimal"
# Not sure what the actual cause is or if there's a better way to handle this
s = '<socket [%r]>' % ex
@@ -138,25 +183,7 @@ class socket(object):
ref = property(_get_ref, _set_ref)
- def _wait(self, watcher, timeout_exc=timeout('timed out')):
- """Block the current greenlet until *watcher* has pending events.
-
- If *timeout* is non-negative, then *timeout_exc* is raised after *timeout* second has passed.
- By default *timeout_exc* is ``socket.timeout('timed out')``.
-
- If :func:`cancel_wait` is called, raise ``socket.error(EBADF, 'File descriptor was closed in another greenlet')``.
- """
- if watcher.callback is not None:
- raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))
- if self.timeout is not None:
- timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False)
- else:
- timeout = None
- try:
- self.hub.wait(watcher)
- finally:
- if timeout is not None:
- timeout.cancel()
+ _wait = _wait_on_socket
def dup(self):
"""dup() -> socket object
@@ -237,15 +264,28 @@ class socket(object):
return text
def _decref_socketios(self):
+ # Called by SocketIO when it is closed.
if self._io_refs > 0:
self._io_refs -= 1
if self._closed:
self.close()
+ def _drop_events(self):
+ if self._read_event is not None:
+ self.hub.cancel_wait(self._read_event, cancel_wait_ex, True)
+ self._read_event = None
+ if self._write_event is not None:
+ self.hub.cancel_wait(self._write_event, cancel_wait_ex, True)
+ self._write_event = None
+
def _real_close(self, _ss=_socket.socket, cancel_wait_ex=cancel_wait_ex):
# This function should not reference any globals. See Python issue #808164.
- self.hub.cancel_wait(self._read_event, cancel_wait_ex)
- self.hub.cancel_wait(self._write_event, cancel_wait_ex)
+
+ # Break any reference to the loop.io objects. Our fileno,
+ # which they were tied to, is now free to be reused, so these
+ # objects are no longer functional.
+ self._drop_events()
+
_ss.close(self._sock)
# Break any references to the underlying socket object. Tested
@@ -286,35 +326,41 @@ class socket(object):
def connect(self, address):
if self.timeout == 0.0:
return _socket.socket.connect(self._sock, address)
- if isinstance(address, tuple):
- r = getaddrinfo(address[0], address[1], self.family)
- address = r[0][-1]
- if self.timeout is not None:
- timer = Timeout.start_new(self.timeout, timeout('timed out'))
- else:
- timer = None
- try:
+ address = _socketcommon._resolve_addr(self._sock, address)
+
+ with Timeout._start_new_or_dummy(self.timeout, timeout("timed out")):
while True:
err = self.getsockopt(SOL_SOCKET, SO_ERROR)
if err:
raise error(err, strerror(err))
result = _socket.socket.connect_ex(self._sock, address)
+
if not result or result == EISCONN:
break
elif (result in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or (result == EINVAL and is_windows):
self._wait(self._write_event)
else:
+ if (isinstance(address, tuple)
+ and address[0] == 'fe80::1'
+ and result == EHOSTUNREACH):
+ # On Python 3.7 on mac, we see EHOSTUNREACH
+ # returned for this link-local address, but it really is
+ # supposed to be ECONNREFUSED according to the standard library
+ # tests (test_socket.NetworkConnectionNoServer.test_create_connection)
+ # (On previous versions, that code passed the '127.0.0.1' IPv4 address, so
+ # ipv6 link locals were never a factor; 3.7 passes 'localhost'.)
+ # It is something of a mystery how the stdlib socket code doesn't
+ # produce EHOSTUNREACH---I (JAM) can't see how socketmodule.c would avoid
+ # that. The normal connect just calls connect_ex much like we do.
+ result = ECONNREFUSED
raise error(result, strerror(result))
- finally:
- if timer is not None:
- timer.cancel()
def connect_ex(self, address):
try:
return self.connect(address) or 0
except timeout:
return EAGAIN
- except gaierror:
+ except gaierror: # pylint:disable=try-except-raise
# gaierror/overflowerror/typerror is not silenced by connect_ex;
# gaierror extends OSError (aka error) so catch it first
raise
@@ -335,8 +381,9 @@ class socket(object):
raise
self._wait(self._read_event)
- if hasattr(_socket.socket, 'sendmsg'):
- # Only on Unix
+ if hasattr(_socket.socket, 'recvmsg'):
+ # Only on Unix; PyPy 3.5 5.10.0 provides sendmsg and recvmsg, but not
+ # recvmsg_into (at least on os x)
def recvmsg(self, *args):
while True:
@@ -347,6 +394,8 @@ class socket(object):
raise
self._wait(self._read_event)
+ if hasattr(_socket.socket, 'recvmsg_into'):
+
def recvmsg_into(self, *args):
while True:
try:
@@ -389,7 +438,7 @@ class socket(object):
try:
return _socket.socket.send(self._sock, data, flags)
except error as ex:
- if ex.args[0] != EWOULDBLOCK or timeout == 0.0:
+ if ex.args[0] not in _socketcommon.GSENDAGAIN or timeout == 0.0:
raise
self._wait(self._write_event)
try:
@@ -406,27 +455,7 @@ class socket(object):
# PyPy2, so it's possibly premature to do this. However, there is a 3.5 test case that
# possibly exposes this in a severe way.
data_memory = _get_memory(data)
- len_data_memory = len(data_memory)
- if not len_data_memory:
- # Don't try to send empty data at all, no point, and breaks ssl
- # See issue 719
- return 0
-
- if self.timeout is None:
- data_sent = 0
- while data_sent < len_data_memory:
- data_sent += self.send(data_memory[data_sent:], flags)
- else:
- timeleft = self.timeout
- end = time.time() + timeleft
- data_sent = 0
- while True:
- data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft)
- if data_sent >= len_data_memory:
- break
- timeleft = end - time.time()
- if timeleft <= 0:
- raise timeout('timed out')
+ return _socketcommon._sendall(self, data_memory, flags)
def sendto(self, *args):
try:
@@ -464,6 +493,11 @@ class socket(object):
raise
def setblocking(self, flag):
+ # Beginning in 3.6.0b3 this is supposed to raise
+ # if the file descriptor is closed, but the test for it
+ # involves closing the fileno directly. Since we
+ # don't touch the fileno here, it doesn't make sense for
+ # us.
if flag:
self.timeout = None
else:
@@ -659,7 +693,7 @@ if hasattr(_socket, "socketpair"):
b = socket(family, type, proto, b.detach())
return a, b
-else:
+else: # pragma: no cover
# Origin: https://gist.github.com/4325783, by Geert Jansen. Public domain.
# gevent: taken from 3.6 release. Expected to be used only on Win. Added to Win/3.5
@@ -715,351 +749,8 @@ else:
__implements__.remove('socketpair')
-# PyPy needs drop and reuse
-def _do_reuse_or_drop(sock, methname):
- try:
- method = getattr(sock, methname)
- except (AttributeError, TypeError):
- pass
- else:
- method()
-
-from io import BytesIO
-
-
-class _basefileobject(object):
- """Faux file object attached to a socket object."""
-
- default_bufsize = 8192
- name = "<socket>"
-
- __slots__ = ["mode", "bufsize", "softspace",
- # "closed" is a property, see below
- "_sock", "_rbufsize", "_wbufsize", "_rbuf", "_wbuf", "_wbuf_len",
- "_close"]
-
- def __init__(self, sock, mode='rb', bufsize=-1, close=False):
- _do_reuse_or_drop(sock, '_reuse')
- self._sock = sock
- self.mode = mode # Not actually used in this version
- if bufsize < 0:
- bufsize = self.default_bufsize
- self.bufsize = bufsize
- self.softspace = False
- # _rbufsize is the suggested recv buffer size. It is *strictly*
- # obeyed within readline() for recv calls. If it is larger than
- # default_bufsize it will be used for recv calls within read().
- if bufsize == 0:
- self._rbufsize = 1
- elif bufsize == 1:
- self._rbufsize = self.default_bufsize
- else:
- self._rbufsize = bufsize
- self._wbufsize = bufsize
- # We use BytesIO for the read buffer to avoid holding a list
- # of variously sized string objects which have been known to
- # fragment the heap due to how they are malloc()ed and often
- # realloc()ed down much smaller than their original allocation.
- self._rbuf = BytesIO()
- self._wbuf = [] # A list of strings
- self._wbuf_len = 0
- self._close = close
-
- def _getclosed(self):
- return self._sock is None
- closed = property(_getclosed, doc="True if the file is closed")
-
- def close(self):
- try:
- if self._sock:
- self.flush()
- finally:
- s = self._sock
- self._sock = None
- if s is not None:
- if self._close:
- s.close()
- else:
- _do_reuse_or_drop(s, '_drop')
-
- def __del__(self):
- try:
- self.close()
- except: # pylint:disable=bare-except
- # close() may fail if __init__ didn't complete
- pass
-
- def flush(self):
- if self._wbuf:
- data = b"".join(self._wbuf)
- self._wbuf = []
- self._wbuf_len = 0
- buffer_size = max(self._rbufsize, self.default_bufsize)
- data_size = len(data)
- write_offset = 0
- view = memoryview(data)
- try:
- while write_offset < data_size:
- self._sock.sendall(view[write_offset:write_offset + buffer_size])
- write_offset += buffer_size
- finally:
- if write_offset < data_size:
- remainder = data[write_offset:]
- del view, data # explicit free
- self._wbuf.append(remainder)
- self._wbuf_len = len(remainder)
-
- def fileno(self):
- return self._sock.fileno()
-
- def write(self, data):
- if not isinstance(data, bytes):
- raise TypeError("Non-bytes data")
- if not data:
- return
- self._wbuf.append(data)
- self._wbuf_len += len(data)
- if (self._wbufsize == 0 or (self._wbufsize == 1 and b'\n' in data) or
- (self._wbufsize > 1 and self._wbuf_len >= self._wbufsize)):
- self.flush()
-
- def writelines(self, list):
- # XXX We could do better here for very long lists
- # XXX Should really reject non-string non-buffers
- lines = filter(None, map(str, list))
- self._wbuf_len += sum(map(len, lines))
- self._wbuf.extend(lines)
- if self._wbufsize <= 1 or self._wbuf_len >= self._wbufsize:
- self.flush()
-
- def read(self, size=-1):
- # Use max, disallow tiny reads in a loop as they are very inefficient.
- # We never leave read() with any leftover data from a new recv() call
- # in our internal buffer.
- rbufsize = max(self._rbufsize, self.default_bufsize)
- # Our use of BytesIO rather than lists of string objects returned by
- # recv() minimizes memory usage and fragmentation that occurs when
- # rbufsize is large compared to the typical return value of recv().
- buf = self._rbuf
- buf.seek(0, 2) # seek end
- if size < 0:
- # Read until EOF
- self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
- while True:
- try:
- data = self._sock.recv(rbufsize)
- except InterruptedError:
- continue
- if not data:
- break
- buf.write(data)
- return buf.getvalue()
- else:
- # Read until size bytes or EOF seen, whichever comes first
- buf_len = buf.tell()
- if buf_len >= size:
- # Already have size bytes in our buffer? Extract and return.
- buf.seek(0)
- rv = buf.read(size)
- self._rbuf = BytesIO()
- self._rbuf.write(buf.read())
- return rv
-
- self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
- while True:
- left = size - buf_len
- # recv() will malloc the amount of memory given as its
- # parameter even though it often returns much less data
- # than that. The returned data string is short lived
- # as we copy it into a BytesIO and free it. This avoids
- # fragmentation issues on many platforms.
- try:
- data = self._sock.recv(left)
- except InterruptedError:
- continue
-
- if not data:
- break
- n = len(data)
- if n == size and not buf_len:
- # Shortcut. Avoid buffer data copies when:
- # - We have no data in our buffer.
- # AND
- # - Our call to recv returned exactly the
- # number of bytes we were asked to read.
- return data
- if n == left:
- buf.write(data)
- del data # explicit free
- break
- assert n <= left, "recv(%d) returned %d bytes" % (left, n)
- buf.write(data)
- buf_len += n
- del data # explicit free
- #assert buf_len == buf.tell()
- return buf.getvalue()
-
- def readline(self, size=-1):
- # pylint:disable=too-many-return-statements
- buf = self._rbuf
- buf.seek(0, 2) # seek end
- if buf.tell() > 0:
- # check if we already have it in our buffer
- buf.seek(0)
- bline = buf.readline(size)
- if bline.endswith(b'\n') or len(bline) == size:
- self._rbuf = BytesIO()
- self._rbuf.write(buf.read())
- return bline
- del bline
- if size < 0: # pylint:disable=too-many-nested-blocks
- # Read until \n or EOF, whichever comes first
- if self._rbufsize <= 1:
- # Speed up unbuffered case
- buf.seek(0)
- buffers = [buf.read()]
- self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
- data = None
- recv = self._sock.recv
- while True:
- try:
- while data != b"\n":
- data = recv(1)
- if not data:
- break
- buffers.append(data)
- except InterruptedError:
- # The try..except to catch EINTR was moved outside the
- # recv loop to avoid the per byte overhead.
- continue
-
- break
- return b"".join(buffers)
-
- buf.seek(0, 2) # seek end
- self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
- while True:
- try:
- data = self._sock.recv(self._rbufsize)
- except InterruptedError:
- continue
-
- if not data:
- break
- nl = data.find(b'\n')
- if nl >= 0:
- nl += 1
- buf.write(data[:nl])
- self._rbuf.write(data[nl:])
- del data
- break
- buf.write(data)
- return buf.getvalue()
- else:
- # Read until size bytes or \n or EOF seen, whichever comes first
- buf.seek(0, 2) # seek end
- buf_len = buf.tell()
- if buf_len >= size:
- buf.seek(0)
- rv = buf.read(size)
- self._rbuf = BytesIO()
- self._rbuf.write(buf.read())
- return rv
- self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
- while True:
- try:
- data = self._sock.recv(self._rbufsize)
- except InterruptedError:
- continue
-
- if not data:
- break
- left = size - buf_len
- # did we just receive a newline?
- nl = data.find(b'\n', 0, left)
- if nl >= 0:
- nl += 1
- # save the excess data to _rbuf
- self._rbuf.write(data[nl:])
- if buf_len:
- buf.write(data[:nl])
- break
- else:
- # Shortcut. Avoid data copy through buf when returning
- # a substring of our first recv().
- return data[:nl]
- n = len(data)
- if n == size and not buf_len:
- # Shortcut. Avoid data copy through buf when
- # returning exactly all of our first recv().
- return data
- if n >= left:
- buf.write(data[:left])
- self._rbuf.write(data[left:])
- break
- buf.write(data)
- buf_len += n
- #assert buf_len == buf.tell()
- return buf.getvalue()
-
- def readlines(self, sizehint=0):
- total = 0
- list = []
- while True:
- line = self.readline()
- if not line:
- break
- list.append(line)
- total += len(line)
- if sizehint and total >= sizehint:
- break
- return list
-
- # Iterator protocols
-
- def __iter__(self):
- return self
-
- def next(self):
- line = self.readline()
- if not line:
- raise StopIteration
- return line
- __next__ = next
-
-try:
- from gevent.fileobject import FileObjectPosix
-except ImportError:
- # Manual implementation
- _fileobject = _basefileobject
-else:
- class _fileobject(FileObjectPosix):
- # Add the proper drop/reuse support for pypy, and match
- # the close=False default in the constructor
- def __init__(self, sock, mode='rb', bufsize=-1, close=False):
- _do_reuse_or_drop(sock, '_reuse')
- self._sock = sock
- FileObjectPosix.__init__(self, sock, mode, bufsize, close)
-
- def close(self):
- try:
- if self._sock:
- self.flush()
- finally:
- s = self._sock
- self._sock = None
- if s is not None:
- if self._close:
- FileObjectPosix.close(self)
- else:
- _do_reuse_or_drop(s, '_drop')
-
- def __del__(self):
- try:
- self.close()
- except: # pylint:disable=bare-except
- # close() may fail if __init__ didn't complete
- pass
-
+if hasattr(__socket__, 'close'): # Python 3.7b1+
+ close = __socket__.close # pylint:disable=no-member
+ __imports__ += ['close']
__all__ = __implements__ + __extensions__ + __imports__