diff options
author | James Taylor <user234683@users.noreply.github.com> | 2018-07-12 23:40:30 -0700 |
---|---|---|
committer | James Taylor <user234683@users.noreply.github.com> | 2018-07-12 23:41:07 -0700 |
commit | c3b9f8c4582882cd1f768b0727eca75475bb4f94 (patch) | |
tree | 5b4a1c693fd5b7416f1d5a75862e633502e77ca7 /python/gevent/event.py | |
parent | fe9fe8257740529f5880693992e4eeca35c7ea3e (diff) | |
download | yt-local-c3b9f8c4582882cd1f768b0727eca75475bb4f94.tar.lz yt-local-c3b9f8c4582882cd1f768b0727eca75475bb4f94.tar.xz yt-local-c3b9f8c4582882cd1f768b0727eca75475bb4f94.zip |
track embedded python distribution
Diffstat (limited to 'python/gevent/event.py')
-rw-r--r-- | python/gevent/event.py | 448 |
1 files changed, 448 insertions, 0 deletions
diff --git a/python/gevent/event.py b/python/gevent/event.py new file mode 100644 index 0000000..f888b6e --- /dev/null +++ b/python/gevent/event.py @@ -0,0 +1,448 @@ +# Copyright (c) 2009-2016 Denis Bilenko, gevent contributors. See LICENSE for details. +"""Basic synchronization primitives: Event and AsyncResult""" +from __future__ import print_function +import sys +from gevent.hub import get_hub, getcurrent, _NONE +from gevent._compat import reraise +from gevent.hub import InvalidSwitchError +from gevent.timeout import Timeout +from gevent._tblib import dump_traceback, load_traceback + +__all__ = ['Event', 'AsyncResult'] + + +class _AbstractLinkable(object): + # Encapsulates the standard parts of the linking and notifying protocol + # common to both repeatable events and one-time events (AsyncResult). + + _notifier = None + + def __init__(self): + # Also previously, AsyncResult maintained the order of notifications, but Event + # did not; this implementation does not. (Event also only call callbacks one + # time (set), but AsyncResult permitted duplicates.) + + # HOWEVER, gevent.queue.Queue does guarantee the order of getters relative + # to putters. Some existing documentation out on the net likes to refer to + # gevent as "deterministic", such that running the same program twice will + # produce results in the same order (so long as I/O isn't involved). This could + # be an argument to maintain order. (One easy way to do that while guaranteeing + # uniqueness would be with a 2.7+ OrderedDict.) + self._links = set() + self.hub = get_hub() + + def ready(self): + # Instances must define this + raise NotImplementedError() + + def _check_and_notify(self): + # If this object is ready to be notified, begin the process. + if self.ready(): + if self._links and not self._notifier: + self._notifier = self.hub.loop.run_callback(self._notify_links) + + def rawlink(self, callback): + """ + Register a callback to call when this object is ready. + + *callback* will be called in the :class:`Hub <gevent.hub.Hub>`, so it must not use blocking gevent API. + *callback* will be passed one argument: this instance. + """ + if not callable(callback): + raise TypeError('Expected callable: %r' % (callback, )) + self._links.add(callback) + self._check_and_notify() + + def unlink(self, callback): + """Remove the callback set by :meth:`rawlink`""" + try: + self._links.remove(callback) + except KeyError: + pass + + def _notify_links(self): + # Actually call the notification callbacks. Those callbacks in todo that are + # still in _links are called. This method is careful to avoid iterating + # over self._links, because links could be added or removed while this + # method runs. Only links present when this method begins running + # will be called; if a callback adds a new link, it will not run + # until the next time notify_links is activated + + # We don't need to capture self._links as todo when establishing + # this callback; any links removed between now and then are handled + # by the `if` below; any links added are also grabbed + todo = set(self._links) + for link in todo: + # check that link was not notified yet and was not removed by the client + # We have to do this here, and not as part of the 'for' statement because + # a previous link(self) call might have altered self._links + if link in self._links: + try: + link(self) + except: # pylint:disable=bare-except + self.hub.handle_error((link, self), *sys.exc_info()) + if getattr(link, 'auto_unlink', None): + # This attribute can avoid having to keep a reference to the function + # *in* the function, which is a cycle + self.unlink(link) + + # save a tiny bit of memory by letting _notifier be collected + # bool(self._notifier) would turn to False as soon as we exit this + # method anyway. + del todo + del self._notifier + + def _wait_core(self, timeout, catch=Timeout): + # The core of the wait implementation, handling + # switching and linking. If *catch* is set to (), + # a timeout that elapses will be allowed to be raised. + # Returns a true value if the wait succeeded without timing out. + switch = getcurrent().switch + self.rawlink(switch) + try: + timer = Timeout._start_new_or_dummy(timeout) + try: + try: + result = self.hub.switch() + if result is not self: # pragma: no cover + raise InvalidSwitchError('Invalid switch into Event.wait(): %r' % (result, )) + return True + except catch as ex: + if ex is not timer: + raise + # test_set_and_clear and test_timeout in test_threading + # rely on the exact return values, not just truthish-ness + return False + finally: + timer.cancel() + finally: + self.unlink(switch) + + def _wait_return_value(self, waited, wait_success): + # pylint:disable=unused-argument + return None + + def _wait(self, timeout=None): + if self.ready(): + return self._wait_return_value(False, False) + + gotit = self._wait_core(timeout) + return self._wait_return_value(True, gotit) + + +class Event(_AbstractLinkable): + """A synchronization primitive that allows one greenlet to wake up one or more others. + It has the same interface as :class:`threading.Event` but works across greenlets. + + An event object manages an internal flag that can be set to true with the + :meth:`set` method and reset to false with the :meth:`clear` method. The :meth:`wait` method + blocks until the flag is true. + + .. note:: + The order and timing in which waiting greenlets are awakened is not determined. + As an implementation note, in gevent 1.1 and 1.0, waiting greenlets are awakened in a + undetermined order sometime *after* the current greenlet yields to the event loop. Other greenlets + (those not waiting to be awakened) may run between the current greenlet yielding and + the waiting greenlets being awakened. These details may change in the future. + """ + + _flag = False + + def __str__(self): + return '<%s %s _links[%s]>' % (self.__class__.__name__, (self._flag and 'set') or 'clear', len(self._links)) + + def is_set(self): + """Return true if and only if the internal flag is true.""" + return self._flag + + isSet = is_set # makes it a better drop-in replacement for threading.Event + ready = is_set # makes it compatible with AsyncResult and Greenlet (for example in wait()) + + def set(self): + """ + Set the internal flag to true. + + All greenlets waiting for it to become true are awakened in + some order at some time in the future. Greenlets that call + :meth:`wait` once the flag is true will not block at all + (until :meth:`clear` is called). + """ + self._flag = True + self._check_and_notify() + + def clear(self): + """ + Reset the internal flag to false. + + Subsequently, threads calling :meth:`wait` will block until + :meth:`set` is called to set the internal flag to true again. + """ + self._flag = False + + def _wait_return_value(self, waited, wait_success): + # To avoid the race condition outlined in http://bugs.python.org/issue13502, + # if we had to wait, then we need to return whether or not + # the condition got changed. Otherwise we simply echo + # the current state of the flag (which should be true) + if not waited: + flag = self._flag + assert flag, "if we didn't wait we should already be set" + return flag + + return wait_success + + def wait(self, timeout=None): + """ + Block until the internal flag is true. + + If the internal flag is true on entry, return immediately. Otherwise, + block until another thread (greenlet) calls :meth:`set` to set the flag to true, + or until the optional timeout occurs. + + When the *timeout* argument is present and not ``None``, it should be a + floating point number specifying a timeout for the operation in seconds + (or fractions thereof). + + :return: This method returns true if and only if the internal flag has been set to + true, either before the wait call or after the wait starts, so it will + always return ``True`` except if a timeout is given and the operation + times out. + + .. versionchanged:: 1.1 + The return value represents the flag during the elapsed wait, not + just after it elapses. This solves a race condition if one greenlet + sets and then clears the flag without switching, while other greenlets + are waiting. When the waiters wake up, this will return True; previously, + they would still wake up, but the return value would be False. This is most + noticeable when the *timeout* is present. + """ + return self._wait(timeout) + + def _reset_internal_locks(self): # pragma: no cover + # for compatibility with threading.Event (only in case of patch_all(Event=True), by default Event is not patched) + # Exception AttributeError: AttributeError("'Event' object has no attribute '_reset_internal_locks'",) + # in <module 'threading' from '/usr/lib/python2.7/threading.pyc'> ignored + pass + + +class AsyncResult(_AbstractLinkable): + """A one-time event that stores a value or an exception. + + Like :class:`Event` it wakes up all the waiters when :meth:`set` or :meth:`set_exception` + is called. Waiters may receive the passed value or exception by calling :meth:`get` + instead of :meth:`wait`. An :class:`AsyncResult` instance cannot be reset. + + To pass a value call :meth:`set`. Calls to :meth:`get` (those that are currently blocking as well as + those made in the future) will return the value: + + >>> result = AsyncResult() + >>> result.set(100) + >>> result.get() + 100 + + To pass an exception call :meth:`set_exception`. This will cause :meth:`get` to raise that exception: + + >>> result = AsyncResult() + >>> result.set_exception(RuntimeError('failure')) + >>> result.get() + Traceback (most recent call last): + ... + RuntimeError: failure + + :class:`AsyncResult` implements :meth:`__call__` and thus can be used as :meth:`link` target: + + >>> import gevent + >>> result = AsyncResult() + >>> gevent.spawn(lambda : 1/0).link(result) + >>> try: + ... result.get() + ... except ZeroDivisionError: + ... print('ZeroDivisionError') + ZeroDivisionError + + .. note:: + The order and timing in which waiting greenlets are awakened is not determined. + As an implementation note, in gevent 1.1 and 1.0, waiting greenlets are awakened in a + undetermined order sometime *after* the current greenlet yields to the event loop. Other greenlets + (those not waiting to be awakened) may run between the current greenlet yielding and + the waiting greenlets being awakened. These details may change in the future. + + .. versionchanged:: 1.1 + The exact order in which waiting greenlets are awakened is not the same + as in 1.0. + .. versionchanged:: 1.1 + Callbacks :meth:`linked <rawlink>` to this object are required to be hashable, and duplicates are + merged. + """ + + _value = _NONE + _exc_info = () + _notifier = None + + @property + def _exception(self): + return self._exc_info[1] if self._exc_info else _NONE + + @property + def value(self): + """ + Holds the value passed to :meth:`set` if :meth:`set` was called. Otherwise, + ``None`` + """ + return self._value if self._value is not _NONE else None + + @property + def exc_info(self): + """ + The three-tuple of exception information if :meth:`set_exception` was called. + """ + if self._exc_info: + return (self._exc_info[0], self._exc_info[1], load_traceback(self._exc_info[2])) + return () + + def __str__(self): + result = '<%s ' % (self.__class__.__name__, ) + if self.value is not None or self._exception is not _NONE: + result += 'value=%r ' % self.value + if self._exception is not None and self._exception is not _NONE: + result += 'exception=%r ' % self._exception + if self._exception is _NONE: + result += 'unset ' + return result + ' _links[%s]>' % len(self._links) + + def ready(self): + """Return true if and only if it holds a value or an exception""" + return self._exc_info or self._value is not _NONE + + def successful(self): + """Return true if and only if it is ready and holds a value""" + return self._value is not _NONE + + @property + def exception(self): + """Holds the exception instance passed to :meth:`set_exception` if :meth:`set_exception` was called. + Otherwise ``None``.""" + if self._exc_info: + return self._exc_info[1] + + def set(self, value=None): + """Store the value and wake up any waiters. + + All greenlets blocking on :meth:`get` or :meth:`wait` are awakened. + Subsequent calls to :meth:`wait` and :meth:`get` will not block at all. + """ + self._value = value + self._check_and_notify() + + def set_exception(self, exception, exc_info=None): + """Store the exception and wake up any waiters. + + All greenlets blocking on :meth:`get` or :meth:`wait` are awakened. + Subsequent calls to :meth:`wait` and :meth:`get` will not block at all. + + :keyword tuple exc_info: If given, a standard three-tuple of type, value, :class:`traceback` + as returned by :func:`sys.exc_info`. This will be used when the exception + is re-raised to propagate the correct traceback. + """ + if exc_info: + self._exc_info = (exc_info[0], exc_info[1], dump_traceback(exc_info[2])) + else: + self._exc_info = (type(exception), exception, dump_traceback(None)) + + self._check_and_notify() + + def _raise_exception(self): + reraise(*self.exc_info) + + def get(self, block=True, timeout=None): + """Return the stored value or raise the exception. + + If this instance already holds a value or an exception, return or raise it immediatelly. + Otherwise, block until another greenlet calls :meth:`set` or :meth:`set_exception` or + until the optional timeout occurs. + + When the *timeout* argument is present and not ``None``, it should be a + floating point number specifying a timeout for the operation in seconds + (or fractions thereof). If the *timeout* elapses, the *Timeout* exception will + be raised. + + :keyword bool block: If set to ``False`` and this instance is not ready, + immediately raise a :class:`Timeout` exception. + """ + if self._value is not _NONE: + return self._value + if self._exc_info: + return self._raise_exception() + + if not block: + # Not ready and not blocking, so immediately timeout + raise Timeout() + + # Wait, raising a timeout that elapses + self._wait_core(timeout, ()) + + # by definition we are now ready + return self.get(block=False) + + def get_nowait(self): + """ + Return the value or raise the exception without blocking. + + If this object is not yet :meth:`ready <ready>`, raise + :class:`gevent.Timeout` immediately. + """ + return self.get(block=False) + + def _wait_return_value(self, waited, wait_success): + # pylint:disable=unused-argument + # Always return the value. Since this is a one-shot event, + # no race condition should reset it. + return self.value + + def wait(self, timeout=None): + """Block until the instance is ready. + + If this instance already holds a value, it is returned immediately. If this + instance already holds an exception, ``None`` is returned immediately. + + Otherwise, block until another greenlet calls :meth:`set` or :meth:`set_exception` + (at which point either the value or ``None`` will be returned, respectively), + or until the optional timeout expires (at which point ``None`` will also be + returned). + + When the *timeout* argument is present and not ``None``, it should be a + floating point number specifying a timeout for the operation in seconds + (or fractions thereof). + + .. note:: If a timeout is given and expires, ``None`` will be returned + (no timeout exception will be raised). + + """ + return self._wait(timeout) + + # link protocol + def __call__(self, source): + if source.successful(): + self.set(source.value) + else: + self.set_exception(source.exception, getattr(source, 'exc_info', None)) + + # Methods to make us more like concurrent.futures.Future + + def result(self, timeout=None): + return self.get(timeout=timeout) + + set_result = set + + def done(self): + return self.ready() + + # we don't support cancelling + + def cancel(self): + return False + + def cancelled(self): + return False + + # exception is a method, we use it as a property |