aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/baseserver.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/baseserver.py')
-rw-r--r--python/gevent/baseserver.py402
1 files changed, 402 insertions, 0 deletions
diff --git a/python/gevent/baseserver.py b/python/gevent/baseserver.py
new file mode 100644
index 0000000..5b8bce5
--- /dev/null
+++ b/python/gevent/baseserver.py
@@ -0,0 +1,402 @@
+"""Base class for implementing servers"""
+# Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details.
+import sys
+import _socket
+import errno
+from gevent.greenlet import Greenlet
+from gevent.event import Event
+from gevent.hub import get_hub
+from gevent._compat import string_types, integer_types, xrange
+
+
+__all__ = ['BaseServer']
+
+
+# We define a helper function to handle closing the socket in
+# do_handle; We'd like to bind it to a kwarg to avoid *any* lookups at
+# all, but that's incompatible with the calling convention of
+# do_handle. On CPython, this is ~20% faster than creating and calling
+# a closure and ~10% faster than using a @staticmethod. (In theory, we
+# could create a closure only once in set_handle, to wrap self._handle,
+# but this is safer from a backwards compat standpoint.)
+# we also avoid unpacking the *args tuple when calling/spawning this object
+# for a tiny improvement (benchmark shows a wash)
+def _handle_and_close_when_done(handle, close, args_tuple):
+ try:
+ return handle(*args_tuple)
+ finally:
+ close(*args_tuple)
+
+
+class BaseServer(object):
+ """
+ An abstract base class that implements some common functionality for the servers in gevent.
+
+ :param listener: Either be an address that the server should bind
+ on or a :class:`gevent.socket.socket` instance that is already
+ bound (and put into listening mode in case of TCP socket).
+
+ :keyword handle: If given, the request handler. The request
+ handler can be defined in a few ways. Most commonly,
+ subclasses will implement a ``handle`` method as an
+ instance method. Alternatively, a function can be passed
+ as the ``handle`` argument to the constructor. In either
+ case, the handler can later be changed by calling
+ :meth:`set_handle`.
+
+ When the request handler returns, the socket used for the
+ request will be closed. Therefore, the handler must not return if
+ the socket is still in use (for example, by manually spawned greenlets).
+
+ :keyword spawn: If provided, is called to create a new
+ greenlet to run the handler. By default,
+ :func:`gevent.spawn` is used (meaning there is no
+ artificial limit on the number of concurrent requests). Possible values for *spawn*:
+
+ - a :class:`gevent.pool.Pool` instance -- ``handle`` will be executed
+ using :meth:`gevent.pool.Pool.spawn` only if the pool is not full.
+ While it is full, no new connections are accepted;
+ - :func:`gevent.spawn_raw` -- ``handle`` will be executed in a raw
+ greenlet which has a little less overhead then :class:`gevent.Greenlet` instances spawned by default;
+ - ``None`` -- ``handle`` will be executed right away, in the :class:`Hub` greenlet.
+ ``handle`` cannot use any blocking functions as it would mean switching to the :class:`Hub`.
+ - an integer -- a shortcut for ``gevent.pool.Pool(integer)``
+
+ .. versionchanged:: 1.1a1
+ When the *handle* function returns from processing a connection,
+ the client socket will be closed. This resolves the non-deterministic
+ closing of the socket, fixing ResourceWarnings under Python 3 and PyPy.
+
+ """
+ # pylint: disable=too-many-instance-attributes,bare-except,broad-except
+
+ #: the number of seconds to sleep in case there was an error in accept() call
+ #: for consecutive errors the delay will double until it reaches max_delay
+ #: when accept() finally succeeds the delay will be reset to min_delay again
+ min_delay = 0.01
+ max_delay = 1
+
+ #: Sets the maximum number of consecutive accepts that a process may perform on
+ #: a single wake up. High values give higher priority to high connection rates,
+ #: while lower values give higher priority to already established connections.
+ #: Default is 100. Note, that in case of multiple working processes on the same
+ #: listening value, it should be set to a lower value. (pywsgi.WSGIServer sets it
+ #: to 1 when environ["wsgi.multiprocess"] is true)
+ max_accept = 100
+
+ _spawn = Greenlet.spawn
+
+ #: the default timeout that we wait for the client connections to close in stop()
+ stop_timeout = 1
+
+ fatal_errors = (errno.EBADF, errno.EINVAL, errno.ENOTSOCK)
+
+ def __init__(self, listener, handle=None, spawn='default'):
+ self._stop_event = Event()
+ self._stop_event.set()
+ self._watcher = None
+ self._timer = None
+ self._handle = None
+ # XXX: FIXME: Subclasses rely on the presence or absence of the
+ # `socket` attribute to determine whether we are open/should be opened.
+ # Instead, have it be None.
+ self.pool = None
+ try:
+ self.set_listener(listener)
+ self.set_spawn(spawn)
+ self.set_handle(handle)
+ self.delay = self.min_delay
+ self.loop = get_hub().loop
+ if self.max_accept < 1:
+ raise ValueError('max_accept must be positive int: %r' % (self.max_accept, ))
+ except:
+ self.close()
+ raise
+
+ def set_listener(self, listener):
+ if hasattr(listener, 'accept'):
+ if hasattr(listener, 'do_handshake'):
+ raise TypeError('Expected a regular socket, not SSLSocket: %r' % (listener, ))
+ self.family = listener.family
+ self.address = listener.getsockname()
+ self.socket = listener
+ else:
+ self.family, self.address = parse_address(listener)
+
+ def set_spawn(self, spawn):
+ if spawn == 'default':
+ self.pool = None
+ self._spawn = self._spawn
+ elif hasattr(spawn, 'spawn'):
+ self.pool = spawn
+ self._spawn = spawn.spawn
+ elif isinstance(spawn, integer_types):
+ from gevent.pool import Pool
+ self.pool = Pool(spawn)
+ self._spawn = self.pool.spawn
+ else:
+ self.pool = None
+ self._spawn = spawn
+ if hasattr(self.pool, 'full'):
+ self.full = self.pool.full
+ if self.pool is not None:
+ self.pool._semaphore.rawlink(self._start_accepting_if_started)
+
+ def set_handle(self, handle):
+ if handle is not None:
+ self.handle = handle
+ if hasattr(self, 'handle'):
+ self._handle = self.handle
+ else:
+ raise TypeError("'handle' must be provided")
+
+ def _start_accepting_if_started(self, _event=None):
+ if self.started:
+ self.start_accepting()
+
+ def start_accepting(self):
+ if self._watcher is None:
+ # just stop watcher without creating a new one?
+ self._watcher = self.loop.io(self.socket.fileno(), 1)
+ self._watcher.start(self._do_read)
+
+ def stop_accepting(self):
+ if self._watcher is not None:
+ self._watcher.stop()
+ self._watcher = None
+ if self._timer is not None:
+ self._timer.stop()
+ self._timer = None
+
+ def do_handle(self, *args):
+ spawn = self._spawn
+ handle = self._handle
+ close = self.do_close
+
+ try:
+ if spawn is None:
+ _handle_and_close_when_done(handle, close, args)
+ else:
+ spawn(_handle_and_close_when_done, handle, close, args)
+ except:
+ close(*args)
+ raise
+
+ def do_close(self, *args):
+ pass
+
+ def do_read(self):
+ raise NotImplementedError()
+
+ def _do_read(self):
+ for _ in xrange(self.max_accept):
+ if self.full():
+ self.stop_accepting()
+ return
+ try:
+ args = self.do_read()
+ self.delay = self.min_delay
+ if not args:
+ return
+ except:
+ self.loop.handle_error(self, *sys.exc_info())
+ ex = sys.exc_info()[1]
+ if self.is_fatal_error(ex):
+ self.close()
+ sys.stderr.write('ERROR: %s failed with %s\n' % (self, str(ex) or repr(ex)))
+ return
+ if self.delay >= 0:
+ self.stop_accepting()
+ self._timer = self.loop.timer(self.delay)
+ self._timer.start(self._start_accepting_if_started)
+ self.delay = min(self.max_delay, self.delay * 2)
+ break
+ else:
+ try:
+ self.do_handle(*args)
+ except:
+ self.loop.handle_error((args[1:], self), *sys.exc_info())
+ if self.delay >= 0:
+ self.stop_accepting()
+ self._timer = self.loop.timer(self.delay)
+ self._timer.start(self._start_accepting_if_started)
+ self.delay = min(self.max_delay, self.delay * 2)
+ break
+
+ def full(self):
+ # copied from self.pool
+ # pylint: disable=method-hidden
+ return False
+
+ def __repr__(self):
+ return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._formatinfo())
+
+ def __str__(self):
+ return '<%s %s>' % (type(self).__name__, self._formatinfo())
+
+ def _formatinfo(self):
+ if hasattr(self, 'socket'):
+ try:
+ fileno = self.socket.fileno()
+ except Exception as ex:
+ fileno = str(ex)
+ result = 'fileno=%s ' % fileno
+ else:
+ result = ''
+ try:
+ if isinstance(self.address, tuple) and len(self.address) == 2:
+ result += 'address=%s:%s' % self.address
+ else:
+ result += 'address=%s' % (self.address, )
+ except Exception as ex:
+ result += str(ex) or '<error>'
+
+ handle = self.__dict__.get('handle')
+ if handle is not None:
+ fself = getattr(handle, '__self__', None)
+ try:
+ if fself is self:
+ # Checks the __self__ of the handle in case it is a bound
+ # method of self to prevent recursivly defined reprs.
+ handle_repr = '<bound method %s.%s of self>' % (
+ self.__class__.__name__,
+ handle.__name__,
+ )
+ else:
+ handle_repr = repr(handle)
+
+ result += ' handle=' + handle_repr
+ except Exception as ex:
+ result += str(ex) or '<error>'
+
+ return result
+
+ @property
+ def server_host(self):
+ """IP address that the server is bound to (string)."""
+ if isinstance(self.address, tuple):
+ return self.address[0]
+
+ @property
+ def server_port(self):
+ """Port that the server is bound to (an integer)."""
+ if isinstance(self.address, tuple):
+ return self.address[1]
+
+ def init_socket(self):
+ """If the user initialized the server with an address rather than socket,
+ then this function will create a socket, bind it and put it into listening mode.
+
+ It is not supposed to be called by the user, it is called by :meth:`start` before starting
+ the accept loop."""
+ pass
+
+ @property
+ def started(self):
+ return not self._stop_event.is_set()
+
+ def start(self):
+ """Start accepting the connections.
+
+ If an address was provided in the constructor, then also create a socket,
+ bind it and put it into the listening mode.
+ """
+ self.init_socket()
+ self._stop_event.clear()
+ try:
+ self.start_accepting()
+ except:
+ self.close()
+ raise
+
+ def close(self):
+ """Close the listener socket and stop accepting."""
+ self._stop_event.set()
+ try:
+ self.stop_accepting()
+ finally:
+ try:
+ self.socket.close()
+ except Exception:
+ pass
+ finally:
+ self.__dict__.pop('socket', None)
+ self.__dict__.pop('handle', None)
+ self.__dict__.pop('_handle', None)
+ self.__dict__.pop('_spawn', None)
+ self.__dict__.pop('full', None)
+ if self.pool is not None:
+ self.pool._semaphore.unlink(self._start_accepting_if_started)
+
+ @property
+ def closed(self):
+ return not hasattr(self, 'socket')
+
+ def stop(self, timeout=None):
+ """
+ Stop accepting the connections and close the listening socket.
+
+ If the server uses a pool to spawn the requests, then
+ :meth:`stop` also waits for all the handlers to exit. If there
+ are still handlers executing after *timeout* has expired
+ (default 1 second, :attr:`stop_timeout`), then the currently
+ running handlers in the pool are killed.
+
+ If the server does not use a pool, then this merely stops accepting connections;
+ any spawned greenlets that are handling requests continue running until
+ they naturally complete.
+ """
+ self.close()
+ if timeout is None:
+ timeout = self.stop_timeout
+ if self.pool:
+ self.pool.join(timeout=timeout)
+ self.pool.kill(block=True, timeout=1)
+
+ def serve_forever(self, stop_timeout=None):
+ """Start the server if it hasn't been already started and wait until it's stopped."""
+ # add test that serve_forever exists on stop()
+ if not self.started:
+ self.start()
+ try:
+ self._stop_event.wait()
+ finally:
+ Greenlet.spawn(self.stop, timeout=stop_timeout).join()
+
+ def is_fatal_error(self, ex):
+ return isinstance(ex, _socket.error) and ex.args[0] in self.fatal_errors
+
+
+def _extract_family(host):
+ if host.startswith('[') and host.endswith(']'):
+ host = host[1:-1]
+ return _socket.AF_INET6, host
+ return _socket.AF_INET, host
+
+
+def _parse_address(address):
+ if isinstance(address, tuple):
+ if not address[0] or ':' in address[0]:
+ return _socket.AF_INET6, address
+ return _socket.AF_INET, address
+
+ if ((isinstance(address, string_types) and ':' not in address)
+ or isinstance(address, integer_types)): # noqa (pep8 E129)
+ # Just a port
+ return _socket.AF_INET6, ('', int(address))
+
+ if not isinstance(address, string_types):
+ raise TypeError('Expected tuple or string, got %s' % type(address))
+
+ host, port = address.rsplit(':', 1)
+ family, host = _extract_family(host)
+ if host == '*':
+ host = ''
+ return family, (host, int(port))
+
+
+def parse_address(address):
+ try:
+ return _parse_address(address)
+ except ValueError as ex:
+ raise ValueError('Failed to parse address %r: %s' % (address, ex))