aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/_fileobjectcommon.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gevent/_fileobjectcommon.py')
-rw-r--r--python/gevent/_fileobjectcommon.py151
1 files changed, 149 insertions, 2 deletions
diff --git a/python/gevent/_fileobjectcommon.py b/python/gevent/_fileobjectcommon.py
index 435f0d8..38b1da7 100644
--- a/python/gevent/_fileobjectcommon.py
+++ b/python/gevent/_fileobjectcommon.py
@@ -1,10 +1,20 @@
+from __future__ import absolute_import, print_function, division
try:
from errno import EBADF
except ImportError:
EBADF = 9
+import os
from io import TextIOWrapper
+import functools
+import sys
+
+
+from gevent.hub import _get_hub_noargs as get_hub
+from gevent._compat import integer_types
+from gevent._compat import reraise
+from gevent.lock import Semaphore, DummySemaphore
class cancel_wait_ex(IOError):
@@ -53,6 +63,9 @@ class FileObjectBase(object):
# Whether we are translating universal newlines or not.
_translate = False
+ _translate_encoding = None
+ _translate_errors = None
+
def __init__(self, io, closefd):
"""
:param io: An io.IOBase-like object.
@@ -63,8 +76,9 @@ class FileObjectBase(object):
self._close = closefd
if self._translate:
- # This automatically handles delegation.
- self.translate_newlines(None)
+ # This automatically handles delegation by assigning to
+ # self.io
+ self.translate_newlines(None, self._translate_encoding, self._translate_errors)
else:
self._do_delegate_methods()
@@ -126,3 +140,136 @@ class FileObjectBase(object):
def _extra_repr(self):
return ''
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
+class FileObjectBlock(FileObjectBase):
+
+ def __init__(self, fobj, *args, **kwargs):
+ closefd = kwargs.pop('close', True)
+ if kwargs:
+ raise TypeError('Unexpected arguments: %r' % kwargs.keys())
+ if isinstance(fobj, integer_types):
+ if not closefd:
+ # we cannot do this, since fdopen object will close the descriptor
+ raise TypeError('FileObjectBlock does not support close=False on an fd.')
+ fobj = os.fdopen(fobj, *args)
+ super(FileObjectBlock, self).__init__(fobj, closefd)
+
+ def _do_close(self, fobj, closefd):
+ fobj.close()
+
+class FileObjectThread(FileObjectBase):
+ """
+ A file-like object wrapping another file-like object, performing all blocking
+ operations on that object in a background thread.
+
+ .. caution::
+ Attempting to change the threadpool or lock of an existing FileObjectThread
+ has undefined consequences.
+
+ .. versionchanged:: 1.1b1
+ The file object is closed using the threadpool. Note that whether or
+ not this action is synchronous or asynchronous is not documented.
+
+ """
+
+ def __init__(self, fobj, mode=None, bufsize=-1, close=True, threadpool=None, lock=True):
+ """
+ :param fobj: The underlying file-like object to wrap, or an integer fileno
+ that will be pass to :func:`os.fdopen` along with *mode* and *bufsize*.
+ :keyword bool lock: If True (the default) then all operations will
+ be performed one-by-one. Note that this does not guarantee that, if using
+ this file object from multiple threads/greenlets, operations will be performed
+ in any particular order, only that no two operations will be attempted at the
+ same time. You can also pass your own :class:`gevent.lock.Semaphore` to synchronize
+ file operations with an external resource.
+ :keyword bool close: If True (the default) then when this object is closed,
+ the underlying object is closed as well.
+ """
+ closefd = close
+ self.threadpool = threadpool or get_hub().threadpool
+ self.lock = lock
+ if self.lock is True:
+ self.lock = Semaphore()
+ elif not self.lock:
+ self.lock = DummySemaphore()
+ if not hasattr(self.lock, '__enter__'):
+ raise TypeError('Expected a Semaphore or boolean, got %r' % type(self.lock))
+ if isinstance(fobj, integer_types):
+ if not closefd:
+ # we cannot do this, since fdopen object will close the descriptor
+ raise TypeError('FileObjectThread does not support close=False on an fd.')
+ if mode is None:
+ assert bufsize == -1, "If you use the default mode, you can't choose a bufsize"
+ fobj = os.fdopen(fobj)
+ else:
+ fobj = os.fdopen(fobj, mode, bufsize)
+
+ self.__io_holder = [fobj] # signal for _wrap_method
+ super(FileObjectThread, self).__init__(fobj, closefd)
+
+ def _do_close(self, fobj, closefd):
+ self.__io_holder[0] = None # for _wrap_method
+ try:
+ with self.lock:
+ self.threadpool.apply(fobj.flush)
+ finally:
+ if closefd:
+ # Note that we're not taking the lock; older code
+ # did fobj.close() without going through the threadpool at all,
+ # so acquiring the lock could potentially introduce deadlocks
+ # that weren't present before. Avoiding the lock doesn't make
+ # the existing race condition any worse.
+ # We wrap the close in an exception handler and re-raise directly
+ # to avoid the (common, expected) IOError from being logged by the pool
+ def close():
+ try:
+ fobj.close()
+ except: # pylint:disable=bare-except
+ return sys.exc_info()
+ exc_info = self.threadpool.apply(close)
+ if exc_info:
+ reraise(*exc_info)
+
+ def _do_delegate_methods(self):
+ super(FileObjectThread, self)._do_delegate_methods()
+ if not hasattr(self, 'read1') and 'r' in getattr(self._io, 'mode', ''):
+ self.read1 = self.read
+ self.__io_holder[0] = self._io
+
+ def _extra_repr(self):
+ return ' threadpool=%r' % (self.threadpool,)
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ line = self.readline()
+ if line:
+ return line
+ raise StopIteration
+ __next__ = next
+
+ def _wrap_method(self, method):
+ # NOTE: We are careful to avoid introducing a refcycle
+ # within self. Our wrapper cannot refer to self.
+ io_holder = self.__io_holder
+ lock = self.lock
+ threadpool = self.threadpool
+
+ @functools.wraps(method)
+ def thread_method(*args, **kwargs):
+ if io_holder[0] is None:
+ # This is different than FileObjectPosix, etc,
+ # because we want to save the expensive trip through
+ # the threadpool.
+ raise FileObjectClosed()
+ with lock:
+ return threadpool.apply(method, args, kwargs)
+
+ return thread_method