aboutsummaryrefslogtreecommitdiffstats
path: root/python/gevent/_ffi/watcher.py
blob: 3f880cef83ba2e3f0950830bb7eb02944d13e602 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
"""
Useful base classes for watchers. The available
watchers will depend on the specific event loop.
"""
# pylint:disable=not-callable
from __future__ import absolute_import, print_function

import signal as signalmodule
import functools
import warnings

from gevent._config import config

try:
    from tracemalloc import get_object_traceback

    def tracemalloc(init):
        # PYTHONTRACEMALLOC env var controls this on Python 3.
        return init
except ImportError: # Python < 3.4

    if config.trace_malloc:
        # Use the same env var to turn this on for Python 2
        import traceback

        class _TB(object):
            __slots__ = ('lines',)

            def __init__(self, lines):
                # These end in newlines, which we don't want for consistency
                self.lines = [x.rstrip() for x in lines]

            def format(self):
                return self.lines

        def tracemalloc(init):
            @functools.wraps(init)
            def traces(self, *args, **kwargs):
                init(self, *args, **kwargs)
                self._captured_malloc = _TB(traceback.format_stack())
            return traces

        def get_object_traceback(obj):
            return obj._captured_malloc

    else:
        def get_object_traceback(_obj):
            return None

        def tracemalloc(init):
            return init

from gevent._compat import fsencode

from gevent._ffi import _dbg # pylint:disable=unused-import
from gevent._ffi import GEVENT_DEBUG_LEVEL
from gevent._ffi import DEBUG
from gevent._ffi.loop import GEVENT_CORE_EVENTS
from gevent._ffi.loop import _NOARGS

ALLOW_WATCHER_DEL = GEVENT_DEBUG_LEVEL >= DEBUG

__all__ = [

]

try:
    ResourceWarning
except NameError:
    class ResourceWarning(Warning):
        "Python 2 fallback"

class _NoWatcherResult(int):

    def __repr__(self):
        return "<NoWatcher>"

_NoWatcherResult = _NoWatcherResult(0)

def events_to_str(event_field, all_events):
    result = []
    for (flag, string) in all_events:
        c_flag = flag
        if event_field & c_flag:
            result.append(string)
            event_field = event_field & (~c_flag)
        if not event_field:
            break
    if event_field:
        result.append(hex(event_field))
    return '|'.join(result)


def not_while_active(func):
    @functools.wraps(func)
    def nw(self, *args, **kwargs):
        if self.active:
            raise ValueError("not while active")
        func(self, *args, **kwargs)
    return nw

def only_if_watcher(func):
    @functools.wraps(func)
    def if_w(self):
        if self._watcher:
            return func(self)
        return _NoWatcherResult
    return if_w


class LazyOnClass(object):

    @classmethod
    def lazy(cls, cls_dict, func):
        "Put a LazyOnClass object in *cls_dict* with the same name as *func*"
        cls_dict[func.__name__] = cls(func)

    def __init__(self, func, name=None):
        self.name = name or func.__name__
        self.func = func

    def __get__(self, inst, klass):
        if inst is None: # pragma: no cover
            return self

        val = self.func(inst)
        setattr(klass, self.name, val)
        return val


class AbstractWatcherType(type):
    """
    Base metaclass for watchers.

    To use, you will:

    - subclass the watcher class defined from this type.
    - optionally subclass this type
    """
    # pylint:disable=bad-mcs-classmethod-argument

    _FFI = None
    _LIB = None

    def __new__(cls, name, bases, cls_dict):
        if name != 'watcher' and not cls_dict.get('_watcher_skip_ffi'):
            cls._fill_watcher(name, bases, cls_dict)
        if '__del__' in cls_dict and not ALLOW_WATCHER_DEL: # pragma: no cover
            raise TypeError("CFFI watchers are not allowed to have __del__")
        return type.__new__(cls, name, bases, cls_dict)

    @classmethod
    def _fill_watcher(cls, name, bases, cls_dict):
        # TODO: refactor smaller
        # pylint:disable=too-many-locals
        if name.endswith('_'):
            # Strip trailing _ added to avoid keyword duplications
            # e.g., async_
            name = name[:-1]

        def _mro_get(attr, bases, error=True):
            for b in bases:
                try:
                    return getattr(b, attr)
                except AttributeError:
                    continue
            if error: # pragma: no cover
                raise AttributeError(attr)
        _watcher_prefix = cls_dict.get('_watcher_prefix') or _mro_get('_watcher_prefix', bases)

        if '_watcher_type' not in cls_dict:
            watcher_type = _watcher_prefix + '_' + name
            cls_dict['_watcher_type'] = watcher_type
        elif not cls_dict['_watcher_type'].startswith(_watcher_prefix):
            watcher_type = _watcher_prefix + '_' + cls_dict['_watcher_type']
            cls_dict['_watcher_type'] = watcher_type

        active_name = _watcher_prefix + '_is_active'

        def _watcher_is_active(self):
            return getattr(self._LIB, active_name)

        LazyOnClass.lazy(cls_dict, _watcher_is_active)

        watcher_struct_name = cls_dict.get('_watcher_struct_name')
        if not watcher_struct_name:
            watcher_struct_pattern = (cls_dict.get('_watcher_struct_pattern')
                                      or _mro_get('_watcher_struct_pattern', bases, False)
                                      or 'struct %s')
            watcher_struct_name = watcher_struct_pattern % (watcher_type,)

        def _watcher_struct_pointer_type(self):
            return self._FFI.typeof(watcher_struct_name + ' *')

        LazyOnClass.lazy(cls_dict, _watcher_struct_pointer_type)

        callback_name = (cls_dict.get('_watcher_callback_name')
                         or _mro_get('_watcher_callback_name', bases, False)
                         or '_gevent_generic_callback')

        def _watcher_callback(self):
            return self._FFI.addressof(self._LIB, callback_name)

        LazyOnClass.lazy(cls_dict, _watcher_callback)

        def _make_meth(name, watcher_name):
            def meth(self):
                lib_name = self._watcher_type + '_' + name
                return getattr(self._LIB, lib_name)
            meth.__name__ = watcher_name
            return meth

        for meth_name in 'start', 'stop', 'init':
            watcher_name = '_watcher' + '_' + meth_name
            if watcher_name not in cls_dict:
                LazyOnClass.lazy(cls_dict, _make_meth(meth_name, watcher_name))

    def new_handle(cls, obj):
        return cls._FFI.new_handle(obj)

    def new(cls, kind):
        return cls._FFI.new(kind)

class watcher(object):

    _callback = None
    _args = None
    _watcher = None
    # self._handle has a reference to self, keeping it alive.
    # We must keep self._handle alive for ffi.from_handle() to be
    # able to work. We only fill this in when we are started,
    # and when we are stopped we destroy it.
    # NOTE: This is a GC cycle, so we keep it around for as short
    # as possible.
    _handle = None

    @tracemalloc
    def __init__(self, _loop, ref=True, priority=None, args=_NOARGS):
        self.loop = _loop
        self.__init_priority = priority
        self.__init_args = args
        self.__init_ref = ref
        self._watcher_full_init()


    def _watcher_full_init(self):
        priority = self.__init_priority
        ref = self.__init_ref
        args = self.__init_args

        self._watcher_create(ref)

        if priority is not None:
            self._watcher_ffi_set_priority(priority)

        try:
            self._watcher_ffi_init(args)
        except:
            # Let these be GC'd immediately.
            # If we keep them around to when *we* are gc'd,
            # they're probably invalid, meaning any native calls
            # we do then to close() them are likely to fail
            self._watcher = None
            raise
        self._watcher_ffi_set_init_ref(ref)

    @classmethod
    def _watcher_ffi_close(cls, ffi_watcher):
        pass

    def _watcher_create(self, ref): # pylint:disable=unused-argument
        self._watcher = self._watcher_new()

    def _watcher_new(self):
        return type(self).new(self._watcher_struct_pointer_type) # pylint:disable=no-member

    def _watcher_ffi_set_init_ref(self, ref):
        pass

    def _watcher_ffi_set_priority(self, priority):
        pass

    def _watcher_ffi_init(self, args):
        raise NotImplementedError()

    def _watcher_ffi_start(self):
        raise NotImplementedError()

    def _watcher_ffi_stop(self):
        self._watcher_stop(self.loop._ptr, self._watcher)

    def _watcher_ffi_ref(self):
        raise NotImplementedError()

    def _watcher_ffi_unref(self):
        raise NotImplementedError()

    def _watcher_ffi_start_unref(self):
        # While a watcher is active, we don't keep it
        # referenced. This allows a timer, for example, to be started,
        # and still allow the loop to end if there is nothing
        # else to do. see test__order.TestSleep0 for one example.
        self._watcher_ffi_unref()

    def _watcher_ffi_stop_ref(self):
        self._watcher_ffi_ref()

    # A string identifying the type of libev object we watch, e.g., 'ev_io'
    # This should be a class attribute.
    _watcher_type = None
    # A class attribute that is the callback on the libev object that init's the C struct,
    # e.g., libev.ev_io_init. If None, will be set by _init_subclasses.
    _watcher_init = None
    # A class attribute that is the callback on the libev object that starts the C watcher,
    # e.g., libev.ev_io_start. If None, will be set by _init_subclasses.
    _watcher_start = None
    # A class attribute that is the callback on the libev object that stops the C watcher,
    # e.g., libev.ev_io_stop. If None, will be set by _init_subclasses.
    _watcher_stop = None
    # A cffi ctype object identifying the struct pointer we create.
    # This is a class attribute set based on the _watcher_type
    _watcher_struct_pointer_type = None
    # The attribute of the libev object identifying the custom
    # callback function for this type of watcher. This is a class
    # attribute set based on the _watcher_type in _init_subclasses.
    _watcher_callback = None
    _watcher_is_active = None

    def close(self):
        if self._watcher is None:
            return

        self.stop()
        _watcher = self._watcher
        self._watcher = None
        self._watcher_set_data(_watcher, self._FFI.NULL) # pylint: disable=no-member
        self._watcher_ffi_close(_watcher)
        self.loop = None

    def _watcher_set_data(self, the_watcher, data):
        # This abstraction exists for the sole benefit of
        # libuv.watcher.stat, which "subclasses" uv_handle_t.
        # Can we do something to avoid this extra function call?
        the_watcher.data = data
        return data

    def __enter__(self):
        return self

    def __exit__(self, t, v, tb):
        self.close()

    if ALLOW_WATCHER_DEL:
        def __del__(self):
            if self._watcher:
                tb = get_object_traceback(self)
                tb_msg = ''
                if tb is not None:
                    tb_msg = '\n'.join(tb.format())
                    tb_msg = '\nTraceback:\n' + tb_msg
                warnings.warn("Failed to close watcher %r%s" % (self, tb_msg),
                              ResourceWarning)

                # may fail if __init__ did; will be harmlessly printed
                self.close()


    def __repr__(self):
        formats = self._format()
        result = "<%s at 0x%x%s" % (self.__class__.__name__, id(self), formats)
        if self.pending:
            result += " pending"
        if self.callback is not None:
            fself = getattr(self.callback, '__self__', None)
            if fself is self:
                result += " callback=<bound method %s of self>" % (self.callback.__name__)
            else:
                result += " callback=%r" % (self.callback, )
        if self.args is not None:
            result += " args=%r" % (self.args, )
        if self.callback is None and self.args is None:
            result += " stopped"
        result += " watcher=%s" % (self._watcher)
        result += " handle=%s" % (self._watcher_handle)
        result += " ref=%s" % (self.ref)
        return result + ">"

    @property
    def _watcher_handle(self):
        if self._watcher:
            return self._watcher.data

    def _format(self):
        return ''

    @property
    def ref(self):
        raise NotImplementedError()

    def _get_callback(self):
        return self._callback

    def _set_callback(self, cb):
        if not callable(cb) and cb is not None:
            raise TypeError("Expected callable, not %r" % (cb, ))
        if cb is None:
            if '_callback' in self.__dict__:
                del self._callback
        else:
            self._callback = cb
    callback = property(_get_callback, _set_callback)

    def _get_args(self):
        return self._args

    def _set_args(self, args):
        if not isinstance(args, tuple) and args is not None:
            raise TypeError("args must be a tuple or None")
        if args is None:
            if '_args' in self.__dict__:
                del self._args
        else:
            self._args = args

    args = property(_get_args, _set_args)

    def start(self, callback, *args):
        if callback is None:
            raise TypeError('callback must be callable, not None')
        self.callback = callback
        self.args = args or _NOARGS
        self.loop._keepaliveset.add(self)
        self._handle = self._watcher_set_data(self._watcher, type(self).new_handle(self)) # pylint:disable=no-member
        self._watcher_ffi_start()
        self._watcher_ffi_start_unref()

    def stop(self):
        if self._callback is None:
            assert self.loop is None or self not in self.loop._keepaliveset
            return
        self._watcher_ffi_stop_ref()
        self._watcher_ffi_stop()
        self.loop._keepaliveset.discard(self)
        self._handle = None
        self._watcher_set_data(self._watcher, self._FFI.NULL) # pylint:disable=no-member
        self.callback = None
        self.args = None

    def _get_priority(self):
        return None

    @not_while_active
    def _set_priority(self, priority):
        pass

    priority = property(_get_priority, _set_priority)


    @property
    def active(self):
        if self._watcher is not None and self._watcher_is_active(self._watcher):
            return True
        return False

    @property
    def pending(self):
        return False

watcher = AbstractWatcherType('watcher', (object,), dict(watcher.__dict__))

class IoMixin(object):

    EVENT_MASK = 0

    def __init__(self, loop, fd, events, ref=True, priority=None, _args=None):
        # Win32 only works with sockets, and only when we use libuv, because
        # we don't use _open_osfhandle. See libuv/watchers.py:io for a description.
        if fd < 0:
            raise ValueError('fd must be non-negative: %r' % fd)
        if events & ~self.EVENT_MASK:
            raise ValueError('illegal event mask: %r' % events)
        self._fd = fd
        super(IoMixin, self).__init__(loop, ref=ref, priority=priority,
                                      args=_args or (fd, events))

    def start(self, callback, *args, **kwargs):
        args = args or _NOARGS
        if kwargs.get('pass_events'):
            args = (GEVENT_CORE_EVENTS, ) + args
        super(IoMixin, self).start(callback, *args)

    def _format(self):
        return ' fd=%d' % self._fd

class TimerMixin(object):
    _watcher_type = 'timer'

    def __init__(self, loop, after=0.0, repeat=0.0, ref=True, priority=None):
        if repeat < 0.0:
            raise ValueError("repeat must be positive or zero: %r" % repeat)
        self._after = after
        self._repeat = repeat
        super(TimerMixin, self).__init__(loop, ref=ref, priority=priority, args=(after, repeat))

    def start(self, callback, *args, **kw):
        update = kw.get("update", self.loop.starting_timer_may_update_loop_time)
        if update:
            # Quoth the libev doc: "This is a costly operation and is
            # usually done automatically within ev_run(). This
            # function is rarely useful, but when some event callback
            # runs for a very long time without entering the event
            # loop, updating libev's idea of the current time is a
            # good idea."

            # 1.3 changed the default for this to False *unless* the loop is
            # running a callback; see libuv for details. Note that
            # starting Timeout objects still sets this to true.

            self.loop.update_now()
        super(TimerMixin, self).start(callback, *args)

    def again(self, callback, *args, **kw):
        raise NotImplementedError()


class SignalMixin(object):
    _watcher_type = 'signal'

    def __init__(self, loop, signalnum, ref=True, priority=None):
        if signalnum < 1 or signalnum >= signalmodule.NSIG:
            raise ValueError('illegal signal number: %r' % signalnum)
        # still possible to crash on one of libev's asserts:
        # 1) "libev: ev_signal_start called with illegal signal number"
        #    EV_NSIG might be different from signal.NSIG on some platforms
        # 2) "libev: a signal must not be attached to two different loops"
        #    we probably could check that in LIBEV_EMBED mode, but not in general
        self._signalnum = signalnum
        super(SignalMixin, self).__init__(loop, ref=ref, priority=priority, args=(signalnum, ))


class IdleMixin(object):
    _watcher_type = 'idle'


class PrepareMixin(object):
    _watcher_type = 'prepare'


class CheckMixin(object):
    _watcher_type = 'check'


class ForkMixin(object):
    _watcher_type = 'fork'


class AsyncMixin(object):
    _watcher_type = 'async'

    def send(self):
        raise NotImplementedError()

    @property
    def pending(self):
        raise NotImplementedError()


class ChildMixin(object):

    # hack for libuv which doesn't extend watcher
    _CALL_SUPER_INIT = True

    def __init__(self, loop, pid, trace=0, ref=True):
        if not loop.default:
            raise TypeError('child watchers are only available on the default loop')
        loop.install_sigchld()
        self._pid = pid
        if self._CALL_SUPER_INIT:
            super(ChildMixin, self).__init__(loop, ref=ref, args=(pid, trace))

    def _format(self):
        return ' pid=%r rstatus=%r' % (self.pid, self.rstatus)

    @property
    def pid(self):
        return self._pid

    @property
    def rpid(self):
        # The received pid, the result of the waitpid() call.
        return self._rpid

    _rpid = None
    _rstatus = 0

    @property
    def rstatus(self):
        return self._rstatus

class StatMixin(object):

    @staticmethod
    def _encode_path(path):
        return fsencode(path)

    def __init__(self, _loop, path, interval=0.0, ref=True, priority=None):
        # Store the encoded path in the same attribute that corecext does
        self._paths = self._encode_path(path)

        # Keep the original path to avoid re-encoding, especially on Python 3
        self._path = path

        # Although CFFI would automatically convert a bytes object into a char* when
        # calling ev_stat_init(..., char*, ...), on PyPy the char* pointer is not
        # guaranteed to live past the function call. On CPython, only with a constant/interned
        # bytes object is the pointer guaranteed to last path the function call. (And since
        # Python 3 is pretty much guaranteed to produce a newly-encoded bytes object above, thats
        # rarely the case). Therefore, we must keep a reference to the produced cdata object
        # so that the struct ev_stat_watcher's `path` pointer doesn't become invalid/deallocated
        self._cpath = self._FFI.new('char[]', self._paths)

        self._interval = interval
        super(StatMixin, self).__init__(_loop, ref=ref, priority=priority,
                                        args=(self._cpath,
                                              interval))

    @property
    def path(self):
        return self._path

    @property
    def attr(self):
        raise NotImplementedError

    @property
    def prev(self):
        raise NotImplementedError

    @property
    def interval(self):
        return self._interval