diff options
Diffstat (limited to 'python/gevent/baseserver.py')
-rw-r--r-- | python/gevent/baseserver.py | 402 |
1 files changed, 402 insertions, 0 deletions
diff --git a/python/gevent/baseserver.py b/python/gevent/baseserver.py new file mode 100644 index 0000000..5b8bce5 --- /dev/null +++ b/python/gevent/baseserver.py @@ -0,0 +1,402 @@ +"""Base class for implementing servers""" +# Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details. +import sys +import _socket +import errno +from gevent.greenlet import Greenlet +from gevent.event import Event +from gevent.hub import get_hub +from gevent._compat import string_types, integer_types, xrange + + +__all__ = ['BaseServer'] + + +# We define a helper function to handle closing the socket in +# do_handle; We'd like to bind it to a kwarg to avoid *any* lookups at +# all, but that's incompatible with the calling convention of +# do_handle. On CPython, this is ~20% faster than creating and calling +# a closure and ~10% faster than using a @staticmethod. (In theory, we +# could create a closure only once in set_handle, to wrap self._handle, +# but this is safer from a backwards compat standpoint.) +# we also avoid unpacking the *args tuple when calling/spawning this object +# for a tiny improvement (benchmark shows a wash) +def _handle_and_close_when_done(handle, close, args_tuple): + try: + return handle(*args_tuple) + finally: + close(*args_tuple) + + +class BaseServer(object): + """ + An abstract base class that implements some common functionality for the servers in gevent. + + :param listener: Either be an address that the server should bind + on or a :class:`gevent.socket.socket` instance that is already + bound (and put into listening mode in case of TCP socket). + + :keyword handle: If given, the request handler. The request + handler can be defined in a few ways. Most commonly, + subclasses will implement a ``handle`` method as an + instance method. Alternatively, a function can be passed + as the ``handle`` argument to the constructor. In either + case, the handler can later be changed by calling + :meth:`set_handle`. + + When the request handler returns, the socket used for the + request will be closed. Therefore, the handler must not return if + the socket is still in use (for example, by manually spawned greenlets). + + :keyword spawn: If provided, is called to create a new + greenlet to run the handler. By default, + :func:`gevent.spawn` is used (meaning there is no + artificial limit on the number of concurrent requests). Possible values for *spawn*: + + - a :class:`gevent.pool.Pool` instance -- ``handle`` will be executed + using :meth:`gevent.pool.Pool.spawn` only if the pool is not full. + While it is full, no new connections are accepted; + - :func:`gevent.spawn_raw` -- ``handle`` will be executed in a raw + greenlet which has a little less overhead then :class:`gevent.Greenlet` instances spawned by default; + - ``None`` -- ``handle`` will be executed right away, in the :class:`Hub` greenlet. + ``handle`` cannot use any blocking functions as it would mean switching to the :class:`Hub`. + - an integer -- a shortcut for ``gevent.pool.Pool(integer)`` + + .. versionchanged:: 1.1a1 + When the *handle* function returns from processing a connection, + the client socket will be closed. This resolves the non-deterministic + closing of the socket, fixing ResourceWarnings under Python 3 and PyPy. + + """ + # pylint: disable=too-many-instance-attributes,bare-except,broad-except + + #: the number of seconds to sleep in case there was an error in accept() call + #: for consecutive errors the delay will double until it reaches max_delay + #: when accept() finally succeeds the delay will be reset to min_delay again + min_delay = 0.01 + max_delay = 1 + + #: Sets the maximum number of consecutive accepts that a process may perform on + #: a single wake up. High values give higher priority to high connection rates, + #: while lower values give higher priority to already established connections. + #: Default is 100. Note, that in case of multiple working processes on the same + #: listening value, it should be set to a lower value. (pywsgi.WSGIServer sets it + #: to 1 when environ["wsgi.multiprocess"] is true) + max_accept = 100 + + _spawn = Greenlet.spawn + + #: the default timeout that we wait for the client connections to close in stop() + stop_timeout = 1 + + fatal_errors = (errno.EBADF, errno.EINVAL, errno.ENOTSOCK) + + def __init__(self, listener, handle=None, spawn='default'): + self._stop_event = Event() + self._stop_event.set() + self._watcher = None + self._timer = None + self._handle = None + # XXX: FIXME: Subclasses rely on the presence or absence of the + # `socket` attribute to determine whether we are open/should be opened. + # Instead, have it be None. + self.pool = None + try: + self.set_listener(listener) + self.set_spawn(spawn) + self.set_handle(handle) + self.delay = self.min_delay + self.loop = get_hub().loop + if self.max_accept < 1: + raise ValueError('max_accept must be positive int: %r' % (self.max_accept, )) + except: + self.close() + raise + + def set_listener(self, listener): + if hasattr(listener, 'accept'): + if hasattr(listener, 'do_handshake'): + raise TypeError('Expected a regular socket, not SSLSocket: %r' % (listener, )) + self.family = listener.family + self.address = listener.getsockname() + self.socket = listener + else: + self.family, self.address = parse_address(listener) + + def set_spawn(self, spawn): + if spawn == 'default': + self.pool = None + self._spawn = self._spawn + elif hasattr(spawn, 'spawn'): + self.pool = spawn + self._spawn = spawn.spawn + elif isinstance(spawn, integer_types): + from gevent.pool import Pool + self.pool = Pool(spawn) + self._spawn = self.pool.spawn + else: + self.pool = None + self._spawn = spawn + if hasattr(self.pool, 'full'): + self.full = self.pool.full + if self.pool is not None: + self.pool._semaphore.rawlink(self._start_accepting_if_started) + + def set_handle(self, handle): + if handle is not None: + self.handle = handle + if hasattr(self, 'handle'): + self._handle = self.handle + else: + raise TypeError("'handle' must be provided") + + def _start_accepting_if_started(self, _event=None): + if self.started: + self.start_accepting() + + def start_accepting(self): + if self._watcher is None: + # just stop watcher without creating a new one? + self._watcher = self.loop.io(self.socket.fileno(), 1) + self._watcher.start(self._do_read) + + def stop_accepting(self): + if self._watcher is not None: + self._watcher.stop() + self._watcher = None + if self._timer is not None: + self._timer.stop() + self._timer = None + + def do_handle(self, *args): + spawn = self._spawn + handle = self._handle + close = self.do_close + + try: + if spawn is None: + _handle_and_close_when_done(handle, close, args) + else: + spawn(_handle_and_close_when_done, handle, close, args) + except: + close(*args) + raise + + def do_close(self, *args): + pass + + def do_read(self): + raise NotImplementedError() + + def _do_read(self): + for _ in xrange(self.max_accept): + if self.full(): + self.stop_accepting() + return + try: + args = self.do_read() + self.delay = self.min_delay + if not args: + return + except: + self.loop.handle_error(self, *sys.exc_info()) + ex = sys.exc_info()[1] + if self.is_fatal_error(ex): + self.close() + sys.stderr.write('ERROR: %s failed with %s\n' % (self, str(ex) or repr(ex))) + return + if self.delay >= 0: + self.stop_accepting() + self._timer = self.loop.timer(self.delay) + self._timer.start(self._start_accepting_if_started) + self.delay = min(self.max_delay, self.delay * 2) + break + else: + try: + self.do_handle(*args) + except: + self.loop.handle_error((args[1:], self), *sys.exc_info()) + if self.delay >= 0: + self.stop_accepting() + self._timer = self.loop.timer(self.delay) + self._timer.start(self._start_accepting_if_started) + self.delay = min(self.max_delay, self.delay * 2) + break + + def full(self): + # copied from self.pool + # pylint: disable=method-hidden + return False + + def __repr__(self): + return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._formatinfo()) + + def __str__(self): + return '<%s %s>' % (type(self).__name__, self._formatinfo()) + + def _formatinfo(self): + if hasattr(self, 'socket'): + try: + fileno = self.socket.fileno() + except Exception as ex: + fileno = str(ex) + result = 'fileno=%s ' % fileno + else: + result = '' + try: + if isinstance(self.address, tuple) and len(self.address) == 2: + result += 'address=%s:%s' % self.address + else: + result += 'address=%s' % (self.address, ) + except Exception as ex: + result += str(ex) or '<error>' + + handle = self.__dict__.get('handle') + if handle is not None: + fself = getattr(handle, '__self__', None) + try: + if fself is self: + # Checks the __self__ of the handle in case it is a bound + # method of self to prevent recursivly defined reprs. + handle_repr = '<bound method %s.%s of self>' % ( + self.__class__.__name__, + handle.__name__, + ) + else: + handle_repr = repr(handle) + + result += ' handle=' + handle_repr + except Exception as ex: + result += str(ex) or '<error>' + + return result + + @property + def server_host(self): + """IP address that the server is bound to (string).""" + if isinstance(self.address, tuple): + return self.address[0] + + @property + def server_port(self): + """Port that the server is bound to (an integer).""" + if isinstance(self.address, tuple): + return self.address[1] + + def init_socket(self): + """If the user initialized the server with an address rather than socket, + then this function will create a socket, bind it and put it into listening mode. + + It is not supposed to be called by the user, it is called by :meth:`start` before starting + the accept loop.""" + pass + + @property + def started(self): + return not self._stop_event.is_set() + + def start(self): + """Start accepting the connections. + + If an address was provided in the constructor, then also create a socket, + bind it and put it into the listening mode. + """ + self.init_socket() + self._stop_event.clear() + try: + self.start_accepting() + except: + self.close() + raise + + def close(self): + """Close the listener socket and stop accepting.""" + self._stop_event.set() + try: + self.stop_accepting() + finally: + try: + self.socket.close() + except Exception: + pass + finally: + self.__dict__.pop('socket', None) + self.__dict__.pop('handle', None) + self.__dict__.pop('_handle', None) + self.__dict__.pop('_spawn', None) + self.__dict__.pop('full', None) + if self.pool is not None: + self.pool._semaphore.unlink(self._start_accepting_if_started) + + @property + def closed(self): + return not hasattr(self, 'socket') + + def stop(self, timeout=None): + """ + Stop accepting the connections and close the listening socket. + + If the server uses a pool to spawn the requests, then + :meth:`stop` also waits for all the handlers to exit. If there + are still handlers executing after *timeout* has expired + (default 1 second, :attr:`stop_timeout`), then the currently + running handlers in the pool are killed. + + If the server does not use a pool, then this merely stops accepting connections; + any spawned greenlets that are handling requests continue running until + they naturally complete. + """ + self.close() + if timeout is None: + timeout = self.stop_timeout + if self.pool: + self.pool.join(timeout=timeout) + self.pool.kill(block=True, timeout=1) + + def serve_forever(self, stop_timeout=None): + """Start the server if it hasn't been already started and wait until it's stopped.""" + # add test that serve_forever exists on stop() + if not self.started: + self.start() + try: + self._stop_event.wait() + finally: + Greenlet.spawn(self.stop, timeout=stop_timeout).join() + + def is_fatal_error(self, ex): + return isinstance(ex, _socket.error) and ex.args[0] in self.fatal_errors + + +def _extract_family(host): + if host.startswith('[') and host.endswith(']'): + host = host[1:-1] + return _socket.AF_INET6, host + return _socket.AF_INET, host + + +def _parse_address(address): + if isinstance(address, tuple): + if not address[0] or ':' in address[0]: + return _socket.AF_INET6, address + return _socket.AF_INET, address + + if ((isinstance(address, string_types) and ':' not in address) + or isinstance(address, integer_types)): # noqa (pep8 E129) + # Just a port + return _socket.AF_INET6, ('', int(address)) + + if not isinstance(address, string_types): + raise TypeError('Expected tuple or string, got %s' % type(address)) + + host, port = address.rsplit(':', 1) + family, host = _extract_family(host) + if host == '*': + host = '' + return family, (host, int(port)) + + +def parse_address(address): + try: + return _parse_address(address) + except ValueError as ex: + raise ValueError('Failed to parse address %r: %s' % (address, ex)) |