diff options
Diffstat (limited to 'python/gevent/baseserver.py')
-rw-r--r-- | python/gevent/baseserver.py | 404 |
1 files changed, 0 insertions, 404 deletions
diff --git a/python/gevent/baseserver.py b/python/gevent/baseserver.py deleted file mode 100644 index d20d936..0000000 --- a/python/gevent/baseserver.py +++ /dev/null @@ -1,404 +0,0 @@ -"""Base class for implementing servers""" -# Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details. -import sys -import _socket -import errno -from gevent.greenlet import Greenlet -from gevent.event import Event -from gevent.hub import get_hub -from gevent._compat import string_types, integer_types, xrange - - -__all__ = ['BaseServer'] - - -# We define a helper function to handle closing the socket in -# do_handle; We'd like to bind it to a kwarg to avoid *any* lookups at -# all, but that's incompatible with the calling convention of -# do_handle. On CPython, this is ~20% faster than creating and calling -# a closure and ~10% faster than using a @staticmethod. (In theory, we -# could create a closure only once in set_handle, to wrap self._handle, -# but this is safer from a backwards compat standpoint.) -# we also avoid unpacking the *args tuple when calling/spawning this object -# for a tiny improvement (benchmark shows a wash) -def _handle_and_close_when_done(handle, close, args_tuple): - try: - return handle(*args_tuple) - finally: - close(*args_tuple) - - -class BaseServer(object): - """ - An abstract base class that implements some common functionality for the servers in gevent. - - :param listener: Either be an address that the server should bind - on or a :class:`gevent.socket.socket` instance that is already - bound (and put into listening mode in case of TCP socket). - - :keyword handle: If given, the request handler. The request - handler can be defined in a few ways. Most commonly, - subclasses will implement a ``handle`` method as an - instance method. Alternatively, a function can be passed - as the ``handle`` argument to the constructor. In either - case, the handler can later be changed by calling - :meth:`set_handle`. - - When the request handler returns, the socket used for the - request will be closed. Therefore, the handler must not return if - the socket is still in use (for example, by manually spawned greenlets). - - :keyword spawn: If provided, is called to create a new - greenlet to run the handler. By default, - :func:`gevent.spawn` is used (meaning there is no - artificial limit on the number of concurrent requests). Possible values for *spawn*: - - - a :class:`gevent.pool.Pool` instance -- ``handle`` will be executed - using :meth:`gevent.pool.Pool.spawn` only if the pool is not full. - While it is full, no new connections are accepted; - - :func:`gevent.spawn_raw` -- ``handle`` will be executed in a raw - greenlet which has a little less overhead then :class:`gevent.Greenlet` instances spawned by default; - - ``None`` -- ``handle`` will be executed right away, in the :class:`Hub` greenlet. - ``handle`` cannot use any blocking functions as it would mean switching to the :class:`Hub`. - - an integer -- a shortcut for ``gevent.pool.Pool(integer)`` - - .. versionchanged:: 1.1a1 - When the *handle* function returns from processing a connection, - the client socket will be closed. This resolves the non-deterministic - closing of the socket, fixing ResourceWarnings under Python 3 and PyPy. - - """ - # pylint: disable=too-many-instance-attributes,bare-except,broad-except - - #: the number of seconds to sleep in case there was an error in accept() call - #: for consecutive errors the delay will double until it reaches max_delay - #: when accept() finally succeeds the delay will be reset to min_delay again - min_delay = 0.01 - max_delay = 1 - - #: Sets the maximum number of consecutive accepts that a process may perform on - #: a single wake up. High values give higher priority to high connection rates, - #: while lower values give higher priority to already established connections. - #: Default is 100. Note, that in case of multiple working processes on the same - #: listening value, it should be set to a lower value. (pywsgi.WSGIServer sets it - #: to 1 when environ["wsgi.multiprocess"] is true) - max_accept = 100 - - _spawn = Greenlet.spawn - - #: the default timeout that we wait for the client connections to close in stop() - stop_timeout = 1 - - fatal_errors = (errno.EBADF, errno.EINVAL, errno.ENOTSOCK) - - def __init__(self, listener, handle=None, spawn='default'): - self._stop_event = Event() - self._stop_event.set() - self._watcher = None - self._timer = None - self._handle = None - # XXX: FIXME: Subclasses rely on the presence or absence of the - # `socket` attribute to determine whether we are open/should be opened. - # Instead, have it be None. - self.pool = None - try: - self.set_listener(listener) - self.set_spawn(spawn) - self.set_handle(handle) - self.delay = self.min_delay - self.loop = get_hub().loop - if self.max_accept < 1: - raise ValueError('max_accept must be positive int: %r' % (self.max_accept, )) - except: - self.close() - raise - - def set_listener(self, listener): - if hasattr(listener, 'accept'): - if hasattr(listener, 'do_handshake'): - raise TypeError('Expected a regular socket, not SSLSocket: %r' % (listener, )) - self.family = listener.family - self.address = listener.getsockname() - self.socket = listener - else: - self.family, self.address = parse_address(listener) - - def set_spawn(self, spawn): - if spawn == 'default': - self.pool = None - self._spawn = self._spawn - elif hasattr(spawn, 'spawn'): - self.pool = spawn - self._spawn = spawn.spawn - elif isinstance(spawn, integer_types): - from gevent.pool import Pool - self.pool = Pool(spawn) - self._spawn = self.pool.spawn - else: - self.pool = None - self._spawn = spawn - if hasattr(self.pool, 'full'): - self.full = self.pool.full - if self.pool is not None: - self.pool._semaphore.rawlink(self._start_accepting_if_started) - - def set_handle(self, handle): - if handle is not None: - self.handle = handle - if hasattr(self, 'handle'): - self._handle = self.handle - else: - raise TypeError("'handle' must be provided") - - def _start_accepting_if_started(self, _event=None): - if self.started: - self.start_accepting() - - def start_accepting(self): - if self._watcher is None: - # just stop watcher without creating a new one? - self._watcher = self.loop.io(self.socket.fileno(), 1) - self._watcher.start(self._do_read) - - def stop_accepting(self): - if self._watcher is not None: - self._watcher.stop() - self._watcher.close() - self._watcher = None - if self._timer is not None: - self._timer.stop() - self._timer.close() - self._timer = None - - def do_handle(self, *args): - spawn = self._spawn - handle = self._handle - close = self.do_close - - try: - if spawn is None: - _handle_and_close_when_done(handle, close, args) - else: - spawn(_handle_and_close_when_done, handle, close, args) - except: - close(*args) - raise - - def do_close(self, *args): - pass - - def do_read(self): - raise NotImplementedError() - - def _do_read(self): - for _ in xrange(self.max_accept): - if self.full(): - self.stop_accepting() - return - try: - args = self.do_read() - self.delay = self.min_delay - if not args: - return - except: - self.loop.handle_error(self, *sys.exc_info()) - ex = sys.exc_info()[1] - if self.is_fatal_error(ex): - self.close() - sys.stderr.write('ERROR: %s failed with %s\n' % (self, str(ex) or repr(ex))) - return - if self.delay >= 0: - self.stop_accepting() - self._timer = self.loop.timer(self.delay) - self._timer.start(self._start_accepting_if_started) - self.delay = min(self.max_delay, self.delay * 2) - break - else: - try: - self.do_handle(*args) - except: - self.loop.handle_error((args[1:], self), *sys.exc_info()) - if self.delay >= 0: - self.stop_accepting() - self._timer = self.loop.timer(self.delay) - self._timer.start(self._start_accepting_if_started) - self.delay = min(self.max_delay, self.delay * 2) - break - - def full(self): - # copied from self.pool - # pylint: disable=method-hidden - return False - - def __repr__(self): - return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._formatinfo()) - - def __str__(self): - return '<%s %s>' % (type(self).__name__, self._formatinfo()) - - def _formatinfo(self): - if hasattr(self, 'socket'): - try: - fileno = self.socket.fileno() - except Exception as ex: - fileno = str(ex) - result = 'fileno=%s ' % fileno - else: - result = '' - try: - if isinstance(self.address, tuple) and len(self.address) == 2: - result += 'address=%s:%s' % self.address - else: - result += 'address=%s' % (self.address, ) - except Exception as ex: - result += str(ex) or '<error>' - - handle = self.__dict__.get('handle') - if handle is not None: - fself = getattr(handle, '__self__', None) - try: - if fself is self: - # Checks the __self__ of the handle in case it is a bound - # method of self to prevent recursivly defined reprs. - handle_repr = '<bound method %s.%s of self>' % ( - self.__class__.__name__, - handle.__name__, - ) - else: - handle_repr = repr(handle) - - result += ' handle=' + handle_repr - except Exception as ex: - result += str(ex) or '<error>' - - return result - - @property - def server_host(self): - """IP address that the server is bound to (string).""" - if isinstance(self.address, tuple): - return self.address[0] - - @property - def server_port(self): - """Port that the server is bound to (an integer).""" - if isinstance(self.address, tuple): - return self.address[1] - - def init_socket(self): - """If the user initialized the server with an address rather than socket, - then this function will create a socket, bind it and put it into listening mode. - - It is not supposed to be called by the user, it is called by :meth:`start` before starting - the accept loop.""" - pass - - @property - def started(self): - return not self._stop_event.is_set() - - def start(self): - """Start accepting the connections. - - If an address was provided in the constructor, then also create a socket, - bind it and put it into the listening mode. - """ - self.init_socket() - self._stop_event.clear() - try: - self.start_accepting() - except: - self.close() - raise - - def close(self): - """Close the listener socket and stop accepting.""" - self._stop_event.set() - try: - self.stop_accepting() - finally: - try: - self.socket.close() - except Exception: - pass - finally: - self.__dict__.pop('socket', None) - self.__dict__.pop('handle', None) - self.__dict__.pop('_handle', None) - self.__dict__.pop('_spawn', None) - self.__dict__.pop('full', None) - if self.pool is not None: - self.pool._semaphore.unlink(self._start_accepting_if_started) - - @property - def closed(self): - return not hasattr(self, 'socket') - - def stop(self, timeout=None): - """ - Stop accepting the connections and close the listening socket. - - If the server uses a pool to spawn the requests, then - :meth:`stop` also waits for all the handlers to exit. If there - are still handlers executing after *timeout* has expired - (default 1 second, :attr:`stop_timeout`), then the currently - running handlers in the pool are killed. - - If the server does not use a pool, then this merely stops accepting connections; - any spawned greenlets that are handling requests continue running until - they naturally complete. - """ - self.close() - if timeout is None: - timeout = self.stop_timeout - if self.pool: - self.pool.join(timeout=timeout) - self.pool.kill(block=True, timeout=1) - - def serve_forever(self, stop_timeout=None): - """Start the server if it hasn't been already started and wait until it's stopped.""" - # add test that serve_forever exists on stop() - if not self.started: - self.start() - try: - self._stop_event.wait() - finally: - Greenlet.spawn(self.stop, timeout=stop_timeout).join() - - def is_fatal_error(self, ex): - return isinstance(ex, _socket.error) and ex.args[0] in self.fatal_errors - - -def _extract_family(host): - if host.startswith('[') and host.endswith(']'): - host = host[1:-1] - return _socket.AF_INET6, host - return _socket.AF_INET, host - - -def _parse_address(address): - if isinstance(address, tuple): - if not address[0] or ':' in address[0]: - return _socket.AF_INET6, address - return _socket.AF_INET, address - - if ((isinstance(address, string_types) and ':' not in address) - or isinstance(address, integer_types)): # noqa (pep8 E129) - # Just a port - return _socket.AF_INET6, ('', int(address)) - - if not isinstance(address, string_types): - raise TypeError('Expected tuple or string, got %s' % type(address)) - - host, port = address.rsplit(':', 1) - family, host = _extract_family(host) - if host == '*': - host = '' - return family, (host, int(port)) - - -def parse_address(address): - try: - return _parse_address(address) - except ValueError as ex: # pylint:disable=try-except-raise - raise ValueError('Failed to parse address %r: %s' % (address, ex)) |