diff options
Diffstat (limited to 'python/gevent/_socket2.py')
-rw-r--r-- | python/gevent/_socket2.py | 196 |
1 files changed, 65 insertions, 131 deletions
diff --git a/python/gevent/_socket2.py b/python/gevent/_socket2.py index dbcf1f7..d2377b2 100644 --- a/python/gevent/_socket2.py +++ b/python/gevent/_socket2.py @@ -2,13 +2,15 @@ """ Python 2 socket module. """ +from __future__ import absolute_import + # Our import magic sadly makes this warning useless # pylint: disable=undefined-variable -import time from gevent import _socketcommon from gevent._util import copy_globals from gevent._compat import PYPY +from gevent.timeout import Timeout copy_globals(_socketcommon, globals(), names_to_ignore=_socketcommon.__py3_imports__ + _socketcommon.__extensions__, @@ -36,22 +38,35 @@ else: # Python 2 doesn't natively support with statements on _fileobject; # but it eases our test cases if we can do the same with on both Py3 # and Py2. Implementation copied from Python 3 - if not hasattr(_fileobject, '__enter__'): - # we could either patch in place: - #_fileobject.__enter__ = lambda self: self - #_fileobject.__exit__ = lambda self, *args: self.close() if not self.closed else None - # or we could subclass. subclassing has the benefit of not - # changing the behaviour of the stdlib if we're just imported; OTOH, - # under Python 2.6/2.7, test_urllib2net.py asserts that the class IS - # socket._fileobject (sigh), so we have to work around that. - class _fileobject(_fileobject): # pylint:disable=function-redefined - - def __enter__(self): - return self - - def __exit__(self, *args): - if not self.closed: - self.close() + assert not hasattr(_fileobject, '__enter__') + # we could either patch in place: + #_fileobject.__enter__ = lambda self: self + #_fileobject.__exit__ = lambda self, *args: self.close() if not self.closed else None + # or we could subclass. subclassing has the benefit of not + # changing the behaviour of the stdlib if we're just imported; OTOH, + # under Python 2.6/2.7, test_urllib2net.py asserts that the class IS + # socket._fileobject (sigh), so we have to work around that. + + # We also make it call our custom socket closing method that disposes + # if IO watchers but not the actual socket itself. + + # Python 2 relies on reference counting to close sockets, so this is all + # very ugly and fragile. + + class _fileobject(_fileobject): # pylint:disable=function-redefined + + def __enter__(self): + return self + + def __exit__(self, *args): + if not self.closed: + self.close() + + def close(self): + if self._sock is not None: + self._sock._drop_events() + super(_fileobject, self).close() + def _get_memory(data): try: @@ -89,6 +104,7 @@ class _closedsocket(object): timeout_default = object() +from gevent._hub_primitives import wait_on_socket as _wait_on_socket class socket(object): """ @@ -108,11 +124,13 @@ class socket(object): self.timeout = _socket.getdefaulttimeout() else: if hasattr(_sock, '_sock'): + # passed a gevent socket self._sock = _sock._sock self.timeout = getattr(_sock, 'timeout', False) if self.timeout is False: self.timeout = _socket.getdefaulttimeout() else: + # passed a native socket self._sock = _sock self.timeout = _socket.getdefaulttimeout() if PYPY: @@ -164,25 +182,7 @@ class socket(object): ref = property(_get_ref, _set_ref) - def _wait(self, watcher, timeout_exc=timeout('timed out')): - """Block the current greenlet until *watcher* has pending events. - - If *timeout* is non-negative, then *timeout_exc* is raised after *timeout* second has passed. - By default *timeout_exc* is ``socket.timeout('timed out')``. - - If :func:`cancel_wait` is called, raise ``socket.error(EBADF, 'File descriptor was closed in another greenlet')``. - """ - if watcher.callback is not None: - raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, )) - if self.timeout is not None: - timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False) - else: - timeout = None - try: - self.hub.wait(watcher) - finally: - if timeout is not None: - timeout.cancel() + _wait = _wait_on_socket def accept(self): sock = self._sock @@ -200,10 +200,21 @@ class socket(object): client_socket._drop() return sockobj, address - def close(self, _closedsocket=_closedsocket, cancel_wait_ex=cancel_wait_ex): + def _drop_events(self, cancel_wait_ex=cancel_wait_ex): + if self._read_event is not None: + self.hub.cancel_wait(self._read_event, cancel_wait_ex, True) + self._read_event = None + if self._write_event is not None: + self.hub.cancel_wait(self._write_event, cancel_wait_ex, True) + self._write_event = None + + + def close(self, _closedsocket=_closedsocket): # This function should not reference any globals. See Python issue #808164. - self.hub.cancel_wait(self._read_event, cancel_wait_ex) - self.hub.cancel_wait(self._write_event, cancel_wait_ex) + + # Also break any reference to the loop.io objects. Our fileno, which they were + # tied to, is now free to be reused, so these objects are no longer functional. + self._drop_events() s = self._sock self._sock = _closedsocket() if PYPY: @@ -217,13 +228,9 @@ class socket(object): if self.timeout == 0.0: return self._sock.connect(address) sock = self._sock - if isinstance(address, tuple): - r = getaddrinfo(address[0], address[1], sock.family) - address = r[0][-1] - if self.timeout is not None: - timer = Timeout.start_new(self.timeout, timeout('timed out')) - else: - timer = None + address = _socketcommon._resolve_addr(sock, address) + + timer = Timeout._start_new_or_dummy(self.timeout, timeout('timed out')) try: while True: err = sock.getsockopt(SOL_SOCKET, SO_ERROR) @@ -237,8 +244,7 @@ class socket(object): else: raise error(result, strerror(result)) finally: - if timer is not None: - timer.cancel() + timer.close() def connect_ex(self, address): try: @@ -248,8 +254,7 @@ class socket(object): except error as ex: if type(ex) is error: # pylint:disable=unidiomatic-typecheck return ex.args[0] - else: - raise # gaierror is not silenced by connect_ex + raise # gaierror is not silenced by connect_ex def dup(self): """dup() -> socket object @@ -261,7 +266,14 @@ class socket(object): def makefile(self, mode='r', bufsize=-1): # Two things to look out for: # 1) Closing the original socket object should not close the - # socket (hence creating a new instance) + # fileobject (hence creating a new socket instance); + # An alternate approach is what _socket3.py does, which is to + # keep count of the times makefile objects have been opened (Py3's + # SocketIO helps with that). But the newly created socket, which + # has its own read/write watchers, does need those to be closed + # when the fileobject is; our custom subclass does that. Note that + # we can't pass the 'close=True' argument, as that causes reference counts + # to get screwed up, and Python2 sockets rely on those. # 2) The resulting fileobject must keep the timeout in order # to be compatible with the stdlib's socket.makefile. # Pass self as _sock to preserve timeout. @@ -322,7 +334,7 @@ class socket(object): try: return sock.send(data, flags) except error as ex: - if ex.args[0] != EWOULDBLOCK or timeout == 0.0: + if ex.args[0] not in _socketcommon.GSENDAGAIN or timeout == 0.0: raise sys.exc_clear() self._wait(self._write_event) @@ -333,91 +345,13 @@ class socket(object): return 0 raise - def __send_chunk(self, data_memory, flags, timeleft, end): - """ - Send the complete contents of ``data_memory`` before returning. - This is the core loop around :meth:`send`. - - :param timeleft: Either ``None`` if there is no timeout involved, - or a float indicating the timeout to use. - :param end: Either ``None`` if there is no timeout involved, or - a float giving the absolute end time. - :return: An updated value for ``timeleft`` (or None) - :raises timeout: If ``timeleft`` was given and elapsed while - sending this chunk. - """ - data_sent = 0 - len_data_memory = len(data_memory) - started_timer = 0 - while data_sent < len_data_memory: - chunk = data_memory[data_sent:] - if timeleft is None: - data_sent += self.send(chunk, flags) - elif started_timer and timeleft <= 0: - # Check before sending to guarantee a check - # happens even if each chunk successfully sends its data - # (especially important for SSL sockets since they have large - # buffers). But only do this if we've actually tried to - # send something once to avoid spurious timeouts on non-blocking - # sockets. - raise timeout('timed out') - else: - started_timer = 1 - data_sent += self.send(chunk, flags, timeout=timeleft) - timeleft = end - time.time() - - return timeleft - def sendall(self, data, flags=0): if isinstance(data, unicode): data = data.encode() # this sendall is also reused by gevent.ssl.SSLSocket subclass, # so it should not call self._sock methods directly data_memory = _get_memory(data) - len_data_memory = len(data_memory) - if not len_data_memory: - # Don't send empty data, can cause SSL EOFError. - # See issue 719 - return 0 - - # On PyPy up through 2.6.0, subviews of a memoryview() object - # copy the underlying bytes the first time the builtin - # socket.send() method is called. On a non-blocking socket - # (that thus calls socket.send() many times) with a large - # input, this results in many repeated copies of an ever - # smaller string, depending on the networking buffering. For - # example, if each send() can process 1MB of a 50MB input, and - # we naively pass the entire remaining subview each time, we'd - # copy 49MB, 48MB, 47MB, etc, thus completely killing - # performance. To workaround this problem, we work in - # reasonable, fixed-size chunks. This results in a 10x - # improvement to bench_sendall.py, while having no measurable impact on - # CPython (since it doesn't copy at all the only extra overhead is - # a few python function calls, which is negligible for large inputs). - - # See https://bitbucket.org/pypy/pypy/issues/2091/non-blocking-socketsend-slow-gevent - - # Too small of a chunk (the socket's buf size is usually too - # small) results in reduced perf due to *too many* calls to send and too many - # small copies. With a buffer of 143K (the default on my system), for - # example, bench_sendall.py yields ~264MB/s, while using 1MB yields - # ~653MB/s (matching CPython). 1MB is arbitrary and might be better - # chosen, say, to match a page size? - chunk_size = max(self.getsockopt(SOL_SOCKET, SO_SNDBUF), 1024 * 1024) # pylint:disable=no-member - - data_sent = 0 - end = None - timeleft = None - if self.timeout is not None: - timeleft = self.timeout - end = time.time() + timeleft - - while data_sent < len_data_memory: - chunk_end = min(data_sent + chunk_size, len_data_memory) - chunk = data_memory[data_sent:chunk_end] - - timeleft = self.__send_chunk(chunk, flags, timeleft, end) - data_sent += len(chunk) # Guaranteed it sent the whole thing + return _socketcommon._sendall(self, data_memory, flags) def sendto(self, *args): sock = self._sock |