aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/hub.py
blob: 001cd064d3c7a869309eb20ed14ed83945a0c42e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
# Copyright (c) 2009-2015 Denis Bilenko. See LICENSE for details.
"""
Event-loop hub.
"""
from __future__ import absolute_import
# XXX: FIXME: Refactor to make this smaller
# pylint:disable=too-many-lines
from functools import partial as _functools_partial
import os
import sys
import traceback

from greenlet import greenlet as RawGreenlet, getcurrent, GreenletExit


__all__ = [
    'getcurrent',
    'GreenletExit',
    'spawn_raw',
    'sleep',
    'kill',
    'signal',
    'reinit',
    'get_hub',
    'Hub',
    'Waiter',
]

from gevent._compat import string_types
from gevent._compat import xrange
from gevent._util import _NONE
from gevent._util import readproperty

if sys.version_info[0] <= 2:
    import thread # pylint:disable=import-error
else:
    import _thread as thread # python 2 pylint:disable=import-error

# These must be the "real" native thread versions,
# not monkey-patched.
threadlocal = thread._local


class _threadlocal(threadlocal):

    def __init__(self):
        # Use a class with an initializer so that we can test
        # for 'is None' instead of catching AttributeError, making
        # the code cleaner and possibly solving some corner cases
        # (like #687)
        threadlocal.__init__(self)
        self.Hub = None
        self.loop = None
        self.hub = None

_threadlocal = _threadlocal()

get_ident = thread.get_ident
MAIN_THREAD = get_ident()




class LoopExit(Exception):
    """
    Exception thrown when the hub finishes running.

    In a normal application, this is never thrown or caught
    explicitly. The internal implementation of functions like
    :func:`join` and :func:`joinall` may catch it, but user code
    generally should not.

    .. caution::
       Errors in application programming can also lead to this exception being
       raised. Some examples include (but are not limited too):

       - greenlets deadlocking on a lock;
       - using a socket or other gevent object with native thread
         affinity from a different thread

    """
    pass


class BlockingSwitchOutError(AssertionError):
    pass


class InvalidSwitchError(AssertionError):
    pass


class ConcurrentObjectUseError(AssertionError):
    # raised when an object is used (waited on) by two greenlets
    # independently, meaning the object was entered into a blocking
    # state by one greenlet and then another while still blocking in the
    # first one
    pass


def spawn_raw(function, *args, **kwargs):
    """
    Create a new :class:`greenlet.greenlet` object and schedule it to
    run ``function(*args, **kwargs)``.

    This returns a raw :class:`~greenlet.greenlet` which does not have all the useful
    methods that :class:`gevent.Greenlet` has. Typically, applications
    should prefer :func:`~gevent.spawn`, but this method may
    occasionally be useful as an optimization if there are many
    greenlets involved.

    .. versionchanged:: 1.1b1
       If *function* is not callable, immediately raise a :exc:`TypeError`
       instead of spawning a greenlet that will raise an uncaught TypeError.

    .. versionchanged:: 1.1rc2
        Accept keyword arguments for ``function`` as previously (incorrectly)
        documented. Note that this may incur an additional expense.

    .. versionchanged:: 1.1a3
        Verify that ``function`` is callable, raising a TypeError if not. Previously,
        the spawned greenlet would have failed the first time it was switched to.
    """
    if not callable(function):
        raise TypeError("function must be callable")
    hub = get_hub()

    # The callback class object that we use to run this doesn't
    # accept kwargs (and those objects are heavily used, as well as being
    # implemented twice in core.ppyx and corecffi.py) so do it with a partial
    if kwargs:
        function = _functools_partial(function, *args, **kwargs)
        g = RawGreenlet(function, hub)
        hub.loop.run_callback(g.switch)
    else:
        g = RawGreenlet(function, hub)
        hub.loop.run_callback(g.switch, *args)
    return g


def sleep(seconds=0, ref=True):
    """
    Put the current greenlet to sleep for at least *seconds*.

    *seconds* may be specified as an integer, or a float if fractional
    seconds are desired.

    .. tip:: In the current implementation, a value of 0 (the default)
       means to yield execution to any other runnable greenlets, but
       this greenlet may be scheduled again before the event loop
       cycles (in an extreme case, a greenlet that repeatedly sleeps
       with 0 can prevent greenlets that are ready to do I/O from
       being scheduled for some (small) period of time); a value greater than
       0, on the other hand, will delay running this greenlet until
       the next iteration of the loop.

    If *ref* is False, the greenlet running ``sleep()`` will not prevent :func:`gevent.wait`
    from exiting.

    .. seealso:: :func:`idle`
    """
    hub = get_hub()
    loop = hub.loop
    if seconds <= 0:
        waiter = Waiter()
        loop.run_callback(waiter.switch)
        waiter.get()
    else:
        hub.wait(loop.timer(seconds, ref=ref))


def idle(priority=0):
    """
    Cause the calling greenlet to wait until the event loop is idle.

    Idle is defined as having no other events of the same or higher
    *priority* pending. That is, as long as sockets, timeouts or even
    signals of the same or higher priority are being processed, the loop
    is not idle.

    .. seealso:: :func:`sleep`
    """
    hub = get_hub()
    watcher = hub.loop.idle()
    if priority:
        watcher.priority = priority
    hub.wait(watcher)


def kill(greenlet, exception=GreenletExit):
    """
    Kill greenlet asynchronously. The current greenlet is not unscheduled.

    .. note::

        The method :meth:`Greenlet.kill` method does the same and
        more (and the same caveats listed there apply here). However, the MAIN
        greenlet - the one that exists initially - does not have a
        ``kill()`` method, and neither do any created with :func:`spawn_raw`,
        so you have to use this function.

    .. caution:: Use care when killing greenlets. If they are not prepared for
       exceptions, this could result in corrupted state.

    .. versionchanged:: 1.1a2
        If the ``greenlet`` has a :meth:`kill <Greenlet.kill>` method, calls it. This prevents a
        greenlet from being switched to for the first time after it's been
        killed but not yet executed.
    """
    if not greenlet.dead:
        if hasattr(greenlet, 'kill'):
            # dealing with gevent.greenlet.Greenlet. Use it, especially
            # to avoid allowing one to be switched to for the first time
            # after it's been killed
            greenlet.kill(exception=exception, block=False)
        else:
            get_hub().loop.run_callback(greenlet.throw, exception)


class signal(object):
    """
    Call the *handler* with the *args* and *kwargs* when the process
    receives the signal *signalnum*.

    The *handler* will be run in a new greenlet when the signal is delivered.

    This returns an object with the useful method ``cancel``, which, when called,
    will prevent future deliveries of *signalnum* from calling *handler*.

    .. note::

        This may not operate correctly with SIGCHLD if libev child watchers
        are used (as they are by default with os.fork).

    .. versionchanged:: 1.2a1
       The ``handler`` argument is required to be callable at construction time.
    """

    # XXX: This is manually documented in gevent.rst while it is aliased in
    # the gevent module.

    greenlet_class = None

    def __init__(self, signalnum, handler, *args, **kwargs):
        if not callable(handler):
            raise TypeError("signal handler must be callable.")

        self.hub = get_hub()
        self.watcher = self.hub.loop.signal(signalnum, ref=False)
        self.watcher.start(self._start)
        self.handler = handler
        self.args = args
        self.kwargs = kwargs
        if self.greenlet_class is None:
            from gevent import Greenlet
            self.greenlet_class = Greenlet

    def _get_ref(self):
        return self.watcher.ref

    def _set_ref(self, value):
        self.watcher.ref = value

    ref = property(_get_ref, _set_ref)
    del _get_ref, _set_ref

    def cancel(self):
        self.watcher.stop()

    def _start(self):
        try:
            greenlet = self.greenlet_class(self.handle)
            greenlet.switch()
        except: # pylint:disable=bare-except
            self.hub.handle_error(None, *sys._exc_info()) # pylint:disable=no-member

    def handle(self):
        try:
            self.handler(*self.args, **self.kwargs)
        except: # pylint:disable=bare-except
            self.hub.handle_error(None, *sys.exc_info())


def reinit():
    """
    Prepare the gevent hub to run in a new (forked) process.

    This should be called *immediately* after :func:`os.fork` in the
    child process. This is done automatically by
    :func:`gevent.os.fork` or if the :mod:`os` module has been
    monkey-patched. If this function is not called in a forked
    process, symptoms may include hanging of functions like
    :func:`socket.getaddrinfo`, and the hub's threadpool is unlikely
    to work.

    .. note:: Registered fork watchers may or may not run before
       this function (and thus ``gevent.os.fork``) return. If they have
       not run, they will run "soon", after an iteration of the event loop.
       You can force this by inserting a few small (but non-zero) calls to :func:`sleep`
       after fork returns. (As of gevent 1.1 and before, fork watchers will
       not have run, but this may change in the future.)

    .. note:: This function may be removed in a future major release
       if the fork process can be more smoothly managed.

    .. warning:: See remarks in :func:`gevent.os.fork` about greenlets
       and libev watchers in the child process.
    """
    # The loop reinit function in turn calls libev's ev_loop_fork
    # function.
    hub = _get_hub()

    if hub is not None:
        # Note that we reinit the existing loop, not destroy it.
        # See https://github.com/gevent/gevent/issues/200.
        hub.loop.reinit()
        # libev's fork watchers are slow to fire because the only fire
        # at the beginning of a loop; due to our use of callbacks that
        # run at the end of the loop, that may be too late. The
        # threadpool and resolvers depend on the fork handlers being
        # run (specifically, the threadpool will fail in the forked
        # child if there were any threads in it, which there will be
        # if the resolver_thread was in use (the default) before the
        # fork.)
        #
        # If the forked process wants to use the threadpool or
        # resolver immediately (in a queued callback), it would hang.
        #
        # The below is a workaround. Fortunately, both of these
        # methods are idempotent and can be called multiple times
        # following a fork if the suddenly started working, or were
        # already working on some platforms. Other threadpools and fork handlers
        # will be called at an arbitrary time later ('soon')
        if hasattr(hub.threadpool, '_on_fork'):
            hub.threadpool._on_fork()
        # resolver_ares also has a fork watcher that's not firing
        if hasattr(hub.resolver, '_on_fork'):
            hub.resolver._on_fork()

        # TODO: We'd like to sleep for a non-zero amount of time to force the loop to make a
        # pass around before returning to this greenlet. That will allow any
        # user-provided fork watchers to run. (Two calls are necessary.) HOWEVER, if
        # we do this, certain tests that heavily mix threads and forking,
        # like 2.7/test_threading:test_reinit_tls_after_fork, fail. It's not immediately clear
        # why.
        #sleep(0.00001)
        #sleep(0.00001)


def get_hub_class():
    """Return the type of hub to use for the current thread.

    If there's no type of hub for the current thread yet, 'gevent.hub.Hub' is used.
    """
    hubtype = _threadlocal.Hub
    if hubtype is None:
        hubtype = _threadlocal.Hub = Hub
    return hubtype


def get_hub(*args, **kwargs):
    """
    Return the hub for the current thread.

    If a hub does not exist in the current thread, a new one is
    created of the type returned by :func:`get_hub_class`.
    """
    hub = _threadlocal.hub
    if hub is None:
        hubtype = get_hub_class()
        hub = _threadlocal.hub = hubtype(*args, **kwargs)
    return hub


def _get_hub():
    """Return the hub for the current thread.

    Return ``None`` if no hub has been created yet.
    """
    return _threadlocal.hub


def set_hub(hub):
    _threadlocal.hub = hub


def _import(path):
    # pylint:disable=too-many-branches
    if isinstance(path, list):
        if not path:
            raise ImportError('Cannot import from empty list: %r' % (path, ))

        for item in path[:-1]:
            try:
                return _import(item)
            except ImportError:
                pass

        return _import(path[-1])

    if not isinstance(path, string_types):
        return path

    if '.' not in path:
        raise ImportError("Cannot import %r (required format: [path/][package.]module.class)" % path)

    if '/' in path:
        package_path, path = path.rsplit('/', 1)
        sys.path = [package_path] + sys.path
    else:
        package_path = None

    try:
        module, item = path.rsplit('.', 1)
        x = __import__(module)
        for attr in path.split('.')[1:]:
            oldx = x
            x = getattr(x, attr, _NONE)
            if x is _NONE:
                raise ImportError('Cannot import %r from %r' % (attr, oldx))
        return x
    finally:
        try:
            sys.path.remove(package_path)
        except ValueError:
            pass


def config(default, envvar):
    result = os.environ.get(envvar) or default # absolute import gets confused pylint: disable=no-member
    if isinstance(result, string_types):
        return result.split(',')
    return result


def resolver_config(default, envvar):
    result = config(default, envvar)
    return [_resolvers.get(x, x) for x in result]


_resolvers = {'ares': 'gevent.resolver_ares.Resolver',
              'thread': 'gevent.resolver_thread.Resolver',
              'block': 'gevent.socket.BlockingResolver'}


_DEFAULT_LOOP_CLASS = 'gevent.core.loop'


class Hub(RawGreenlet):
    """A greenlet that runs the event loop.

    It is created automatically by :func:`get_hub`.

    **Switching**

    Every time this greenlet (i.e., the event loop) is switched *to*, if
    the current greenlet has a ``switch_out`` method, it will be called. This
    allows a greenlet to take some cleanup actions before yielding control. This method
    should not call any gevent blocking functions.
    """

    #: If instances of these classes are raised into the event loop,
    #: they will be propagated out to the main greenlet (where they will
    #: usually be caught by Python itself)
    SYSTEM_ERROR = (KeyboardInterrupt, SystemExit, SystemError)

    #: Instances of these classes are not considered to be errors and
    #: do not get logged/printed when raised by the event loop.
    NOT_ERROR = (GreenletExit, SystemExit)

    loop_class = config(_DEFAULT_LOOP_CLASS, 'GEVENT_LOOP')
    # For the standard class, go ahead and import it when this class
    # is defined. This is no loss of generality because the envvar is
    # only read when this class is defined, and we know that the
    # standard class will be available. This can solve problems with
    # the class being imported from multiple threads at once, leading
    # to one of the imports failing. Only do this for the object we
    # need in the constructor, as the rest of the factories are
    # themselves handled lazily. See #687. (People using a custom loop_class
    # can probably manage to get_hub() from the main thread or otherwise import
    # that loop_class themselves.)
    if loop_class == [_DEFAULT_LOOP_CLASS]:
        loop_class = [_import(loop_class)]

    resolver_class = ['gevent.resolver_thread.Resolver',
                      'gevent.resolver_ares.Resolver',
                      'gevent.socket.BlockingResolver']
    #: The class or callable object, or the name of a factory function or class,
    #: that will be used to create :attr:`resolver`. By default, configured according to
    #: :doc:`dns`. If a list, a list of objects in preference order.
    resolver_class = resolver_config(resolver_class, 'GEVENT_RESOLVER')
    threadpool_class = config('gevent.threadpool.ThreadPool', 'GEVENT_THREADPOOL')
    backend = config(None, 'GEVENT_BACKEND')
    threadpool_size = 10

    # using pprint.pformat can override custom __repr__ methods on dict/list
    # subclasses, which can be a security concern
    format_context = 'pprint.saferepr'


    def __init__(self, loop=None, default=None):
        RawGreenlet.__init__(self)
        if hasattr(loop, 'run'):
            if default is not None:
                raise TypeError("Unexpected argument: default")
            self.loop = loop
        elif _threadlocal.loop is not None:
            # Reuse a loop instance previously set by
            # destroying a hub without destroying the associated
            # loop. See #237 and #238.
            self.loop = _threadlocal.loop
        else:
            if default is None and get_ident() != MAIN_THREAD:
                default = False
            loop_class = _import(self.loop_class)
            if loop is None:
                loop = self.backend
            self.loop = loop_class(flags=loop, default=default)
        self._resolver = None
        self._threadpool = None
        self.format_context = _import(self.format_context)

    def __repr__(self):
        if self.loop is None:
            info = 'destroyed'
        else:
            try:
                info = self.loop._format()
            except Exception as ex: # pylint:disable=broad-except
                info = str(ex) or repr(ex) or 'error'
        result = '<%s at 0x%x %s' % (self.__class__.__name__, id(self), info)
        if self._resolver is not None:
            result += ' resolver=%r' % self._resolver
        if self._threadpool is not None:
            result += ' threadpool=%r' % self._threadpool
        return result + '>'

    def handle_error(self, context, type, value, tb):
        """
        Called by the event loop when an error occurs. The arguments
        type, value, and tb are the standard tuple returned by :func:`sys.exc_info`.

        Applications can set a property on the hub with this same signature
        to override the error handling provided by this class.

        Errors that are :attr:`system errors <SYSTEM_ERROR>` are passed
        to :meth:`handle_system_error`.

        :param context: If this is ``None``, indicates a system error that
            should generally result in exiting the loop and being thrown to the
            parent greenlet.
        """
        if isinstance(value, str):
            # Cython can raise errors where the value is a plain string
            # e.g., AttributeError, "_semaphore.Semaphore has no attr", <traceback>
            value = type(value)
        if not issubclass(type, self.NOT_ERROR):
            self.print_exception(context, type, value, tb)
        if context is None or issubclass(type, self.SYSTEM_ERROR):
            self.handle_system_error(type, value)

    def handle_system_error(self, type, value):
        current = getcurrent()
        if current is self or current is self.parent or self.loop is None:
            self.parent.throw(type, value)
        else:
            # in case system error was handled and life goes on
            # switch back to this greenlet as well
            cb = None
            try:
                cb = self.loop.run_callback(current.switch)
            except: # pylint:disable=bare-except
                traceback.print_exc(file=self.exception_stream)
            try:
                self.parent.throw(type, value)
            finally:
                if cb is not None:
                    cb.stop()

    @readproperty
    def exception_stream(self):
        """
        The stream to which exceptions will be written.
        Defaults to ``sys.stderr`` unless assigned to.

        .. versionadded:: 1.2a1
        """
        # Unwrap any FileObjectThread we have thrown around sys.stderr
        # (because it can't be used in the hub). Tricky because we are
        # called in error situations when it's not safe to import.
        stderr = sys.stderr
        if type(stderr).__name__ == 'FileObjectThread':
            stderr = stderr.io # pylint:disable=no-member
        return stderr

    def print_exception(self, context, type, value, tb):
        # Python 3 does not gracefully handle None value or tb in
        # traceback.print_exception() as previous versions did.
        # pylint:disable=no-member
        errstream = self.exception_stream

        if value is None:
            errstream.write('%s\n' % type.__name__)
        else:
            traceback.print_exception(type, value, tb, file=errstream)
        del tb

        try:
            import time
            errstream.write(time.ctime())
            errstream.write(' ' if context is not None else '\n')
        except: # pylint:disable=bare-except
            # Possible not safe to import under certain
            # error conditions in Python 2
            pass

        if context is not None:
            if not isinstance(context, str):
                try:
                    context = self.format_context(context)
                except: # pylint:disable=bare-except
                    traceback.print_exc(file=self.exception_stream)
                    context = repr(context)
            errstream.write('%s failed with %s\n\n' % (context, getattr(type, '__name__', 'exception'), ))

    def switch(self):
        switch_out = getattr(getcurrent(), 'switch_out', None)
        if switch_out is not None:
            switch_out()
        return RawGreenlet.switch(self)

    def switch_out(self):
        raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback')

    def wait(self, watcher):
        """
        Wait until the *watcher* (which should not be started) is ready.

        The current greenlet will be unscheduled during this time.

        .. seealso:: :class:`gevent.core.io`, :class:`gevent.core.timer`,
            :class:`gevent.core.signal`, :class:`gevent.core.idle`, :class:`gevent.core.prepare`,
            :class:`gevent.core.check`, :class:`gevent.core.fork`, :class:`gevent.core.async`,
            :class:`gevent.core.child`, :class:`gevent.core.stat`

        """
        waiter = Waiter()
        unique = object()
        watcher.start(waiter.switch, unique)
        try:
            result = waiter.get()
            if result is not unique:
                raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique))
        finally:
            watcher.stop()

    def cancel_wait(self, watcher, error):
        """
        Cancel an in-progress call to :meth:`wait` by throwing the given *error*
        in the waiting greenlet.
        """
        if watcher.callback is not None:
            self.loop.run_callback(self._cancel_wait, watcher, error)

    def _cancel_wait(self, watcher, error):
        if watcher.active:
            switch = watcher.callback
            if switch is not None:
                greenlet = getattr(switch, '__self__', None)
                if greenlet is not None:
                    greenlet.throw(error)

    def run(self):
        """
        Entry-point to running the loop. This method is called automatically
        when the hub greenlet is scheduled; do not call it directly.

        :raises LoopExit: If the loop finishes running. This means
           that there are no other scheduled greenlets, and no active
           watchers or servers. In some situations, this indicates a
           programming error.
        """
        assert self is getcurrent(), 'Do not call Hub.run() directly'
        while True:
            loop = self.loop
            loop.error_handler = self
            try:
                loop.run()
            finally:
                loop.error_handler = None  # break the refcount cycle
            self.parent.throw(LoopExit('This operation would block forever', self))
        # this function must never return, as it will cause switch() in the parent greenlet
        # to return an unexpected value
        # It is still possible to kill this greenlet with throw. However, in that case
        # switching to it is no longer safe, as switch will return immediatelly

    def join(self, timeout=None):
        """Wait for the event loop to finish. Exits only when there are
        no more spawned greenlets, started servers, active timeouts or watchers.

        If *timeout* is provided, wait no longer for the specified number of seconds.

        Returns True if exited because the loop finished execution.
        Returns False if exited because of timeout expired.
        """
        assert getcurrent() is self.parent, "only possible from the MAIN greenlet"
        if self.dead:
            return True

        waiter = Waiter()

        if timeout is not None:
            timeout = self.loop.timer(timeout, ref=False)
            timeout.start(waiter.switch)

        try:
            try:
                waiter.get()
            except LoopExit:
                return True
        finally:
            if timeout is not None:
                timeout.stop()
        return False

    def destroy(self, destroy_loop=None):
        if self._resolver is not None:
            self._resolver.close()
            del self._resolver
        if self._threadpool is not None:
            self._threadpool.kill()
            del self._threadpool
        if destroy_loop is None:
            destroy_loop = not self.loop.default
        if destroy_loop:
            if _threadlocal.loop is self.loop:
                # Don't let anyone try to reuse this
                _threadlocal.loop = None
            self.loop.destroy()
        else:
            # Store in case another hub is created for this
            # thread.
            _threadlocal.loop = self.loop

        self.loop = None
        if _threadlocal.hub is self:
            _threadlocal.hub = None

    def _get_resolver(self):
        if self._resolver is None:
            if self.resolver_class is not None:
                self.resolver_class = _import(self.resolver_class)
                self._resolver = self.resolver_class(hub=self)
        return self._resolver

    def _set_resolver(self, value):
        self._resolver = value

    def _del_resolver(self):
        del self._resolver

    resolver = property(_get_resolver, _set_resolver, _del_resolver)

    def _get_threadpool(self):
        if self._threadpool is None:
            if self.threadpool_class is not None:
                self.threadpool_class = _import(self.threadpool_class)
                self._threadpool = self.threadpool_class(self.threadpool_size, hub=self)
        return self._threadpool

    def _set_threadpool(self, value):
        self._threadpool = value

    def _del_threadpool(self):
        del self._threadpool

    threadpool = property(_get_threadpool, _set_threadpool, _del_threadpool)


class Waiter(object):
    """
    A low level communication utility for greenlets.

    Waiter is a wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them somewhat safer:

    * switching will occur only if the waiting greenlet is executing :meth:`get` method currently;
    * any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
    * if :meth:`switch`/:meth:`throw` is called before the receiver calls :meth:`get`, then :class:`Waiter`
      will store the value/exception. The following :meth:`get` will return the value/raise the exception.

    The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
    The :meth:`get` method must be called from a greenlet other than :class:`Hub`.

        >>> result = Waiter()
        >>> timer = get_hub().loop.timer(0.1)
        >>> timer.start(result.switch, 'hello from Waiter')
        >>> result.get() # blocks for 0.1 seconds
        'hello from Waiter'

    If switch is called before the greenlet gets a chance to call :meth:`get` then
    :class:`Waiter` stores the value.

        >>> result = Waiter()
        >>> timer = get_hub().loop.timer(0.1)
        >>> timer.start(result.switch, 'hi from Waiter')
        >>> sleep(0.2)
        >>> result.get() # returns immediatelly without blocking
        'hi from Waiter'

    .. warning::

        This a limited and dangerous way to communicate between
        greenlets. It can easily leave a greenlet unscheduled forever
        if used incorrectly. Consider using safer classes such as
        :class:`gevent.event.Event`, :class:`gevent.event.AsyncResult`,
        or :class:`gevent.queue.Queue`.
    """

    __slots__ = ['hub', 'greenlet', 'value', '_exception']

    def __init__(self, hub=None):
        if hub is None:
            self.hub = get_hub()
        else:
            self.hub = hub
        self.greenlet = None
        self.value = None
        self._exception = _NONE

    def clear(self):
        self.greenlet = None
        self.value = None
        self._exception = _NONE

    def __str__(self):
        if self._exception is _NONE:
            return '<%s greenlet=%s>' % (type(self).__name__, self.greenlet)
        if self._exception is None:
            return '<%s greenlet=%s value=%r>' % (type(self).__name__, self.greenlet, self.value)
        return '<%s greenlet=%s exc_info=%r>' % (type(self).__name__, self.greenlet, self.exc_info)

    def ready(self):
        """Return true if and only if it holds a value or an exception"""
        return self._exception is not _NONE

    def successful(self):
        """Return true if and only if it is ready and holds a value"""
        return self._exception is None

    @property
    def exc_info(self):
        "Holds the exception info passed to :meth:`throw` if :meth:`throw` was called. Otherwise ``None``."
        if self._exception is not _NONE:
            return self._exception

    def switch(self, value=None):
        """Switch to the greenlet if one's available. Otherwise store the value."""
        greenlet = self.greenlet
        if greenlet is None:
            self.value = value
            self._exception = None
        else:
            assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
            switch = greenlet.switch
            try:
                switch(value)
            except: # pylint:disable=bare-except
                self.hub.handle_error(switch, *sys.exc_info())

    def switch_args(self, *args):
        return self.switch(args)

    def throw(self, *throw_args):
        """Switch to the greenlet with the exception. If there's no greenlet, store the exception."""
        greenlet = self.greenlet
        if greenlet is None:
            self._exception = throw_args
        else:
            assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
            throw = greenlet.throw
            try:
                throw(*throw_args)
            except: # pylint:disable=bare-except
                self.hub.handle_error(throw, *sys.exc_info())

    def get(self):
        """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
        if self._exception is not _NONE:
            if self._exception is None:
                return self.value
            else:
                getcurrent().throw(*self._exception)
        else:
            if self.greenlet is not None:
                raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
            self.greenlet = getcurrent()
            try:
                return self.hub.switch()
            finally:
                self.greenlet = None

    def __call__(self, source):
        if source.exception is None:
            self.switch(source.value)
        else:
            self.throw(source.exception)

    # can also have a debugging version, that wraps the value in a tuple (self, value) in switch()
    # and unwraps it in wait() thus checking that switch() was indeed called


class _MultipleWaiter(Waiter):
    """
    An internal extension of Waiter that can be used if multiple objects
    must be waited on, and there is a chance that in between waits greenlets
    might be switched out. All greenlets that switch to this waiter
    will have their value returned.

    This does not handle exceptions or throw methods.
    """
    __slots__ = ['_values']

    def __init__(self, *args, **kwargs):
        Waiter.__init__(self, *args, **kwargs)
        # we typically expect a relatively small number of these to be outstanding.
        # since we pop from the left, a deque might be slightly
        # more efficient, but since we're in the hub we avoid imports if
        # we can help it to better support monkey-patching, and delaying the import
        # here can be impractical (see https://github.com/gevent/gevent/issues/652)
        self._values = list()

    def switch(self, value): # pylint:disable=signature-differs
        self._values.append(value)
        Waiter.switch(self, True)

    def get(self):
        if not self._values:
            Waiter.get(self)
            Waiter.clear(self)

        return self._values.pop(0)


def iwait(objects, timeout=None, count=None):
    """
    Iteratively yield *objects* as they are ready, until all (or *count*) are ready
    or *timeout* expired.

    :param objects: A sequence (supporting :func:`len`) containing objects
        implementing the wait protocol (rawlink() and unlink()).
    :keyword int count: If not `None`, then a number specifying the maximum number
        of objects to wait for. If ``None`` (the default), all objects
        are waited for.
    :keyword float timeout: If given, specifies a maximum number of seconds
        to wait. If the timeout expires before the desired waited-for objects
        are available, then this method returns immediately.

    .. seealso:: :func:`wait`

    .. versionchanged:: 1.1a1
       Add the *count* parameter.
    .. versionchanged:: 1.1a2
       No longer raise :exc:`LoopExit` if our caller switches greenlets
       in between items yielded by this function.
    """
    # QQQ would be nice to support iterable here that can be generated slowly (why?)
    if objects is None:
        yield get_hub().join(timeout=timeout)
        return

    count = len(objects) if count is None else min(count, len(objects))
    waiter = _MultipleWaiter()
    switch = waiter.switch

    if timeout is not None:
        timer = get_hub().loop.timer(timeout, priority=-1)
        timer.start(switch, _NONE)

    try:
        for obj in objects:
            obj.rawlink(switch)

        for _ in xrange(count):
            item = waiter.get()
            waiter.clear()
            if item is _NONE:
                return
            yield item
    finally:
        if timeout is not None:
            timer.stop()
        for aobj in objects:
            unlink = getattr(aobj, 'unlink', None)
            if unlink:
                try:
                    unlink(switch)
                except: # pylint:disable=bare-except
                    traceback.print_exc()


def wait(objects=None, timeout=None, count=None):
    """
    Wait for ``objects`` to become ready or for event loop to finish.

    If ``objects`` is provided, it must be a list containing objects
    implementing the wait protocol (rawlink() and unlink() methods):

    - :class:`gevent.Greenlet` instance
    - :class:`gevent.event.Event` instance
    - :class:`gevent.lock.Semaphore` instance
    - :class:`gevent.subprocess.Popen` instance

    If ``objects`` is ``None`` (the default), ``wait()`` blocks until
    the current event loop has nothing to do (or until ``timeout`` passes):

    - all greenlets have finished
    - all servers were stopped
    - all event loop watchers were stopped.

    If ``count`` is ``None`` (the default), wait for all ``objects``
    to become ready.

    If ``count`` is a number, wait for (up to) ``count`` objects to become
    ready. (For example, if count is ``1`` then the function exits
    when any object in the list is ready).

    If ``timeout`` is provided, it specifies the maximum number of
    seconds ``wait()`` will block.

    Returns the list of ready objects, in the order in which they were
    ready.

    .. seealso:: :func:`iwait`
    """
    if objects is None:
        return get_hub().join(timeout=timeout)
    return list(iwait(objects, timeout, count))


class linkproxy(object):
    __slots__ = ['callback', 'obj']

    def __init__(self, callback, obj):
        self.callback = callback
        self.obj = obj

    def __call__(self, *args):
        callback = self.callback
        obj = self.obj
        self.callback = None
        self.obj = None
        callback(obj)