1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
|
# Copyright (c) 2009-2015 Denis Bilenko. See LICENSE for details.
"""
Event-loop hub.
"""
from __future__ import absolute_import, print_function
# XXX: FIXME: Refactor to make this smaller
# pylint:disable=too-many-lines
from functools import partial as _functools_partial
import sys
import traceback
from greenlet import greenlet as RawGreenlet
from greenlet import getcurrent
from greenlet import GreenletExit
__all__ = [
'getcurrent',
'GreenletExit',
'spawn_raw',
'sleep',
'kill',
'signal',
'reinit',
'get_hub',
'Hub',
'Waiter',
]
from gevent._config import config as GEVENT_CONFIG
from gevent._compat import thread_mod_name
from gevent._util import readproperty
from gevent._util import Lazy
from gevent._util import gmctime
from gevent._ident import IdentRegistry
from gevent._hub_local import get_hub
from gevent._hub_local import get_loop
from gevent._hub_local import set_hub
from gevent._hub_local import set_loop
from gevent._hub_local import get_hub_if_exists as _get_hub
from gevent._hub_local import get_hub_noargs as _get_hub_noargs
from gevent._hub_local import set_default_hub_class
from gevent._greenlet_primitives import TrackedRawGreenlet
from gevent._hub_primitives import WaitOperationsGreenlet
# Export
from gevent import _hub_primitives
wait = _hub_primitives.wait_on_objects
iwait = _hub_primitives.iwait_on_objects
from gevent.exceptions import LoopExit
from gevent._waiter import Waiter
# Need the real get_ident. We're imported early enough (by gevent/__init__.py)
# that we can be sure nothing is monkey patched yet.
get_thread_ident = __import__(thread_mod_name).get_ident
MAIN_THREAD_IDENT = get_thread_ident() # XXX: Assuming import is done on the main thread.
def spawn_raw(function, *args, **kwargs):
"""
Create a new :class:`greenlet.greenlet` object and schedule it to
run ``function(*args, **kwargs)``.
This returns a raw :class:`~greenlet.greenlet` which does not have all the useful
methods that :class:`gevent.Greenlet` has. Typically, applications
should prefer :func:`~gevent.spawn`, but this method may
occasionally be useful as an optimization if there are many
greenlets involved.
.. versionchanged:: 1.1a3
Verify that ``function`` is callable, raising a TypeError if not. Previously,
the spawned greenlet would have failed the first time it was switched to.
.. versionchanged:: 1.1b1
If *function* is not callable, immediately raise a :exc:`TypeError`
instead of spawning a greenlet that will raise an uncaught TypeError.
.. versionchanged:: 1.1rc2
Accept keyword arguments for ``function`` as previously (incorrectly)
documented. Note that this may incur an additional expense.
.. versionchanged:: 1.3a2
Populate the ``spawning_greenlet`` and ``spawn_tree_locals``
attributes of the returned greenlet.
.. versionchanged:: 1.3b1
*Only* populate ``spawning_greenlet`` and ``spawn_tree_locals``
if ``GEVENT_TRACK_GREENLET_TREE`` is enabled (the default). If not enabled,
those attributes will not be set.
"""
if not callable(function):
raise TypeError("function must be callable")
# The hub is always the parent.
hub = _get_hub_noargs()
factory = TrackedRawGreenlet if GEVENT_CONFIG.track_greenlet_tree else RawGreenlet
# The callback class object that we use to run this doesn't
# accept kwargs (and those objects are heavily used, as well as being
# implemented twice in core.ppyx and corecffi.py) so do it with a partial
if kwargs:
function = _functools_partial(function, *args, **kwargs)
g = factory(function, hub)
hub.loop.run_callback(g.switch)
else:
g = factory(function, hub)
hub.loop.run_callback(g.switch, *args)
return g
def sleep(seconds=0, ref=True):
"""
Put the current greenlet to sleep for at least *seconds*.
*seconds* may be specified as an integer, or a float if fractional
seconds are desired.
.. tip:: In the current implementation, a value of 0 (the default)
means to yield execution to any other runnable greenlets, but
this greenlet may be scheduled again before the event loop
cycles (in an extreme case, a greenlet that repeatedly sleeps
with 0 can prevent greenlets that are ready to do I/O from
being scheduled for some (small) period of time); a value greater than
0, on the other hand, will delay running this greenlet until
the next iteration of the loop.
If *ref* is False, the greenlet running ``sleep()`` will not prevent :func:`gevent.wait`
from exiting.
.. versionchanged:: 1.3a1
Sleeping with a value of 0 will now be bounded to approximately block the
loop for no longer than :func:`gevent.getswitchinterval`.
.. seealso:: :func:`idle`
"""
hub = _get_hub_noargs()
loop = hub.loop
if seconds <= 0:
waiter = Waiter(hub)
loop.run_callback(waiter.switch, None)
waiter.get()
else:
with loop.timer(seconds, ref=ref) as t:
# Sleeping is expected to be an "absolute" measure with
# respect to time.time(), not a relative measure, so it's
# important to update the loop's notion of now before we start
loop.update_now()
hub.wait(t)
def idle(priority=0):
"""
Cause the calling greenlet to wait until the event loop is idle.
Idle is defined as having no other events of the same or higher
*priority* pending. That is, as long as sockets, timeouts or even
signals of the same or higher priority are being processed, the loop
is not idle.
.. seealso:: :func:`sleep`
"""
hub = _get_hub_noargs()
watcher = hub.loop.idle()
if priority:
watcher.priority = priority
hub.wait(watcher)
def kill(greenlet, exception=GreenletExit):
"""
Kill greenlet asynchronously. The current greenlet is not unscheduled.
.. note::
The method :meth:`Greenlet.kill` method does the same and
more (and the same caveats listed there apply here). However, the MAIN
greenlet - the one that exists initially - does not have a
``kill()`` method, and neither do any created with :func:`spawn_raw`,
so you have to use this function.
.. caution:: Use care when killing greenlets. If they are not prepared for
exceptions, this could result in corrupted state.
.. versionchanged:: 1.1a2
If the ``greenlet`` has a :meth:`kill <Greenlet.kill>` method, calls it. This prevents a
greenlet from being switched to for the first time after it's been
killed but not yet executed.
"""
if not greenlet.dead:
if hasattr(greenlet, 'kill'):
# dealing with gevent.greenlet.Greenlet. Use it, especially
# to avoid allowing one to be switched to for the first time
# after it's been killed
greenlet.kill(exception=exception, block=False)
else:
_get_hub_noargs().loop.run_callback(greenlet.throw, exception)
class signal(object):
"""
Call the *handler* with the *args* and *kwargs* when the process
receives the signal *signalnum*.
The *handler* will be run in a new greenlet when the signal is delivered.
This returns an object with the useful method ``cancel``, which, when called,
will prevent future deliveries of *signalnum* from calling *handler*.
.. note::
This may not operate correctly with SIGCHLD if libev child watchers
are used (as they are by default with os.fork).
.. versionchanged:: 1.2a1
The ``handler`` argument is required to be callable at construction time.
"""
# XXX: This is manually documented in gevent.rst while it is aliased in
# the gevent module.
greenlet_class = None
def __init__(self, signalnum, handler, *args, **kwargs):
if not callable(handler):
raise TypeError("signal handler must be callable.")
self.hub = _get_hub_noargs()
self.watcher = self.hub.loop.signal(signalnum, ref=False)
self.watcher.start(self._start)
self.handler = handler
self.args = args
self.kwargs = kwargs
if self.greenlet_class is None:
from gevent import Greenlet
self.greenlet_class = Greenlet
def _get_ref(self):
return self.watcher.ref
def _set_ref(self, value):
self.watcher.ref = value
ref = property(_get_ref, _set_ref)
del _get_ref, _set_ref
def cancel(self):
self.watcher.stop()
def _start(self):
try:
greenlet = self.greenlet_class(self.handle)
greenlet.switch()
except: # pylint:disable=bare-except
self.hub.handle_error(None, *sys._exc_info()) # pylint:disable=no-member
def handle(self):
try:
self.handler(*self.args, **self.kwargs)
except: # pylint:disable=bare-except
self.hub.handle_error(None, *sys.exc_info())
def reinit(hub=None):
"""
reinit() -> None
Prepare the gevent hub to run in a new (forked) process.
This should be called *immediately* after :func:`os.fork` in the
child process. This is done automatically by
:func:`gevent.os.fork` or if the :mod:`os` module has been
monkey-patched. If this function is not called in a forked
process, symptoms may include hanging of functions like
:func:`socket.getaddrinfo`, and the hub's threadpool is unlikely
to work.
.. note:: Registered fork watchers may or may not run before
this function (and thus ``gevent.os.fork``) return. If they have
not run, they will run "soon", after an iteration of the event loop.
You can force this by inserting a few small (but non-zero) calls to :func:`sleep`
after fork returns. (As of gevent 1.1 and before, fork watchers will
not have run, but this may change in the future.)
.. note:: This function may be removed in a future major release
if the fork process can be more smoothly managed.
.. warning:: See remarks in :func:`gevent.os.fork` about greenlets
and event loop watchers in the child process.
"""
# Note the signature line in the docstring: hub is not a public param.
# The loop reinit function in turn calls libev's ev_loop_fork
# function.
hub = _get_hub() if hub is None else hub
if hub is None:
return
# Note that we reinit the existing loop, not destroy it.
# See https://github.com/gevent/gevent/issues/200.
hub.loop.reinit()
# libev's fork watchers are slow to fire because the only fire
# at the beginning of a loop; due to our use of callbacks that
# run at the end of the loop, that may be too late. The
# threadpool and resolvers depend on the fork handlers being
# run (specifically, the threadpool will fail in the forked
# child if there were any threads in it, which there will be
# if the resolver_thread was in use (the default) before the
# fork.)
#
# If the forked process wants to use the threadpool or
# resolver immediately (in a queued callback), it would hang.
#
# The below is a workaround. Fortunately, all of these
# methods are idempotent and can be called multiple times
# following a fork if the suddenly started working, or were
# already working on some platforms. Other threadpools and fork handlers
# will be called at an arbitrary time later ('soon')
for obj in (hub._threadpool, hub._resolver, hub.periodic_monitoring_thread):
getattr(obj, '_on_fork', lambda: None)()
# TODO: We'd like to sleep for a non-zero amount of time to force the loop to make a
# pass around before returning to this greenlet. That will allow any
# user-provided fork watchers to run. (Two calls are necessary.) HOWEVER, if
# we do this, certain tests that heavily mix threads and forking,
# like 2.7/test_threading:test_reinit_tls_after_fork, fail. It's not immediately clear
# why.
#sleep(0.00001)
#sleep(0.00001)
hub_ident_registry = IdentRegistry()
class Hub(WaitOperationsGreenlet):
"""
A greenlet that runs the event loop.
It is created automatically by :func:`get_hub`.
.. rubric:: Switching
Every time this greenlet (i.e., the event loop) is switched *to*,
if the current greenlet has a ``switch_out`` method, it will be
called. This allows a greenlet to take some cleanup actions before
yielding control. This method should not call any gevent blocking
functions.
"""
#: If instances of these classes are raised into the event loop,
#: they will be propagated out to the main greenlet (where they will
#: usually be caught by Python itself)
SYSTEM_ERROR = (KeyboardInterrupt, SystemExit, SystemError)
#: Instances of these classes are not considered to be errors and
#: do not get logged/printed when raised by the event loop.
NOT_ERROR = (GreenletExit, SystemExit)
#: The size we use for our threadpool. Either use a subclass
#: for this, or change it immediately after creating the hub.
threadpool_size = 10
# An instance of PeriodicMonitoringThread, if started.
periodic_monitoring_thread = None
# The ident of the thread we were created in, which should be the
# thread that we run in.
thread_ident = None
#: A string giving the name of this hub. Useful for associating hubs
#: with particular threads. Printed as part of the default repr.
#:
#: .. versionadded:: 1.3b1
name = ''
# NOTE: We cannot define a class-level 'loop' attribute
# because that conflicts with the slot we inherit from the
# Cythonized-bases.
def __init__(self, loop=None, default=None):
WaitOperationsGreenlet.__init__(self, None, None)
self.thread_ident = get_thread_ident()
if hasattr(loop, 'run'):
if default is not None:
raise TypeError("Unexpected argument: default")
self.loop = loop
elif get_loop() is not None:
# Reuse a loop instance previously set by
# destroying a hub without destroying the associated
# loop. See #237 and #238.
self.loop = get_loop()
else:
if default is None and self.thread_ident != MAIN_THREAD_IDENT:
default = False
if loop is None:
loop = self.backend
self.loop = self.loop_class(flags=loop, default=default) # pylint:disable=not-callable
self._resolver = None
self._threadpool = None
self.format_context = GEVENT_CONFIG.format_context
self.minimal_ident = hub_ident_registry.get_ident(self)
@Lazy
def ident_registry(self):
return IdentRegistry()
@property
def loop_class(self):
return GEVENT_CONFIG.loop
@property
def backend(self):
return GEVENT_CONFIG.libev_backend
@property
def main_hub(self):
"""
Is this the hub for the main thread?
.. versionadded:: 1.3b1
"""
return self.thread_ident == MAIN_THREAD_IDENT
def __repr__(self):
if self.loop is None:
info = 'destroyed'
else:
try:
info = self.loop._format()
except Exception as ex: # pylint:disable=broad-except
info = str(ex) or repr(ex) or 'error'
result = '<%s %r at 0x%x %s' % (
self.__class__.__name__,
self.name,
id(self),
info)
if self._resolver is not None:
result += ' resolver=%r' % self._resolver
if self._threadpool is not None:
result += ' threadpool=%r' % self._threadpool
result += ' thread_ident=%s' % (hex(self.thread_ident), )
return result + '>'
def handle_error(self, context, type, value, tb):
"""
Called by the event loop when an error occurs. The arguments
type, value, and tb are the standard tuple returned by :func:`sys.exc_info`.
Applications can set a property on the hub with this same signature
to override the error handling provided by this class.
Errors that are :attr:`system errors <SYSTEM_ERROR>` are passed
to :meth:`handle_system_error`.
:param context: If this is ``None``, indicates a system error that
should generally result in exiting the loop and being thrown to the
parent greenlet.
"""
if isinstance(value, str):
# Cython can raise errors where the value is a plain string
# e.g., AttributeError, "_semaphore.Semaphore has no attr", <traceback>
value = type(value)
if not issubclass(type, self.NOT_ERROR):
self.print_exception(context, type, value, tb)
if context is None or issubclass(type, self.SYSTEM_ERROR):
self.handle_system_error(type, value)
def handle_system_error(self, type, value):
"""
Called from `handle_error` when the exception type is determined
to be a :attr:`system error <SYSTEM_ERROR>`.
System errors cause the exception to be raised in the main
greenlet (the parent of this hub).
"""
current = getcurrent()
if current is self or current is self.parent or self.loop is None:
self.parent.throw(type, value)
else:
# in case system error was handled and life goes on
# switch back to this greenlet as well
cb = None
try:
cb = self.loop.run_callback(current.switch)
except: # pylint:disable=bare-except
traceback.print_exc(file=self.exception_stream)
try:
self.parent.throw(type, value)
finally:
if cb is not None:
cb.stop()
@readproperty
def exception_stream(self):
"""
The stream to which exceptions will be written.
Defaults to ``sys.stderr`` unless assigned to.
.. versionadded:: 1.2a1
"""
# Unwrap any FileObjectThread we have thrown around sys.stderr
# (because it can't be used in the hub). Tricky because we are
# called in error situations when it's not safe to import.
stderr = sys.stderr
if type(stderr).__name__ == 'FileObjectThread':
stderr = stderr.io # pylint:disable=no-member
return stderr
def print_exception(self, context, type, value, tb):
# Python 3 does not gracefully handle None value or tb in
# traceback.print_exception() as previous versions did.
# pylint:disable=no-member
errstream = self.exception_stream
if value is None:
errstream.write('%s\n' % type.__name__)
else:
traceback.print_exception(type, value, tb, file=errstream)
del tb
try:
errstream.write(gmctime())
errstream.write(' ' if context is not None else '\n')
except: # pylint:disable=bare-except
# Possible not safe to import under certain
# error conditions in Python 2
pass
if context is not None:
if not isinstance(context, str):
try:
context = self.format_context(context)
except: # pylint:disable=bare-except
traceback.print_exc(file=self.exception_stream)
context = repr(context)
errstream.write('%s failed with %s\n\n' % (context, getattr(type, '__name__', 'exception'), ))
def run(self):
"""
Entry-point to running the loop. This method is called automatically
when the hub greenlet is scheduled; do not call it directly.
:raises gevent.exceptions.LoopExit: If the loop finishes running. This means
that there are no other scheduled greenlets, and no active
watchers or servers. In some situations, this indicates a
programming error.
"""
assert self is getcurrent(), 'Do not call Hub.run() directly'
self.start_periodic_monitoring_thread()
while 1:
loop = self.loop
loop.error_handler = self
try:
loop.run()
finally:
loop.error_handler = None # break the refcount cycle
debug = []
if hasattr(loop, 'debug'):
debug = loop.debug()
self.parent.throw(LoopExit('This operation would block forever', self, debug))
# this function must never return, as it will cause switch() in the parent greenlet
# to return an unexpected value
# It is still possible to kill this greenlet with throw. However, in that case
# switching to it is no longer safe, as switch will return immediately
def start_periodic_monitoring_thread(self):
if self.periodic_monitoring_thread is None and GEVENT_CONFIG.monitor_thread:
# Note that it is possible for one real thread to
# (temporarily) wind up with multiple monitoring threads,
# if hubs are started and stopped within the thread. This shows up
# in the threadpool tests. The monitoring threads will eventually notice their
# hub object is gone.
from gevent._monitor import PeriodicMonitoringThread
from gevent.events import PeriodicMonitorThreadStartedEvent
from gevent.events import notify_and_call_entry_points
self.periodic_monitoring_thread = PeriodicMonitoringThread(self)
if self.main_hub:
self.periodic_monitoring_thread.install_monitor_memory_usage()
notify_and_call_entry_points(PeriodicMonitorThreadStartedEvent(
self.periodic_monitoring_thread))
return self.periodic_monitoring_thread
def join(self, timeout=None):
"""Wait for the event loop to finish. Exits only when there are
no more spawned greenlets, started servers, active timeouts or watchers.
If *timeout* is provided, wait no longer for the specified number of seconds.
Returns True if exited because the loop finished execution.
Returns False if exited because of timeout expired.
"""
assert getcurrent() is self.parent, "only possible from the MAIN greenlet"
if self.dead:
return True
waiter = Waiter(self)
if timeout is not None:
timeout = self.loop.timer(timeout, ref=False)
timeout.start(waiter.switch, None)
try:
try:
waiter.get()
except LoopExit:
return True
finally:
if timeout is not None:
timeout.stop()
timeout.close()
return False
def destroy(self, destroy_loop=None):
"""
Destroy this hub and clean up its resources.
If you manually create hubs, you *should* call this
method before disposing of the hub object reference.
"""
if self.periodic_monitoring_thread is not None:
self.periodic_monitoring_thread.kill()
self.periodic_monitoring_thread = None
if self._resolver is not None:
self._resolver.close()
del self._resolver
if self._threadpool is not None:
self._threadpool.kill()
del self._threadpool
if destroy_loop is None:
destroy_loop = not self.loop.default
if destroy_loop:
if get_loop() is self.loop:
# Don't let anyone try to reuse this
set_loop(None)
self.loop.destroy()
else:
# Store in case another hub is created for this
# thread.
set_loop(self.loop)
self.loop = None
if _get_hub() is self:
set_hub(None)
# XXX: We can probably simplify the resolver and threadpool properties.
@property
def resolver_class(self):
return GEVENT_CONFIG.resolver
def _get_resolver(self):
if self._resolver is None:
self._resolver = self.resolver_class(hub=self) # pylint:disable=not-callable
return self._resolver
def _set_resolver(self, value):
self._resolver = value
def _del_resolver(self):
self._resolver = None
resolver = property(_get_resolver, _set_resolver, _del_resolver,
"""
The DNS resolver that the socket functions will use.
.. seealso:: :doc:`/dns`
""")
@property
def threadpool_class(self):
return GEVENT_CONFIG.threadpool
def _get_threadpool(self):
if self._threadpool is None:
# pylint:disable=not-callable
self._threadpool = self.threadpool_class(self.threadpool_size, hub=self)
return self._threadpool
def _set_threadpool(self, value):
self._threadpool = value
def _del_threadpool(self):
self._threadpool = None
threadpool = property(_get_threadpool, _set_threadpool, _del_threadpool,
"""
The threadpool associated with this hub.
Usually this is a
:class:`gevent.threadpool.ThreadPool`, but
you :attr:`can customize that
<gevent._config.Config.threadpool>`.
Use this object to schedule blocking
(non-cooperative) operations in a different
thread to prevent them from halting the event loop.
""")
set_default_hub_class(Hub)
class linkproxy(object):
__slots__ = ['callback', 'obj']
def __init__(self, callback, obj):
self.callback = callback
self.obj = obj
def __call__(self, *args):
callback = self.callback
obj = self.obj
self.callback = None
self.obj = None
callback(obj)
|