diff options
Diffstat (limited to 'yt_dlp/downloader')
-rw-r--r-- | yt_dlp/downloader/__init__.py | 12 | ||||
-rw-r--r-- | yt_dlp/downloader/dash.py | 68 | ||||
-rw-r--r-- | yt_dlp/downloader/f4m.py | 2 | ||||
-rw-r--r-- | yt_dlp/downloader/fragment.py | 47 |
4 files changed, 91 insertions, 38 deletions
diff --git a/yt_dlp/downloader/__init__.py b/yt_dlp/downloader/__init__.py index 5270e8081..acc19f43a 100644 --- a/yt_dlp/downloader/__init__.py +++ b/yt_dlp/downloader/__init__.py @@ -12,10 +12,15 @@ def get_suitable_downloader(info_dict, params={}, default=NO_DEFAULT, protocol=N info_copy = info_dict.copy() info_copy['to_stdout'] = to_stdout - downloaders = [_get_suitable_downloader(info_copy, proto, params, default) - for proto in (protocol or info_copy['protocol']).split('+')] + protocols = (protocol or info_copy['protocol']).split('+') + downloaders = [_get_suitable_downloader(info_copy, proto, params, default) for proto in protocols] + if set(downloaders) == {FFmpegFD} and FFmpegFD.can_merge_formats(info_copy, params): return FFmpegFD + elif (set(downloaders) == {DashSegmentsFD} + and not (to_stdout and len(protocols) > 1) + and set(protocols) == {'http_dash_segments_generator'}): + return DashSegmentsFD elif len(downloaders) == 1: return downloaders[0] return None @@ -49,6 +54,7 @@ PROTOCOL_MAP = { 'rtsp': RtspFD, 'f4m': F4mFD, 'http_dash_segments': DashSegmentsFD, + 'http_dash_segments_generator': DashSegmentsFD, 'ism': IsmFD, 'mhtml': MhtmlFD, 'niconico_dmc': NiconicoDmcFD, @@ -63,6 +69,7 @@ def shorten_protocol_name(proto, simplify=False): 'm3u8_native': 'm3u8_n', 'rtmp_ffmpeg': 'rtmp_f', 'http_dash_segments': 'dash', + 'http_dash_segments_generator': 'dash_g', 'niconico_dmc': 'dmc', 'websocket_frag': 'WSfrag', } @@ -71,6 +78,7 @@ def shorten_protocol_name(proto, simplify=False): 'https': 'http', 'ftps': 'ftp', 'm3u8_native': 'm3u8', + 'http_dash_segments_generator': 'dash', 'rtmp_ffmpeg': 'rtmp', 'm3u8_frag_urls': 'm3u8', 'dash_frag_urls': 'dash', diff --git a/yt_dlp/downloader/dash.py b/yt_dlp/downloader/dash.py index 6444ad692..8dd43f4fa 100644 --- a/yt_dlp/downloader/dash.py +++ b/yt_dlp/downloader/dash.py @@ -1,4 +1,5 @@ from __future__ import unicode_literals +import time from ..downloader import get_suitable_downloader from .fragment import FragmentFD @@ -15,27 +16,53 @@ class DashSegmentsFD(FragmentFD): FD_NAME = 'dashsegments' def real_download(self, filename, info_dict): - if info_dict.get('is_live'): + if info_dict.get('is_live') and set(info_dict['protocol'].split('+')) != {'http_dash_segments_generator'}: self.report_error('Live DASH videos are not supported') - fragment_base_url = info_dict.get('fragment_base_url') - fragments = info_dict['fragments'][:1] if self.params.get( - 'test', False) else info_dict['fragments'] - + real_start = time.time() real_downloader = get_suitable_downloader( info_dict, self.params, None, protocol='dash_frag_urls', to_stdout=(filename == '-')) - ctx = { - 'filename': filename, - 'total_frags': len(fragments), - } + requested_formats = [{**info_dict, **fmt} for fmt in info_dict.get('requested_formats', [])] + args = [] + for fmt in requested_formats or [info_dict]: + try: + fragment_count = 1 if self.params.get('test') else len(fmt['fragments']) + except TypeError: + fragment_count = None + ctx = { + 'filename': fmt.get('filepath') or filename, + 'live': 'is_from_start' if fmt.get('is_from_start') else fmt.get('is_live'), + 'total_frags': fragment_count, + } + + if real_downloader: + self._prepare_external_frag_download(ctx) + else: + self._prepare_and_start_frag_download(ctx, fmt) + ctx['start'] = real_start + + fragments_to_download = self._get_fragments(fmt, ctx) + + if real_downloader: + self.to_screen( + '[%s] Fragment downloads will be delegated to %s' % (self.FD_NAME, real_downloader.get_basename())) + info_dict['fragments'] = fragments_to_download + fd = real_downloader(self.ydl, self.params) + return fd.real_download(filename, info_dict) + + args.append([ctx, fragments_to_download, fmt]) - if real_downloader: - self._prepare_external_frag_download(ctx) - else: - self._prepare_and_start_frag_download(ctx, info_dict) + return self.download_and_append_fragments_multiple(*args) + + def _resolve_fragments(self, fragments, ctx): + fragments = fragments(ctx) if callable(fragments) else fragments + return [next(fragments)] if self.params.get('test') else fragments + + def _get_fragments(self, fmt, ctx): + fragment_base_url = fmt.get('fragment_base_url') + fragments = self._resolve_fragments(fmt['fragments'], ctx) - fragments_to_download = [] frag_index = 0 for i, fragment in enumerate(fragments): frag_index += 1 @@ -46,17 +73,8 @@ class DashSegmentsFD(FragmentFD): assert fragment_base_url fragment_url = urljoin(fragment_base_url, fragment['path']) - fragments_to_download.append({ + yield { 'frag_index': frag_index, 'index': i, 'url': fragment_url, - }) - - if real_downloader: - self.to_screen( - '[%s] Fragment downloads will be delegated to %s' % (self.FD_NAME, real_downloader.get_basename())) - info_dict['fragments'] = fragments_to_download - fd = real_downloader(self.ydl, self.params) - return fd.real_download(filename, info_dict) - - return self.download_and_append_fragments(ctx, fragments_to_download, info_dict) + } diff --git a/yt_dlp/downloader/f4m.py b/yt_dlp/downloader/f4m.py index 9da2776d9..0008b7c28 100644 --- a/yt_dlp/downloader/f4m.py +++ b/yt_dlp/downloader/f4m.py @@ -366,7 +366,7 @@ class F4mFD(FragmentFD): ctx = { 'filename': filename, 'total_frags': total_frags, - 'live': live, + 'live': bool(live), } self._prepare_frag_download(ctx) diff --git a/yt_dlp/downloader/fragment.py b/yt_dlp/downloader/fragment.py index 04b0f68c0..79c6561c7 100644 --- a/yt_dlp/downloader/fragment.py +++ b/yt_dlp/downloader/fragment.py @@ -1,9 +1,10 @@ from __future__ import division, unicode_literals +import http.client +import json +import math import os import time -import json -from math import ceil try: import concurrent.futures @@ -15,6 +16,7 @@ from .common import FileDownloader from .http import HttpFD from ..aes import aes_cbc_decrypt_bytes from ..compat import ( + compat_os_name, compat_urllib_error, compat_struct_pack, ) @@ -90,7 +92,7 @@ class FragmentFD(FileDownloader): self._start_frag_download(ctx, info_dict) def __do_ytdl_file(self, ctx): - return not ctx['live'] and not ctx['tmpfilename'] == '-' and not self.params.get('_no_ytdl_file') + return ctx['live'] is not True and ctx['tmpfilename'] != '-' and not self.params.get('_no_ytdl_file') def _read_ytdl_file(self, ctx): assert 'ytdl_corrupt' not in ctx @@ -375,17 +377,20 @@ class FragmentFD(FileDownloader): @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ... all args must be either tuple or list ''' + interrupt_trigger = [True] max_progress = len(args) if max_progress == 1: return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func) - max_workers = self.params.get('concurrent_fragment_downloads', max_progress) + max_workers = self.params.get('concurrent_fragment_downloads', 1) if max_progress > 1: self._prepare_multiline_status(max_progress) def thread_func(idx, ctx, fragments, info_dict, tpe): ctx['max_progress'] = max_progress ctx['progress_idx'] = idx - return self.download_and_append_fragments(ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, tpe=tpe) + return self.download_and_append_fragments( + ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, + tpe=tpe, interrupt_trigger=interrupt_trigger) class FTPE(concurrent.futures.ThreadPoolExecutor): # has to stop this or it's going to wait on the worker thread itself @@ -393,8 +398,11 @@ class FragmentFD(FileDownloader): pass spins = [] + if compat_os_name == 'nt': + self.report_warning('Ctrl+C does not work on Windows when used with parallel threads. ' + 'This is a known issue and patches are welcome') for idx, (ctx, fragments, info_dict) in enumerate(args): - tpe = FTPE(ceil(max_workers / max_progress)) + tpe = FTPE(math.ceil(max_workers / max_progress)) job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe) spins.append((tpe, job)) @@ -402,18 +410,32 @@ class FragmentFD(FileDownloader): for tpe, job in spins: try: result = result and job.result() + except KeyboardInterrupt: + interrupt_trigger[0] = False finally: tpe.shutdown(wait=True) + if not interrupt_trigger[0]: + raise KeyboardInterrupt() return result - def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, tpe=None): + def download_and_append_fragments( + self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, + tpe=None, interrupt_trigger=None): + if not interrupt_trigger: + interrupt_trigger = (True, ) + fragment_retries = self.params.get('fragment_retries', 0) - is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True) + is_fatal = ( + ((lambda _: False) if info_dict.get('is_live') else (lambda idx: idx == 0)) + if self.params.get('skip_unavailable_fragments', True) else (lambda _: True)) + if not pack_func: pack_func = lambda frag_content, _: frag_content def download_fragment(fragment, ctx): frag_index = ctx['fragment_index'] = fragment['frag_index'] + if not interrupt_trigger[0]: + return False, frag_index headers = info_dict.get('http_headers', {}).copy() byte_range = fragment.get('byte_range') if byte_range: @@ -428,7 +450,7 @@ class FragmentFD(FileDownloader): if not success: return False, frag_index break - except compat_urllib_error.HTTPError as err: + except (compat_urllib_error.HTTPError, http.client.IncompleteRead) as err: # Unavailable (possibly temporary) fragments may be served. # First we try to retry then either skip or abort. # See https://github.com/ytdl-org/youtube-dl/issues/10165, @@ -466,7 +488,8 @@ class FragmentFD(FileDownloader): decrypt_fragment = self.decrypter(info_dict) - max_workers = self.params.get('concurrent_fragment_downloads', 1) + max_workers = math.ceil( + self.params.get('concurrent_fragment_downloads', 1) / ctx.get('max_progress', 1)) if can_threaded_download and max_workers > 1: def _download_fragment(fragment): @@ -477,6 +500,8 @@ class FragmentFD(FileDownloader): self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome') with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool: for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments): + if not interrupt_trigger[0]: + break ctx['fragment_filename_sanitized'] = frag_filename ctx['fragment_index'] = frag_index result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx) @@ -484,6 +509,8 @@ class FragmentFD(FileDownloader): return False else: for fragment in fragments: + if not interrupt_trigger[0]: + break frag_content, frag_index = download_fragment(fragment, ctx) result = append_fragment(decrypt_fragment(fragment, frag_content), frag_index, ctx) if not result: |