aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/libuv/loop.py
blob: 0f317c00b6dd43416660c481381c8ce6378e9869 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
"""
libuv loop implementation
"""
# pylint: disable=no-member
from __future__ import absolute_import, print_function

import os
from collections import defaultdict
from collections import namedtuple
from operator import delitem
import signal

from gevent._ffi import _dbg # pylint: disable=unused-import
from gevent._ffi.loop import AbstractLoop
from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error
from gevent._ffi.loop import assign_standard_callbacks
from gevent._ffi.loop import AbstractCallbacks
from gevent._util import implementer
from gevent._interfaces import ILoop

ffi = _corecffi.ffi
libuv = _corecffi.lib

__all__ = [
]


class _Callbacks(AbstractCallbacks):

    def _find_loop_from_c_watcher(self, watcher_ptr):
        loop_handle = ffi.cast('uv_handle_t*', watcher_ptr).data
        return self.from_handle(loop_handle) if loop_handle else None

    def python_sigchld_callback(self, watcher_ptr, _signum):
        self.from_handle(ffi.cast('uv_handle_t*', watcher_ptr).data)._sigchld_callback()

    def python_timer0_callback(self, watcher_ptr):
        return self.python_prepare_callback(watcher_ptr)

    def python_queue_callback(self, watcher_ptr, revents):
        watcher_handle = watcher_ptr.data
        the_watcher = self.from_handle(watcher_handle)

        the_watcher.loop._queue_callback(watcher_ptr, revents)


_callbacks = assign_standard_callbacks(
    ffi, libuv, _Callbacks,
    [('python_sigchld_callback', None),
     ('python_timer0_callback', None),
     ('python_queue_callback', None)])

from gevent._ffi.loop import EVENTS
GEVENT_CORE_EVENTS = EVENTS # export

from gevent.libuv import watcher as _watchers # pylint:disable=no-name-in-module

_events_to_str = _watchers._events_to_str # export

READ = libuv.UV_READABLE
WRITE = libuv.UV_WRITABLE

def get_version():
    uv_bytes = ffi.string(libuv.uv_version_string())
    if not isinstance(uv_bytes, str):
        # Py3
        uv_str = uv_bytes.decode("ascii")
    else:
        uv_str = uv_bytes

    return 'libuv-' + uv_str

def get_header_version():
    return 'libuv-%d.%d.%d' % (libuv.UV_VERSION_MAJOR, libuv.UV_VERSION_MINOR, libuv.UV_VERSION_PATCH)

def supported_backends():
    return ['default']

@implementer(ILoop)
class loop(AbstractLoop):

    # XXX: Undocumented. Maybe better named 'timer_resolution'? We can't
    # know this in general on libev
    min_sleep_time = 0.001 # 1ms

    error_handler = None

    _CHECK_POINTER = 'uv_check_t *'

    _PREPARE_POINTER = 'uv_prepare_t *'
    _PREPARE_CALLBACK_SIG = "void(*)(void*)"

    _TIMER_POINTER = _CHECK_POINTER # This is poorly named. It's for the callback "timer"

    def __init__(self, flags=None, default=None):
        AbstractLoop.__init__(self, ffi, libuv, _watchers, flags, default)
        self.__loop_pid = os.getpid()
        self._child_watchers = defaultdict(list)
        self._io_watchers = dict()
        self._fork_watchers = set()
        self._pid = os.getpid()
        self._default = self._ptr == libuv.uv_default_loop()
        self._queued_callbacks = []

    def _queue_callback(self, watcher_ptr, revents):
        self._queued_callbacks.append((watcher_ptr, revents))

    def _init_loop(self, flags, default):
        if default is None:
            default = True
            # Unlike libev, libuv creates a new default
            # loop automatically if the old default loop was
            # closed.

        if default:
            # XXX: If the default loop had been destroyed, this
            # will create a new one, but we won't destroy it
            ptr = libuv.uv_default_loop()
        else:
            ptr = libuv.uv_loop_new()


        if not ptr:
            raise SystemError("Failed to get loop")

        # Track whether or not any object has destroyed
        # this loop. See _can_destroy_default_loop
        ptr.data = ptr
        return ptr

    _signal_idle = None

    def _init_and_start_check(self):
        libuv.uv_check_init(self._ptr, self._check)
        libuv.uv_check_start(self._check, libuv.python_check_callback)
        libuv.uv_unref(self._check)

        # We also have to have an idle watcher to be able to handle
        # signals in a timely manner. Without them, libuv won't loop again
        # and call into its check and prepare handlers.
        # Note that this basically forces us into a busy-loop
        # XXX: As predicted, using an idle watcher causes our process
        # to eat 100% CPU time. We instead use a timer with a max of a .3 second
        # delay to notice signals. Note that this timeout also implements fork
        # watchers, effectively.

        # XXX: Perhaps we could optimize this to notice when there are other
        # timers in the loop and start/stop it then. When we have a callback
        # scheduled, this should also be the same and unnecessary?
        # libev does takes this basic approach on Windows.
        self._signal_idle = ffi.new("uv_timer_t*")
        libuv.uv_timer_init(self._ptr, self._signal_idle)
        self._signal_idle.data = self._handle_to_self
        sig_cb = ffi.cast('void(*)(uv_timer_t*)', libuv.python_check_callback)
        libuv.uv_timer_start(self._signal_idle,
                             sig_cb,
                             300,
                             300)
        libuv.uv_unref(self._signal_idle)

    def _run_callbacks(self):
        # Manually handle fork watchers.
        curpid = os.getpid()
        if curpid != self._pid:
            self._pid = curpid
            for watcher in self._fork_watchers:
                watcher._on_fork()


        # The contents of queued_callbacks at this point should be timers
        # that expired when the loop began along with any idle watchers.
        # We need to run them so that any manual callbacks they want to schedule
        # get added to the list and ran next before we go on to poll for IO.
        # This is critical for libuv on linux: closing a socket schedules some manual
        # callbacks to actually stop the watcher; if those don't run before
        # we poll for IO, then libuv can abort the process for the closed file descriptor.

        # XXX: There's still a race condition here because we may not run *all* the manual
        # callbacks. We need a way to prioritize those.

        # Running these before the manual callbacks lead to some
        # random test failures. In test__event.TestEvent_SetThenClear
        # we would get a LoopExit sometimes. The problem occurred when
        # a timer expired on entering the first loop; we would process
        # it there, and then process the callback that it created
        # below, leaving nothing for the loop to do. Having the
        # self.run() manually process manual callbacks before
        # continuing solves the problem. (But we must still run callbacks
        # here again.)
        self._prepare_ran_callbacks = self.__run_queued_callbacks()

        super(loop, self)._run_callbacks()

    def _init_and_start_prepare(self):
        libuv.uv_prepare_init(self._ptr, self._prepare)
        libuv.uv_prepare_start(self._prepare, libuv.python_prepare_callback)
        libuv.uv_unref(self._prepare)

    def _init_callback_timer(self):
        libuv.uv_check_init(self._ptr, self._timer0)

    def _stop_callback_timer(self):
        libuv.uv_check_stop(self._timer0)

    def _start_callback_timer(self):
        # The purpose of the callback timer is to ensure that we run
        # callbacks as soon as possible on the next iteration of the event loop.

        # In libev, we set a 0 duration timer with a no-op callback.
        # This executes immediately *after* the IO poll is done (it
        # actually determines the time that the IO poll will block
        # for), so having the timer present simply spins the loop, and
        # our normal prepare watcher kicks in to run the callbacks.

        # In libuv, however, timers are run *first*, before prepare
        # callbacks and before polling for IO. So a no-op 0 duration
        # timer actually does *nothing*. (Also note that libev queues all
        # watchers found during IO poll to run at the end (I think), while libuv
        # runs them in uv__io_poll itself.)

        # From the loop inside uv_run:
        # while True:
        #   uv__update_time(loop);
        #   uv__run_timers(loop);
        #   # we don't use pending watchers. They are how libuv
        #   # implements the pipe/udp/tcp streams.
        #   ran_pending = uv__run_pending(loop);
        #   uv__run_idle(loop);
        #   uv__run_prepare(loop);
        #   ...
        #   uv__io_poll(loop, timeout); # <--- IO watchers run here!
        #   uv__run_check(loop);

        # libev looks something like this (pseudo code because the real code is
        # hard to read):
        #
        # do {
        #    run_fork_callbacks();
        #    run_prepare_callbacks();
        #    timeout = min(time of all timers or normal block time)
        #    io_poll() # <--- Only queues IO callbacks
        #    update_now(); calculate_expired_timers();
        #    run callbacks in this order: (although specificying priorities changes it)
        #        check
        #        stat
        #        child
        #        signal
        #        timer
        #        io
        # }

        # So instead of running a no-op and letting the side-effect of spinning
        # the loop run the callbacks, we must explicitly run them here.

        # If we don't, test__systemerror:TestCallback will be flaky, failing
        # one time out of ~20, depending on timing.

        # To get them to run immediately after this current loop,
        # we use a check watcher, instead of a 0 duration timer entirely.
        # If we use a 0 duration timer, we can get stuck in a timer loop.
        # Python 3.6 fails in test_ftplib.py

        # As a final note, if we have not yet entered the loop *at
        # all*, and a timer was created with a duration shorter than
        # the amount of time it took for us to enter the loop in the
        # first place, it may expire and get called before our callback
        # does. This could also lead to test__systemerror:TestCallback
        # appearing to be flaky.

        # As yet another final note, if we are currently running a
        # timer callback, meaning we're inside uv__run_timers() in C,
        # and the Python starts a new timer, if the Python code then
        # update's the loop's time, it's possible that timer will
        # expire *and be run in the same iteration of the loop*. This
        # is trivial to do: In sequential code, anything after
        # `gevent.sleep(0.1)` is running in a timer callback. Starting
        # a new timer---e.g., another gevent.sleep() call---will
        # update the time, *before* uv__run_timers exits, meaning
        # other timers get a chance to run before our check or prepare
        # watcher callbacks do. Therefore, we do indeed have to have a 0
        # timer to run callbacks---it gets inserted before any other user
        # timers---ideally, this should be especially careful about how much time
        # it runs for.

        # AND YET: We can't actually do that. We get timeouts that I haven't fully
        # investigated if we do. Probably stuck in a timer loop.

        # As a partial remedy to this, unlike libev, our timer watcher
        # class doesn't update the loop time by default.

        libuv.uv_check_start(self._timer0, libuv.python_timer0_callback)


    def _stop_aux_watchers(self):
        assert self._prepare
        assert self._check
        assert self._signal_idle
        libuv.uv_prepare_stop(self._prepare)
        libuv.uv_ref(self._prepare) # Why are we doing this?

        libuv.uv_check_stop(self._check)
        libuv.uv_ref(self._check)

        libuv.uv_timer_stop(self._signal_idle)
        libuv.uv_ref(self._signal_idle)

        libuv.uv_check_stop(self._timer0)

    def _setup_for_run_callback(self):
        self._start_callback_timer()
        libuv.uv_ref(self._timer0)


    def _can_destroy_loop(self, ptr):
        # We're being asked to destroy a loop that's,
        # at the time it was constructed, was the default loop.
        # If loop objects were constructed more than once,
        # it may have already been destroyed, though.
        # We track this in the data member.
        return ptr.data

    def _destroy_loop(self, ptr):
        ptr.data = ffi.NULL
        libuv.uv_stop(ptr)

        libuv.gevent_close_all_handles(ptr)

        closed_failed = libuv.uv_loop_close(ptr)
        if closed_failed:
            assert closed_failed == libuv.UV_EBUSY
            # We already closed all the handles. Run the loop
            # once to let them be cut off from the loop.
            ran_has_more_callbacks = libuv.uv_run(ptr, libuv.UV_RUN_ONCE)
            if ran_has_more_callbacks:
                libuv.uv_run(ptr, libuv.UV_RUN_NOWAIT)
            closed_failed = libuv.uv_loop_close(ptr)
            assert closed_failed == 0, closed_failed

        # Destroy the native resources *after* we have closed
        # the loop. If we do it before, walking the handles
        # attached to the loop is likely to segfault.

        libuv.gevent_zero_check(self._check)
        libuv.gevent_zero_check(self._timer0)
        libuv.gevent_zero_prepare(self._prepare)
        libuv.gevent_zero_timer(self._signal_idle)
        del self._check
        del self._prepare
        del self._signal_idle
        del self._timer0

        libuv.gevent_zero_loop(ptr)

        # Destroy any watchers we're still holding on to.
        del self._io_watchers
        del self._fork_watchers
        del self._child_watchers


    def debug(self):
        """
        Return all the handles that are open and their ref status.
        """
        handle_state = namedtuple("HandleState",
                                  ['handle',
                                   'type',
                                   'watcher',
                                   'ref',
                                   'active',
                                   'closing'])
        handles = []

        # XXX: Convert this to a modern callback.
        def walk(handle, _arg):
            data = handle.data
            if data:
                watcher = ffi.from_handle(data)
            else:
                watcher = None
            handles.append(handle_state(handle,
                                        ffi.string(libuv.uv_handle_type_name(handle.type)),
                                        watcher,
                                        libuv.uv_has_ref(handle),
                                        libuv.uv_is_active(handle),
                                        libuv.uv_is_closing(handle)))

        libuv.uv_walk(self._ptr,
                      ffi.callback("void(*)(uv_handle_t*,void*)",
                                   walk),
                      ffi.NULL)
        return handles

    def ref(self):
        pass

    def unref(self):
        # XXX: Called by _run_callbacks.
        pass

    def break_(self, how=None):
        libuv.uv_stop(self._ptr)

    def reinit(self):
        # TODO: How to implement? We probably have to simply
        # re-__init__ this whole class? Does it matter?
        # OR maybe we need to uv_walk() and close all the handles?

        # XXX: libuv < 1.12 simply CANNOT handle a fork unless you immediately
        # exec() in the child. There are multiple calls to abort() that
        # will kill the child process:
        # - The OS X poll implementation (kqueue) aborts on an error return
        # value; since kqueue FDs can't be inherited, then the next call
        # to kqueue in the child will fail and get aborted; fork() is likely
        # to be called during the gevent loop, meaning we're deep inside the
        # runloop already, so we can't even close the loop that we're in:
        # it's too late, the next call to kqueue is already scheduled.
        # - The threadpool, should it be in use, also aborts
        # (https://github.com/joyent/libuv/pull/1136)
        # - There global shared state that breaks signal handling
        # and leads to an abort() in the child, EVEN IF the loop in the parent
        # had already been closed
        # (https://github.com/joyent/libuv/issues/1405)

        # In 1.12, the uv_loop_fork function was added (by gevent!)
        libuv.uv_loop_fork(self._ptr)

    _prepare_ran_callbacks = False

    def __run_queued_callbacks(self):
        if not self._queued_callbacks:
            return False

        cbs = list(self._queued_callbacks)
        self._queued_callbacks = []

        for watcher_ptr, arg in cbs:
            handle = watcher_ptr.data
            if not handle:
                # It's been stopped and possibly closed
                assert not libuv.uv_is_active(watcher_ptr)
                continue
            val = _callbacks.python_callback(handle, arg)
            if val == -1:
                _callbacks.python_handle_error(handle, arg)
            elif val == 1:
                if not libuv.uv_is_active(watcher_ptr):
                    if watcher_ptr.data != handle:
                        if watcher_ptr.data:
                            _callbacks.python_stop(None)
                    else:
                        _callbacks.python_stop(handle)
        return True


    def run(self, nowait=False, once=False):
        # we can only respect one flag or the other.
        # nowait takes precedence because it can't block
        mode = libuv.UV_RUN_DEFAULT
        if once:
            mode = libuv.UV_RUN_ONCE
        if nowait:
            mode = libuv.UV_RUN_NOWAIT

        if mode == libuv.UV_RUN_DEFAULT:
            while self._ptr and self._ptr.data:
                # This is here to better preserve order guarantees. See _run_callbacks
                # for details.
                # It may get run again from the prepare watcher, so potentially we
                # could take twice as long as the switch interval.
                self._run_callbacks()
                self._prepare_ran_callbacks = False
                ran_status = libuv.uv_run(self._ptr, libuv.UV_RUN_ONCE)
                # Note that we run queued callbacks when the prepare watcher runs,
                # thus accounting for timers that expired before polling for IO,
                # and idle watchers. This next call should get IO callbacks and
                # callbacks from timers that expired *after* polling for IO.
                ran_callbacks = self.__run_queued_callbacks()

                if not ran_status and not ran_callbacks and not self._prepare_ran_callbacks:
                    # A return of 0 means there are no referenced and
                    # active handles. The loop is over.
                    # If we didn't run any callbacks, then we couldn't schedule
                    # anything to switch in the future, so there's no point
                    # running again.
                    return ran_status
            return 0 # Somebody closed the loop

        result = libuv.uv_run(self._ptr, mode)
        self.__run_queued_callbacks()
        return result

    def now(self):
        # libuv's now is expressed as an integer number of
        # milliseconds, so to get it compatible with time.time units
        # that this method is supposed to return, we have to divide by 1000.0
        now = libuv.uv_now(self._ptr)
        return now / 1000.0

    def update_now(self):
        libuv.uv_update_time(self._ptr)

    def fileno(self):
        if self._ptr:
            fd = libuv.uv_backend_fd(self._ptr)
            if fd >= 0:
                return fd

    _sigchld_watcher = None
    _sigchld_callback_ffi = None

    def install_sigchld(self):
        if not self.default:
            return

        if self._sigchld_watcher:
            return

        self._sigchld_watcher = ffi.new('uv_signal_t*')
        libuv.uv_signal_init(self._ptr, self._sigchld_watcher)
        self._sigchld_watcher.data = self._handle_to_self

        libuv.uv_signal_start(self._sigchld_watcher,
                              libuv.python_sigchld_callback,
                              signal.SIGCHLD)

    def reset_sigchld(self):
        if not self.default or not self._sigchld_watcher:
            return

        libuv.uv_signal_stop(self._sigchld_watcher)
        # Must go through this to manage the memory lifetime
        # correctly. Alternately, we could just stop it and restart
        # it in install_sigchld?
        _watchers.watcher._watcher_ffi_close(self._sigchld_watcher)
        del self._sigchld_watcher


    def _sigchld_callback(self):
        # Signals can arrive at (relatively) any time. To eliminate
        # race conditions, and behave more like libev, we "queue"
        # sigchld to run when we run callbacks.
        while True:
            try:
                pid, status, _usage = os.wait3(os.WNOHANG)
            except OSError:
                # Python 3 raises ChildProcessError
                break

            if pid == 0:
                break
            children_watchers = self._child_watchers.get(pid, []) + self._child_watchers.get(0, [])
            for watcher in children_watchers:
                self.run_callback(watcher._set_waitpid_status, pid, status)

            # Don't invoke child watchers for 0 more than once
            self._child_watchers[0] = []

    def _register_child_watcher(self, watcher):
        self._child_watchers[watcher._pid].append(watcher)

    def _unregister_child_watcher(self, watcher):
        try:
            # stop() should be idempotent
            self._child_watchers[watcher._pid].remove(watcher)
        except ValueError:
            pass

        # Now's a good time to clean up any dead lists we don't need
        # anymore
        for pid in list(self._child_watchers):
            if not self._child_watchers[pid]:
                del self._child_watchers[pid]

    def io(self, fd, events, ref=True, priority=None):
        # We rely on hard references here and explicit calls to
        # close() on the returned object to correctly manage
        # the watcher lifetimes.

        io_watchers = self._io_watchers
        try:
            io_watcher = io_watchers[fd]
            assert io_watcher._multiplex_watchers, ("IO Watcher %s unclosed but should be dead" % io_watcher)
        except KeyError:
            # Start the watcher with just the events that we're interested in.
            # as multiplexers are added, the real event mask will be updated to keep in sync.
            # If we watch for too much, we get spurious wakeups and busy loops.
            io_watcher = self._watchers.io(self, fd, 0)
            io_watchers[fd] = io_watcher
            io_watcher._no_more_watchers = lambda: delitem(io_watchers, fd)

        return io_watcher.multiplex(events)

    def prepare(self, ref=True, priority=None):
        # We run arbitrary code in python_prepare_callback. That could switch
        # greenlets. If it does that while also manipulating the active prepare
        # watchers, we could corrupt the process state, since the prepare watcher
        # queue is iterated on the stack (on unix). We could workaround this by implementing
        # prepare watchers in pure Python.
        # See https://github.com/gevent/gevent/issues/1126
        raise TypeError("prepare watchers are not currently supported in libuv. "
                        "If you need them, please contact the maintainers.")