aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/_hub_primitives.py
blob: d8ed031432a4aa9807c939574860c31f1ca669ef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
# -*- coding: utf-8 -*-
# copyright (c) 2018 gevent. See  LICENSE.
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False,binding=True
"""
A collection of primitives used by the hub, and suitable for
compilation with Cython because of their frequency of use.


"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import traceback

from gevent.exceptions import InvalidSwitchError
from gevent.exceptions import ConcurrentObjectUseError

from gevent import _greenlet_primitives
from gevent import _waiter
from gevent._util import _NONE
from gevent._hub_local import get_hub_noargs as get_hub
from gevent.timeout import Timeout

# In Cython, we define these as 'cdef inline' functions. The
# compilation unit cannot have a direct assignment to them (import
# is assignment) without generating a 'lvalue is not valid target'
# error.
locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None
locals()['Waiter'] = _waiter.Waiter
locals()['MultipleWaiter'] = _waiter.MultipleWaiter
locals()['SwitchOutGreenletWithLoop'] = _greenlet_primitives.SwitchOutGreenletWithLoop

__all__ = [
    'WaitOperationsGreenlet',
    'iwait_on_objects',
    'wait_on_objects',
    'wait_read',
    'wait_write',
    'wait_readwrite',
]

class WaitOperationsGreenlet(SwitchOutGreenletWithLoop): # pylint:disable=undefined-variable

    def wait(self, watcher):
        """
        Wait until the *watcher* (which must not be started) is ready.

        The current greenlet will be unscheduled during this time.
        """
        waiter = Waiter(self) # pylint:disable=undefined-variable
        watcher.start(waiter.switch, waiter)
        try:
            result = waiter.get()
            if result is not waiter:
                raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (
                    getcurrent(), # pylint:disable=undefined-variable
                    result, waiter))
        finally:
            watcher.stop()

    def cancel_wait(self, watcher, error, close_watcher=False):
        """
        Cancel an in-progress call to :meth:`wait` by throwing the given *error*
        in the waiting greenlet.

        .. versionchanged:: 1.3a1
           Added the *close_watcher* parameter. If true, the watcher
           will be closed after the exception is thrown. The watcher should then
           be discarded. Closing the watcher is important to release native resources.
        .. versionchanged:: 1.3a2
           Allow the *watcher* to be ``None``. No action is taken in that case.
        """
        if watcher is None:
            # Presumably already closed.
            # See https://github.com/gevent/gevent/issues/1089
            return
        if watcher.callback is not None:
            self.loop.run_callback(self._cancel_wait, watcher, error, close_watcher)
        elif close_watcher:
            watcher.close()

    def _cancel_wait(self, watcher, error, close_watcher):
        # We have to check again to see if it was still active by the time
        # our callback actually runs.
        active = watcher.active
        cb = watcher.callback
        if close_watcher:
            watcher.close()
        if active:
            # The callback should be greenlet.switch(). It may or may not be None.
            glet = getattr(cb, '__self__', None)
            if glet is not None:
                glet.throw(error)


class _WaitIterator(object):

    def __init__(self, objects, hub, timeout, count):
        self._hub = hub
        self._waiter = MultipleWaiter(hub) # pylint:disable=undefined-variable
        self._switch = self._waiter.switch
        self._timeout = timeout
        self._objects = objects

        self._timer = None
        self._begun = False


        # Even if we're only going to return 1 object,
        # we must still rawlink() *all* of them, so that no
        # matter which one finishes first we find it.
        self._count = len(objects) if count is None else min(count, len(objects))


    def __iter__(self):
        # When we begin iterating, we begin the timer.
        # XXX: If iteration doesn't actually happen, we
        # could leave these links around!
        if not self._begun:
            self._begun = True

            for obj in self._objects:
                obj.rawlink(self._switch)

            if self._timeout is not None:
                self._timer = self._hub.loop.timer(self._timeout, priority=-1)
                self._timer.start(self._switch, self)
        return self

    def __next__(self):
        if self._count == 0:
            # Exhausted
            self._cleanup()
            raise StopIteration()

        self._count -= 1
        try:
            item = self._waiter.get()
            self._waiter.clear()
            if item is self:
                # Timer expired, no more
                self._cleanup()
                raise StopIteration()
            return item
        except:
            self._cleanup()
            raise

    next = __next__

    def _cleanup(self):
        if self._timer is not None:
            self._timer.close()
            self._timer = None

        objs = self._objects
        self._objects = ()
        for aobj in objs:
            unlink = getattr(aobj, 'unlink', None)
            if unlink is not None:
                try:
                    unlink(self._switch)
                except: # pylint:disable=bare-except
                    traceback.print_exc()


def iwait_on_objects(objects, timeout=None, count=None):
    """
    Iteratively yield *objects* as they are ready, until all (or *count*) are ready
    or *timeout* expired.

    :param objects: A sequence (supporting :func:`len`) containing objects
        implementing the wait protocol (rawlink() and unlink()).
    :keyword int count: If not `None`, then a number specifying the maximum number
        of objects to wait for. If ``None`` (the default), all objects
        are waited for.
    :keyword float timeout: If given, specifies a maximum number of seconds
        to wait. If the timeout expires before the desired waited-for objects
        are available, then this method returns immediately.

    .. seealso:: :func:`wait`

    .. versionchanged:: 1.1a1
       Add the *count* parameter.
    .. versionchanged:: 1.1a2
       No longer raise :exc:`LoopExit` if our caller switches greenlets
       in between items yielded by this function.
    """
    # QQQ would be nice to support iterable here that can be generated slowly (why?)
    hub = get_hub()
    if objects is None:
        return [hub.join(timeout=timeout)]
    return _WaitIterator(objects, hub, timeout, count)


def wait_on_objects(objects=None, timeout=None, count=None):
    """
    Wait for ``objects`` to become ready or for event loop to finish.

    If ``objects`` is provided, it must be a list containing objects
    implementing the wait protocol (rawlink() and unlink() methods):

    - :class:`gevent.Greenlet` instance
    - :class:`gevent.event.Event` instance
    - :class:`gevent.lock.Semaphore` instance
    - :class:`gevent.subprocess.Popen` instance

    If ``objects`` is ``None`` (the default), ``wait()`` blocks until
    the current event loop has nothing to do (or until ``timeout`` passes):

    - all greenlets have finished
    - all servers were stopped
    - all event loop watchers were stopped.

    If ``count`` is ``None`` (the default), wait for all ``objects``
    to become ready.

    If ``count`` is a number, wait for (up to) ``count`` objects to become
    ready. (For example, if count is ``1`` then the function exits
    when any object in the list is ready).

    If ``timeout`` is provided, it specifies the maximum number of
    seconds ``wait()`` will block.

    Returns the list of ready objects, in the order in which they were
    ready.

    .. seealso:: :func:`iwait`
    """
    if objects is None:
        hub = get_hub()
        return hub.join(timeout=timeout) # pylint:disable=
    return list(iwait_on_objects(objects, timeout, count))

_timeout_error = Exception

def set_default_timeout_error(e):
    global _timeout_error
    _timeout_error = e

def _primitive_wait(watcher, timeout, timeout_exc, hub):
    if watcher.callback is not None:
        raise ConcurrentObjectUseError('This socket is already used by another greenlet: %r'
                                       % (watcher.callback, ))

    if hub is None:
        hub = get_hub()

    if timeout is None:
        hub.wait(watcher)
        return

    timeout = Timeout._start_new_or_dummy(
        timeout,
        (timeout_exc
         if timeout_exc is not _NONE or timeout is None
         else _timeout_error('timed out')))

    with timeout:
        hub.wait(watcher)

# Suitable to be bound as an instance method
def wait_on_socket(socket, watcher, timeout_exc=None):
    _primitive_wait(watcher, socket.timeout,
                    timeout_exc if timeout_exc is not None else _NONE,
                    socket.hub)

def wait_on_watcher(watcher, timeout=None, timeout_exc=_NONE, hub=None):
    """
    wait(watcher, timeout=None, [timeout_exc=None]) -> None

    Block the current greenlet until *watcher* is ready.

    If *timeout* is non-negative, then *timeout_exc* is raised after
    *timeout* second has passed.

    If :func:`cancel_wait` is called on *io* by another greenlet,
    raise an exception in this blocking greenlet
    (``socket.error(EBADF, 'File descriptor was closed in another
    greenlet')`` by default).

    :param io: An event loop watcher, most commonly an IO watcher obtained from
        :meth:`gevent.core.loop.io`
    :keyword timeout_exc: The exception to raise if the timeout expires.
        By default, a :class:`socket.timeout` exception is raised.
        If you pass a value for this keyword, it is interpreted as for
        :class:`gevent.timeout.Timeout`.

    :raises ~gevent.hub.ConcurrentObjectUseError: If the *watcher* is
        already started.
    """
    _primitive_wait(watcher, timeout, timeout_exc, hub)


def wait_read(fileno, timeout=None, timeout_exc=_NONE):
    """
    wait_read(fileno, timeout=None, [timeout_exc=None]) -> None

    Block the current greenlet until *fileno* is ready to read.

    For the meaning of the other parameters and possible exceptions,
    see :func:`wait`.

    .. seealso:: :func:`cancel_wait`
    """
    hub = get_hub()
    io = hub.loop.io(fileno, 1)
    try:
        return wait_on_watcher(io, timeout, timeout_exc, hub)
    finally:
        io.close()


def wait_write(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
    """
    wait_write(fileno, timeout=None, [timeout_exc=None]) -> None

    Block the current greenlet until *fileno* is ready to write.

    For the meaning of the other parameters and possible exceptions,
    see :func:`wait`.

    .. deprecated:: 1.1
       The keyword argument *event* is ignored. Applications should not pass this parameter.
       In the future, doing so will become an error.

    .. seealso:: :func:`cancel_wait`
    """
    # pylint:disable=unused-argument
    hub = get_hub()
    io = hub.loop.io(fileno, 2)
    try:
        return wait_on_watcher(io, timeout, timeout_exc, hub)
    finally:
        io.close()


def wait_readwrite(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
    """
    wait_readwrite(fileno, timeout=None, [timeout_exc=None]) -> None

    Block the current greenlet until *fileno* is ready to read or
    write.

    For the meaning of the other parameters and possible exceptions,
    see :func:`wait`.

    .. deprecated:: 1.1
       The keyword argument *event* is ignored. Applications should not pass this parameter.
       In the future, doing so will become an error.

    .. seealso:: :func:`cancel_wait`
    """
    # pylint:disable=unused-argument
    hub = get_hub()
    io = hub.loop.io(fileno, 3)
    try:
        return wait_on_watcher(io, timeout, timeout_exc, hub)
    finally:
        io.close()


def _init():
    greenlet_init() # pylint:disable=undefined-variable

_init()

from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent.__hub_primitives')