aboutsummaryrefslogtreecommitdiffstats
path: root/hypervideo_dl/postprocessor/common.py
blob: c3fca35689986e1e703ad4bba6fc1d245de8006f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
import functools
import json
import os
import urllib.error

from ..utils import (
    PostProcessingError,
    RetryManager,
    _configuration_args,
    deprecation_warning,
    encodeFilename,
    network_exceptions,
    sanitized_Request,
)


class PostProcessorMetaClass(type):
    @staticmethod
    def run_wrapper(func):
        @functools.wraps(func)
        def run(self, info, *args, **kwargs):
            info_copy = self._copy_infodict(info)
            self._hook_progress({'status': 'started'}, info_copy)
            ret = func(self, info, *args, **kwargs)
            if ret is not None:
                _, info = ret
            self._hook_progress({'status': 'finished'}, info_copy)
            return ret
        return run

    def __new__(cls, name, bases, attrs):
        if 'run' in attrs:
            attrs['run'] = cls.run_wrapper(attrs['run'])
        return type.__new__(cls, name, bases, attrs)


class PostProcessor(metaclass=PostProcessorMetaClass):
    """Post Processor class.

    PostProcessor objects can be added to downloaders with their
    add_post_processor() method. When the downloader has finished a
    successful download, it will take its internal chain of PostProcessors
    and start calling the run() method on each one of them, first with
    an initial argument and then with the returned value of the previous
    PostProcessor.

    PostProcessor objects follow a "mutual registration" process similar
    to InfoExtractor objects.

    Optionally PostProcessor can use a list of additional command-line arguments
    with self._configuration_args.
    """

    _downloader = None

    def __init__(self, downloader=None):
        self._progress_hooks = []
        self.add_progress_hook(self.report_progress)
        self.set_downloader(downloader)
        self.PP_NAME = self.pp_key()

    @classmethod
    def pp_key(cls):
        name = cls.__name__[:-2]
        return name[6:] if name[:6].lower() == 'ffmpeg' else name

    def to_screen(self, text, prefix=True, *args, **kwargs):
        if self._downloader:
            tag = '[%s] ' % self.PP_NAME if prefix else ''
            return self._downloader.to_screen(f'{tag}{text}', *args, **kwargs)

    def report_warning(self, text, *args, **kwargs):
        if self._downloader:
            return self._downloader.report_warning(text, *args, **kwargs)

    def deprecation_warning(self, msg):
        warn = getattr(self._downloader, 'deprecation_warning', deprecation_warning)
        return warn(msg, stacklevel=1)

    def deprecated_feature(self, msg):
        if self._downloader:
            return self._downloader.deprecated_feature(msg)
        return deprecation_warning(msg, stacklevel=1)

    def report_error(self, text, *args, **kwargs):
        self.deprecation_warning('"hypervideo_dl.postprocessor.PostProcessor.report_error" is deprecated. '
                                 'raise "hypervideo_dl.utils.PostProcessingError" instead')
        if self._downloader:
            return self._downloader.report_error(text, *args, **kwargs)

    def write_debug(self, text, *args, **kwargs):
        if self._downloader:
            return self._downloader.write_debug(text, *args, **kwargs)

    def _delete_downloaded_files(self, *files_to_delete, **kwargs):
        if self._downloader:
            return self._downloader._delete_downloaded_files(*files_to_delete, **kwargs)
        for filename in set(filter(None, files_to_delete)):
            os.remove(filename)

    def get_param(self, name, default=None, *args, **kwargs):
        if self._downloader:
            return self._downloader.params.get(name, default, *args, **kwargs)
        return default

    def set_downloader(self, downloader):
        """Sets the downloader for this PP."""
        self._downloader = downloader
        for ph in getattr(downloader, '_postprocessor_hooks', []):
            self.add_progress_hook(ph)

    def _copy_infodict(self, info_dict):
        return getattr(self._downloader, '_copy_infodict', dict)(info_dict)

    @staticmethod
    def _restrict_to(*, video=True, audio=True, images=True, simulated=True):
        allowed = {'video': video, 'audio': audio, 'images': images}

        def decorator(func):
            @functools.wraps(func)
            def wrapper(self, info):
                if not simulated and (self.get_param('simulate') or self.get_param('skip_download')):
                    return [], info
                format_type = (
                    'video' if info.get('vcodec') != 'none'
                    else 'audio' if info.get('acodec') != 'none'
                    else 'images')
                if allowed[format_type]:
                    return func(self, info)
                else:
                    self.to_screen('Skipping %s' % format_type)
                    return [], info
            return wrapper
        return decorator

    def run(self, information):
        """Run the PostProcessor.

        The "information" argument is a dictionary like the ones
        composed by InfoExtractors. The only difference is that this
        one has an extra field called "filepath" that points to the
        downloaded file.

        This method returns a tuple, the first element is a list of the files
        that can be deleted, and the second of which is the updated
        information.

        In addition, this method may raise a PostProcessingError
        exception if post processing fails.
        """
        return [], information  # by default, keep file and do nothing

    def try_utime(self, path, atime, mtime, errnote='Cannot update utime of file'):
        try:
            os.utime(encodeFilename(path), (atime, mtime))
        except Exception:
            self.report_warning(errnote)

    def _configuration_args(self, exe, *args, **kwargs):
        return _configuration_args(
            self.pp_key(), self.get_param('postprocessor_args'), exe, *args, **kwargs)

    def _hook_progress(self, status, info_dict):
        if not self._progress_hooks:
            return
        status.update({
            'info_dict': info_dict,
            'postprocessor': self.pp_key(),
        })
        for ph in self._progress_hooks:
            ph(status)

    def add_progress_hook(self, ph):
        # See YoutubeDl.py (search for postprocessor_hooks) for a description of this interface
        self._progress_hooks.append(ph)

    def report_progress(self, s):
        s['_default_template'] = '%(postprocessor)s %(status)s' % s
        if not self._downloader:
            return

        progress_dict = s.copy()
        progress_dict.pop('info_dict')
        progress_dict = {'info': s['info_dict'], 'progress': progress_dict}

        progress_template = self.get_param('progress_template', {})
        tmpl = progress_template.get('postprocess')
        if tmpl:
            self._downloader.to_screen(
                self._downloader.evaluate_outtmpl(tmpl, progress_dict), skip_eol=True, quiet=False)

        self._downloader.to_console_title(self._downloader.evaluate_outtmpl(
            progress_template.get('postprocess-title') or 'hypervideo %(progress._default_template)s',
            progress_dict))

    def _retry_download(self, err, count, retries):
        # While this is not an extractor, it behaves similar to one and
        # so obey extractor_retries and "--retry-sleep extractor"
        RetryManager.report_retry(err, count, retries, info=self.to_screen, warn=self.report_warning,
                                  sleep_func=self.get_param('retry_sleep_functions', {}).get('extractor'))

    def _download_json(self, url, *, expected_http_errors=(404,)):
        self.write_debug(f'{self.PP_NAME} query: {url}')
        for retry in RetryManager(self.get_param('extractor_retries', 3), self._retry_download):
            try:
                rsp = self._downloader.urlopen(sanitized_Request(url))
            except network_exceptions as e:
                if isinstance(e, urllib.error.HTTPError) and e.code in expected_http_errors:
                    return None
                retry.error = PostProcessingError(f'Unable to communicate with {self.PP_NAME} API: {e}')
                continue
        return json.loads(rsp.read().decode(rsp.info().get_param('charset') or 'utf-8'))


class AudioConversionError(PostProcessingError):  # Deprecated
    pass